0
0
Kafkadevops~7 mins

Java consumer client in Kafka - Commands & Configuration

Choose your learning style9 modes available
Introduction
When you want to read messages from a Kafka topic in a Java application, you use a Java consumer client. It connects to Kafka, listens for new messages, and processes them one by one.
When you need to process user activity logs sent to a Kafka topic in real time.
When you want to build a service that reacts to events published by other applications.
When you want to read messages from Kafka and store them in a database.
When you want to monitor system metrics sent as Kafka messages.
When you want to consume messages from Kafka for batch processing.
Config File - ConsumerExample.java
ConsumerExample.java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class ConsumerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "example-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("example-topic"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Received message: key = %s, value = %s, offset = %d\n", record.key(), record.value(), record.offset());
                }
            }
        } finally {
            consumer.close();
        }
    }
}

This Java file creates a Kafka consumer client.

Properties: Set Kafka server address, consumer group, and deserializers for keys and values.

KafkaConsumer: Creates the consumer with these properties.

subscribe: Tells the consumer which topic to listen to.

poll: Checks for new messages every 100 milliseconds.

Processing loop: Prints each message's key, value, and offset.

close: Cleans up resources when done.

Commands
Compile the Java consumer client code using the Kafka client library version 3.5.1.
Terminal
javac -cp kafka-clients-3.5.1.jar ConsumerExample.java
Expected OutputExpected
No output (command runs silently)
Run the compiled Java consumer client to start reading messages from the Kafka topic.
Terminal
java -cp .:kafka-clients-3.5.1.jar ConsumerExample
Expected OutputExpected
Received message: key = user1, value = login, offset = 0 Received message: key = user2, value = logout, offset = 1
Key Concept

If you remember nothing else from this pattern, remember: the consumer must subscribe to topics and poll regularly to receive messages.

Common Mistakes
Not setting the correct deserializer classes for key and value.
Kafka cannot convert bytes to strings, so the consumer fails to read messages properly.
Always set key.deserializer and value.deserializer to StringDeserializer for string messages.
Not calling consumer.poll() in a loop.
Without polling, the consumer does not fetch new messages and appears to do nothing.
Use a continuous loop calling poll() to keep receiving messages.
Not closing the consumer after use.
Resources remain open, which can cause memory leaks or connection issues.
Call consumer.close() in a finally block to clean up properly.
Summary
Set up consumer properties including Kafka server, group ID, and deserializers.
Create a KafkaConsumer and subscribe it to the desired topic.
Use a loop to poll for new messages and process them.
Close the consumer to release resources when finished.