0
0
Apache Sparkdata~5 mins

Windowed aggregations in Apache Spark

Choose your learning style9 modes available
Introduction

Windowed aggregations help us calculate values like sums or averages over a moving group of rows. This lets us see trends or comparisons within parts of our data.

To find a running total of sales for each day in a month.
To compare each student's score with the average score in their class.
To calculate the average temperature over the last 7 days for each city.
To rank products by sales within each category.
To find the difference between each row's value and the previous row's value.
Syntax
Apache Spark
from pyspark.sql import Window
from pyspark.sql.functions import sum, avg, rank

windowSpec = Window.partitionBy('group_column').orderBy('order_column').rowsBetween(start, end)

# Example aggregation
df.withColumn('running_sum', sum('value_column').over(windowSpec))

partitionBy groups data into windows.

orderBy defines the order inside each group.

Examples
This calculates the total sales up to the current date for each category.
Apache Spark
from pyspark.sql import Window
from pyspark.sql.functions import sum

windowSpec = Window.partitionBy('category').orderBy('date').rowsBetween(Window.unboundedPreceding, 0)
df.withColumn('cumulative_sales', sum('sales').over(windowSpec))
This finds the average temperature over the last 7 days for each city.
Apache Spark
from pyspark.sql import Window
from pyspark.sql.functions import avg

windowSpec = Window.partitionBy('city').orderBy('date').rowsBetween(-6, 0)
df.withColumn('weekly_avg_temp', avg('temperature').over(windowSpec))
This ranks employees by salary within each department.
Apache Spark
from pyspark.sql import Window
from pyspark.sql.functions import rank, desc

windowSpec = Window.partitionBy('department').orderBy(desc('salary'))
df.withColumn('salary_rank', rank().over(windowSpec))
Sample Program

This program creates a small sales dataset. It then calculates the running total of sales for each category by date using windowed aggregation.

Apache Spark
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import sum, avg

spark = SparkSession.builder.appName('WindowedAggregationsExample').getOrCreate()

# Sample data
data = [
    ('A', '2024-01-01', 100),
    ('A', '2024-01-02', 150),
    ('A', '2024-01-03', 200),
    ('B', '2024-01-01', 80),
    ('B', '2024-01-02', 120),
    ('B', '2024-01-03', 160)
]

columns = ['category', 'date', 'sales']
df = spark.createDataFrame(data, columns)

# Define window: group by category, order by date, from start to current row
windowSpec = Window.partitionBy('category').orderBy('date').rowsBetween(Window.unboundedPreceding, 0)

# Calculate cumulative sales
result_df = df.withColumn('cumulative_sales', sum('sales').over(windowSpec))

result_df.show()
OutputSuccess
Important Notes

Window functions do not reduce the number of rows; they add new columns with aggregated values.

Ordering inside the window is important for cumulative or running calculations.

Use rowsBetween to control the range of rows included in the window.

Summary

Windowed aggregations let you calculate values over groups of rows without losing detail.

They are useful for running totals, moving averages, rankings, and comparisons.

Use partitionBy to group, orderBy to sort, and rowsBetween to set the window range.