Complete the code to create a Kafka consumer.
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", [1]); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
The value deserializer must match the type of the message value. Here, StringDeserializer is used to deserialize string values.
Complete the code to subscribe the consumer to a topic named 'my-topic'.
consumer.[1](Collections.singletonList("my-topic"));
assign instead of subscribe for topic subscription.poll before subscribing.The subscribe method is used to subscribe the consumer to one or more topics.
Fix the error in the code to correctly poll messages from Kafka.
ConsumerRecords<String, String> records = consumer.[1];Since Kafka 2.0, the poll method requires a Duration argument specifying the timeout.
Fill both blanks to process each record's key and value inside the poll loop.
for (ConsumerRecord<String, String> record : records) { System.out.println("Key: " + record.[1] + ", Value: " + record.[2]); }
The key and value fields of ConsumerRecord hold the message key and value respectively.
Fill all three blanks to commit offsets synchronously after processing records.
try { consumer.[1](); } catch (Exception e) { System.out.println("Commit failed: "); e.[2](); } finally { consumer.[3](); }
commitAsync() instead of commitSync() here.getMessage() on exception instead of printStackTrace().commitSync() commits offsets synchronously. printStackTrace() prints error details. close() closes the consumer cleanly.