0
0
Kafkadevops~5 mins

Filter and map operations in Kafka - Commands & Configuration

Choose your learning style9 modes available
Introduction
Kafka streams allow you to process data in real time by filtering unwanted messages and transforming data into a new form. This helps you focus only on the important information and change it as needed while it flows through your system.
When you want to keep only messages with specific values from a stream of data.
When you need to change the format or content of messages before sending them to another system.
When you want to remove noisy or irrelevant data from your processing pipeline.
When you want to enrich or simplify data by applying a function to each message.
When you want to build real-time dashboards that show only filtered and transformed data.
Config File - filter_map_streams.properties
filter_map_streams.properties
application.id=filter-map-app
bootstrap.servers=localhost:9092
key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde

This configuration file sets up the Kafka Streams application with a unique application ID and connects it to the Kafka server running locally. It also defines how keys and values in messages are converted to and from bytes using string serializers and deserializers.

Commands
Create an input topic where raw messages will be sent for processing.
Terminal
kafka-topics --create --topic input-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Expected OutputExpected
Created topic input-topic.
--topic - Name of the Kafka topic to create
--partitions - Number of partitions for the topic
--replication-factor - Number of copies of the data for fault tolerance
Start a producer to send messages to the input topic. You can type messages here to test filtering and mapping.
Terminal
kafka-console-producer --topic input-topic --bootstrap-server localhost:9092
Expected OutputExpected
No output (command runs silently)
--topic - Topic to send messages to
--bootstrap-server - Kafka server address
Run the Kafka Streams application that filters messages containing the word 'error' and converts messages to uppercase.
Terminal
java -cp kafka-streams-examples.jar org.apache.kafka.streams.examples.filtermap.FilterMapExample filter_map_streams.properties
Expected OutputExpected
Starting FilterMapExample Topology: ... (topology details) ... Streams started
Consume messages from the output topic to see the filtered and transformed results.
Terminal
kafka-console-consumer --topic output-topic --bootstrap-server localhost:9092 --from-beginning
Expected OutputExpected
ERROR: DISK FAILURE WARNING: CPU HIGH INFO: SYSTEM OK
--topic - Topic to read messages from
--from-beginning - Read all messages from the start
Key Concept

If you remember nothing else from this pattern, remember: filtering removes unwanted messages and mapping changes message content as data flows through Kafka Streams.

Common Mistakes
Not setting the correct serializers and deserializers in the configuration.
Kafka Streams cannot convert messages properly, causing errors or data loss.
Always specify matching key and value serdes for your message types in the properties file.
Trying to filter or map on the wrong topic or without starting the streams application.
No processing happens, so output topic remains empty or unchanged.
Make sure to create and send messages to the input topic and run the streams app before consuming output.
Summary
Create an input Kafka topic to receive raw messages.
Run a Kafka Streams app configured to filter and map messages.
Send messages to the input topic and consume processed messages from the output topic.