0
0
KafkaHow-ToBeginner · 4 min read

How to Commit Offset in Kafka: Simple Guide with Examples

To commit offsets in Kafka, use the commitSync() or commitAsync() methods of the Kafka consumer after processing messages. This tells Kafka which messages have been read, so the consumer can resume from the correct position if restarted.
📐

Syntax

The main methods to commit offsets in Kafka consumer are:

  • commitSync(): Synchronously commits the latest offsets. It blocks until the commit is acknowledged by Kafka.
  • commitAsync(): Asynchronously commits offsets. It does not block and can improve performance but may lose commit information on failure.

You can also commit specific offsets by passing a map of TopicPartition to OffsetAndMetadata.

java
consumer.commitSync();

// Or commit specific offsets
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("my-topic", 0), new OffsetAndMetadata(15));
consumer.commitSync(offsets);
💻

Example

This example shows a Kafka consumer that reads messages and commits offsets synchronously after processing each batch.

java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class CommitOffsetExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "my-group");
        props.put("enable.auto.commit", "false");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("my-topic"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Received message: key = %s, value = %s, offset = %d\n",
                            record.key(), record.value(), record.offset());
                }
                // Commit offsets synchronously after processing
                consumer.commitSync();
            }
        } finally {
            consumer.close();
        }
    }
}
Output
Received message: key = key1, value = value1, offset = 0 Received message: key = key2, value = value2, offset = 1 ...
⚠️

Common Pitfalls

  • Auto commit enabled: If enable.auto.commit is true, Kafka commits offsets automatically which may cause message loss if processing fails before commit.
  • Not committing offsets: Forgetting to commit offsets causes the consumer to reprocess messages after restart.
  • Using commitAsync() without handling failures: Async commits can fail silently; always handle callbacks or use commitSync() for critical commits.
java
/* Wrong: auto commit enabled, may lose messages on failure */
props.put("enable.auto.commit", "true");

/* Right: disable auto commit and commit manually */
props.put("enable.auto.commit", "false");
consumer.commitSync();
📊

Quick Reference

MethodDescriptionWhen to Use
commitSync()Blocks until offsets are committedUse when you need guaranteed commit before proceeding
commitAsync()Commits offsets asynchronouslyUse for better performance when occasional commit loss is acceptable
enable.auto.commitAutomatically commits offsets periodicallyAvoid for precise control over commits

Key Takeaways

Always disable auto commit by setting enable.auto.commit to false for manual offset control.
Use commitSync() to ensure offsets are committed before processing continues.
commitAsync() improves performance but may lose commits on failure; handle callbacks if used.
Committing offsets after processing messages prevents duplicate processing on restart.
You can commit specific offsets by passing a map of TopicPartition to OffsetAndMetadata.