How to Filter in Kafka Streams: Syntax and Examples
In Kafka Streams, you can filter records using the
filter() method on a KStream. This method takes a predicate function that returns true to keep a record or false to discard it, allowing you to process only the records you want.Syntax
The filter() method is called on a KStream object and requires a predicate function that takes a key and value as input and returns a boolean. Records for which the predicate returns true are kept; others are dropped.
Syntax:
filteredStream = originalStream.filter((key, value) -> predicate)
Where:
- originalStream: The input
KStream. - filter: The method to apply filtering.
- predicate: A function returning
trueto keep the record,falseto discard. - filteredStream: The resulting filtered
KStream.
java
KStream<String, String> filteredStream = originalStream.filter((key, value) -> value.contains("important"));
Example
This example shows how to filter messages from a Kafka topic so that only records with values containing the word "important" are processed further.
java
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.StreamsConfig; import java.util.Properties; public class FilterExample { public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "filter-example-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> source = builder.stream("input-topic"); KStream<String, String> filtered = source.filter((key, value) -> value != null && value.contains("important")); filtered.to("filtered-topic"); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } }
Output
Records from "input-topic" containing "important" in their value are forwarded to "filtered-topic"; others are ignored.
Common Pitfalls
- Not checking for
nullvalues in the predicate can causeNullPointerException. - Using
filter()instead offilterNot()when you want to exclude certain records. - Forgetting that
filter()returns a newKStreamand does not modify the original stream.
Example of a common mistake and fix:
java
// Wrong: No null check, may throw exception KStream<String, String> filtered = source.filter((key, value) -> value.contains("error")); // Right: Null check added KStream<String, String> filteredSafe = source.filter((key, value) -> value != null && value.contains("error"));
Quick Reference
| Method | Description | Returns |
|---|---|---|
| filter(predicate) | Keeps records where predicate returns true | Filtered KStream |
| filterNot(predicate) | Keeps records where predicate returns false | Filtered KStream |
| map() | Transforms records | Mapped KStream |
| to(topic) | Sends stream to a Kafka topic | void |
Key Takeaways
Use the filter() method on a KStream with a predicate to keep desired records.
Always check for null values in your filter predicate to avoid errors.
filter() returns a new KStream; the original stream remains unchanged.
Use filterNot() to exclude records matching a condition.
Send filtered results to a new topic using the to() method.