How to Use Kafka Consumer in Java: Simple Guide
To use a
KafkaConsumer in Java, create a consumer instance with properties like bootstrap.servers and group.id, subscribe to topics using subscribe(), and poll messages with poll(). Then process the records and close the consumer to release resources.Syntax
The basic steps to use a Kafka consumer in Java are:
- Create a
Propertiesobject with configuration settings. - Instantiate a
KafkaConsumer<K, V>with those properties. - Subscribe to one or more topics using
subscribe(). - Use
poll()in a loop to fetch records. - Process the records as needed.
- Close the consumer when done.
java
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "my-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } // consumer.close(); // call when finished consuming
Example
This example shows a complete Java Kafka consumer that connects to a Kafka broker, subscribes to a topic named test-topic, polls messages, and prints their offset, key, and value.
java
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; public class SimpleKafkaConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Arrays.asList("test-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } } }
Output
offset = 0, key = user1, value = Hello Kafka
offset = 1, key = user2, value = Welcome to Kafka
Common Pitfalls
Common mistakes when using Kafka consumer in Java include:
- Not setting the
group.idproperty, which is required for consumer group management. - Forgetting to use the correct deserializer classes for key and value.
- Not calling
poll()regularly, which can cause the consumer to be considered dead by the broker. - Not closing the consumer properly, which can lead to resource leaks.
- Using infinite loops without a proper shutdown mechanism.
java
/* Wrong: Missing group.id and wrong deserializer */ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); // props.put("group.id", "my-group"); // Missing group.id props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); // Wrong if expecting String props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); /* Right: Correct group.id and deserializers */ props.put("group.id", "my-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Quick Reference
Remember these key points when using Kafka consumer in Java:
- bootstrap.servers: Kafka broker address.
- group.id: Consumer group identifier.
- key.deserializer and value.deserializer: Classes to convert bytes to objects.
- subscribe(): To listen to topics.
- poll(): To fetch messages regularly.
- close(): To release resources.
Key Takeaways
Always set the group.id property for your Kafka consumer.
Use StringDeserializer for keys and values if your messages are strings.
Call poll() regularly to keep the consumer alive and fetch messages.
Subscribe to topics before polling to receive messages.
Close the consumer properly to avoid resource leaks.