0
0
KafkaHow-ToBeginner · 4 min read

How to Filter in Kafka Streams: Syntax and Examples

In Kafka Streams, you can filter records using the filter() method on a KStream. This method takes a predicate function that returns true to keep a record or false to discard it, allowing you to process only the records you want.
📐

Syntax

The filter() method is called on a KStream object and requires a predicate function that takes a key and value as input and returns a boolean. Records for which the predicate returns true are kept; others are dropped.

Syntax:

filteredStream = originalStream.filter((key, value) -> predicate)

Where:

  • originalStream: The input KStream.
  • filter: The method to apply filtering.
  • predicate: A function returning true to keep the record, false to discard.
  • filteredStream: The resulting filtered KStream.
java
KStream<String, String> filteredStream = originalStream.filter((key, value) -> value.contains("important"));
💻

Example

This example shows how to filter messages from a Kafka topic so that only records with values containing the word "important" are processed further.

java
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.StreamsConfig;
import java.util.Properties;

public class FilterExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "filter-example-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> source = builder.stream("input-topic");

        KStream<String, String> filtered = source.filter((key, value) -> value != null && value.contains("important"));

        filtered.to("filtered-topic");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}
Output
Records from "input-topic" containing "important" in their value are forwarded to "filtered-topic"; others are ignored.
⚠️

Common Pitfalls

  • Not checking for null values in the predicate can cause NullPointerException.
  • Using filter() instead of filterNot() when you want to exclude certain records.
  • Forgetting that filter() returns a new KStream and does not modify the original stream.

Example of a common mistake and fix:

java
// Wrong: No null check, may throw exception
KStream<String, String> filtered = source.filter((key, value) -> value.contains("error"));

// Right: Null check added
KStream<String, String> filteredSafe = source.filter((key, value) -> value != null && value.contains("error"));
📊

Quick Reference

MethodDescriptionReturns
filter(predicate)Keeps records where predicate returns trueFiltered KStream
filterNot(predicate)Keeps records where predicate returns falseFiltered KStream
map()Transforms recordsMapped KStream
to(topic)Sends stream to a Kafka topicvoid

Key Takeaways

Use the filter() method on a KStream with a predicate to keep desired records.
Always check for null values in your filter predicate to avoid errors.
filter() returns a new KStream; the original stream remains unchanged.
Use filterNot() to exclude records matching a condition.
Send filtered results to a new topic using the to() method.