0
0
KafkaHow-ToBeginner · 4 min read

How to Consume from Specific Offset in Kafka: Simple Guide

To consume from a specific offset in Kafka, use the assign() method to specify the partition, then call seek() to set the exact offset before polling messages. This lets you start reading from any position in the topic partition.
📐

Syntax

Use the Kafka consumer methods assign() and seek() to consume from a specific offset.

  • assign(Collection<TopicPartition> partitions): Assigns the consumer to specific partitions.
  • seek(TopicPartition partition, long offset): Sets the offset to start consuming from in the assigned partition.
  • poll(Duration timeout): Fetches records starting from the set offset.
java
consumer.assign(Collections.singletonList(new TopicPartition("my-topic", 0)));
consumer.seek(new TopicPartition("my-topic", 0), 15L);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
💻

Example

This example shows how to consume messages from offset 10 of partition 0 in topic my-topic using the Java Kafka consumer.

java
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SpecificOffsetConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        TopicPartition partition0 = new TopicPartition("my-topic", 0);
        consumer.assign(Collections.singletonList(partition0));

        // Start consuming from offset 10
        consumer.seek(partition0, 10L);

        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

        records.forEach(record -> {
            System.out.printf("Offset = %d, Key = %s, Value = %s\n", record.offset(), record.key(), record.value());
        });

        consumer.close();
    }
}
Output
Offset = 10, Key = key10, Value = message10 Offset = 11, Key = key11, Value = message11 ... (messages from offset 10 onwards)
⚠️

Common Pitfalls

  • Not calling assign() before seek() causes errors because the consumer must know which partitions to read.
  • Using subscribe() with seek() does not work; seek() only works with assigned partitions.
  • Setting an offset that does not exist (too high or negative) will cause no messages to be returned.
  • Forgetting to poll after seeking means no messages are fetched.
java
/* Wrong way: Using subscribe and seek together */
consumer.subscribe(Collections.singletonList("my-topic"));
consumer.seek(new TopicPartition("my-topic", 0), 5L); // This will throw IllegalStateException

/* Right way: Use assign before seek */
consumer.assign(Collections.singletonList(new TopicPartition("my-topic", 0)));
consumer.seek(new TopicPartition("my-topic", 0), 5L);
📊

Quick Reference

Remember these key points when consuming from a specific offset:

  • Use assign() to specify partitions manually.
  • Call seek() to set the exact offset.
  • Then call poll() to fetch messages.
  • Do not mix subscribe() with seek().
  • Offsets start at 0 and must be valid for the partition.

Key Takeaways

Use consumer.assign() before consumer.seek() to consume from a specific offset.
Do not use subscribe() when you want to seek to a specific offset.
Ensure the offset you seek to exists in the partition to get messages.
Always call poll() after seek() to fetch records starting from the offset.
Offsets are zero-based and specific to each partition.