How to Use Kafka Consumer in Python: Simple Guide
Use the
KafkaConsumer class from the kafka-python library to consume messages from Kafka topics in Python. Initialize it with the Kafka server address and topic name, then iterate over messages to process them.Syntax
The basic syntax to create a Kafka consumer in Python involves importing KafkaConsumer, specifying the topic, and connecting to the Kafka server. You then read messages in a loop.
- KafkaConsumer(topic, bootstrap_servers): Connects to Kafka and subscribes to the topic.
- for message in consumer: Loops over incoming messages.
- message.value: Contains the message content.
python
from kafka import KafkaConsumer consumer = KafkaConsumer( 'my_topic', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', enable_auto_commit=True, group_id='my-group', value_deserializer=lambda x: x.decode('utf-8') ) for message in consumer: print(f"Received message: {message.value}")
Example
This example shows how to consume messages from a Kafka topic named test_topic. It connects to a Kafka server running on localhost:9092, reads messages from the beginning, and prints each message's content.
python
from kafka import KafkaConsumer consumer = KafkaConsumer( 'test_topic', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', enable_auto_commit=True, group_id='example-group', value_deserializer=lambda x: x.decode('utf-8') ) print("Starting consumer...") for message in consumer: print(f"Received: {message.value}")
Output
Starting consumer...
Received: Hello Kafka
Received: Another message
Received: Last message
Common Pitfalls
Common mistakes when using Kafka consumer in Python include:
- Not setting
auto_offset_resetproperly, causing the consumer to miss messages if no committed offset exists. - Forgetting to decode message bytes, resulting in unreadable output.
- Not specifying a
group_id, which disables consumer group management and offset commits. - Not handling exceptions, which can cause the consumer to stop unexpectedly.
Always configure these options to ensure smooth consumption.
python
from kafka import KafkaConsumer # Wrong: No group_id and no deserializer consumer_wrong = KafkaConsumer('topic', bootstrap_servers=['localhost:9092']) # Right: Proper group_id and deserializer consumer_right = KafkaConsumer( 'topic', bootstrap_servers=['localhost:9092'], group_id='my-group', value_deserializer=lambda x: x.decode('utf-8'), auto_offset_reset='earliest' )
Quick Reference
Here is a quick summary of key KafkaConsumer parameters:
| Parameter | Description | Example Value |
|---|---|---|
| bootstrap_servers | Kafka server address | ['localhost:9092'] |
| group_id | Consumer group identifier | 'my-group' |
| auto_offset_reset | Where to start if no offset found | 'earliest' or 'latest' |
| enable_auto_commit | Automatically commit offsets | True or False |
| value_deserializer | Function to decode message bytes | lambda x: x.decode('utf-8') |
Key Takeaways
Use KafkaConsumer from kafka-python to read messages from Kafka topics.
Always set group_id and value_deserializer for proper message handling.
Set auto_offset_reset to 'earliest' to read from the beginning if no offset exists.
Handle exceptions to keep the consumer running smoothly.
Test your consumer with a local Kafka server before production use.