Consider two streaming DataFrames df1 and df2 with event-time columns eventTime1 and eventTime2. Both have watermarks set with 10 minutes delay. They are joined on a key column and event times within 5 minutes. What is the output count after processing the following batches?
Batch 1:
df1: (key=1, eventTime1=10:00), (key=2, eventTime1=10:05)
df2: (key=1, eventTime2=10:03), (key=2, eventTime2=10:20)
Batch 2:
df1: (key=1, eventTime1=10:15)
df2: (key=1, eventTime2=10:10)
from pyspark.sql.functions import expr # Assume df1 and df2 are streaming DataFrames with schema (key int, eventTime timestamp) # Watermarks set as: df1_with_watermark = df1.withWatermark('eventTime1', '10 minutes') df2_with_watermark = df2.withWatermark('eventTime2', '10 minutes') joined = df1_with_watermark.alias("df1").join( df2_with_watermark.alias("df2"), expr("df1.key = df2.key AND df2.eventTime2 BETWEEN df1.eventTime1 AND df1.eventTime1 + interval 5 minutes"), 'inner' ) # After processing batches, count the joined rows result_count = joined.count()
Remember that watermarks drop late data and the join condition requires event times within 5 minutes.
In batch 1, only key=1 matches because 10:03 is within 10:00 to 10:05. Key=2 does not match because 10:20 is not within 10:05 to 10:10. In batch 2, 10:10 does not match the new df1 at 10:15 (10:10 < 10:15), nor previous df1 key=1 at 10:00 (10:10 > 10:05). So total matches are 1.
You have two streaming DataFrames orders and customers. You perform a left outer join on customerId. The customers stream has a watermark of 1 hour on updateTime. If a late customer update arrives after the watermark, will it appear in the join output?
Given the following data:
Orders batch:
(orderId=101, customerId=1, orderTime=12:00)
(orderId=102, customerId=2, orderTime=12:05)
Customers batch:
(customerId=1, updateTime=11:00, name='Alice')
(customerId=2, updateTime=10:00, name='Bob')
Watermark drops data older than current watermark threshold.
Customer with customerId=2 has updateTime=10:00 which is older than watermark threshold (assuming current time near 12:00). So this late data is dropped and does not join. CustomerId=1 is within watermark and joins successfully.
Given the following Spark Structured Streaming code snippet, what error will it raise?
joined = df1.join(df2, on='key', how='inner')
query = joined.writeStream.format('console').start()
query.awaitTermination()Assume both df1 and df2 are streaming DataFrames without watermarks.
joined = df1.join(df2, on='key', how='inner') query = joined.writeStream.format('console').start() query.awaitTermination()
Stream-stream joins require watermarks to handle state cleanup.
Spark requires watermarks on both streams for stream-stream joins to manage state and avoid infinite state growth. Without watermarks, it raises AnalysisException.
You run two streaming joins: one with watermarks on both streams and one without watermarks. Which graph best represents the state size over time?
Watermarks help Spark remove old state data.
Without watermarks, Spark cannot remove old state, so state size grows linearly. With watermarks, old state is cleaned up, keeping state size stable.
You need to join two streams clicks and impressions on userId with event-time columns clickTime and impressionTime. You want to join clicks with impressions that happened within 10 minutes before or after the click. Late data can arrive up to 30 minutes late. Which approach correctly implements this with Spark Structured Streaming?
Watermarks must cover maximum allowed lateness. Join condition must cover the time window.
To handle late data up to 30 minutes, watermarks must be set with 30 minutes delay on both streams. The join condition uses a time range of ±10 minutes around clickTime to match impressions. This ensures correct matches and state cleanup.