0
0
KafkaHow-ToBeginner · 4 min read

How to Use commitSync in Kafka for Reliable Offset Commit

In Kafka, use commitSync() on a consumer to synchronously commit the current offsets of messages you have processed. This method blocks until the commit is acknowledged by the Kafka broker, ensuring your offsets are saved reliably before continuing.
📐

Syntax

The commitSync() method is called on a Kafka consumer instance to commit offsets synchronously. It can be called without arguments to commit the latest offsets or with a map of specific offsets.

  • consumer.commitSync(): Commits the latest offsets of all partitions assigned to the consumer.
  • consumer.commitSync(offsets): Commits the specified offsets for partitions.
java
consumer.commitSync();

// Or with specific offsets
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("topicName", 0), new OffsetAndMetadata(15));
consumer.commitSync(offsets);
💻

Example

This example shows a Kafka consumer that polls messages, processes them, and then uses commitSync() to synchronously commit the offsets after processing each batch. This ensures that offsets are saved only after messages are handled.

java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class CommitSyncExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "example-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("enable.auto.commit", "false");

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Collections.singletonList("my-topic"));

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Consumed message: key = %s, value = %s, offset = %d\n",
                            record.key(), record.value(), record.offset());
                    // Process the message here
                }
                // Commit offsets synchronously after processing
                consumer.commitSync();
            }
        }
    }
}
Output
Consumed message: key = key1, value = value1, offset = 0 Consumed message: key = key2, value = value2, offset = 1 ...
⚠️

Common Pitfalls

  • Not disabling auto commit: If enable.auto.commit is true, Kafka commits offsets automatically, which can cause conflicts with manual commitSync().
  • Blocking too long: commitSync() blocks until the broker acknowledges, so calling it too frequently or in a tight loop can reduce performance.
  • Ignoring exceptions: commitSync() throws exceptions on failure; not handling them can cause offset loss or duplicate processing.
java
try {
    consumer.commitSync();
} catch (org.apache.kafka.clients.consumer.CommitFailedException e) {
    System.err.println("Commit failed, retry or handle accordingly");
}
📊

Quick Reference

Remember these tips when using commitSync():

  • Set enable.auto.commit to false to control commits manually.
  • Call commitSync() after processing messages to ensure reliable offset saving.
  • Handle exceptions from commitSync() to avoid losing offset commits.
  • Use commitSync() for critical commits where you want to block until confirmation.

Key Takeaways

Use commitSync() to synchronously commit offsets after processing messages for reliability.
Always disable auto commit by setting enable.auto.commit to false when using commitSync().
Handle exceptions from commitSync() to manage commit failures gracefully.
Avoid calling commitSync() too frequently to prevent performance issues.
commitSync() blocks until the broker confirms the commit, ensuring offsets are saved.