How to Implement Exactly Once Kafka: Guide and Example
To implement
exactly once semantics in Kafka, enable idempotent producers and use transactions to group produce and consume operations atomically. This ensures messages are neither lost nor duplicated during processing.Syntax
To enable exactly once semantics in Kafka, configure the producer with enable.idempotence=true and use the initTransactions(), beginTransaction(), commitTransaction(), and abortTransaction() methods to manage transactions.
Key parts:
enable.idempotence=true: Ensures the producer retries safely without duplicates.transactional.id: Unique ID for the producer to track transactions.initTransactions(): Initializes transactional state.beginTransaction(): Starts a new transaction.commitTransaction(): Commits the transaction atomically.abortTransaction(): Aborts the transaction on failure.
java
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("enable.idempotence", "true"); props.put("transactional.id", "my-transactional-id"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); producer.initTransactions(); try { producer.beginTransaction(); producer.send(new ProducerRecord<>("my-topic", "key1", "value1")); // other sends or processing producer.commitTransaction(); } catch (Exception e) { producer.abortTransaction(); }
Example
This example shows a Kafka producer configured for exactly once delivery. It initializes transactions, sends a message within a transaction, and commits it. If an error occurs, it aborts the transaction to avoid duplicates.
java
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class ExactlyOnceProducer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("enable.idempotence", "true"); props.put("transactional.id", "txn-producer-1"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); producer.initTransactions(); try { producer.beginTransaction(); producer.send(new ProducerRecord<>("my-topic", "key", "value")); producer.commitTransaction(); System.out.println("Message sent exactly once."); } catch (Exception e) { producer.abortTransaction(); System.out.println("Transaction aborted due to error: " + e.getMessage()); } finally { producer.close(); } } }
Output
Message sent exactly once.
Common Pitfalls
Common mistakes when implementing exactly once Kafka include:
- Not setting
enable.idempotence=true, which can cause duplicate messages on retries. - Omitting
transactional.id, so transactions cannot be tracked properly. - Failing to call
initTransactions()before starting transactions. - Not handling exceptions to abort transactions, risking partial commits.
- Using consumers without
isolation.level=read_committedisolation level, which can read uncommitted data.
Example of wrong vs right configuration:
java
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // Wrong: missing idempotence and transactional.id // props.put("enable.idempotence", "true"); // props.put("transactional.id", "txn-1"); // Right: props.put("enable.idempotence", "true"); props.put("transactional.id", "txn-1");
Quick Reference
Summary tips for exactly once Kafka:
- Always enable
enable.idempotence=trueon producers. - Set a unique
transactional.idper producer instance. - Call
initTransactions()before sending messages. - Use
beginTransaction(),commitTransaction(), andabortTransaction()to manage atomic writes. - Configure consumers with
isolation.level=read_committedto avoid reading uncommitted data.
Key Takeaways
Enable idempotent producers with enable.idempotence=true for safe retries.
Use transactions with a unique transactional.id to group produce operations atomically.
Always initialize transactions with initTransactions() before producing messages.
Handle exceptions to abort transactions and avoid partial commits.
Set consumer isolation.level to read_committed to read only committed data.