0
0
KafkaHow-ToBeginner · 4 min read

How to Map in Kafka Streams: Syntax and Example

In Kafka Streams, you use the map method on a KStream to transform each record into a new key-value pair. The map function takes a KeyValueMapper that returns a new KeyValue object for each input record.
📐

Syntax

The map method transforms each record in a KStream by applying a function that returns a new key-value pair.

  • KStream<K, V> map(KeyValueMapper<K, V, KeyValue<KR, VR>> mapper): Takes a mapper function.
  • KeyValueMapper: A function that receives the current key and value and returns a new KeyValue pair.
  • Returns a new KStream<KR, VR> with transformed key-value pairs.
java
KStream<K, V> originalStream = builder.stream("input-topic");

KStream<KR, VR> mappedStream = originalStream.map((key, value) -> new KeyValue<>(newKey, newValue));
💻

Example

This example reads from an input topic, converts each value to uppercase, and writes to an output topic.

java
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;

import java.util.Properties;

public class MapExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "map-example-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> input = builder.stream("input-topic");

        KStream<String, String> uppercased = input.map((key, value) ->
            new KeyValue<>(key, value.toUpperCase())
        );

        uppercased.to("output-topic");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}
Output
Records from 'input-topic' with values like 'hello' become 'HELLO' in 'output-topic'.
⚠️

Common Pitfalls

  • Not returning a new KeyValue object inside the map function causes errors.
  • Changing keys without considering downstream partitioning can cause unexpected behavior.
  • Using mapValues if only values need transformation is simpler and more efficient.
java
/* Wrong: returning null or not returning a KeyValue */
KStream<String, String> wrong = input.map((key, value) -> null); // Causes NullPointerException

/* Right: always return new KeyValue */
KStream<String, String> right = input.map((key, value) -> new KeyValue<>(key, value.toLowerCase()));
📊

Quick Reference

MethodDescriptionReturns
mapTransforms each record to a new key-value pairKStream with new key and value types
mapValuesTransforms only the value, keeps the key sameKStream with same key type
flatMapTransforms each record into zero or more recordsKStream with new key and value types

Key Takeaways

Use map to transform both key and value of each record in a Kafka stream.
Always return a new KeyValue object inside the map function.
Consider mapValues if only the value needs to be changed for simpler code.
Changing keys affects partitioning and downstream processing, so plan accordingly.
Test your mapping logic to avoid runtime errors like null returns.