0
0
Apache-sparkHow-ToBeginner ยท 3 min read

How to Use lag and lead Functions in PySpark

In PySpark, use the lag and lead functions from pyspark.sql.functions with a window specification to access previous or next rows in a DataFrame. These functions help compare values across rows by shifting data backward or forward within a partition.
๐Ÿ“

Syntax

The lag and lead functions require three main parts:

  • Column: The column to shift.
  • Offset: How many rows to move backward (lag) or forward (lead), default is 1.
  • Default value: Value to use if no row exists at the offset, optional.
  • Window: Defines how to group and order rows for shifting.

Example syntax:

lag(column, offset=1, default=None).over(windowSpec)
lead(column, offset=1, default=None).over(windowSpec)
python
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, lead

windowSpec = Window.partitionBy('group_column').orderBy('order_column')

lag_col = lag('value_column', 1).over(windowSpec)
lead_col = lead('value_column', 1).over(windowSpec)
๐Ÿ’ป

Example

This example shows how to add columns with the previous and next values of a column within each group.

python
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, lead

spark = SparkSession.builder.master('local[*]').appName('LagLeadExample').getOrCreate()

data = [
    ('A', 1, 100),
    ('A', 2, 200),
    ('A', 3, 300),
    ('B', 1, 400),
    ('B', 2, 500)
]

columns = ['group', 'order', 'value']
df = spark.createDataFrame(data, columns)

windowSpec = Window.partitionBy('group').orderBy('order')

df_with_lag_lead = df.withColumn('prev_value', lag('value', 1).over(windowSpec)) \
                     .withColumn('next_value', lead('value', 1).over(windowSpec))

df_with_lag_lead.show()
Output
+-----+-----+-----+----------+----------+ |group|order|value|prev_value|next_value| +-----+-----+-----+----------+----------+ | A| 1| 100| null| 200| | A| 2| 200| 100| 300| | A| 3| 300| 200| null| | B| 1| 400| null| 500| | B| 2| 500| 400| null| +-----+-----+-----+----------+----------+
โš ๏ธ

Common Pitfalls

  • Not defining a Window causes errors or unexpected results because lag and lead need a window to know how to order rows.
  • Using lag or lead without partitioning may mix data from different groups.
  • Forgetting to handle null values returned when no previous or next row exists.
python
from pyspark.sql.window import Window
from pyspark.sql.functions import lag

# Wrong: no window defined
# df.withColumn('prev', lag('value').over())  # This will raise an error

# Right: define window with partition and order
windowSpec = Window.partitionBy('group').orderBy('order')
df.withColumn('prev', lag('value', 1, 0).over(windowSpec))  # default 0 instead of null
๐Ÿ“Š

Quick Reference

FunctionPurposeParametersReturns
lag(column, offset=1, default=None)Access previous row valuecolumn: column name, offset: rows back, default: fill valueValue from previous row or default
lead(column, offset=1, default=None)Access next row valuecolumn: column name, offset: rows forward, default: fill valueValue from next row or default
Window.partitionBy(cols).orderBy(cols)Define groups and order for lag/leadcols: columns to partition/order byWindow specification for over()
โœ…

Key Takeaways

Always use a Window specification with lag and lead to define row order and grouping.
lag accesses previous rows; lead accesses next rows within the window.
Specify offset and default values to control how far to look and what to return if no row exists.
Handle nulls returned by lag/lead when no previous or next row is available.
Use partitionBy to avoid mixing data from different groups when shifting rows.