How to Use lag and lead Functions in PySpark
In PySpark, use the
lag and lead functions from pyspark.sql.functions with a window specification to access previous or next rows in a DataFrame. These functions help compare values across rows by shifting data backward or forward within a partition.Syntax
The lag and lead functions require three main parts:
- Column: The column to shift.
- Offset: How many rows to move backward (
lag) or forward (lead), default is 1. - Default value: Value to use if no row exists at the offset, optional.
- Window: Defines how to group and order rows for shifting.
Example syntax:
lag(column, offset=1, default=None).over(windowSpec) lead(column, offset=1, default=None).over(windowSpec)
python
from pyspark.sql.window import Window from pyspark.sql.functions import lag, lead windowSpec = Window.partitionBy('group_column').orderBy('order_column') lag_col = lag('value_column', 1).over(windowSpec) lead_col = lead('value_column', 1).over(windowSpec)
Example
This example shows how to add columns with the previous and next values of a column within each group.
python
from pyspark.sql import SparkSession from pyspark.sql.window import Window from pyspark.sql.functions import lag, lead spark = SparkSession.builder.master('local[*]').appName('LagLeadExample').getOrCreate() data = [ ('A', 1, 100), ('A', 2, 200), ('A', 3, 300), ('B', 1, 400), ('B', 2, 500) ] columns = ['group', 'order', 'value'] df = spark.createDataFrame(data, columns) windowSpec = Window.partitionBy('group').orderBy('order') df_with_lag_lead = df.withColumn('prev_value', lag('value', 1).over(windowSpec)) \ .withColumn('next_value', lead('value', 1).over(windowSpec)) df_with_lag_lead.show()
Output
+-----+-----+-----+----------+----------+
|group|order|value|prev_value|next_value|
+-----+-----+-----+----------+----------+
| A| 1| 100| null| 200|
| A| 2| 200| 100| 300|
| A| 3| 300| 200| null|
| B| 1| 400| null| 500|
| B| 2| 500| 400| null|
+-----+-----+-----+----------+----------+
Common Pitfalls
- Not defining a
Windowcauses errors or unexpected results becauselagandleadneed a window to know how to order rows. - Using
lagorleadwithout partitioning may mix data from different groups. - Forgetting to handle
nullvalues returned when no previous or next row exists.
python
from pyspark.sql.window import Window from pyspark.sql.functions import lag # Wrong: no window defined # df.withColumn('prev', lag('value').over()) # This will raise an error # Right: define window with partition and order windowSpec = Window.partitionBy('group').orderBy('order') df.withColumn('prev', lag('value', 1, 0).over(windowSpec)) # default 0 instead of null
Quick Reference
| Function | Purpose | Parameters | Returns |
|---|---|---|---|
| lag(column, offset=1, default=None) | Access previous row value | column: column name, offset: rows back, default: fill value | Value from previous row or default |
| lead(column, offset=1, default=None) | Access next row value | column: column name, offset: rows forward, default: fill value | Value from next row or default |
| Window.partitionBy(cols).orderBy(cols) | Define groups and order for lag/lead | cols: columns to partition/order by | Window specification for over() |
Key Takeaways
Always use a Window specification with lag and lead to define row order and grouping.
lag accesses previous rows; lead accesses next rows within the window.
Specify offset and default values to control how far to look and what to return if no row exists.
Handle nulls returned by lag/lead when no previous or next row is available.
Use partitionBy to avoid mixing data from different groups when shifting rows.