Finding the Top Daily Spender with PySpark

Window functions, aggregations, and joins — a practical walkthrough
Published March 2026 · 5 min read
Introduction
A common analytics question is: "Who spent the most each day?" It sounds simple, but answering it cleanly in a distributed system like Apache Spark requires a thoughtful combination of aggregation, window functions, and joins. In this post, we’ll walk through a PySpark pipeline that does exactly that — identifying the highest-spending individual on each day within a three-month window.
The Problem
We have two tables:
• purchases — containing columns for person_id, purchase_date, and purchase_amount
• people — containing columns for id and first_name
Our goal: for every day between January 1, 2025 and March 1, 2025, find the person who spent the most and return their name, the date, and the amount.
Step-by-Step Walkthrough
Step 1: Filter to the Date Range
First, we narrow our dataset to only the purchases that fall within the target window. This is a straightforward filter operation that reduces the data volume for all downstream steps.
from pyspark.sql import functions as F
from pyspark.sql import types as T
# Filter purchases to the target date range
df_purchase_window = purchases.filter(
(F.col("purchase_date") >= "2025-01-01") &
(F.col("purchase_date") <= "2025-03-01")
)
Why this matters: Filtering early is a best practice in Spark. It leverages predicate pushdown — meaning Spark can skip reading irrelevant partitions from disk entirely, dramatically reducing I/O.
Step 2: Aggregate Daily Spend per Person
A person might make multiple purchases in a single day. We need to sum those up to get a single total per person per day.
# Compute each person's total spend per day
df_daily_spend = (
df_purchase_window
.groupBy("person_id", "purchase_date")
.agg(F.sum("purchase_amount").alias("top_daily_spend"))
)
The groupBy produces one row per person per day, with their total spend captured in the top_daily_spend column.
Step 3: Rank with a Window Function
This is where things get interesting. We use Spark’s Window API to rank spenders within each day, then keep only the top-ranked row.
# Rank people by daily spend for each purchase date
w = Window.partitionBy("purchase_date").orderBy(F.col("top_daily_spend").desc())
df_top_spenders_per_day = (
df_daily_spend
.withColumn("daily_rank", F.rank().over(w))
.filter(F.col("daily_rank") == 1)
)
A note on rank() vs. row_number(): We use rank() intentionally here. If two people spend the same amount on the same day, rank() assigns them both rank 1, so neither is silently dropped. By contrast, row_number() would arbitrarily pick one and discard the other — a subtle data-loss bug that’s easy to miss.
Step 4: Join for Human-Readable Names
IDs are great for machines, but reports are for people. We join against the people table to swap the ID for a name, then select and order the final output.
# Join to people table to get names
df_result = (
df_top_spenders_per_day
.join(
people,
df_top_spenders_per_day["person_id"] == people["id"],
"inner"
)
.select(
people["first_name"].alias("person_name"),
df_top_spenders_per_day["purchase_date"],
df_top_spenders_per_day["top_daily_spend"]
)
.orderBy("purchase_date", "person_name")
)
# Validate
df_result.show(truncate=False)
Using explicit DataFrame column references (e.g., people["first_name"]) instead of bare string column names avoids ambiguity errors that crop up when both sides of a join share column names.
Key Takeaways
1. Filter early. Reducing data volume at the start pays dividends in every subsequent step.
2. Choose your ranking function carefully. rank() preserves ties; row_number() doesn’t. Know the difference.
3. Use explicit column references in joins. It prevents ambiguous column errors and makes your code self-documenting.
4. Window functions are your friend. They let you compute rankings, running totals, and comparisons within groups without collapsing your DataFrame.
Wrapping Up
This pipeline is a compact example of a pattern that shows up constantly in real-world data engineering: filter, aggregate, rank, and enrich. Once you internalize these building blocks, you can compose them to answer increasingly complex analytical questions — all while keeping your Spark jobs efficient and your code readable.



