Absolutely! Let’s break it down and explain each PySpark window function with examples using your code and dataset. I’ll categorize the functions into three main types as you mentioned: Ranking, Analytic, and Aggregate functions. 👇
📦 Dataset Setup
simpleData = (
("James", "Sales", 3000),
("Michael", "Sales", 4600),
("Robert", "Sales", 4100),
("Maria", "Finance", 3000),
("James", "Sales", 3000),
("Scott", "Finance", 3300),
("Jen", "Finance", 3900),
("Jeff", "Marketing", 3000),
("Kumar", "Marketing", 2000),
("Saif", "Sales", 4100)
)
columns =
df = spark.createDataFrame(data=simpleData, schema=columns)
df.show(truncate=False)
🪟 Window Specifications
from pyspark.sql.window import Window
from pyspark.sql.functions import *
# For functions that need ordering within department
windowSpec = Window.partitionBy("department").orderBy("salary")
# For aggregate functions per department
windowAgg = Window.partitionBy("department")
🔢 1. Ranking Functions
These return a ranking for each row within the partition.
➤ row_number()
Assigns a unique row number to each row within the department based on salary ordering.
.withColumn("row_number", row_number().over(windowSpec))
📌 Useful for removing duplicates or selecting top-N rows.
➤ rank()
Gives ranks to rows with gaps for ties.
.withColumn("rank", rank().over(windowSpec))
📝 Example: If two people have the same salary and are ranked 1st, the next rank will be 3.
➤ dense_rank()
Like rank(), but no gaps between ranks.
.withColumn("dense_rank", dense_rank().over(windowSpec))
➤ percent_rank()
Shows relative rank in percent (0 to 1). Multiply by 100 to see percentage.
.withColumn("percent_rank", percent_rank().over(windowSpec) * 100)
➤ cume_dist()
Gives cumulative distribution — proportion of rows less than or equal to current row.
.withColumn("cumulative_distribution", cume_dist().over(windowSpec) * 100)
➤ ntile(n)
Divides the partition into n buckets and assigns a bucket number.
.withColumn("ntile", ntile(2).over(windowSpec))
📌 In this case, splits each department into 2 groups based on salary.
🔍 2. Analytic Functions
These return values from other rows in the window frame.
➤ lag()
Gets the previous row’s salary.
.withColumn("lag", lag("salary", 1).over(windowSpec))
➤ lead()
Gets the next row’s salary.
.withColumn("lead", lead("salary", 1).over(windowSpec))
📊 3. Aggregate Window Functions
These return aggregated values over partitions.
➤ avg(), sum(), min(), max()
Calculate average, sum, min, and max salary per department.
.withColumn("avg", avg("salary").over(windowAgg))
.withColumn("sum", sum("salary").over(windowAgg))
.withColumn("min", min("salary").over(windowAgg))
.withColumn("max", max("salary").over(windowAgg))
➤ Running Total (Cumulative Sum)
.withColumn("RunningSum", sum("salary").over(windowSpec))
📌 Running total salary for each department, ordered by salary.
📌 Final Combined Example
df \
.withColumn("row_number", row_number().over(windowSpec)) \
.withColumn("rank", rank().over(windowSpec)) \
.withColumn("dense_rank", dense_rank().over(windowSpec)) \
.withColumn("percent_rank", percent_rank().over(windowSpec) * 100) \
.withColumn("cumulative_distribution", cume_dist().over(windowSpec) * 100) \
.withColumn("ntile", ntile(2).over(windowSpec)) \
.withColumn("lag", lag("salary", 1).over(windowSpec)) \
.withColumn("lead", lead("salary", 1).over(windowSpec)) \
.withColumn("avg", avg("salary").over(windowAgg)) \
.withColumn("sum", sum("salary").over(windowAgg)) \
.withColumn("RunningSum", sum("salary").over(windowSpec)) \
.withColumn("min", min("salary").over(windowAgg)) \
.withColumn("max", max("salary").over(windowAgg)) \
.show()
💡 Summary Table
| Function | Type | Purpose |
|---|---|---|
row_number() | Ranking | Unique row ID per partition |
rank() | Ranking | Rank with gaps |
dense_rank() | Ranking | Rank without gaps |
percent_rank() | Ranking | Relative rank (0 to 1) |
cume_dist() | Ranking | Cumulative % distribution |
ntile(n) | Ranking | Divide into n groups |
lag() | Analytic | Previous row’s value |
lead() | Analytic | Next row’s value |
avg(), sum() | Aggregate | Average and sum |
min(), max() | Aggregate | Min and max |
RunningSum() | Aggregate | Cumulative sum |