How to Fix Consumer Timeout in Kafka: Simple Steps
consumer timeout in Kafka, increase the max.poll.interval.ms or session.timeout.ms settings to allow more time for processing. Also, ensure your consumer polls frequently and processes messages quickly to avoid timeouts.Why This Happens
A consumer timeout in Kafka happens when the consumer takes too long to poll messages or process them. Kafka expects consumers to poll regularly within a set time (max.poll.interval.ms) and send heartbeats within session.timeout.ms. If these limits are exceeded, Kafka assumes the consumer is dead and triggers a timeout error.
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("max.poll.interval.ms", "300000"); // 5 minutes KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { // Simulate long processing Thread.sleep(400000); // 400 seconds System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }
The Fix
To fix the consumer timeout, increase max.poll.interval.ms to allow more time for processing messages. Also, avoid long blocking operations inside the poll loop or process messages asynchronously. This keeps the consumer responsive and prevents Kafka from marking it as dead.
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("max.poll.interval.ms", "600000"); // 10 minutes KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { // Process quickly or asynchronously System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }
Prevention
To prevent consumer timeouts in Kafka, always keep your consumer polling frequently and avoid long blocking operations inside the poll loop. Use asynchronous processing or batch messages if processing takes time. Also, tune max.poll.interval.ms and session.timeout.ms based on your workload to give enough time for processing without triggering timeouts.
- Monitor consumer lag and processing time.
- Use heartbeat intervals (
heartbeat.interval.ms) properly. - Handle exceptions to avoid consumer crashes.
Related Errors
Other errors related to consumer timeouts include:
- CommitFailedException: Happens when the consumer tries to commit offsets after a rebalance due to timeout.
- RebalanceInProgressException: Occurs if the consumer group is rebalancing because a consumer timed out.
- OffsetOutOfRangeException: Happens if offsets are invalid after a timeout and rebalance.
Fixes usually involve adjusting timeout settings and ensuring timely polling.