0
0
Apache Sparkdata~20 mins

Window functions in Apache Spark - Practice Problems & Coding Challenges

Choose your learning style9 modes available
Challenge - 5 Problems
🎖️
Window Function Master
Get all challenges correct to earn this badge!
Test your skills under time pressure!
Predict Output
intermediate
2:00remaining
Output of rank() window function
What is the output of the rank() column after running this Spark code?
Apache Spark
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import rank

spark = SparkSession.builder.getOrCreate()
data = [("A", 100), ("B", 200), ("A", 200), ("B", 100), ("A", 100)]
df = spark.createDataFrame(data, ["category", "value"])
windowSpec = Window.partitionBy("category").orderBy("value")
df = df.withColumn("rank", rank().over(windowSpec))
df.orderBy("category", "value").select("category", "value", "rank").collect()
A[Row(category='A', value=100, rank=1), Row(category='A', value=100, rank=2), Row(category='A', value=200, rank=3), Row(category='B', value=100, rank=1), Row(category='B', value=200, rank=2)]
B[Row(category='A', value=100, rank=1), Row(category='A', value=100, rank=1), Row(category='A', value=200, rank=2), Row(category='B', value=100, rank=1), Row(category='B', value=200, rank=2)]
C[Row(category='A', value=100, rank=1), Row(category='A', value=100, rank=1), Row(category='A', value=200, rank=3), Row(category='B', value=100, rank=1), Row(category='B', value=200, rank=2)]
D[Row(category='A', value=100, rank=1), Row(category='A', value=100, rank=2), Row(category='A', value=200, rank=2), Row(category='B', value=100, rank=1), Row(category='B', value=200, rank=3)]
Attempts:
2 left
💡 Hint
Remember that rank() assigns the same rank to ties and skips ranks after ties.
data_output
intermediate
2:00remaining
Count rows per group using window function
What is the output of the count column after running this Spark code?
Apache Spark
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import count

spark = SparkSession.builder.getOrCreate()
data = [("X", 1), ("X", 2), ("Y", 3), ("Y", 4), ("Y", 5)]
df = spark.createDataFrame(data, ["group", "value"])
windowSpec = Window.partitionBy("group")
df = df.withColumn("count", count("value").over(windowSpec))
df.orderBy("group", "value").select("group", "value", "count").collect()
A[Row(group='X', value=1, count=2), Row(group='X', value=2, count=1), Row(group='Y', value=3, count=3), Row(group='Y', value=4, count=2), Row(group='Y', value=5, count=1)]
B[Row(group='X', value=1, count=1), Row(group='X', value=2, count=2), Row(group='Y', value=3, count=1), Row(group='Y', value=4, count=2), Row(group='Y', value=5, count=3)]
C[Row(group='X', value=1, count=5), Row(group='X', value=2, count=5), Row(group='Y', value=3, count=5), Row(group='Y', value=4, count=5), Row(group='Y', value=5, count=5)]
D[Row(group='X', value=1, count=2), Row(group='X', value=2, count=2), Row(group='Y', value=3, count=3), Row(group='Y', value=4, count=3), Row(group='Y', value=5, count=3)]
Attempts:
2 left
💡 Hint
Count over a partition counts all rows in that group for each row.
visualization
advanced
2:30remaining
Visualize cumulative sum with window function
Which option shows the correct cumulative sum of value within each category when using this Spark code?
Apache Spark
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import sum

