How to Use Windowing in Kafka Streams for Time-Based Processing
Use
windowedBy() in Kafka Streams to group records into time windows like tumbling, hopping, or session windows. This lets you aggregate or analyze data over fixed or sliding time periods easily.Syntax
Windowing in Kafka Streams is applied using the windowedBy() method on a KGroupedStream. You specify the window type and its parameters:
- Tumbling Window: Fixed-size, non-overlapping windows.
- Hopping Window: Fixed-size windows that can overlap, defined by size and advance interval.
- Session Window: Windows based on activity gaps, closing after inactivity.
Example syntax:
groupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
java
KGroupedStream<String, String> groupedStream = stream.groupByKey(); // Tumbling window of 5 minutes TimeWindows tumblingWindow = TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)); KTable<Windowed<String>, Long> windowedCounts = groupedStream .windowedBy(tumblingWindow) .count();
Example
This example shows how to count events per 1-minute tumbling window using Kafka Streams in Java. It groups records by key, applies a tumbling window, and counts the number of records in each window.
java
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.*; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.state.WindowStore; import java.time.Duration; import java.util.Properties; public class WindowingExample { public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "windowing-example"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> stream = builder.stream("input-topic"); KGroupedStream<String, String> groupedStream = stream.groupByKey(); TimeWindows tumblingWindow = TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)); KTable<Windowed<String>, Long> windowedCounts = groupedStream .windowedBy(tumblingWindow) .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("counts-store")); windowedCounts.toStream().foreach((windowedKey, count) -> { System.out.println("Key: " + windowedKey.key() + ", Window start: " + windowedKey.window().startTime() + ", Count: " + count); }); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } }
Output
Key: user1, Window start: 2024-06-01T12:00:00Z, Count: 5
Key: user2, Window start: 2024-06-01T12:00:00Z, Count: 3
Key: user1, Window start: 2024-06-01T12:01:00Z, Count: 2
Common Pitfalls
- Not setting grace period: Without grace, late-arriving records are dropped. Use
ofSizeWithNoGrace()orofSizeAndGrace()carefully. - Using wrong window type: Tumbling windows do not overlap; hopping windows do. Choose based on your use case.
- Incorrect serialization: Windowed keys require special serdes like
WindowedSerdes. - Forgetting to materialize state stores: Aggregations need materialized stores to keep windowed results.
java
/* Wrong: No grace period, late events dropped */ groupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5))).count(); /* Right: Set grace period to accept late events */ groupedStream.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1))).count();
Quick Reference
| Window Type | Description | Key Method | Parameters |
|---|---|---|---|
| Tumbling Window | Fixed-size, non-overlapping windows | TimeWindows.ofSizeWithNoGrace() | Duration windowSize |
| Hopping Window | Fixed-size, overlapping windows | TimeWindows.ofSizeAndGrace() | Duration windowSize, Duration gracePeriod |
| Session Window | Windows based on inactivity gaps | SessionWindows.with() | Duration inactivityGap |
Key Takeaways
Use windowedBy() on grouped streams to apply windowing in Kafka Streams.
Choose the right window type: tumbling for fixed intervals, hopping for overlapping, session for activity-based.
Always configure grace periods to handle late-arriving data properly.
Materialize state stores to keep windowed aggregation results.
Use appropriate serdes for windowed keys to avoid serialization issues.