0
0
KafkaDebug / FixIntermediate · 4 min read

How to Fix Consumer Timeout in Kafka: Simple Steps

To fix a consumer timeout in Kafka, increase the max.poll.interval.ms or session.timeout.ms settings to allow more time for processing. Also, ensure your consumer polls frequently and processes messages quickly to avoid timeouts.
🔍

Why This Happens

A consumer timeout in Kafka happens when the consumer takes too long to poll messages or process them. Kafka expects consumers to poll regularly within a set time (max.poll.interval.ms) and send heartbeats within session.timeout.ms. If these limits are exceeded, Kafka assumes the consumer is dead and triggers a timeout error.

java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("max.poll.interval.ms", "300000"); // 5 minutes

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        // Simulate long processing
        Thread.sleep(400000); // 400 seconds
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}
Output
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member
🔧

The Fix

To fix the consumer timeout, increase max.poll.interval.ms to allow more time for processing messages. Also, avoid long blocking operations inside the poll loop or process messages asynchronously. This keeps the consumer responsive and prevents Kafka from marking it as dead.

java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("max.poll.interval.ms", "600000"); // 10 minutes

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        // Process quickly or asynchronously
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}
Output
offset = 15, key = key1, value = value1 offset = 16, key = key2, value = value2 ...
🛡️

Prevention

To prevent consumer timeouts in Kafka, always keep your consumer polling frequently and avoid long blocking operations inside the poll loop. Use asynchronous processing or batch messages if processing takes time. Also, tune max.poll.interval.ms and session.timeout.ms based on your workload to give enough time for processing without triggering timeouts.

  • Monitor consumer lag and processing time.
  • Use heartbeat intervals (heartbeat.interval.ms) properly.
  • Handle exceptions to avoid consumer crashes.
⚠️

Related Errors

Other errors related to consumer timeouts include:

  • CommitFailedException: Happens when the consumer tries to commit offsets after a rebalance due to timeout.
  • RebalanceInProgressException: Occurs if the consumer group is rebalancing because a consumer timed out.
  • OffsetOutOfRangeException: Happens if offsets are invalid after a timeout and rebalance.

Fixes usually involve adjusting timeout settings and ensuring timely polling.

Key Takeaways

Increase max.poll.interval.ms to allow longer processing time before timeout.
Avoid long blocking operations inside the consumer poll loop.
Poll frequently to keep the consumer session alive and send heartbeats.
Use asynchronous processing to handle heavy workloads without blocking.
Monitor consumer lag and tune timeout settings based on workload.