0
0
KafkaHow-ToBeginner · 4 min read

How to Use Aggregate Kafka Streams for Data Summarization

Use the aggregate() method in Kafka Streams to combine records of a keyed stream into a single result per key. It requires an initializer, an aggregator function, and a state store name to keep track of the aggregation results.
📐

Syntax

The aggregate() method in Kafka Streams is used on a KGroupedStream to combine records by key. It takes three main parameters:

  • Initializer: A supplier that provides the initial aggregation value.
  • Aggregator: A function that takes the current key, new value, and current aggregate, then returns the updated aggregate.
  • Materialized: Defines the state store name and serialization formats to keep the aggregation results.
java
KTable<K, VR> aggregate(Initializer<VR> initializer,
                      Aggregator<K, V, VR> aggregator,
                      Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
💻

Example

This example shows how to count the total length of words grouped by their first letter using aggregate(). It initializes the count to 0, then adds the length of each word to the aggregate.

java
StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> textLines = builder.stream("input-topic");

KGroupedStream<String, String> groupedByFirstChar = textLines
    .groupBy((key, value) -> value.substring(0, 1));

KTable<String, Integer> aggregated = groupedByFirstChar.aggregate(
    () -> 0, // initializer
    (key, value, aggregate) -> aggregate + value.length(), // aggregator
    Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("agg-store")
        .withKeySerde(Serdes.String())
        .withValueSerde(Serdes.Integer()) // state store
);

aggregated.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Integer()));
Output
Records in "output-topic" will have keys as first letters and values as total length sums of words starting with that letter.
⚠️

Common Pitfalls

  • Not grouping before aggregating: You must call groupBy() or groupByKey() before aggregate() to create a KGroupedStream.
  • Incorrect initializer: The initializer must return the correct initial type matching the aggregate type.
  • State store misconfiguration: Always specify Materialized with proper serdes and a unique store name to avoid runtime errors.
  • Using reduce() instead of aggregate() when you need a different output type than input.
java
/* Wrong: aggregate called directly on KStream */
// KStream<String, String> stream = ...;
// stream.aggregate(...); // This will not compile

/* Right: group before aggregate */
// KGroupedStream<String, String> grouped = stream.groupByKey();
// grouped.aggregate(...);
📊

Quick Reference

ParameterDescription
InitializerSupplies the initial aggregate value
AggregatorFunction to update aggregate with new record
MaterializedDefines state store name and serdes
ReturnsA KTable with aggregated results per key

Key Takeaways

Always group your stream by key before calling aggregate().
Provide a proper initializer matching the aggregate type.
Use Materialized to configure state store and serialization.
aggregate() allows flexible aggregation with custom output types.
Check your serdes and store names to avoid runtime errors.