0
0
Apache Sparkdata~5 mins

Unit testing Spark transformations in Apache Spark

Choose your learning style9 modes available
Introduction

Unit testing Spark transformations helps you check if your data changes work correctly before running big jobs. It saves time and avoids mistakes.

When you want to check if a data filter keeps only the right rows.
When you want to verify that a new column is calculated correctly.
When you want to make sure joining two datasets works as expected.
When you want to test your data cleaning steps on small samples.
When you want to catch errors early in your Spark code.
Syntax
Apache Spark
def test_transformation(spark):
    input_df = spark.createDataFrame([...], schema)
    result_df = your_transformation_function(input_df)
    expected_df = spark.createDataFrame([...], schema)
    assert result_df.collect() == expected_df.collect()

Use small sample data to keep tests fast.

Compare results using collect() or DataFrame equality helpers.

Examples
This test checks if filtering rows with id > 1 works correctly.
Apache Spark
def test_filter(spark):
    data = [(1, 'a'), (2, 'b'), (3, 'c')]
    df = spark.createDataFrame(data, ['id', 'value'])
    filtered = df.filter(df.id > 1)
    expected = spark.createDataFrame([(2, 'b'), (3, 'c')], ['id', 'value'])
    assert filtered.collect() == expected.collect()
This test checks if a new column 'double' is correctly added by multiplying 'num' by 2.
Apache Spark
def test_add_column(spark):
    data = [(1,), (2,), (3,)]
    df = spark.createDataFrame(data, ['num'])
    result = df.withColumn('double', df.num * 2)
    expected = spark.createDataFrame([(1, 2), (2, 4), (3, 6)], ['num', 'double'])
    assert result.collect() == expected.collect()
Sample Program

This program creates a small DataFrame, applies a transformation to uppercase a column, and checks if the result matches the expected DataFrame. It prints 'Test passed!' if the test succeeds.

Apache Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import upper

def test_uppercase_column():
    spark = SparkSession.builder.master('local').appName('UnitTest').getOrCreate()
    data = [('alice',), ('bob',), ('carol',)]
    df = spark.createDataFrame(data, ['name'])
    # Transformation: make names uppercase
    result = df.withColumn('name_upper', upper(df.name))
    expected_data = [('alice', 'ALICE'), ('bob', 'BOB'), ('carol', 'CAROL')]
    expected = spark.createDataFrame(expected_data, ['name', 'name_upper'])
    assert result.collect() == expected.collect()
    print('Test passed!')
    spark.stop()

if __name__ == '__main__':
    test_uppercase_column()
OutputSuccess
Important Notes

Use SparkSession with master='local' for fast local testing.

Comparing DataFrames with collect() works for small data only.

For bigger tests, consider using testing libraries like 'pytest' and Spark testing utilities.

Summary

Unit tests check small parts of your Spark code to catch errors early.

Use small sample data and compare actual vs expected results.

Run tests often to keep your data transformations correct.