0
0
KafkaHow-ToIntermediate · 4 min read

How to Implement Retry Mechanism in Kafka Consumers

To implement a retry mechanism in Kafka, configure the consumer to handle failures by using retry.backoff.ms and max.poll.records settings, or implement manual retries in your consumer code by catching exceptions and reprocessing messages. Another common approach is to use a separate retry topic where failed messages are sent and consumed again after a delay.
📐

Syntax

The retry mechanism in Kafka can be configured using consumer properties or implemented in code. Key configuration properties include:

  • retry.backoff.ms: Time to wait before retrying a failed fetch.
  • max.poll.records: Number of records returned in a single poll, controlling batch size for retries.
  • enable.auto.commit: Should be false to manually commit offsets after successful processing.

In code, retries are done by catching exceptions during message processing and either retrying immediately or sending the message to a retry topic.

java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "retry-group");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("retry.backoff.ms", 1000);
props.put("max.poll.records", 10);
💻

Example

This example shows a Kafka consumer in Java that implements a retry mechanism by catching exceptions during message processing and retrying up to 3 times before sending the message to a retry topic.

java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaRetryConsumer {
    private static final String TOPIC = "main-topic";
    private static final String RETRY_TOPIC = "retry-topic";

    public static void main(String[] args) {
        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers", "localhost:9092");
        consumerProps.put("group.id", "retry-group");
        consumerProps.put("enable.auto.commit", "false");
        consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", "localhost:9092");
        producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
             KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {

            consumer.subscribe(Collections.singletonList(TOPIC));

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    boolean success = false;
                    int attempts = 0;
                    while (!success && attempts < 3) {
                        try {
                            // Process message
                            System.out.println("Processing message: " + record.value());
                            // Simulate processing logic
                            if (record.value().contains("fail")) {
                                throw new RuntimeException("Processing failed");
                            }
                            success = true;
                        } catch (Exception e) {
                            attempts++;
                            System.out.println("Attempt " + attempts + " failed for message: " + record.value());
                            if (attempts == 3) {
                                // Send to retry topic
                                producer.send(new ProducerRecord<>(RETRY_TOPIC, record.key(), record.value()));
                                System.out.println("Sent message to retry topic: " + record.value());
                            }
                        }
                    }
                }
                consumer.commitSync();
            }
        }
    }
}
Output
Processing message: hello Processing message: fail message Attempt 1 failed for message: fail message Attempt 2 failed for message: fail message Attempt 3 failed for message: fail message Sent message to retry topic: fail message
⚠️

Common Pitfalls

Common mistakes when implementing Kafka retry mechanisms include:

  • Using enable.auto.commit=true which commits offsets before processing, causing message loss on failure.
  • Not limiting retry attempts, leading to infinite loops and resource exhaustion.
  • Not using a separate retry topic, which can block the main consumer and delay processing.
  • Ignoring backoff delays between retries, causing high load on the system.

Always manually commit offsets after successful processing and consider exponential backoff or delayed retries using separate topics.

java
/* Wrong approach: auto commit enabled, no retry limit */
props.put("enable.auto.commit", "true");

/* Right approach: manual commit and retry limit in code */
props.put("enable.auto.commit", "false");
// Retry logic implemented in consumer code with max attempts
📊

Quick Reference

Summary tips for Kafka retry mechanism:

  • Set enable.auto.commit=false to control offset commits.
  • Use retry.backoff.ms to add delay between retries.
  • Implement retry logic in consumer code with a max retry count.
  • Use a separate retry topic for delayed retries and dead-letter handling.
  • Commit offsets only after successful processing to avoid message loss.

Key Takeaways

Always disable auto commit and manually commit offsets after successful message processing.
Implement retry logic with a maximum number of attempts to avoid infinite retries.
Use a separate retry topic to handle delayed retries without blocking the main consumer.
Configure retry backoff to prevent overwhelming the system with immediate retries.
Monitor and handle dead-letter messages for messages that fail all retry attempts.