Consider the following Kafka consumer poll loop in Java. What will be printed to the console?
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my-topic")); int messageCount = 0; while (messageCount < 3) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.println("Received: " + record.value()); messageCount++; } } consumer.close();
Look at the messageCount condition in the while loop.
The loop continues polling until messageCount reaches 3, so it prints exactly 3 messages and then stops.
In a Kafka consumer application, what is the effect of not calling consumer.poll() regularly inside the consumer loop?
Think about how Kafka detects active consumers in a group.
Kafka requires consumers to poll regularly to show they are alive. If polling stops, the broker assumes the consumer is dead and triggers a rebalance.
Examine the following code snippet. Why might this consumer fail to commit offsets properly?
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("topic1")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { process(record.value()); } // Missing commit call here }
Check where offset commits happen in Kafka consumer code.
Kafka consumers must explicitly commit offsets after processing messages. Without commit calls, offsets are not saved, risking duplicate processing.
Identify the correct fix for the syntax error in this Kafka consumer poll loop snippet:
while (true) {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
System.out.println(record.value());
}
} Check the expected argument type for poll method in KafkaConsumer.
The poll method expects a Duration object, not an integer. Using Duration.ofMillis(100) fixes the syntax error.
You want to stop a Kafka consumer poll loop gracefully on receiving a shutdown signal. Which approach correctly implements this?
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("topic")); final AtomicBoolean running = new AtomicBoolean(true); Runtime.getRuntime().addShutdownHook(new Thread(() -> { running.set(false); consumer.wakeup(); })); try { while (running.get()) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { process(record.value()); } consumer.commitSync(); } } catch (WakeupException e) { if (running.get()) throw e; } finally { consumer.close(); }
Consider how to interrupt a blocking poll call safely.
The wakeup() method interrupts the blocking poll() call, allowing the loop to exit gracefully when the flag is set.