How to Map in Kafka Streams: Syntax and Example
In Kafka Streams, you use the
map method on a KStream to transform each record into a new key-value pair. The map function takes a KeyValueMapper that returns a new KeyValue object for each input record.Syntax
The map method transforms each record in a KStream by applying a function that returns a new key-value pair.
- KStream<K, V> map(KeyValueMapper<K, V, KeyValue<KR, VR>> mapper): Takes a mapper function.
KeyValueMapper: A function that receives the current key and value and returns a newKeyValuepair.- Returns a new
KStream<KR, VR>with transformed key-value pairs.
java
KStream<K, V> originalStream = builder.stream("input-topic"); KStream<KR, VR> mappedStream = originalStream.map((key, value) -> new KeyValue<>(newKey, newValue));
Example
This example reads from an input topic, converts each value to uppercase, and writes to an output topic.
java
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import java.util.Properties; public class MapExample { public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "map-example-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> input = builder.stream("input-topic"); KStream<String, String> uppercased = input.map((key, value) -> new KeyValue<>(key, value.toUpperCase()) ); uppercased.to("output-topic"); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } }
Output
Records from 'input-topic' with values like 'hello' become 'HELLO' in 'output-topic'.
Common Pitfalls
- Not returning a new
KeyValueobject inside themapfunction causes errors. - Changing keys without considering downstream partitioning can cause unexpected behavior.
- Using
mapValuesif only values need transformation is simpler and more efficient.
java
/* Wrong: returning null or not returning a KeyValue */ KStream<String, String> wrong = input.map((key, value) -> null); // Causes NullPointerException /* Right: always return new KeyValue */ KStream<String, String> right = input.map((key, value) -> new KeyValue<>(key, value.toLowerCase()));
Quick Reference
| Method | Description | Returns |
|---|---|---|
| map | Transforms each record to a new key-value pair | KStream with new key and value types |
| mapValues | Transforms only the value, keeps the key same | KStream with same key type |
| flatMap | Transforms each record into zero or more records | KStream with new key and value types |
Key Takeaways
Use
map to transform both key and value of each record in a Kafka stream.Always return a new
KeyValue object inside the map function.Consider
mapValues if only the value needs to be changed for simpler code.Changing keys affects partitioning and downstream processing, so plan accordingly.
Test your mapping logic to avoid runtime errors like null returns.