0
0
KafkaHow-ToBeginner · 4 min read

How to Use Retry in Spring Kafka for Reliable Message Processing

In Spring Kafka, you can enable retry by configuring a RetryTemplate or using the SeekToCurrentErrorHandler with retry capabilities. This allows your consumer to automatically retry processing failed messages a set number of times before sending them to a dead-letter topic or logging the failure.
📐

Syntax

To use retry in Spring Kafka, you typically configure a RetryTemplate and set it on the KafkaListenerContainerFactory. Alternatively, you can use SeekToCurrentErrorHandler with retry settings for error handling.

Key parts:

  • RetryTemplate: Defines retry policies like max attempts and backoff.
  • KafkaListenerContainerFactory: Factory to create Kafka listeners where retry is applied.
  • SeekToCurrentErrorHandler: Handles retries by seeking offsets to reprocess messages.
java
@Bean
public RetryTemplate retryTemplate() {
    RetryTemplate retryTemplate = new RetryTemplate();
    FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
    backOffPolicy.setBackOffPeriod(1000); // 1 second delay
    retryTemplate.setBackOffPolicy(backOffPolicy);

    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(3); // retry 3 times
    retryTemplate.setRetryPolicy(retryPolicy);

    return retryTemplate;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setRetryTemplate(retryTemplate());
    return factory;
}
💻

Example

This example shows a Spring Kafka consumer that retries message processing up to 3 times with a 1-second delay between attempts. If all retries fail, the message is logged as failed.

java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumer {

    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void listen(ConsumerRecord<String, String> record) {
        System.out.println("Received message: " + record.value());
        if (record.value().contains("fail")) {
            throw new RuntimeException("Simulated processing failure");
        }
        System.out.println("Processed message successfully");
    }

    @Bean
    public RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(1000); // 1 second
        retryTemplate.setBackOffPolicy(backOffPolicy);

        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3); // retry 3 times
        retryTemplate.setRetryPolicy(retryPolicy);

        return retryTemplate;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setRetryTemplate(retryTemplate());
        factory.setErrorHandler(new SeekToCurrentErrorHandler());
        return factory;
    }
}
Output
Received message: hello Processed message successfully Received message: fail message Exception in thread "main" java.lang.RuntimeException: Simulated processing failure ... (retries 3 times with 1 second delay) Message processing failed after retries
⚠️

Common Pitfalls

  • Not setting the RetryTemplate on the listener container factory causes retries to be ignored.
  • Using SeekToCurrentErrorHandler without configuring max failures can cause infinite retry loops.
  • Not handling exceptions properly inside the listener can cause the consumer to stop.
  • Forgetting to configure a dead-letter topic to handle messages after retries fail leads to message loss or blocking.
java
/* Wrong: No retry template set */
factory.setConsumerFactory(consumerFactory());
// Missing: factory.setRetryTemplate(retryTemplate());

/* Right: Retry template set */
factory.setRetryTemplate(retryTemplate());
📊

Quick Reference

ConceptDescriptionDefault Value / Notes
RetryTemplateDefines retry rules like max attempts and backoff delayMust be set explicitly
FixedBackOffPolicySets fixed delay between retriesDefault 0 ms
SimpleRetryPolicySets max retry attemptsDefault 3 attempts recommended
SeekToCurrentErrorHandlerError handler that seeks offsets for retryUse with caution to avoid infinite loops
Dead Letter TopicTopic to send messages after retries failConfigure for production use

Key Takeaways

Configure a RetryTemplate with max attempts and backoff to enable retries in Spring Kafka.
Set the RetryTemplate on the KafkaListenerContainerFactory to activate retry logic.
Use SeekToCurrentErrorHandler to control retry behavior and offset seeking.
Always configure a dead-letter topic to handle messages that fail after retries.
Test retry behavior to avoid infinite loops and ensure message processing reliability.