0
0
KafkaHow-ToIntermediate · 4 min read

How to Implement Exactly Once Kafka: Guide and Example

To implement exactly once semantics in Kafka, enable idempotent producers and use transactions to group produce and consume operations atomically. This ensures messages are neither lost nor duplicated during processing.
📐

Syntax

To enable exactly once semantics in Kafka, configure the producer with enable.idempotence=true and use the initTransactions(), beginTransaction(), commitTransaction(), and abortTransaction() methods to manage transactions.

Key parts:

  • enable.idempotence=true: Ensures the producer retries safely without duplicates.
  • transactional.id: Unique ID for the producer to track transactions.
  • initTransactions(): Initializes transactional state.
  • beginTransaction(): Starts a new transaction.
  • commitTransaction(): Commits the transaction atomically.
  • abortTransaction(): Aborts the transaction on failure.
java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("enable.idempotence", "true");
props.put("transactional.id", "my-transactional-id");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

producer.initTransactions();

try {
    producer.beginTransaction();
    producer.send(new ProducerRecord<>("my-topic", "key1", "value1"));
    // other sends or processing
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
}
💻

Example

This example shows a Kafka producer configured for exactly once delivery. It initializes transactions, sends a message within a transaction, and commits it. If an error occurs, it aborts the transaction to avoid duplicates.

java
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class ExactlyOnceProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("enable.idempotence", "true");
        props.put("transactional.id", "txn-producer-1");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        producer.initTransactions();

        try {
            producer.beginTransaction();
            producer.send(new ProducerRecord<>("my-topic", "key", "value"));
            producer.commitTransaction();
            System.out.println("Message sent exactly once.");
        } catch (Exception e) {
            producer.abortTransaction();
            System.out.println("Transaction aborted due to error: " + e.getMessage());
        } finally {
            producer.close();
        }
    }
}
Output
Message sent exactly once.
⚠️

Common Pitfalls

Common mistakes when implementing exactly once Kafka include:

  • Not setting enable.idempotence=true, which can cause duplicate messages on retries.
  • Omitting transactional.id, so transactions cannot be tracked properly.
  • Failing to call initTransactions() before starting transactions.
  • Not handling exceptions to abort transactions, risking partial commits.
  • Using consumers without isolation.level=read_committed isolation level, which can read uncommitted data.

Example of wrong vs right configuration:

java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Wrong: missing idempotence and transactional.id
// props.put("enable.idempotence", "true");
// props.put("transactional.id", "txn-1");

// Right:
props.put("enable.idempotence", "true");
props.put("transactional.id", "txn-1");
📊

Quick Reference

Summary tips for exactly once Kafka:

  • Always enable enable.idempotence=true on producers.
  • Set a unique transactional.id per producer instance.
  • Call initTransactions() before sending messages.
  • Use beginTransaction(), commitTransaction(), and abortTransaction() to manage atomic writes.
  • Configure consumers with isolation.level=read_committed to avoid reading uncommitted data.

Key Takeaways

Enable idempotent producers with enable.idempotence=true for safe retries.
Use transactions with a unique transactional.id to group produce operations atomically.
Always initialize transactions with initTransactions() before producing messages.
Handle exceptions to abort transactions and avoid partial commits.
Set consumer isolation.level to read_committed to read only committed data.