How to Configure Kafka Consumer in Spring Boot
To configure a Kafka consumer in Spring, define consumer properties in
application.properties or application.yml, then create a @KafkaListener method in a Spring bean to receive messages. Use spring-kafka dependency and configure the ConsumerFactory and ConcurrentKafkaListenerContainerFactory if custom setup is needed.Syntax
Kafka consumer configuration in Spring involves setting properties for the consumer group, bootstrap servers, key and value deserializers, and optionally concurrency. The @KafkaListener annotation marks a method to receive messages from a topic.
- spring.kafka.consumer.group-id: Identifies the consumer group.
- spring.kafka.bootstrap-servers: Kafka server addresses.
- spring.kafka.consumer.key-deserializer: Class to deserialize message keys.
- spring.kafka.consumer.value-deserializer: Class to deserialize message values.
- @KafkaListener(topics = "topicName"): Method to listen to messages.
properties/java
spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=my-group spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer @KafkaListener(topics = "my-topic", groupId = "my-group") public void listen(String message) { // process message }
Example
This example shows a Spring Boot application configured to consume messages from a Kafka topic named test-topic. It uses @KafkaListener to receive and print messages.
java
package com.example.kafkaconsumer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @SpringBootApplication @EnableKafka public class KafkaConsumerApplication { public static void main(String[] args) { SpringApplication.run(KafkaConsumerApplication.class, args); } } @Component class Consumer { @KafkaListener(topics = "test-topic", groupId = "group1") public void listen(String message) { System.out.println("Received message: " + message); } } // application.properties // spring.kafka.bootstrap-servers=localhost:9092 // spring.kafka.consumer.group-id=group1 // spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer // spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
Output
Received message: Hello Kafka
Received message: Another message
Common Pitfalls
Common mistakes when configuring Kafka consumers in Spring include:
- Not setting the
group-id, which causes consumers not to join a group properly. - Using wrong deserializer classes, leading to deserialization errors.
- Forgetting to add
@EnableKafkaannotation to enable Kafka support. - Not matching the topic name exactly in
@KafkaListener. - Not configuring the Kafka bootstrap servers correctly.
Always verify your application.properties and listener method signatures.
properties
/* Wrong: Missing group-id */ spring.kafka.consumer.group-id= /* Right: Set group-id */ spring.kafka.consumer.group-id=my-group /* Wrong: Using StringSerializer instead of StringDeserializer */ spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringSerializer /* Right: Use StringDeserializer */ spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
Quick Reference
Summary tips for Kafka consumer configuration in Spring:
- Always set
spring.kafka.bootstrap-serversto your Kafka cluster address. - Define a unique
group-idfor your consumer group. - Use
StringDeserializerfor simple string messages. - Annotate your listener method with
@KafkaListenerand specify the topic. - Add
@EnableKafkaon your main application class.
Key Takeaways
Set Kafka consumer properties like bootstrap servers and group-id in application.properties.
Use @KafkaListener annotation on a method to receive messages from Kafka topics.
Add @EnableKafka annotation to enable Kafka support in Spring Boot.
Use correct deserializer classes to avoid message parsing errors.
Verify topic names and group IDs match between configuration and listener.