0
0
Apache Sparkdata~20 mins

Structured Streaming basics in Apache Spark - Practice Problems & Coding Challenges

Choose your learning style9 modes available
Challenge - 5 Problems
🎖️
Structured Streaming Master
Get all challenges correct to earn this badge!
Test your skills under time pressure!
Predict Output
intermediate
2:00remaining
Output of a simple streaming aggregation
What will be the output of the following Structured Streaming code snippet after processing the input data?
Apache Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col

spark = SparkSession.builder.appName('Test').getOrCreate()

# Simulated streaming input
input_data = spark.readStream.format('rate').option('rowsPerSecond', 1).load()

# Aggregate count of rows per 10 second window
agg = input_data.groupBy(window(col('timestamp'), '10 seconds')).count()

query = agg.writeStream.format('console').outputMode('complete').start()

query.processAllAvailable()
query.stop()
AA syntax error due to missing import for window function.
BA runtime error because 'rate' source cannot be used with window aggregation.
CAn empty output because no data is processed in batch mode.
DA table printed to console showing counts of rows grouped by 10-second windows, with counts increasing over time.
Attempts:
2 left
💡 Hint
Think about how the 'rate' source generates data and how window aggregation works in streaming.
data_output
intermediate
1:30remaining
Number of rows in streaming output with append mode
Given a streaming DataFrame reading from a socket source with the following code, how many rows will be output after processing 3 lines of input if the output mode is 'append'?
Apache Spark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SocketTest').getOrCreate()

lines = spark.readStream.format('socket').option('host', 'localhost').option('port', 9999).load()

query = lines.writeStream.format('console').outputMode('append').start()

# Assume 3 lines are sent to the socket
query.processAllAvailable()
query.stop()
A3 rows, one for each line received from the socket.
B0 rows, because append mode does not output any data.
C1 row, summarizing all 3 lines.
DAn error because socket source does not support append mode.
Attempts:
2 left
💡 Hint
Append mode outputs only new rows as they arrive.
🔧 Debug
advanced
2:00remaining
Identify the error in this streaming query
What error will this Structured Streaming code produce when executed?
Apache Spark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('ErrorTest').getOrCreate()

input_stream = spark.readStream.format('jdbc').option('header', 'true').load('/path/to/folder')

query = input_stream.writeStream.format('console').outputMode('append').start()

query.processAllAvailable()
query.stop()
ANo error, the query runs successfully and outputs data.
BRuntimeException due to missing checkpoint location.
CAnalysisException because 'jdbc' format does not support streaming source.
DFileNotFoundError because the folder path does not exist.
Attempts:
2 left
💡 Hint
Check if the 'jdbc' format supports streaming input.
🧠 Conceptual
advanced
1:00remaining
Understanding output modes in Structured Streaming
Which output mode should be used to update only the rows that have changed in a streaming aggregation query?
AComplete mode
BUpdate mode
CAppend mode
DOverwrite mode
Attempts:
2 left
💡 Hint
Think about which mode outputs only changed rows without re-outputting all data.
🚀 Application
expert
2:30remaining
Predict the output schema of a streaming aggregation
Given the following streaming aggregation code, what will be the schema of the output DataFrame?
Apache Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col

spark = SparkSession.builder.appName('SchemaTest').getOrCreate()

input_stream = spark.readStream.format('rate').load()

agg = input_stream.groupBy(window(col('timestamp'), '5 minutes'), (col('value') % 10).alias('mod_value')).count()

# What is the schema of 'agg'?
AStructType with fields: window (Struct with start and end timestamps), (value % 10) as long, count as long
BStructType with fields: timestamp, value, count
CStructType with fields: window (string), value (string), count (integer)
DStructType with fields: timestamp (timestamp), count (integer)
Attempts:
2 left
💡 Hint
Consider the groupBy keys and aggregation columns.