Complete the code to create a Kafka producer.
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", [1]); KafkaProducer<String, String> producer = new KafkaProducer<>(props);
The value serializer must be set to StringSerializer to send string messages.
Complete the code to send a message to a Kafka topic.
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", [1], "Hello Kafka"); producer.send(record);
The second argument is the message key, which should be a string like "key1".
Fix the error in the consumer configuration to read string messages.
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", [1]); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
The value deserializer must be StringDeserializer to read string messages correctly.
Fill both blanks to create a dictionary comprehension that filters events with value length greater than 5.
Map<String, String> filteredEvents = events.entrySet().stream()
.filter(e -> e.getValue().[1]() > 5)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));In Java, to get the length of a string, use the method length().
Fill all three blanks to create a Kafka consumer poll loop that processes records.
while (true) { ConsumerRecords<String, String> records = consumer.[1](Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.println("Received message: " + record.[2]() + ": " + record.[3]()); } }
The consumer uses poll() to fetch messages. Each record has key() and value() methods to access data.