How to Use KafkaTemplate for Sending Messages in Kafka
Use
KafkaTemplate in Spring to send messages to Kafka topics by calling its send() method with the topic name and message. It handles the connection and serialization, making it easy to produce messages asynchronously.Syntax
The basic syntax to send a message using KafkaTemplate is:
kafkaTemplate.send(topic, message): Sends a message to the specified topic.topic: The name of the Kafka topic as a string.message: The message payload, usually a string or a serializable object.
This method returns a ListenableFuture which you can use to check if the send was successful.
java
kafkaTemplate.send("myTopic", "Hello Kafka!");
Example
This example shows how to configure KafkaTemplate in a Spring Boot application and send a simple string message to a Kafka topic named testTopic.
java
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Autowired; import java.util.HashMap; import java.util.Map; @SpringBootApplication public class KafkaTemplateExample { @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String topic, String message) { kafkaTemplate.send(topic, message); System.out.println("Sent message: " + message + " to topic: " + topic); } public static void main(String[] args) { var context = SpringApplication.run(KafkaTemplateExample.class, args); KafkaTemplateExample example = context.getBean(KafkaTemplateExample.class); example.sendMessage("testTopic", "Hello KafkaTemplate!"); } }
Output
Sent message: Hello KafkaTemplate! to topic: testTopic
Common Pitfalls
Common mistakes when using KafkaTemplate include:
- Not configuring the
ProducerFactoryproperly, causing connection failures. - Forgetting to set serializers, which leads to serialization errors.
- Assuming
send()is synchronous; it is asynchronous and returns aListenableFuture. - Not handling exceptions or checking send results, which can hide message delivery failures.
Always configure serializers and bootstrap servers correctly and consider adding callbacks to handle send results.
java
/* Wrong: Missing serializer config causes errors */ Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Missing serializer configs here /* Right: Include serializers */ configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
Quick Reference
KafkaTemplate Key Methods:
send(topic, message): Send message asynchronously.send(topic, key, message): Send message with a key.flush(): Force sending all buffered messages.executeInTransaction(action): Run send operations in a transaction.
Remember to configure ProducerFactory with correct bootstrap servers and serializers.
Key Takeaways
KafkaTemplate simplifies sending messages to Kafka topics in Spring applications.
Always configure bootstrap servers and serializers correctly in ProducerFactory.
The send() method is asynchronous and returns a future for result handling.
Handle send results or exceptions to ensure message delivery success.
Use send(topic, key, message) to send keyed messages for partitioning control.