Unit testing Spark transformations helps you check if your data changes work correctly before running big jobs. It saves time and avoids mistakes.
0
0
Unit testing Spark transformations in Apache Spark
Introduction
When you want to check if a data filter keeps only the right rows.
When you want to verify that a new column is calculated correctly.
When you want to make sure joining two datasets works as expected.
When you want to test your data cleaning steps on small samples.
When you want to catch errors early in your Spark code.
Syntax
Apache Spark
def test_transformation(spark): input_df = spark.createDataFrame([...], schema) result_df = your_transformation_function(input_df) expected_df = spark.createDataFrame([...], schema) assert result_df.collect() == expected_df.collect()
Use small sample data to keep tests fast.
Compare results using collect() or DataFrame equality helpers.
Examples
This test checks if filtering rows with id > 1 works correctly.
Apache Spark
def test_filter(spark): data = [(1, 'a'), (2, 'b'), (3, 'c')] df = spark.createDataFrame(data, ['id', 'value']) filtered = df.filter(df.id > 1) expected = spark.createDataFrame([(2, 'b'), (3, 'c')], ['id', 'value']) assert filtered.collect() == expected.collect()
This test checks if a new column 'double' is correctly added by multiplying 'num' by 2.
Apache Spark
def test_add_column(spark): data = [(1,), (2,), (3,)] df = spark.createDataFrame(data, ['num']) result = df.withColumn('double', df.num * 2) expected = spark.createDataFrame([(1, 2), (2, 4), (3, 6)], ['num', 'double']) assert result.collect() == expected.collect()
Sample Program
This program creates a small DataFrame, applies a transformation to uppercase a column, and checks if the result matches the expected DataFrame. It prints 'Test passed!' if the test succeeds.
Apache Spark
from pyspark.sql import SparkSession from pyspark.sql.functions import upper def test_uppercase_column(): spark = SparkSession.builder.master('local').appName('UnitTest').getOrCreate() data = [('alice',), ('bob',), ('carol',)] df = spark.createDataFrame(data, ['name']) # Transformation: make names uppercase result = df.withColumn('name_upper', upper(df.name)) expected_data = [('alice', 'ALICE'), ('bob', 'BOB'), ('carol', 'CAROL')] expected = spark.createDataFrame(expected_data, ['name', 'name_upper']) assert result.collect() == expected.collect() print('Test passed!') spark.stop() if __name__ == '__main__': test_uppercase_column()
OutputSuccess
Important Notes
Use SparkSession with master='local' for fast local testing.
Comparing DataFrames with collect() works for small data only.
For bigger tests, consider using testing libraries like 'pytest' and Spark testing utilities.
Summary
Unit tests check small parts of your Spark code to catch errors early.
Use small sample data and compare actual vs expected results.
Run tests often to keep your data transformations correct.