0
0
KafkaHow-ToBeginner · 4 min read

How to Use Kafka Consumer API: Syntax, Example, and Tips

To use the KafkaConsumer API, create a consumer instance with configuration properties, subscribe to topics using subscribe(), and poll messages with poll(). Process the received records and commit offsets to track consumption progress.
📐

Syntax

The Kafka Consumer API requires creating a KafkaConsumer object with properties like bootstrap.servers, group.id, and deserializers. Use subscribe() to listen to topics, then call poll() in a loop to fetch messages. Finally, commit offsets to mark messages as processed.

  • Properties: Configure connection and behavior.
  • subscribe(Collection<String> topics): Subscribe to topics.
  • poll(Duration timeout): Fetch messages.
  • commitSync(): Commit offsets synchronously.
java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
    }
    consumer.commitSync();
}
💻

Example

This example shows a simple Kafka consumer that connects to a local Kafka server, subscribes to a topic named test-topic, polls messages every 100 milliseconds, prints each message's offset, key, and value, and commits offsets after processing.

java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SimpleConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Collections.singletonList("test-topic"));

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
                }
                consumer.commitSync();
            }
        }
    }
}
Output
offset = 15, key = user1, value = Hello Kafka offset = 16, key = user2, value = Welcome to Kafka Consumer API
⚠️

Common Pitfalls

  • Not committing offsets: Without committing, Kafka may resend messages causing duplicates.
  • Long poll intervals: Polling too infrequently can cause consumer group rebalances.
  • Incorrect deserializers: Using wrong deserializers causes runtime errors.
  • Not closing consumer: Can cause resource leaks.

Always commit offsets after processing and close the consumer properly.

java
/* Wrong: No commit, no close */
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic"));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
    System.out.println(record.value());
}

/* Right: Commit and close */
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Arrays.asList("topic"));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.value());
        }
        consumer.commitSync();
    }
}
📊

Quick Reference

Keep these tips in mind when using Kafka Consumer API:

  • Set bootstrap.servers to your Kafka broker address.
  • Use group.id to identify your consumer group.
  • Always specify correct key and value deserializers.
  • Call subscribe() before polling.
  • Poll frequently to avoid session timeouts.
  • Commit offsets after processing messages.
  • Close the consumer to free resources.

Key Takeaways

Create a KafkaConsumer with proper configs and deserializers before subscribing.
Use subscribe() to listen to topics and poll() in a loop to receive messages.
Commit offsets after processing to avoid duplicate message consumption.
Poll frequently and close the consumer to maintain stable connections.
Use try-with-resources or close the consumer to prevent resource leaks.