0
0
KafkaHow-ToBeginner · 4 min read

How to Use groupByKey in Kafka Streams: Syntax and Example

In Kafka Streams, use groupByKey() to group records by their current key, which prepares the stream for aggregation operations like counting or summing. This method returns a KGroupedStream that you can use to perform stateful operations on grouped data.
📐

Syntax

The groupByKey() method is called on a KStream<K, V> and returns a KGroupedStream<K, V>. It groups records by their existing key without changing it.

  • KStream<K, V>: The original stream with key type K and value type V.
  • groupByKey(): Groups records by their current key.
  • KGroupedStream<K, V>: The grouped stream used for aggregation.
java
KGroupedStream<K, V> groupedStream = kStream.groupByKey();
💻

Example

This example shows how to use groupByKey() to count the number of records per key in a Kafka Streams application.

java
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.StreamsConfig;

import java.util.Properties;

public class GroupByKeyExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "groupbykey-example");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        // Create a stream from input topic
        KStream<String, String> inputStream = builder.stream("input-topic");

        // Group by existing key
        KGroupedStream<String, String> groupedStream = inputStream.groupByKey();

        // Count records per key
        groupedStream.count(Materialized.as("counts-store"))
                .toStream()
                .to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        // Add shutdown hook to close streams
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}
Output
Records with the same key from "input-topic" are counted and the counts are written to "output-topic" as key-value pairs where the value is the count (Long).
⚠️

Common Pitfalls

  • Using groupByKey on a stream without keys: If your stream records have null keys, groupByKey() will not group them properly.
  • Confusing groupByKey() with groupBy(): groupBy() allows changing the key, while groupByKey() uses the existing key. Use groupBy() if you want to re-key the stream.
  • Not materializing state stores: When aggregating after groupByKey(), you must materialize the state store or use default materialization to keep track of counts or sums.
java
/* Wrong: Using groupByKey on a stream with null keys */
KStream<String, String> streamWithNullKeys = builder.stream("topic-with-null-keys");
KGroupedStream<String, String> grouped = streamWithNullKeys.groupByKey(); // Will not group properly

/* Right: Ensure keys are set before grouping */
KStream<String, String> keyedStream = streamWithNullKeys.filter((k, v) -> k != null);
KGroupedStream<String, String> groupedCorrect = keyedStream.groupByKey();
📊

Quick Reference

groupByKey() Cheat Sheet:

MethodDescriptionReturns
groupByKey()Groups records by their existing keyKGroupedStream
groupBy(KeyValueMapper)Groups records by a new keyKGroupedStream
count()Counts records per key after groupingKTable
reduce()Aggregates records per key with a reducerKTable
aggregate()Aggregates records per key with initializer and aggregatorKTable

Key Takeaways

Use groupByKey() to group stream records by their existing key for aggregation.
groupByKey() returns a KGroupedStream which supports stateful operations like count and reduce.
Ensure your stream records have non-null keys before using groupByKey().
Use groupBy() if you need to change the key before grouping.
Always materialize state stores when performing aggregations after grouping.