0
0
KafkaHow-ToBeginner · 4 min read

How to Create a Consumer in Kafka: Simple Guide

To create a consumer in Kafka, you need to configure the bootstrap.servers, group.id, and key/value deserializers. Then, instantiate a KafkaConsumer object, subscribe to topics, and poll for messages.
📐

Syntax

Creating a Kafka consumer involves setting properties and using the KafkaConsumer class. Key properties include:

  • bootstrap.servers: Kafka server addresses
  • group.id: Consumer group identifier
  • key.deserializer and value.deserializer: How to convert bytes to objects

After configuration, create the consumer, subscribe to topics, and poll for messages.

java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}
💻

Example

This example shows a simple Kafka consumer in Java that connects to a local Kafka server, subscribes to a topic named test-topic, and prints received messages.

java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SimpleConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("test-topic"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Received message: offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }
}
Output
Received message: offset = 0, key = null, value = Hello Kafka Received message: offset = 1, key = null, value = Welcome to Kafka consumer
⚠️

Common Pitfalls

Common mistakes when creating Kafka consumers include:

  • Not setting the group.id, which is required for consumer groups.
  • Using wrong deserializers that do not match the message format.
  • Not subscribing to any topic before polling, causing no messages to be received.
  • Forgetting to poll regularly, which can cause session timeouts.

Always ensure your consumer configuration matches your Kafka setup and message format.

java
/* Wrong: Missing group.id */
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// This will cause an error or unexpected behavior

/* Right: Include group.id */
props.put("group.id", "my-group");
📊

Quick Reference

PropertyDescriptionExample Value
bootstrap.serversKafka server addresseslocalhost:9092
group.idConsumer group identifiermy-group
key.deserializerClass to deserialize message keysorg.apache.kafka.common.serialization.StringDeserializer
value.deserializerClass to deserialize message valuesorg.apache.kafka.common.serialization.StringDeserializer
auto.offset.resetWhere to start if no offset found"earliest" or "latest"

Key Takeaways

Always set bootstrap.servers, group.id, and deserializers to create a Kafka consumer.
Subscribe to topics before polling to receive messages.
Poll regularly to avoid consumer session timeouts.
Match deserializers with the message format to avoid errors.
Use auto.offset.reset to control where to start reading if no offset exists.