How to Use Kafka Producer in Java: Simple Guide
To use a
KafkaProducer in Java, create a producer instance with configuration properties, then send messages to a Kafka topic using the send() method. Always close the producer after use to free resources.Syntax
The basic syntax to create and use a Kafka producer in Java involves setting properties, creating a KafkaProducer object, and sending messages with send().
- Properties: Configure bootstrap servers and serializers.
- KafkaProducer: Instantiate with properties.
- ProducerRecord: Create message with topic, key, and value.
- send(): Send the record asynchronously.
- close(): Close the producer to release resources.
java
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); ProducerRecord<String, String> record = new ProducerRecord<>("topic-name", "key", "value"); producer.send(record); producer.close();
Example
This example shows a complete Java program that sends a simple message to a Kafka topic named test-topic. It demonstrates setting up the producer, sending a message, and closing the producer.
java
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.Future; public class SimpleKafkaProducer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "myKey", "Hello Kafka"); try { Future<RecordMetadata> future = producer.send(record); RecordMetadata metadata = future.get(); System.out.println("Message sent to topic " + metadata.topic() + " partition " + metadata.partition() + " offset " + metadata.offset()); } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } } }
Output
Message sent to topic test-topic partition 0 offset 0
Common Pitfalls
Common mistakes when using Kafka producer in Java include:
- Not setting the correct
bootstrap.serversaddress, causing connection failures. - Forgetting to specify serializers for key and value, leading to serialization errors.
- Not closing the producer, which can cause resource leaks.
- Ignoring exceptions from
send()which may hide message delivery failures.
Always handle exceptions and close the producer properly.
java
/* Wrong: Missing serializers and not closing producer */ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); producer.send(new ProducerRecord<>("topic", "key", "value")); /* Right: Include serializers and close producer */ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producerCorrect = new KafkaProducer<>(props); try { producerCorrect.send(new ProducerRecord<>("topic", "key", "value")); } finally { producerCorrect.close(); }
Quick Reference
Remember these key points when using Kafka producer in Java:
- Set
bootstrap.serversto your Kafka broker address. - Use
StringSerializeror appropriate serializers for your data. - Create
ProducerRecordwith topic, key, and value. - Use
send()to send messages asynchronously. - Always close the producer with
close()to free resources.
Key Takeaways
Always configure bootstrap servers and serializers before creating KafkaProducer.
Use ProducerRecord to specify topic, key, and message value for sending.
Handle exceptions from send() to catch delivery issues.
Close the KafkaProducer after use to avoid resource leaks.
Test connectivity to Kafka brokers before sending messages.