How to Use Aggregate Kafka Streams for Data Summarization
Use the
aggregate() method in Kafka Streams to combine records of a keyed stream into a single result per key. It requires an initializer, an aggregator function, and a state store name to keep track of the aggregation results.Syntax
The aggregate() method in Kafka Streams is used on a KGroupedStream to combine records by key. It takes three main parameters:
- Initializer: A supplier that provides the initial aggregation value.
- Aggregator: A function that takes the current key, new value, and current aggregate, then returns the updated aggregate.
- Materialized: Defines the state store name and serialization formats to keep the aggregation results.
java
KTable<K, VR> aggregate(Initializer<VR> initializer,
Aggregator<K, V, VR> aggregator,
Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);Example
This example shows how to count the total length of words grouped by their first letter using aggregate(). It initializes the count to 0, then adds the length of each word to the aggregate.
java
StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> textLines = builder.stream("input-topic"); KGroupedStream<String, String> groupedByFirstChar = textLines .groupBy((key, value) -> value.substring(0, 1)); KTable<String, Integer> aggregated = groupedByFirstChar.aggregate( () -> 0, // initializer (key, value, aggregate) -> aggregate + value.length(), // aggregator Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("agg-store") .withKeySerde(Serdes.String()) .withValueSerde(Serdes.Integer()) // state store ); aggregated.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Integer()));
Output
Records in "output-topic" will have keys as first letters and values as total length sums of words starting with that letter.
Common Pitfalls
- Not grouping before aggregating: You must call
groupBy()orgroupByKey()beforeaggregate()to create aKGroupedStream. - Incorrect initializer: The initializer must return the correct initial type matching the aggregate type.
- State store misconfiguration: Always specify
Materializedwith proper serdes and a unique store name to avoid runtime errors. - Using
reduce()instead ofaggregate()when you need a different output type than input.
java
/* Wrong: aggregate called directly on KStream */ // KStream<String, String> stream = ...; // stream.aggregate(...); // This will not compile /* Right: group before aggregate */ // KGroupedStream<String, String> grouped = stream.groupByKey(); // grouped.aggregate(...);
Quick Reference
| Parameter | Description |
|---|---|
| Initializer | Supplies the initial aggregate value |
| Aggregator | Function to update aggregate with new record |
| Materialized | Defines state store name and serdes |
| Returns | A KTable with aggregated results per key |
Key Takeaways
Always group your stream by key before calling aggregate().
Provide a proper initializer matching the aggregate type.
Use Materialized to configure state store and serialization.
aggregate() allows flexible aggregation with custom output types.
Check your serdes and store names to avoid runtime errors.