How to Create a Consumer in Kafka: Simple Guide
To create a consumer in Kafka, you need to configure the
bootstrap.servers, group.id, and key/value deserializers. Then, instantiate a KafkaConsumer object, subscribe to topics, and poll for messages.Syntax
Creating a Kafka consumer involves setting properties and using the KafkaConsumer class. Key properties include:
- bootstrap.servers: Kafka server addresses
- group.id: Consumer group identifier
- key.deserializer and value.deserializer: How to convert bytes to objects
After configuration, create the consumer, subscribe to topics, and poll for messages.
java
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "my-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }
Example
This example shows a simple Kafka consumer in Java that connects to a local Kafka server, subscribes to a topic named test-topic, and prints received messages.
java
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class SimpleConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("Received message: offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } }
Output
Received message: offset = 0, key = null, value = Hello Kafka
Received message: offset = 1, key = null, value = Welcome to Kafka consumer
Common Pitfalls
Common mistakes when creating Kafka consumers include:
- Not setting the
group.id, which is required for consumer groups. - Using wrong deserializers that do not match the message format.
- Not subscribing to any topic before polling, causing no messages to be received.
- Forgetting to poll regularly, which can cause session timeouts.
Always ensure your consumer configuration matches your Kafka setup and message format.
java
/* Wrong: Missing group.id */ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // This will cause an error or unexpected behavior /* Right: Include group.id */ props.put("group.id", "my-group");
Quick Reference
| Property | Description | Example Value |
|---|---|---|
| bootstrap.servers | Kafka server addresses | localhost:9092 |
| group.id | Consumer group identifier | my-group |
| key.deserializer | Class to deserialize message keys | org.apache.kafka.common.serialization.StringDeserializer |
| value.deserializer | Class to deserialize message values | org.apache.kafka.common.serialization.StringDeserializer |
| auto.offset.reset | Where to start if no offset found | "earliest" or "latest" |
Key Takeaways
Always set bootstrap.servers, group.id, and deserializers to create a Kafka consumer.
Subscribe to topics before polling to receive messages.
Poll regularly to avoid consumer session timeouts.
Match deserializers with the message format to avoid errors.
Use auto.offset.reset to control where to start reading if no offset exists.