0
0
KafkaHow-ToBeginner · 4 min read

How to Use Windowing in Kafka Streams for Time-Based Processing

Use windowedBy() in Kafka Streams to group records into time windows like tumbling, hopping, or session windows. This lets you aggregate or analyze data over fixed or sliding time periods easily.
📐

Syntax

Windowing in Kafka Streams is applied using the windowedBy() method on a KGroupedStream. You specify the window type and its parameters:

  • Tumbling Window: Fixed-size, non-overlapping windows.
  • Hopping Window: Fixed-size windows that can overlap, defined by size and advance interval.
  • Session Window: Windows based on activity gaps, closing after inactivity.

Example syntax:

groupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
java
KGroupedStream<String, String> groupedStream = stream.groupByKey();

// Tumbling window of 5 minutes
TimeWindows tumblingWindow = TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5));

KTable<Windowed<String>, Long> windowedCounts = groupedStream
    .windowedBy(tumblingWindow)
    .count();
💻

Example

This example shows how to count events per 1-minute tumbling window using Kafka Streams in Java. It groups records by key, applies a tumbling window, and counts the number of records in each window.

java
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.state.WindowStore;

import java.time.Duration;
import java.util.Properties;

public class WindowingExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "windowing-example");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> stream = builder.stream("input-topic");

        KGroupedStream<String, String> groupedStream = stream.groupByKey();

        TimeWindows tumblingWindow = TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1));

        KTable<Windowed<String>, Long> windowedCounts = groupedStream
            .windowedBy(tumblingWindow)
            .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("counts-store"));

        windowedCounts.toStream().foreach((windowedKey, count) -> {
            System.out.println("Key: " + windowedKey.key() + ", Window start: " + windowedKey.window().startTime() + ", Count: " + count);
        });

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}
Output
Key: user1, Window start: 2024-06-01T12:00:00Z, Count: 5 Key: user2, Window start: 2024-06-01T12:00:00Z, Count: 3 Key: user1, Window start: 2024-06-01T12:01:00Z, Count: 2
⚠️

Common Pitfalls

  • Not setting grace period: Without grace, late-arriving records are dropped. Use ofSizeWithNoGrace() or ofSizeAndGrace() carefully.
  • Using wrong window type: Tumbling windows do not overlap; hopping windows do. Choose based on your use case.
  • Incorrect serialization: Windowed keys require special serdes like WindowedSerdes.
  • Forgetting to materialize state stores: Aggregations need materialized stores to keep windowed results.
java
/* Wrong: No grace period, late events dropped */
groupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5))).count();

/* Right: Set grace period to accept late events */
groupedStream.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1))).count();
📊

Quick Reference

Window TypeDescriptionKey MethodParameters
Tumbling WindowFixed-size, non-overlapping windowsTimeWindows.ofSizeWithNoGrace()Duration windowSize
Hopping WindowFixed-size, overlapping windowsTimeWindows.ofSizeAndGrace()Duration windowSize, Duration gracePeriod
Session WindowWindows based on inactivity gapsSessionWindows.with()Duration inactivityGap

Key Takeaways

Use windowedBy() on grouped streams to apply windowing in Kafka Streams.
Choose the right window type: tumbling for fixed intervals, hopping for overlapping, session for activity-based.
Always configure grace periods to handle late-arriving data properly.
Materialize state stores to keep windowed aggregation results.
Use appropriate serdes for windowed keys to avoid serialization issues.