Complete the code to define a Kafka topic bean in Spring Boot.
import org.apache.kafka.clients.admin.NewTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class KafkaTopicConfig { @Bean public NewTopic [1]() { return new NewTopic("my_topic", 1, (short) 1); } }
The method name defining the Kafka topic bean can be any valid method name, but createTopic is a common and clear choice.
Complete the code to send a message to a Kafka topic using KafkaTemplate.
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class ProducerService { private final KafkaTemplate<String, String> kafkaTemplate; public ProducerService(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } public void sendMessage(String message) { kafkaTemplate.[1]("my_topic", message); } }
The KafkaTemplate class uses the send method to send messages to a Kafka topic.
Fix the error in the Kafka listener method annotation to listen to the correct topic.
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class ConsumerService { @KafkaListener(topics = "[1]", groupId = "group_id") public void consume(String message) { System.out.println("Received message: " + message); } }
The listener must subscribe to the same topic name used when sending messages, which is my_topic.
Fill both blanks to configure Kafka consumer properties in application.yml.
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: [1]
key-deserializer: [2]The group-id should be a custom group name like my_group. The key-deserializer for string keys is org.apache.kafka.common.serialization.StringDeserializer.
Fill all three blanks to create a Kafka listener method that listens to two topics and uses a specific group id.
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class MultiTopicConsumer { @KafkaListener(topics = {"[1]", "[2]"}, groupId = "[3]") public void listen(String message) { System.out.println("Message received: " + message); } }
The listener subscribes to topicA and topicB with the consumer group multi_group.