Back to all posts

How to use Window Functions in PySpark

Absolutely! Let’s break it down and explain each PySpark window function with examples using your code and dataset. I’ll categorize the functions into thre…

Absolutely! Let’s break it down and explain each PySpark window function with examples using your code and dataset. I’ll categorize the functions into three main types as you mentioned: Ranking, Analytic, and Aggregate functions. 👇


📦 Dataset Setup

Bash
simpleData = (
    ("James", "Sales", 3000),
    ("Michael", "Sales", 4600),
    ("Robert", "Sales", 4100),
    ("Maria", "Finance", 3000),
    ("James", "Sales", 3000),
    ("Scott", "Finance", 3300),
    ("Jen", "Finance", 3900),
    ("Jeff", "Marketing", 3000),
    ("Kumar", "Marketing", 2000),
    ("Saif", "Sales", 4100)
)
columns = 
df = spark.createDataFrame(data=simpleData, schema=columns)
df.show(truncate=False)

🪟 Window Specifications

Python
from pyspark.sql.window import Window
from pyspark.sql.functions import *

# For functions that need ordering within department
windowSpec = Window.partitionBy("department").orderBy("salary")

# For aggregate functions per department
windowAgg = Window.partitionBy("department")

🔢 1. Ranking Functions

These return a ranking for each row within the partition.

row_number()

Assigns a unique row number to each row within the department based on salary ordering.

SQL
.withColumn("row_number", row_number().over(windowSpec))

📌 Useful for removing duplicates or selecting top-N rows.


rank()

Gives ranks to rows with gaps for ties.

SQL
.withColumn("rank", rank().over(windowSpec))

📝 Example: If two people have the same salary and are ranked 1st, the next rank will be 3.


dense_rank()

Like rank(), but no gaps between ranks.

SQL
.withColumn("dense_rank", dense_rank().over(windowSpec))

percent_rank()

Shows relative rank in percent (0 to 1). Multiply by 100 to see percentage.

SQL
.withColumn("percent_rank", percent_rank().over(windowSpec) * 100)

cume_dist()

Gives cumulative distribution — proportion of rows less than or equal to current row.

SQL
.withColumn("cumulative_distribution", cume_dist().over(windowSpec) * 100)

ntile(n)

Divides the partition into n buckets and assigns a bucket number.

SQL
.withColumn("ntile", ntile(2).over(windowSpec))

📌 In this case, splits each department into 2 groups based on salary.


🔍 2. Analytic Functions

These return values from other rows in the window frame.

lag()

Gets the previous row’s salary.

SQL
.withColumn("lag", lag("salary", 1).over(windowSpec))

lead()

Gets the next row’s salary.

SQL
.withColumn("lead", lead("salary", 1).over(windowSpec))

📊 3. Aggregate Window Functions

These return aggregated values over partitions.

avg(), sum(), min(), max()

Calculate average, sum, min, and max salary per department.

SQL
.withColumn("avg", avg("salary").over(windowAgg))
.withColumn("sum", sum("salary").over(windowAgg))
.withColumn("min", min("salary").over(windowAgg))
.withColumn("max", max("salary").over(windowAgg))

➤ Running Total (Cumulative Sum)

SQL
.withColumn("RunningSum", sum("salary").over(windowSpec))

📌 Running total salary for each department, ordered by salary.


📌 Final Combined Example

SQL
df \
.withColumn("row_number", row_number().over(windowSpec)) \
.withColumn("rank", rank().over(windowSpec)) \
.withColumn("dense_rank", dense_rank().over(windowSpec)) \
.withColumn("percent_rank", percent_rank().over(windowSpec) * 100) \
.withColumn("cumulative_distribution", cume_dist().over(windowSpec) * 100) \
.withColumn("ntile", ntile(2).over(windowSpec)) \
.withColumn("lag", lag("salary", 1).over(windowSpec)) \
.withColumn("lead", lead("salary", 1).over(windowSpec)) \
.withColumn("avg", avg("salary").over(windowAgg)) \
.withColumn("sum", sum("salary").over(windowAgg)) \
.withColumn("RunningSum", sum("salary").over(windowSpec)) \
.withColumn("min", min("salary").over(windowAgg)) \
.withColumn("max", max("salary").over(windowAgg)) \
.show()

💡 Summary Table

FunctionTypePurpose
row_number()RankingUnique row ID per partition
rank()RankingRank with gaps
dense_rank()RankingRank without gaps
percent_rank()RankingRelative rank (0 to 1)
cume_dist()RankingCumulative % distribution
ntile(n)RankingDivide into n groups
lag()AnalyticPrevious row’s value
lead()AnalyticNext row’s value
avg(), sum()AggregateAverage and sum
min(), max()AggregateMin and max
RunningSum()AggregateCumulative sum

Keep building your data skillset

Explore more SQL, Python, analytics, and engineering tutorials.