0
0
Kafkadevops~5 mins

Python consumer in Kafka - Commands & Configuration

Choose your learning style9 modes available
Introduction
When you want to read messages from a Kafka topic using Python, you use a Python consumer. It connects to Kafka, listens for new messages, and processes them one by one. This helps your app react to data or events in real time.
When you want to process user activity logs sent to Kafka in real time.
When you need to read sensor data from Kafka and store it in a database.
When your app must react immediately to new orders placed and sent to Kafka.
When you want to build a dashboard that updates live with Kafka messages.
When you want to test consuming messages from a Kafka topic during development.
Commands
This command installs the Confluent Kafka Python client library needed to create a Kafka consumer.
Terminal
pip install confluent-kafka
Expected OutputExpected
Collecting confluent-kafka Downloading confluent_kafka-2.1.1-cp39-cp39-manylinux2014_x86_64.whl (1.4 MB) Installing collected packages: confluent-kafka Successfully installed confluent-kafka-2.1.1
Runs the Python script that connects to Kafka, subscribes to the topic 'example-topic', and prints messages as they arrive.
Terminal
python kafka_consumer.py
Expected OutputExpected
Received message: Hello Kafka! Received message: Another message Received message: Last message
Key Concept

If you remember nothing else from this pattern, remember: the Python consumer connects to Kafka, subscribes to topics, and continuously polls for new messages to process.

Code Example
Kafka
from confluent_kafka import Consumer, KafkaException

config = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest'
}

consumer = Consumer(config)
consumer.subscribe(['example-topic'])

try:
    while True:
        msg = consumer.poll(1.0)  # Wait up to 1 second for a message
        if msg is None:
            continue
        if msg.error():
            raise KafkaException(msg.error())
        print(f"Received message: {msg.value().decode('utf-8')}")
        consumer.commit(asynchronous=False)
except KeyboardInterrupt:
    pass
finally:
    consumer.close()
OutputSuccess
Common Mistakes
Not installing the confluent-kafka library before running the consumer script.
The script will fail with ModuleNotFoundError because the Kafka client library is missing.
Run 'pip install confluent-kafka' before running the consumer script.
Not calling consumer.poll() or consumer.consume() in a loop to fetch messages.
Without polling, the consumer will not receive any messages and appear stuck.
Use a loop that calls consumer.poll() or consumer.consume() repeatedly to get messages.
Not committing offsets after processing messages.
Kafka will resend the same messages on restart, causing duplicate processing.
Call consumer.commit() after processing messages to save the read position.
Summary
Install the Confluent Kafka Python client with pip before running your consumer.
Write a Python script that creates a Consumer, subscribes to a topic, and polls for messages in a loop.
Process each message and commit offsets to avoid duplicate processing on restart.