Watermarking helps us handle data that arrives late in streaming. It tells the system how long to wait for late data before processing.
0
0
Watermarking for late data in Apache Spark
Introduction
When processing live data streams like sensor readings that might arrive late.
When analyzing user activity logs that can be delayed due to network issues.
When aggregating events by time windows but some events come after the window closes.
When you want to balance between waiting for late data and producing timely results.
Syntax
Apache Spark
streamingDF.withWatermark("eventTime", "delayThreshold")
eventTime is the column with event timestamps.
delayThreshold is how long to wait for late data, e.g., "10 minutes".
Examples
Waits 5 minutes for late data based on the 'timestamp' column.
Apache Spark
streamingDF.withWatermark("timestamp", "5 minutes")
Waits 1 hour for late data based on the 'event_time' column.
Apache Spark
streamingDF.withWatermark("event_time", "1 hour")
Sample Program
This example shows how to add a watermark of 5 minutes on the eventTime column. It groups events into 5-minute windows and counts them. Late events arriving after 5 minutes will be ignored in aggregation.
Apache Spark
from pyspark.sql import SparkSession from pyspark.sql.functions import window, col spark = SparkSession.builder.appName("WatermarkExample").getOrCreate() # Sample streaming data simulating events with timestamps input_data = [ ("2024-06-01 10:00:00", "event1"), ("2024-06-01 10:02:00", "event2"), ("2024-06-01 10:07:00", "event3"), # Late event ] # Create DataFrame with schema schema = "eventTime STRING, event STRING" staticDF = spark.createDataFrame(input_data, schema) # Convert eventTime to timestamp type streamingDF = staticDF.withColumn("eventTime", col("eventTime").cast("timestamp")) # Simulate streaming by using staticDF as streaming source (for example only) # In real use, streamingDF would come from a streaming source like Kafka # Apply watermark of 5 minutes on eventTime watermarkedDF = streamingDF.withWatermark("eventTime", "5 minutes") # Group events by 5-minute windows aggDF = watermarkedDF.groupBy(window(col("eventTime"), "5 minutes")).count() # Show the aggregation result aggDF.show(truncate=False) spark.stop()
OutputSuccess
Important Notes
Watermarking only works with event time columns of timestamp type.
Late data arriving after the watermark delay is dropped from aggregations.
Choosing the right delay threshold balances accuracy and latency.
Summary
Watermarking helps handle late data in streaming by setting a wait time.
Use with event time columns to group or aggregate streaming data.
Late data beyond the watermark is ignored to keep results timely.