Windowed aggregations help us calculate values like sums or averages over a moving group of rows. This lets us see trends or comparisons within parts of our data.
0
0
Windowed aggregations in Apache Spark
Introduction
To find a running total of sales for each day in a month.
To compare each student's score with the average score in their class.
To calculate the average temperature over the last 7 days for each city.
To rank products by sales within each category.
To find the difference between each row's value and the previous row's value.
Syntax
Apache Spark
from pyspark.sql import Window from pyspark.sql.functions import sum, avg, rank windowSpec = Window.partitionBy('group_column').orderBy('order_column').rowsBetween(start, end) # Example aggregation df.withColumn('running_sum', sum('value_column').over(windowSpec))
partitionBy groups data into windows.
orderBy defines the order inside each group.
Examples
This calculates the total sales up to the current date for each category.
Apache Spark
from pyspark.sql import Window from pyspark.sql.functions import sum windowSpec = Window.partitionBy('category').orderBy('date').rowsBetween(Window.unboundedPreceding, 0) df.withColumn('cumulative_sales', sum('sales').over(windowSpec))
This finds the average temperature over the last 7 days for each city.
Apache Spark
from pyspark.sql import Window from pyspark.sql.functions import avg windowSpec = Window.partitionBy('city').orderBy('date').rowsBetween(-6, 0) df.withColumn('weekly_avg_temp', avg('temperature').over(windowSpec))
This ranks employees by salary within each department.
Apache Spark
from pyspark.sql import Window from pyspark.sql.functions import rank, desc windowSpec = Window.partitionBy('department').orderBy(desc('salary')) df.withColumn('salary_rank', rank().over(windowSpec))
Sample Program
This program creates a small sales dataset. It then calculates the running total of sales for each category by date using windowed aggregation.
Apache Spark
from pyspark.sql import SparkSession from pyspark.sql import Window from pyspark.sql.functions import sum, avg spark = SparkSession.builder.appName('WindowedAggregationsExample').getOrCreate() # Sample data data = [ ('A', '2024-01-01', 100), ('A', '2024-01-02', 150), ('A', '2024-01-03', 200), ('B', '2024-01-01', 80), ('B', '2024-01-02', 120), ('B', '2024-01-03', 160) ] columns = ['category', 'date', 'sales'] df = spark.createDataFrame(data, columns) # Define window: group by category, order by date, from start to current row windowSpec = Window.partitionBy('category').orderBy('date').rowsBetween(Window.unboundedPreceding, 0) # Calculate cumulative sales result_df = df.withColumn('cumulative_sales', sum('sales').over(windowSpec)) result_df.show()
OutputSuccess
Important Notes
Window functions do not reduce the number of rows; they add new columns with aggregated values.
Ordering inside the window is important for cumulative or running calculations.
Use rowsBetween to control the range of rows included in the window.
Summary
Windowed aggregations let you calculate values over groups of rows without losing detail.
They are useful for running totals, moving averages, rankings, and comparisons.
Use partitionBy to group, orderBy to sort, and rowsBetween to set the window range.