0
0
Apache Sparkdata~20 mins

Handling skewed joins in Apache Spark - Practice Problems & Coding Challenges

Choose your learning style9 modes available
Challenge - 5 Problems
🎖️
Skewed Join Master
Get all challenges correct to earn this badge!
Test your skills under time pressure!
Predict Output
intermediate
2:00remaining
Output of a skewed join without optimization
Given two Spark DataFrames df1 and df2 where df2 has a highly skewed key, what will be the output count of the join df1.join(df2, "key")?
Apache Spark
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
data1 = [(1, 'A'), (2, 'B'), (3, 'C'), (4, 'D')]
data2 = [(1, 'X')] * 1000 + [(2, 'Y'), (3, 'Z'), (4, 'W')]
df1 = spark.createDataFrame(data1, ['key', 'val1'])
df2 = spark.createDataFrame(data2, ['key', 'val2'])

joined = df1.join(df2, 'key')
print(joined.count())
A1003
B4
C1000
D4000
Attempts:
2 left
💡 Hint
Think about how many rows match for each key in the join.
🧠 Conceptual
intermediate
1:30remaining
Reason for skewed join performance issues
Why do skewed joins cause performance problems in distributed data processing frameworks like Spark?
ABecause the skewed key causes some tasks to process much more data, leading to imbalance.
BBecause skewed joins always cause network failures.
CBecause skewed joins reduce the total number of partitions.
DBecause skewed joins require more memory on all nodes equally.
Attempts:
2 left
💡 Hint
Consider how data is distributed across tasks.
🚀 Application
advanced
2:30remaining
Choosing a technique to handle skewed join
You have two large DataFrames with a highly skewed join key. Which technique is best to reduce the impact of skew on join performance?
AFilter out the skewed keys before joining.
BBroadcast the smaller DataFrame to all nodes.
CIncrease the number of shuffle partitions without changing data.
DUse salting by adding a random prefix to the skewed key in both DataFrames before joining.
Attempts:
2 left
💡 Hint
Think about how to distribute skewed keys evenly.
🔧 Debug
advanced
2:00remaining
Identify the error in skewed join salting code
What error will this PySpark code raise?
from pyspark.sql.functions import lit, rand, concat

salted_df1 = df1.withColumn('salt', (rand() * 10).cast('int'))
salted_df1 = salted_df1.withColumn('salted_key', concat(df1['key'].cast('string'), salted_df1['salt'].cast('string')))
salted_df2 = df2.withColumn('salt', (rand() * 10).cast('int'))
salted_df2 = salted_df2.withColumn('salted_key', concat(df2['key'].cast('string'), salted_df2['salt'].cast('string')))

joined = salted_df1.join(salted_df2, 'salted_key')
ANo error, code runs fine
BSyntaxError due to missing import
CTypeError due to concat of int and string columns without casting
DAnalysisException due to missing join column
Attempts:
2 left
💡 Hint
Check the data types used in concat function.
data_output
expert
3:00remaining
Result count after salting skewed join
Given df1 with keys [1,2,3] and df2 where key=1 has 100 rows and keys 2,3 have 1 row each, after applying salting with 5 salt values (0 to 4) on key=1 only, what is the count of rows in the salted join?
Apache Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, concat, col

spark = SparkSession.builder.getOrCreate()

data1 = [(1, 'A'), (2, 'B'), (3, 'C')]
data2 = [(1, f'X{i}') for i in range(100)] + [(2, 'Y'), (3, 'Z')]

df1 = spark.createDataFrame(data1, ['key', 'val1'])
df2 = spark.createDataFrame(data2, ['key', 'val2'])

# Salting df1 only for key=1
salt_values = [0,1,2,3,4]
salted_df1 = df1.filter(col('key') != 1)
salted_ones = df1.filter(col('key') == 1).crossJoin(spark.createDataFrame([(x,) for x in salt_values], ['salt']))
salted_ones = salted_ones.withColumn('salted_key', concat(col('key').cast('string'), col('salt').cast('string')))
salted_df1 = salted_df1.withColumn('salted_key', col('key').cast('string'))
salted_df1 = salted_df1.union(salted_ones)

# Salting df2 for key=1
salted_df2 = df2.filter(col('key') != 1).withColumn('salted_key', col('key').cast('string'))
salted_ones_df2 = df2.filter(col('key') == 1).crossJoin(spark.createDataFrame([(x,) for x in salt_values], ['salt']))
salted_ones_df2 = salted_ones_df2.withColumn('salted_key', concat(col('key').cast('string'), col('salt').cast('string')))
salted_df2 = salted_df2.union(salted_ones_df2)

joined = salted_df1.join(salted_df2, 'salted_key')
print(joined.count())
A100
B500
C105
D505
Attempts:
2 left
💡 Hint
Calculate how many rows join for salted and non-salted keys.