0
0
SpringbootHow-ToBeginner · 4 min read

How to Consume Messages Using Kafka with Spring Boot

To consume messages using Kafka in Spring Boot, create a listener method annotated with @KafkaListener that listens to a topic. Configure the consumer properties in application.yml or application.properties and enable Kafka support with @EnableKafka.
📐

Syntax

To consume messages from Kafka in Spring, you use the @KafkaListener annotation on a method. This method will be called automatically when a message arrives on the specified topic.

  • @KafkaListener(topics = "topicName"): Listens to the given Kafka topic.
  • Method parameter: The message payload received from Kafka.
  • @EnableKafka: Enables Kafka listener detection in your Spring Boot application.

Consumer properties like group ID and bootstrap servers are set in configuration files.

java
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@EnableKafka
@Component
public class KafkaConsumer {

    @KafkaListener(topics = "myTopic", groupId = "myGroup")
    public void listen(String message) {
        System.out.println("Received message: " + message);
    }
}
💻

Example

This example shows a complete Spring Boot application that consumes messages from a Kafka topic named testTopic. The @KafkaListener method prints each received message to the console.

Make sure Kafka is running locally on port 9092 before running this example.

java
package com.example.kafkaconsumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@SpringBootApplication
@EnableKafka
public class KafkaConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumerApplication.class, args);
    }
}

@Component
class Consumer {
    @KafkaListener(topics = "testTopic", groupId = "group1")
    public void consume(String message) {
        System.out.println("Consumed message: " + message);
    }
}
Output
Consumed message: Hello Kafka Consumed message: Spring Boot is awesome
⚠️

Common Pitfalls

  • Missing @EnableKafka annotation: Without this, @KafkaListener methods won't be detected.
  • Incorrect topic or group ID: Make sure the topic name matches the Kafka topic and group ID is set to avoid conflicts.
  • Not configuring consumer properties: You must set bootstrap.servers and group.id in application.properties or application.yml.
  • Using wrong method signature: The listener method should accept the message payload type (e.g., String or a deserialized object).
java
/* Wrong: Missing @EnableKafka */
@Component
public class WrongConsumer {
    @KafkaListener(topics = "testTopic")
    public void listen(String msg) {
        System.out.println(msg);
    }
}

/* Right: Add @EnableKafka on configuration or main class */
@EnableKafka
@SpringBootApplication
public class App { /* ... */ }
📊

Quick Reference

Here is a quick checklist for consuming Kafka messages in Spring Boot:

StepDescription
Add @EnableKafkaEnable Kafka listener support in your Spring Boot app.
Create a consumer classAnnotate a method with @KafkaListener to receive messages.
Configure propertiesSet spring.kafka.bootstrap-servers and spring.kafka.consumer.group-id.
Run Kafka brokerMake sure Kafka server is running and topic exists.
Start Spring Boot appThe listener will consume messages automatically.

Key Takeaways

Use @KafkaListener on a method to consume Kafka messages in Spring Boot.
Always add @EnableKafka to enable listener detection.
Configure bootstrap servers and group ID in application properties.
Ensure Kafka broker is running and topics exist before consuming.
Listener methods should accept the message payload type.