0
0
Apache-sparkHow-ToBeginner ยท 3 min read

How to Use Broadcast Join in Spark with PySpark

In PySpark, use broadcast() from pyspark.sql.functions to mark a small DataFrame for broadcast join. This tells Spark to send the small DataFrame to all worker nodes, speeding up the join with a large DataFrame.
๐Ÿ“

Syntax

The basic syntax to use broadcast join in PySpark is:

  • from pyspark.sql.functions import broadcast: Import the broadcast function.
  • df_large.join(broadcast(df_small), on='key'): Join the large DataFrame with the broadcasted small DataFrame on a key column.

This forces Spark to send the small DataFrame to all executors, avoiding shuffle and improving join speed.

python
from pyspark.sql.functions import broadcast

# Join large DataFrame with broadcasted small DataFrame
df_large.join(broadcast(df_small), on='key')
๐Ÿ’ป

Example

This example shows how to join a large DataFrame with a small DataFrame using broadcast join to improve performance.

python
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast

spark = SparkSession.builder.appName('BroadcastJoinExample').getOrCreate()

# Create large DataFrame
data_large = [(1, 'Alice'), (2, 'Bob'), (3, 'Charlie'), (4, 'David')]
df_large = spark.createDataFrame(data_large, ['id', 'name'])

# Create small DataFrame
data_small = [(1, 'NY'), (2, 'LA')]
df_small = spark.createDataFrame(data_small, ['id', 'city'])

# Perform broadcast join
result = df_large.join(broadcast(df_small), on='id', how='left')

result.show()
Output
+---+-------+----+ | id| name|city| +---+-------+----+ | 1| Alice| NY| | 3|Charlie|null| | 2| Bob| LA| | 4| David|null| +---+-------+----+
โš ๏ธ

Common Pitfalls

Common mistakes when using broadcast join include:

  • Broadcasting a large DataFrame, which can cause memory errors.
  • Not importing broadcast function before use.
  • Using broadcast join when both DataFrames are large, which may degrade performance.

Always ensure the DataFrame you broadcast is small enough to fit in memory on each worker.

python
from pyspark.sql.functions import broadcast

# Wrong: broadcasting a large DataFrame
# result = df_large.join(broadcast(df_large), on='id')  # Avoid this

# Right: broadcast only the small DataFrame
result = df_large.join(broadcast(df_small), on='id')
๐Ÿ“Š

Quick Reference

ConceptDescription
broadcast()Marks a DataFrame to be broadcasted to all worker nodes
join()Joins two DataFrames on a key column
Small DataFrameThe DataFrame to broadcast; should fit in memory
Large DataFrameThe DataFrame to join with the broadcasted one
Avoid broadcasting large DataFramesBroadcasting large data causes memory issues and slows down processing
โœ…

Key Takeaways

Use broadcast() to send small DataFrames to all Spark workers for faster joins.
Only broadcast small DataFrames to avoid memory errors.
Import broadcast from pyspark.sql.functions before using it.
Broadcast join avoids shuffle and speeds up join operations.
Check DataFrame sizes before deciding to broadcast.