How to Consume from Specific Offset in Kafka: Simple Guide
To consume from a specific offset in Kafka, use the
assign() method to specify the partition, then call seek() to set the exact offset before polling messages. This lets you start reading from any position in the topic partition.Syntax
Use the Kafka consumer methods assign() and seek() to consume from a specific offset.
assign(Collection<TopicPartition> partitions): Assigns the consumer to specific partitions.seek(TopicPartition partition, long offset): Sets the offset to start consuming from in the assigned partition.poll(Duration timeout): Fetches records starting from the set offset.
java
consumer.assign(Collections.singletonList(new TopicPartition("my-topic", 0))); consumer.seek(new TopicPartition("my-topic", 0), 15L); ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
Example
This example shows how to consume messages from offset 10 of partition 0 in topic my-topic using the Java Kafka consumer.
java
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class SpecificOffsetConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); TopicPartition partition0 = new TopicPartition("my-topic", 0); consumer.assign(Collections.singletonList(partition0)); // Start consuming from offset 10 consumer.seek(partition0, 10L); ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); records.forEach(record -> { System.out.printf("Offset = %d, Key = %s, Value = %s\n", record.offset(), record.key(), record.value()); }); consumer.close(); } }
Output
Offset = 10, Key = key10, Value = message10
Offset = 11, Key = key11, Value = message11
... (messages from offset 10 onwards)
Common Pitfalls
- Not calling
assign()beforeseek()causes errors because the consumer must know which partitions to read. - Using
subscribe()withseek()does not work;seek()only works with assigned partitions. - Setting an offset that does not exist (too high or negative) will cause no messages to be returned.
- Forgetting to poll after seeking means no messages are fetched.
java
/* Wrong way: Using subscribe and seek together */ consumer.subscribe(Collections.singletonList("my-topic")); consumer.seek(new TopicPartition("my-topic", 0), 5L); // This will throw IllegalStateException /* Right way: Use assign before seek */ consumer.assign(Collections.singletonList(new TopicPartition("my-topic", 0))); consumer.seek(new TopicPartition("my-topic", 0), 5L);
Quick Reference
Remember these key points when consuming from a specific offset:
- Use
assign()to specify partitions manually. - Call
seek()to set the exact offset. - Then call
poll()to fetch messages. - Do not mix
subscribe()withseek(). - Offsets start at 0 and must be valid for the partition.
Key Takeaways
Use consumer.assign() before consumer.seek() to consume from a specific offset.
Do not use subscribe() when you want to seek to a specific offset.
Ensure the offset you seek to exists in the partition to get messages.
Always call poll() after seek() to fetch records starting from the offset.
Offsets are zero-based and specific to each partition.