0
0
Apache Sparkdata~5 mins

Watermarking for late data in Apache Spark

Choose your learning style9 modes available
Introduction

Watermarking helps us handle data that arrives late in streaming. It tells the system how long to wait for late data before processing.

When processing live data streams like sensor readings that might arrive late.
When analyzing user activity logs that can be delayed due to network issues.
When aggregating events by time windows but some events come after the window closes.
When you want to balance between waiting for late data and producing timely results.
Syntax
Apache Spark
streamingDF.withWatermark("eventTime", "delayThreshold")

eventTime is the column with event timestamps.

delayThreshold is how long to wait for late data, e.g., "10 minutes".

Examples
Waits 5 minutes for late data based on the 'timestamp' column.
Apache Spark
streamingDF.withWatermark("timestamp", "5 minutes")
Waits 1 hour for late data based on the 'event_time' column.
Apache Spark
streamingDF.withWatermark("event_time", "1 hour")
Sample Program

This example shows how to add a watermark of 5 minutes on the eventTime column. It groups events into 5-minute windows and counts them. Late events arriving after 5 minutes will be ignored in aggregation.

Apache Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col

spark = SparkSession.builder.appName("WatermarkExample").getOrCreate()

# Sample streaming data simulating events with timestamps
input_data = [
    ("2024-06-01 10:00:00", "event1"),
    ("2024-06-01 10:02:00", "event2"),
    ("2024-06-01 10:07:00", "event3"),  # Late event
]

# Create DataFrame with schema
schema = "eventTime STRING, event STRING"
staticDF = spark.createDataFrame(input_data, schema)

# Convert eventTime to timestamp type
streamingDF = staticDF.withColumn("eventTime", col("eventTime").cast("timestamp"))

# Simulate streaming by using staticDF as streaming source (for example only)
# In real use, streamingDF would come from a streaming source like Kafka

# Apply watermark of 5 minutes on eventTime
watermarkedDF = streamingDF.withWatermark("eventTime", "5 minutes")

# Group events by 5-minute windows
aggDF = watermarkedDF.groupBy(window(col("eventTime"), "5 minutes")).count()

# Show the aggregation result
aggDF.show(truncate=False)

spark.stop()
OutputSuccess
Important Notes

Watermarking only works with event time columns of timestamp type.

Late data arriving after the watermark delay is dropped from aggregations.

Choosing the right delay threshold balances accuracy and latency.

Summary

Watermarking helps handle late data in streaming by setting a wait time.

Use with event time columns to group or aggregate streaming data.

Late data beyond the watermark is ignored to keep results timely.