UDFs let you add your own custom logic to process data in Spark. They help when built-in functions are not enough.
UDFs (User Defined Functions) in Apache Spark
from pyspark.sql.functions import udf from pyspark.sql.types import <ReturnType> # Define a Python function def my_function(value): # custom logic return modified_value # Register the function as a UDF with return type my_udf = udf(my_function, <ReturnType>()) # Use the UDF in DataFrame operations df = df.withColumn('new_column', my_udf(df['existing_column']))
Replace
UDFs work row by row on DataFrame columns.
from pyspark.sql.functions import udf from pyspark.sql.types import IntegerType def add_one(x): return x + 1 if x is not None else None add_one_udf = udf(add_one, IntegerType()) # Edge case: empty DataFrame empty_df = spark.createDataFrame([], 'value INT') empty_df = empty_df.withColumn('plus_one', add_one_udf(empty_df['value'])) empty_df.show()
from pyspark.sql.functions import udf from pyspark.sql.types import StringType def greet(name): if name: return 'Hello ' + name else: return 'Hello Stranger' greet_udf = udf(greet, StringType()) # DataFrame with one row one_row_df = spark.createDataFrame([('Alice',)], ['name']) one_row_df = one_row_df.withColumn('greeting', greet_udf(one_row_df['name'])) one_row_df.show()
from pyspark.sql.functions import udf from pyspark.sql.types import StringType def mark_end(text): if text: return text + ' END' else: return None mark_end_udf = udf(mark_end, StringType()) # DataFrame with multiple rows including null multi_df = spark.createDataFrame([('start',), (None,), ('middle',)], ['text']) multi_df = multi_df.withColumn('marked', mark_end_udf(multi_df['text'])) multi_df.show()
This program creates a Spark DataFrame with numbers, defines a UDF to double each number, and applies it. It prints the DataFrame before and after applying the UDF.
from pyspark.sql import SparkSession from pyspark.sql.functions import udf from pyspark.sql.types import IntegerType # Create Spark session spark = SparkSession.builder.appName('UDF Example').getOrCreate() # Define a Python function to double a number def double_value(number): if number is None: return None return number * 2 # Register the function as a UDF double_value_udf = udf(double_value, IntegerType()) # Create a DataFrame with sample data data = [(1,), (5,), (None,), (10,)] df = spark.createDataFrame(data, ['number']) print('Original DataFrame:') df.show() # Use the UDF to create a new column with doubled values df = df.withColumn('doubled', double_value_udf(df['number'])) print('DataFrame after applying UDF:') df.show() # Stop Spark session spark.stop()
Time complexity: UDFs run once per row, so O(n) where n is number of rows.
Space complexity: Minimal extra space, mostly for function call overhead.
Common mistake: forgetting to handle null values inside the UDF causes errors.
Use UDFs when built-in Spark functions cannot do the needed operation efficiently.
UDFs let you add custom row-wise logic to Spark DataFrames.
Always specify the return type when registering a UDF.
Handle nulls carefully inside your UDF to avoid errors.