spark = SparkSession.builder.getOrCreate()
data = [("A", 10), ("A", 20), ("B", 5), ("B", 15), ("A", 30)]
df = spark.createDataFrame(data, ["category", "value"])
windowSpec = Window.partitionBy("category").orderBy("value").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df = df.withColumn("cumulative_sum", sum("value").over(windowSpec))
df.orderBy("category", "value").select("category", "value", "cumulative_sum").collect()
A[Row(category='A', value=10, cumulative_sum=10), Row(category='A', value=20, cumulative_sum=20), Row(category='A', value=30, cumulative_sum=30), Row(category='B', value=5, cumulative_sum=5), Row(category='B', value=15, cumulative_sum=15)]
B[Row(category='A', value=10, cumulative_sum=10), Row(category='A', value=20, cumulative_sum=30), Row(category='A', value=30, cumulative_sum=60), Row(category='B', value=5, cumulative_sum=5), Row(category='B', value=15, cumulative_sum=20)]
C[Row(category='A', value=10, cumulative_sum=10), Row(category='A', value=20, cumulative_sum=30), Row(category='A', value=30, cumulative_sum=50), Row(category='B', value=5, cumulative_sum=5), Row(category='B', value=15, cumulative_sum=20)]
D[Row(category='A', value=10, cumulative_sum=10), Row(category='A', value=20, cumulative_sum=30), Row(category='A', value=30, cumulative_sum=60), Row(category='B', value=5, cumulative_sum=5), Row(category='B', value=15, cumulative_sum=15)]
Attempts:
2 left
💡 Hint
Cumulative sum adds all previous values including the current one in order.
🔧 Debug
advanced
2:00remaining
Identify error in window function usage
What error does this Spark code raise?
Apache Spark
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import lag

spark = SparkSession.builder.getOrCreate()
data = [(1, 100), (2, 200), (3, 300)]
df = spark.createDataFrame(data, ["id", "value"])
windowSpec = Window.orderBy("id")
df = df.withColumn("prev_value", lag("value", 2).over(windowSpec))
df.show()
ANo error, output shows null for first two rows in prev_value
BAnalysisException: Window function lag requires an OVER clause with partitioning
CTypeError: lag() missing required positional argument 'offset'
DRuntimeError: Window specification must include partitionBy clause
Attempts:
2 left
💡 Hint
lag() can be used with only orderBy in window specification.
🚀 Application
expert
3:00remaining
Calculate moving average with sliding window
Given this Spark code, what is the output of the moving_avg column?
Apache Spark
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import avg

spark = SparkSession.builder.getOrCreate()
data = [("2023-01-01", 10), ("2023-01-02", 20), ("2023-01-03", 30), ("2023-01-04", 40), ("2023-01-05", 50)]
df = spark.createDataFrame(data, ["date", "value"])
windowSpec = Window.orderBy("date").rowsBetween(-2, 0)
df = df.withColumn("moving_avg", avg("value").over(windowSpec))
df.orderBy("date").select("date", "value", "moving_avg").collect()
A[Row(date='2023-01-01', value=10, moving_avg=10.0), Row(date='2023-01-02', value=20, moving_avg=15.0), Row(date='2023-01-03', value=30, moving_avg=20.0), Row(date='2023-01-04', value=40, moving_avg=30.0), Row(date='2023-01-05', value=50, moving_avg=40.0)]
B[Row(date='2023-01-01', value=10, moving_avg=10.0), Row(date='2023-01-02', value=20, moving_avg=15.0), Row(date='2023-01-03', value=30, moving_avg=20.0), Row(date='2023-01-04', value=40, moving_avg=30.0), Row(date='2023-01-05', value=50, moving_avg=45.0)]
C[Row(date='2023-01-01', value=10, moving_avg=10.0), Row(date='2023-01-02', value=20, moving_avg=15.0), Row(date='2023-01-03', value=30, moving_avg=20.0), Row(date='2023-01-04', value=40, moving_avg=35.0), Row(date='2023-01-05', value=50, moving_avg=40.0)]
D[Row(date='2023-01-01', value=10, moving_avg=10.0), Row(date='2023-01-02', value=20, moving_avg=20.0), Row(date='2023-01-03', value=30, moving_avg=30.0), Row(date='2023-01-04', value=40, moving_avg=40.0), Row(date='2023-01-05', value=50, moving_avg=50.0)]
Attempts:
2 left
💡 Hint
Moving average with rowsBetween(-2, 0) averages current row and two previous rows.