How to Use commitSync in Kafka for Reliable Offset Commit
In Kafka, use
commitSync() on a consumer to synchronously commit the current offsets of messages you have processed. This method blocks until the commit is acknowledged by the Kafka broker, ensuring your offsets are saved reliably before continuing.Syntax
The commitSync() method is called on a Kafka consumer instance to commit offsets synchronously. It can be called without arguments to commit the latest offsets or with a map of specific offsets.
consumer.commitSync(): Commits the latest offsets of all partitions assigned to the consumer.consumer.commitSync(offsets): Commits the specified offsets for partitions.
java
consumer.commitSync(); // Or with specific offsets Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); offsets.put(new TopicPartition("topicName", 0), new OffsetAndMetadata(15)); consumer.commitSync(offsets);
Example
This example shows a Kafka consumer that polls messages, processes them, and then uses commitSync() to synchronously commit the offsets after processing each batch. This ensures that offsets are saved only after messages are handled.
java
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class CommitSyncExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "example-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("enable.auto.commit", "false"); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Collections.singletonList("my-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("Consumed message: key = %s, value = %s, offset = %d\n", record.key(), record.value(), record.offset()); // Process the message here } // Commit offsets synchronously after processing consumer.commitSync(); } } } }
Output
Consumed message: key = key1, value = value1, offset = 0
Consumed message: key = key2, value = value2, offset = 1
...
Common Pitfalls
- Not disabling auto commit: If
enable.auto.commitis true, Kafka commits offsets automatically, which can cause conflicts with manualcommitSync(). - Blocking too long:
commitSync()blocks until the broker acknowledges, so calling it too frequently or in a tight loop can reduce performance. - Ignoring exceptions:
commitSync()throws exceptions on failure; not handling them can cause offset loss or duplicate processing.
java
try { consumer.commitSync(); } catch (org.apache.kafka.clients.consumer.CommitFailedException e) { System.err.println("Commit failed, retry or handle accordingly"); }
Quick Reference
Remember these tips when using commitSync():
- Set
enable.auto.committofalseto control commits manually. - Call
commitSync()after processing messages to ensure reliable offset saving. - Handle exceptions from
commitSync()to avoid losing offset commits. - Use
commitSync()for critical commits where you want to block until confirmation.
Key Takeaways
Use commitSync() to synchronously commit offsets after processing messages for reliability.
Always disable auto commit by setting enable.auto.commit to false when using commitSync().
Handle exceptions from commitSync() to manage commit failures gracefully.
Avoid calling commitSync() too frequently to prevent performance issues.
commitSync() blocks until the broker confirms the commit, ensuring offsets are saved.