How to Commit Offset in Kafka: Simple Guide with Examples
To commit offsets in Kafka, use the
commitSync() or commitAsync() methods of the Kafka consumer after processing messages. This tells Kafka which messages have been read, so the consumer can resume from the correct position if restarted.Syntax
The main methods to commit offsets in Kafka consumer are:
commitSync(): Synchronously commits the latest offsets. It blocks until the commit is acknowledged by Kafka.commitAsync(): Asynchronously commits offsets. It does not block and can improve performance but may lose commit information on failure.
You can also commit specific offsets by passing a map of TopicPartition to OffsetAndMetadata.
java
consumer.commitSync(); // Or commit specific offsets Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); offsets.put(new TopicPartition("my-topic", 0), new OffsetAndMetadata(15)); consumer.commitSync(offsets);
Example
This example shows a Kafka consumer that reads messages and commits offsets synchronously after processing each batch.
java
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class CommitOffsetExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "my-group"); props.put("enable.auto.commit", "false"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my-topic")); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("Received message: key = %s, value = %s, offset = %d\n", record.key(), record.value(), record.offset()); } // Commit offsets synchronously after processing consumer.commitSync(); } } finally { consumer.close(); } } }
Output
Received message: key = key1, value = value1, offset = 0
Received message: key = key2, value = value2, offset = 1
...
Common Pitfalls
- Auto commit enabled: If
enable.auto.commitis true, Kafka commits offsets automatically which may cause message loss if processing fails before commit. - Not committing offsets: Forgetting to commit offsets causes the consumer to reprocess messages after restart.
- Using
commitAsync()without handling failures: Async commits can fail silently; always handle callbacks or usecommitSync()for critical commits.
java
/* Wrong: auto commit enabled, may lose messages on failure */ props.put("enable.auto.commit", "true"); /* Right: disable auto commit and commit manually */ props.put("enable.auto.commit", "false"); consumer.commitSync();
Quick Reference
| Method | Description | When to Use |
|---|---|---|
| commitSync() | Blocks until offsets are committed | Use when you need guaranteed commit before proceeding |
| commitAsync() | Commits offsets asynchronously | Use for better performance when occasional commit loss is acceptable |
| enable.auto.commit | Automatically commits offsets periodically | Avoid for precise control over commits |
Key Takeaways
Always disable auto commit by setting enable.auto.commit to false for manual offset control.
Use commitSync() to ensure offsets are committed before processing continues.
commitAsync() improves performance but may lose commits on failure; handle callbacks if used.
Committing offsets after processing messages prevents duplicate processing on restart.
You can commit specific offsets by passing a map of TopicPartition to OffsetAndMetadata.