How to Consume Messages from Kafka: Simple Guide
To consume messages from
Kafka, create a KafkaConsumer client, subscribe to one or more topics, and continuously poll for new messages using the poll() method. Process each received message and commit offsets to track consumption progress.Syntax
The basic steps to consume messages from Kafka are:
- Create a
KafkaConsumerwith configuration properties likebootstrap.serversandgroup.id. - Subscribe to topics using
consumer.subscribe(). - Use
consumer.poll()in a loop to fetch messages. - Process each message from the returned records.
- Commit offsets automatically or manually to mark messages as consumed.
java
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "my-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }
Example
This example shows a simple Kafka consumer in Java that connects to a Kafka broker, subscribes to a topic named test-topic, and prints each message's offset, key, and value.
java
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class SimpleKafkaConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Collections.singletonList("test-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } } }
Output
offset = 15, key = user1, value = Hello Kafka
offset = 16, key = user2, value = Welcome to Kafka
Common Pitfalls
- Not setting the correct
group.id: Consumers in the same group share message consumption. Missing or wrong group ID can cause unexpected behavior. - Not subscribing to topics before polling: You must call
subscribe()beforepoll()or no messages will be received. - Ignoring offset commits: Without committing offsets, Kafka may resend messages causing duplicates.
- Using infinite loops without exit conditions: Always plan how to stop the consumer gracefully.
java
/* Wrong: No group.id set */ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); // This will throw an error or behave unexpectedly because group.id is missing /* Right: Set group.id */ props.put("group.id", "my-group");
Quick Reference
Remember these key points when consuming messages from Kafka:
- bootstrap.servers: Kafka broker address.
- group.id: Consumer group identifier.
- subscribe(): Register topics to consume.
- poll(): Fetch messages.
- Commit offsets: Track consumed messages.
Key Takeaways
Create a KafkaConsumer with proper configuration including bootstrap.servers and group.id.
Subscribe to topics before polling messages.
Use poll() in a loop to continuously receive messages.
Commit offsets to avoid processing duplicates.
Handle consumer shutdown gracefully to free resources.