0
0
KafkaHow-ToBeginner · 4 min read

How to Use Kafka Consumer in Python: Simple Guide

Use the KafkaConsumer class from the kafka-python library to consume messages from Kafka topics in Python. Initialize it with the Kafka server address and topic name, then iterate over messages to process them.
📐

Syntax

The basic syntax to create a Kafka consumer in Python involves importing KafkaConsumer, specifying the topic, and connecting to the Kafka server. You then read messages in a loop.

  • KafkaConsumer(topic, bootstrap_servers): Connects to Kafka and subscribes to the topic.
  • for message in consumer: Loops over incoming messages.
  • message.value: Contains the message content.
python
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'my_topic',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='my-group',
    value_deserializer=lambda x: x.decode('utf-8')
)

for message in consumer:
    print(f"Received message: {message.value}")
💻

Example

This example shows how to consume messages from a Kafka topic named test_topic. It connects to a Kafka server running on localhost:9092, reads messages from the beginning, and prints each message's content.

python
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'test_topic',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='example-group',
    value_deserializer=lambda x: x.decode('utf-8')
)

print("Starting consumer...")
for message in consumer:
    print(f"Received: {message.value}")
Output
Starting consumer... Received: Hello Kafka Received: Another message Received: Last message
⚠️

Common Pitfalls

Common mistakes when using Kafka consumer in Python include:

  • Not setting auto_offset_reset properly, causing the consumer to miss messages if no committed offset exists.
  • Forgetting to decode message bytes, resulting in unreadable output.
  • Not specifying a group_id, which disables consumer group management and offset commits.
  • Not handling exceptions, which can cause the consumer to stop unexpectedly.

Always configure these options to ensure smooth consumption.

python
from kafka import KafkaConsumer

# Wrong: No group_id and no deserializer
consumer_wrong = KafkaConsumer('topic', bootstrap_servers=['localhost:9092'])

# Right: Proper group_id and deserializer
consumer_right = KafkaConsumer(
    'topic',
    bootstrap_servers=['localhost:9092'],
    group_id='my-group',
    value_deserializer=lambda x: x.decode('utf-8'),
    auto_offset_reset='earliest'
)
📊

Quick Reference

Here is a quick summary of key KafkaConsumer parameters:

ParameterDescriptionExample Value
bootstrap_serversKafka server address['localhost:9092']
group_idConsumer group identifier'my-group'
auto_offset_resetWhere to start if no offset found'earliest' or 'latest'
enable_auto_commitAutomatically commit offsetsTrue or False
value_deserializerFunction to decode message byteslambda x: x.decode('utf-8')

Key Takeaways

Use KafkaConsumer from kafka-python to read messages from Kafka topics.
Always set group_id and value_deserializer for proper message handling.
Set auto_offset_reset to 'earliest' to read from the beginning if no offset exists.
Handle exceptions to keep the consumer running smoothly.
Test your consumer with a local Kafka server before production use.