0
0
KafkaHow-ToBeginner · 4 min read

How to Configure Kafka Consumer in Spring Boot

To configure a Kafka consumer in Spring, define consumer properties in application.properties or application.yml, then create a @KafkaListener method in a Spring bean to receive messages. Use spring-kafka dependency and configure the ConsumerFactory and ConcurrentKafkaListenerContainerFactory if custom setup is needed.
📐

Syntax

Kafka consumer configuration in Spring involves setting properties for the consumer group, bootstrap servers, key and value deserializers, and optionally concurrency. The @KafkaListener annotation marks a method to receive messages from a topic.

  • spring.kafka.consumer.group-id: Identifies the consumer group.
  • spring.kafka.bootstrap-servers: Kafka server addresses.
  • spring.kafka.consumer.key-deserializer: Class to deserialize message keys.
  • spring.kafka.consumer.value-deserializer: Class to deserialize message values.
  • @KafkaListener(topics = "topicName"): Method to listen to messages.
properties/java
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listen(String message) {
    // process message
}
💻

Example

This example shows a Spring Boot application configured to consume messages from a Kafka topic named test-topic. It uses @KafkaListener to receive and print messages.

java
package com.example.kafkaconsumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@SpringBootApplication
@EnableKafka
public class KafkaConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumerApplication.class, args);
    }
}

@Component
class Consumer {
    @KafkaListener(topics = "test-topic", groupId = "group1")
    public void listen(String message) {
        System.out.println("Received message: " + message);
    }
}

// application.properties
// spring.kafka.bootstrap-servers=localhost:9092
// spring.kafka.consumer.group-id=group1
// spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
// spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
Output
Received message: Hello Kafka Received message: Another message
⚠️

Common Pitfalls

Common mistakes when configuring Kafka consumers in Spring include:

  • Not setting the group-id, which causes consumers not to join a group properly.
  • Using wrong deserializer classes, leading to deserialization errors.
  • Forgetting to add @EnableKafka annotation to enable Kafka support.
  • Not matching the topic name exactly in @KafkaListener.
  • Not configuring the Kafka bootstrap servers correctly.

Always verify your application.properties and listener method signatures.

properties
/* Wrong: Missing group-id */
spring.kafka.consumer.group-id=

/* Right: Set group-id */
spring.kafka.consumer.group-id=my-group

/* Wrong: Using StringSerializer instead of StringDeserializer */
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringSerializer

/* Right: Use StringDeserializer */
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
📊

Quick Reference

Summary tips for Kafka consumer configuration in Spring:

  • Always set spring.kafka.bootstrap-servers to your Kafka cluster address.
  • Define a unique group-id for your consumer group.
  • Use StringDeserializer for simple string messages.
  • Annotate your listener method with @KafkaListener and specify the topic.
  • Add @EnableKafka on your main application class.

Key Takeaways

Set Kafka consumer properties like bootstrap servers and group-id in application.properties.
Use @KafkaListener annotation on a method to receive messages from Kafka topics.
Add @EnableKafka annotation to enable Kafka support in Spring Boot.
Use correct deserializer classes to avoid message parsing errors.
Verify topic names and group IDs match between configuration and listener.