0
0
KafkaHow-ToBeginner · 4 min read

How to Consume Messages from Kafka: Simple Guide

To consume messages from Kafka, create a KafkaConsumer client, subscribe to one or more topics, and continuously poll for new messages using the poll() method. Process each received message and commit offsets to track consumption progress.
📐

Syntax

The basic steps to consume messages from Kafka are:

  • Create a KafkaConsumer with configuration properties like bootstrap.servers and group.id.
  • Subscribe to topics using consumer.subscribe().
  • Use consumer.poll() in a loop to fetch messages.
  • Process each message from the returned records.
  • Commit offsets automatically or manually to mark messages as consumed.
java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}
💻

Example

This example shows a simple Kafka consumer in Java that connects to a Kafka broker, subscribes to a topic named test-topic, and prints each message's offset, key, and value.

java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SimpleKafkaConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Collections.singletonList("test-topic"));

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
            }
        }
    }
}
Output
offset = 15, key = user1, value = Hello Kafka offset = 16, key = user2, value = Welcome to Kafka
⚠️

Common Pitfalls

  • Not setting the correct group.id: Consumers in the same group share message consumption. Missing or wrong group ID can cause unexpected behavior.
  • Not subscribing to topics before polling: You must call subscribe() before poll() or no messages will be received.
  • Ignoring offset commits: Without committing offsets, Kafka may resend messages causing duplicates.
  • Using infinite loops without exit conditions: Always plan how to stop the consumer gracefully.
java
/* Wrong: No group.id set */
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));

// This will throw an error or behave unexpectedly because group.id is missing

/* Right: Set group.id */
props.put("group.id", "my-group");
📊

Quick Reference

Remember these key points when consuming messages from Kafka:

  • bootstrap.servers: Kafka broker address.
  • group.id: Consumer group identifier.
  • subscribe(): Register topics to consume.
  • poll(): Fetch messages.
  • Commit offsets: Track consumed messages.

Key Takeaways

Create a KafkaConsumer with proper configuration including bootstrap.servers and group.id.
Subscribe to topics before polling messages.
Use poll() in a loop to continuously receive messages.
Commit offsets to avoid processing duplicates.
Handle consumer shutdown gracefully to free resources.