How to Use groupByKey in Kafka Streams: Syntax and Example
In Kafka Streams, use
groupByKey() to group records by their current key, which prepares the stream for aggregation operations like counting or summing. This method returns a KGroupedStream that you can use to perform stateful operations on grouped data.Syntax
The groupByKey() method is called on a KStream<K, V> and returns a KGroupedStream<K, V>. It groups records by their existing key without changing it.
- KStream<K, V>: The original stream with key type
Kand value typeV. - groupByKey(): Groups records by their current key.
- KGroupedStream<K, V>: The grouped stream used for aggregation.
java
KGroupedStream<K, V> groupedStream = kStream.groupByKey();
Example
This example shows how to use groupByKey() to count the number of records per key in a Kafka Streams application.
java
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.StreamsConfig; import java.util.Properties; public class GroupByKeyExample { public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "groupbykey-example"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); // Create a stream from input topic KStream<String, String> inputStream = builder.stream("input-topic"); // Group by existing key KGroupedStream<String, String> groupedStream = inputStream.groupByKey(); // Count records per key groupedStream.count(Materialized.as("counts-store")) .toStream() .to("output-topic", Produced.with(Serdes.String(), Serdes.Long())); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); // Add shutdown hook to close streams Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } }
Output
Records with the same key from "input-topic" are counted and the counts are written to "output-topic" as key-value pairs where the value is the count (Long).
Common Pitfalls
- Using groupByKey on a stream without keys: If your stream records have null keys,
groupByKey()will not group them properly. - Confusing groupByKey() with groupBy():
groupBy()allows changing the key, whilegroupByKey()uses the existing key. UsegroupBy()if you want to re-key the stream. - Not materializing state stores: When aggregating after
groupByKey(), you must materialize the state store or use default materialization to keep track of counts or sums.
java
/* Wrong: Using groupByKey on a stream with null keys */ KStream<String, String> streamWithNullKeys = builder.stream("topic-with-null-keys"); KGroupedStream<String, String> grouped = streamWithNullKeys.groupByKey(); // Will not group properly /* Right: Ensure keys are set before grouping */ KStream<String, String> keyedStream = streamWithNullKeys.filter((k, v) -> k != null); KGroupedStream<String, String> groupedCorrect = keyedStream.groupByKey();
Quick Reference
groupByKey() Cheat Sheet:
| Method | Description | Returns |
|---|---|---|
| groupByKey() | Groups records by their existing key | KGroupedStream |
| groupBy(KeyValueMapper) | Groups records by a new key | KGroupedStream |
| count() | Counts records per key after grouping | KTable |
| reduce() | Aggregates records per key with a reducer | KTable |
| aggregate() | Aggregates records per key with initializer and aggregator | KTable |
Key Takeaways
Use groupByKey() to group stream records by their existing key for aggregation.
groupByKey() returns a KGroupedStream which supports stateful operations like count and reduce.
Ensure your stream records have non-null keys before using groupByKey().
Use groupBy() if you need to change the key before grouping.
Always materialize state stores when performing aggregations after grouping.