0
0
Kafkadevops~20 mins

Consumer offset commit strategies in Kafka - Practice Problems & Coding Challenges

Choose your learning style9 modes available
Challenge - 5 Problems
🎖️
Kafka Offset Commit Master
Get all challenges correct to earn this badge!
Test your skills under time pressure!
Predict Output
intermediate
2:00remaining
What is the output of this Kafka consumer offset commit code?

Consider a Kafka consumer configured with enable.auto.commit=false. After processing a batch of messages, the consumer commits offsets synchronously using commitSync(). What will be the output of the following code snippet?

Kafka
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));

try {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println("Received: " + record.value());
    }
    consumer.commitSync();
    System.out.println("Offsets committed synchronously.");
} finally {
    consumer.close();
}
APrints all received messages but throws an exception at commitSync()
BPrints all received messages and then prints 'Offsets committed synchronously.'
CPrints nothing and immediately prints 'Offsets committed synchronously.'
DPrints all received messages and commits offsets asynchronously without confirmation
Attempts:
2 left
💡 Hint

Think about what commitSync() does and how enable.auto.commit=false affects offset commits.

Predict Output
intermediate
2:00remaining
What happens when using commitAsync() in Kafka consumer?

Given a Kafka consumer with enable.auto.commit=false, what is the expected behavior of the following code snippet that uses commitAsync()?

Kafka
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));

try {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println("Received: " + record.value());
    }
    consumer.commitAsync();
    System.out.println("Offsets committed asynchronously.");
} finally {
    consumer.close();
}
APrints all received messages and then prints 'Offsets committed asynchronously.' immediately without waiting for commit confirmation
BPrints all received messages and waits for commit confirmation before printing the message
CThrows a runtime exception because commitAsync() requires a callback
DDoes not commit offsets and prints only received messages
Attempts:
2 left
💡 Hint

Remember that commitAsync() commits offsets in the background and does not block the thread.

🧠 Conceptual
advanced
2:00remaining
Which commit strategy ensures exactly-once processing in Kafka consumers?

In Kafka consumer offset commit strategies, which approach best supports exactly-once processing semantics?

AManually committing offsets synchronously after processing each record
BUsing <code>enable.auto.commit=true</code> with default auto commit interval
CCommitting offsets asynchronously without handling commit failures
DNot committing offsets at all and relying on consumer restart
Attempts:
2 left
💡 Hint

Think about when offsets should be committed to avoid message loss or duplication.

🔧 Debug
advanced
2:00remaining
Identify the error in this Kafka consumer offset commit code

What error will this Kafka consumer code produce when trying to commit offsets?

Kafka
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
    System.out.println("Received: " + record.value());
}
consumer.commitSync();
AThrows CommitFailedException because commitSync() is called with auto commit enabled
BThrows IllegalStateException because offsets are auto-committed and manual commitSync() is called
CNo error; offsets are committed synchronously successfully
DThrows IllegalArgumentException due to missing commit callback
Attempts:
2 left
💡 Hint

Check if calling commitSync() is allowed when enable.auto.commit=true.

🚀 Application
expert
2:00remaining
How many offsets are committed after this batch processing with manual commit?

A Kafka consumer processes messages from partitions 0 and 1 of a topic. It processes 3 messages from partition 0 with offsets 5, 6, 7 and 2 messages from partition 1 with offsets 10, 11. After processing, it commits offsets manually using commitSync() with the following map:

{new TopicPartition("topic", 0): new OffsetAndMetadata(8), new TopicPartition("topic", 1): new OffsetAndMetadata(12)}

How many offsets are committed in total?

A3 offsets committed, only for partition 0 messages
B5 offsets committed, one for each message processed
C0 offsets committed because offsets are off by one
D2 offsets committed, one for each partition
Attempts:
2 left
💡 Hint

Remember that Kafka commits the offset of the next message to read, not the last processed message.