How to Use commitAsync in Kafka for Asynchronous Offset Commit
Use
commitAsync() in Kafka consumers to commit offsets asynchronously without blocking the consumer thread. It accepts an optional callback to handle success or failure of the commit operation, allowing your application to continue processing messages smoothly.Syntax
The commitAsync() method is called on a Kafka consumer instance to commit offsets asynchronously. It can be used with or without a callback.
commitAsync(): Commits the latest offsets asynchronously without a callback.commitAsync(OffsetCommitCallback callback): Commits offsets asynchronously and executes the callback when done.
java
consumer.commitAsync(); // With callback consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception != null) { System.err.println("Commit failed for offsets " + offsets + ": " + exception.getMessage()); } else { System.out.println("Commit succeeded for offsets " + offsets); } } });
Example
This example shows a Kafka consumer that polls messages and commits offsets asynchronously using commitAsync() with a callback to log success or failure.
java
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import java.time.Duration; import java.util.Collections; import java.util.Map; import java.util.Properties; public class AsyncCommitExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("enable.auto.commit", "false"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Collections.singletonList("my-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("Consumed message: key = %s, value = %s, offset = %d\n", record.key(), record.value(), record.offset()); } consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception != null) { System.err.println("Commit failed for offsets " + offsets + ": " + exception.getMessage()); } else { System.out.println("Commit succeeded for offsets " + offsets); } } }); } } } }
Output
Consumed message: key = key1, value = value1, offset = 15
Commit succeeded for offsets {my-topic-0=OffsetAndMetadata{offset=16, metadata=''}}
Consumed message: key = key2, value = value2, offset = 16
Commit succeeded for offsets {my-topic-0=OffsetAndMetadata{offset=17, metadata=''}}
Common Pitfalls
- Ignoring commit failures: Not handling exceptions in the callback can hide commit errors.
- Using commitAsync without disabling auto commit: If
enable.auto.commitis true, manual commits may conflict. - Not calling commitSync on shutdown: Relying only on
commitAsynccan cause lost offsets on shutdown because it is asynchronous.
java
/* Wrong: No callback and auto commit enabled */ props.put("enable.auto.commit", "true"); consumer.commitAsync(); // May cause conflicts /* Right: Disable auto commit and handle errors */ props.put("enable.auto.commit", "false"); consumer.commitAsync((offsets, exception) -> { if (exception != null) { System.err.println("Commit failed: " + exception.getMessage()); } });
Quick Reference
commitAsync() commits offsets asynchronously without blocking.
commitAsync(OffsetCommitCallback callback) commits asynchronously and lets you handle success or failure.
Always disable enable.auto.commit when using manual commits.
Use commitSync() on shutdown to ensure offsets are saved.
| Method | Description | Notes |
|---|---|---|
| commitAsync() | Commit offsets asynchronously without callback | Non-blocking, no error handling |
| commitAsync(OffsetCommitCallback) | Commit asynchronously with callback | Allows error handling |
| commitSync() | Commit offsets synchronously | Blocks until commit completes, use on shutdown |
| enable.auto.commit | Kafka consumer config | Set to false for manual commits |
Key Takeaways
Use commitAsync() to commit Kafka consumer offsets without blocking message processing.
Always disable enable.auto.commit when manually committing offsets asynchronously.
Provide an OffsetCommitCallback to handle commit success or failure for better reliability.
Call commitSync() during shutdown to ensure all offsets are committed before exit.
Ignoring commit failures can lead to message reprocessing or data loss.