0
0
KafkaHow-ToBeginner · 4 min read

How to Use commitAsync in Kafka for Asynchronous Offset Commit

Use commitAsync() in Kafka consumers to commit offsets asynchronously without blocking the consumer thread. It accepts an optional callback to handle success or failure of the commit operation, allowing your application to continue processing messages smoothly.
📐

Syntax

The commitAsync() method is called on a Kafka consumer instance to commit offsets asynchronously. It can be used with or without a callback.

  • commitAsync(): Commits the latest offsets asynchronously without a callback.
  • commitAsync(OffsetCommitCallback callback): Commits offsets asynchronously and executes the callback when done.
java
consumer.commitAsync();

// With callback
consumer.commitAsync(new OffsetCommitCallback() {
    @Override
    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
        if (exception != null) {
            System.err.println("Commit failed for offsets " + offsets + ": " + exception.getMessage());
        } else {
            System.out.println("Commit succeeded for offsets " + offsets);
        }
    }
});
💻

Example

This example shows a Kafka consumer that polls messages and commits offsets asynchronously using commitAsync() with a callback to log success or failure.

java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;

public class AsyncCommitExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("enable.auto.commit", "false");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Collections.singletonList("my-topic"));

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Consumed message: key = %s, value = %s, offset = %d\n",
                            record.key(), record.value(), record.offset());
                }

                consumer.commitAsync(new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                        if (exception != null) {
                            System.err.println("Commit failed for offsets " + offsets + ": " + exception.getMessage());
                        } else {
                            System.out.println("Commit succeeded for offsets " + offsets);
                        }
                    }
                });
            }
        }
    }
}
Output
Consumed message: key = key1, value = value1, offset = 15 Commit succeeded for offsets {my-topic-0=OffsetAndMetadata{offset=16, metadata=''}} Consumed message: key = key2, value = value2, offset = 16 Commit succeeded for offsets {my-topic-0=OffsetAndMetadata{offset=17, metadata=''}}
⚠️

Common Pitfalls

  • Ignoring commit failures: Not handling exceptions in the callback can hide commit errors.
  • Using commitAsync without disabling auto commit: If enable.auto.commit is true, manual commits may conflict.
  • Not calling commitSync on shutdown: Relying only on commitAsync can cause lost offsets on shutdown because it is asynchronous.
java
/* Wrong: No callback and auto commit enabled */
props.put("enable.auto.commit", "true");
consumer.commitAsync(); // May cause conflicts

/* Right: Disable auto commit and handle errors */
props.put("enable.auto.commit", "false");
consumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        System.err.println("Commit failed: " + exception.getMessage());
    }
});
📊

Quick Reference

commitAsync() commits offsets asynchronously without blocking.

commitAsync(OffsetCommitCallback callback) commits asynchronously and lets you handle success or failure.

Always disable enable.auto.commit when using manual commits.

Use commitSync() on shutdown to ensure offsets are saved.

MethodDescriptionNotes
commitAsync()Commit offsets asynchronously without callbackNon-blocking, no error handling
commitAsync(OffsetCommitCallback)Commit asynchronously with callbackAllows error handling
commitSync()Commit offsets synchronouslyBlocks until commit completes, use on shutdown
enable.auto.commitKafka consumer configSet to false for manual commits

Key Takeaways

Use commitAsync() to commit Kafka consumer offsets without blocking message processing.
Always disable enable.auto.commit when manually committing offsets asynchronously.
Provide an OffsetCommitCallback to handle commit success or failure for better reliability.
Call commitSync() during shutdown to ensure all offsets are committed before exit.
Ignoring commit failures can lead to message reprocessing or data loss.