Consider a Kafka consumer configured with enable.auto.commit=false. After processing a batch of messages, the consumer commits offsets synchronously using commitSync(). What will be the output of the following code snippet?
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("enable.auto.commit", "false"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test-topic")); try { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.println("Received: " + record.value()); } consumer.commitSync(); System.out.println("Offsets committed synchronously."); } finally { consumer.close(); }
Think about what commitSync() does and how enable.auto.commit=false affects offset commits.
With enable.auto.commit=false, offsets are not committed automatically. The commitSync() method commits offsets synchronously, so after processing messages, the offsets are committed and the confirmation message is printed.
Given a Kafka consumer with enable.auto.commit=false, what is the expected behavior of the following code snippet that uses commitAsync()?
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("enable.auto.commit", "false"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test-topic")); try { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.println("Received: " + record.value()); } consumer.commitAsync(); System.out.println("Offsets committed asynchronously."); } finally { consumer.close(); }
Remember that commitAsync() commits offsets in the background and does not block the thread.
The commitAsync() method commits offsets asynchronously, so the program continues immediately after calling it and prints the confirmation message without waiting for the commit to complete.
In Kafka consumer offset commit strategies, which approach best supports exactly-once processing semantics?
Think about when offsets should be committed to avoid message loss or duplication.
Manually committing offsets synchronously after processing each record ensures that offsets are only committed after successful processing, supporting exactly-once semantics.
What error will this Kafka consumer code produce when trying to commit offsets?
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("enable.auto.commit", "true"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test-topic")); ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.println("Received: " + record.value()); } consumer.commitSync();
Check if calling commitSync() is allowed when enable.auto.commit=true.
Even if enable.auto.commit=true, calling commitSync() is allowed and will commit offsets synchronously. No error is thrown.
A Kafka consumer processes messages from partitions 0 and 1 of a topic. It processes 3 messages from partition 0 with offsets 5, 6, 7 and 2 messages from partition 1 with offsets 10, 11. After processing, it commits offsets manually using commitSync() with the following map:
{new TopicPartition("topic", 0): new OffsetAndMetadata(8), new TopicPartition("topic", 1): new OffsetAndMetadata(12)}How many offsets are committed in total?
Remember that Kafka commits the offset of the next message to read, not the last processed message.
The commit map contains one offset per partition indicating the next offset to read. So two offsets are committed, one for each partition.