How to Use Kafka Consumer API: Syntax, Example, and Tips
To use the
KafkaConsumer API, create a consumer instance with configuration properties, subscribe to topics using subscribe(), and poll messages with poll(). Process the received records and commit offsets to track consumption progress.Syntax
The Kafka Consumer API requires creating a KafkaConsumer object with properties like bootstrap.servers, group.id, and deserializers. Use subscribe() to listen to topics, then call poll() in a loop to fetch messages. Finally, commit offsets to mark messages as processed.
- Properties: Configure connection and behavior.
- subscribe(Collection<String> topics): Subscribe to topics.
- poll(Duration timeout): Fetch messages.
- commitSync(): Commit offsets synchronously.
java
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "my-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } consumer.commitSync(); }
Example
This example shows a simple Kafka consumer that connects to a local Kafka server, subscribes to a topic named test-topic, polls messages every 100 milliseconds, prints each message's offset, key, and value, and commits offsets after processing.
java
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class SimpleConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Collections.singletonList("test-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } consumer.commitSync(); } } } }
Output
offset = 15, key = user1, value = Hello Kafka
offset = 16, key = user2, value = Welcome to Kafka Consumer API
Common Pitfalls
- Not committing offsets: Without committing, Kafka may resend messages causing duplicates.
- Long poll intervals: Polling too infrequently can cause consumer group rebalances.
- Incorrect deserializers: Using wrong deserializers causes runtime errors.
- Not closing consumer: Can cause resource leaks.
Always commit offsets after processing and close the consumer properly.
java
/* Wrong: No commit, no close */ KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("topic")); ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.println(record.value()); } /* Right: Commit and close */ try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Arrays.asList("topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.println(record.value()); } consumer.commitSync(); } }
Quick Reference
Keep these tips in mind when using Kafka Consumer API:
- Set
bootstrap.serversto your Kafka broker address. - Use
group.idto identify your consumer group. - Always specify correct key and value deserializers.
- Call
subscribe()before polling. - Poll frequently to avoid session timeouts.
- Commit offsets after processing messages.
- Close the consumer to free resources.
Key Takeaways
Create a KafkaConsumer with proper configs and deserializers before subscribing.
Use subscribe() to listen to topics and poll() in a loop to receive messages.
Commit offsets after processing to avoid duplicate message consumption.
Poll frequently and close the consumer to maintain stable connections.
Use try-with-resources or close the consumer to prevent resource leaks.