0
0
KafkaHow-ToBeginner · 4 min read

How to Use Kafka Consumer in Java: Simple Guide

To use a KafkaConsumer in Java, create a consumer instance with properties like bootstrap.servers and group.id, subscribe to topics using subscribe(), and poll messages with poll(). Then process the records and close the consumer to release resources.
📐

Syntax

The basic steps to use a Kafka consumer in Java are:

  • Create a Properties object with configuration settings.
  • Instantiate a KafkaConsumer<K, V> with those properties.
  • Subscribe to one or more topics using subscribe().
  • Use poll() in a loop to fetch records.
  • Process the records as needed.
  • Close the consumer when done.
java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}

// consumer.close(); // call when finished consuming
💻

Example

This example shows a complete Java Kafka consumer that connects to a Kafka broker, subscribes to a topic named test-topic, polls messages, and prints their offset, key, and value.

java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class SimpleKafkaConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Arrays.asList("test-topic"));

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
            }
        }
    }
}
Output
offset = 0, key = user1, value = Hello Kafka offset = 1, key = user2, value = Welcome to Kafka
⚠️

Common Pitfalls

Common mistakes when using Kafka consumer in Java include:

  • Not setting the group.id property, which is required for consumer group management.
  • Forgetting to use the correct deserializer classes for key and value.
  • Not calling poll() regularly, which can cause the consumer to be considered dead by the broker.
  • Not closing the consumer properly, which can lead to resource leaks.
  • Using infinite loops without a proper shutdown mechanism.
java
/* Wrong: Missing group.id and wrong deserializer */
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// props.put("group.id", "my-group"); // Missing group.id
props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); // Wrong if expecting String
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");

/* Right: Correct group.id and deserializers */
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
📊

Quick Reference

Remember these key points when using Kafka consumer in Java:

  • bootstrap.servers: Kafka broker address.
  • group.id: Consumer group identifier.
  • key.deserializer and value.deserializer: Classes to convert bytes to objects.
  • subscribe(): To listen to topics.
  • poll(): To fetch messages regularly.
  • close(): To release resources.

Key Takeaways

Always set the group.id property for your Kafka consumer.
Use StringDeserializer for keys and values if your messages are strings.
Call poll() regularly to keep the consumer alive and fetch messages.
Subscribe to topics before polling to receive messages.
Close the consumer properly to avoid resource leaks.