Skip to main content

Command Palette

Search for a command to run...

Mastering PySpark Joins & Window Functions

Updated
4 min read
Mastering PySpark Joins & Window Functions

A Server Monitoring Dashboard Example


When working with large-scale infrastructure, a common real-world need is building a monitoring dashboard — something that shows you the health of your servers at a glance. In this post, we'll walk through a practical PySpark example that combines joins, window functions, and aggregations to build exactly that.

We'll work with two tables: one for servers and one for health checks. Our goal is to produce a single summary showing each server's latest check status and its total number of critical alerts.


Step 1: Set Up the Data

First, we create our two source DataFrames. The server table holds basic metadata, and the health_check table logs each check's outcome and date.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder.getOrCreate()

# Server table
server_data = [
    (1, "Web App Alpha"),
    (2, "Database Cluster"),
    (3, "Cache Layer"),
    (4, "Auth Gateway"),
    (5, "Search Index"),
    (6, "Log Aggregator"),
    (7, "API Proxy"),
    (8, "Scheduler Daemon"),
]
server_columns = ["server_id", "server_name"]
df_servers = spark.createDataFrame(server_data, server_columns)

# Health Check table
health_check_data = [
    (501, 1, "Healthy",  "2025-08-01"),
    (502, 1, "Critical", "2025-08-03"),
    (503, 1, "Critical", "2025-08-05"),
    (504, 1, "Healthy",  "2025-08-07"),
    (505, 2, "Critical", "2025-08-02"),
    (506, 2, "Critical", "2025-08-04"),
    (507, 2, "Critical", "2025-08-06"),

    (508, 3, "Healthy",  "2025-08-01"),
    (509, 3, "Healthy",  "2025-08-03"),
    (510, 3, "Critical", "2025-08-05"),
    (511, 3, "Healthy",  "2025-08-06"),
    (512, 3, "Critical", "2025-08-08"),

    (513, 5, "Healthy",  "2025-08-04"),

    (514, 6, "Critical", "2025-08-03"),

    (515, 7, "Healthy",  "2025-08-01"),
    (516, 7, "Healthy",  "2025-08-02"),
    (517, 7, "Healthy",  "2025-08-04"),
    (518, 7, "Healthy",  "2025-08-06"),
]
health_check_columns = ["check_id", "server_id", "status", "check_date"]
df_health_checks = spark.createDataFrame(health_check_data, health_check_columns)

Notice that servers 4 and 8 have no health checks at all — this is intentional. It lets us demonstrate how left joins handle missing data gracefully.


Step 2: Aggregate Critical Alert Counts

We filter for only "Critical" statuses and group by server to get an alert tally:

df_critical_counts = (
    df_health_checks
    .where(F.lower(F.col("status")) == "critical")
    .groupBy("server_id", "status")
    .agg(F.count("status").alias("critical_count"))
)

This gives us one row per server that has ever had a critical alert, along with the count.


Step 3: Find the Latest Health Check per Server

This is where window functions shine. We partition by server_id, order by check_date descending, and assign a row number. Row 1 is always the most recent check:

w = Window.partitionBy("server_id").orderBy(F.col("check_date").desc())

df_latest_checks = (
    df_health_checks
    .withColumn("rn", F.row_number().over(w))
    .where(F.col("rn") == 1)
)

Why row_number() instead of max()? Because we don't just want the latest date — we want the entire row (including the status) for that date. Window functions let us do that elegantly.


Step 4: Join Everything Together

Now we bring it all together with two left joins — one for the latest check, one for the critical count:

df_server_dashboard = (
    df_servers
    .join(df_latest_checks,
          df_servers["server_id"] == df_latest_checks["server_id"], "left")
    .join(df_critical_counts,
          df_servers["server_id"] == df_critical_counts["server_id"], "left")
    .select(
        df_servers["*"],
        F.coalesce(df_latest_checks["status"],
                    F.lit("N/A")).alias("last_check_status"),
        F.coalesce(df_critical_counts["critical_count"],
                    F.lit(0)).alias("critical_count")
    )
    .orderBy(df_servers["server_id"])
)

df_server_dashboard.show(truncate=False)

The coalesce calls are crucial: servers with no checks get "N/A" instead of null, and servers with zero critical alerts get 0 instead of null. This makes the output clean and dashboard-ready.


Output


Key Takeaways

Here's a quick reference for the DataFrame names and what they represent:

DataFrame Purpose
df_servers Master list of all registered servers
df_health_checks Raw health check log with status and date
df_critical_counts Aggregated critical alert totals per server
df_latest_checks Most recent check per server via window function
df_server_dashboard Final joined output ready for display

The three PySpark techniques at work here each solve a distinct problem. Window functions (row_number) let you pick the most recent row per group without losing columns. Left joins ensure every server appears in the output, even those with no activity. And coalesce transforms nulls into meaningful defaults so your output is always presentation-ready.

This pattern — "latest record + aggregated stats + full entity list" — appears constantly in data engineering. Whether you're monitoring servers, tracking orders, or building user dashboards, the shape of the solution is the same. Master these three building blocks and you'll find yourself reaching for them again and again.