0
0
Kafkadevops~20 mins

Python consumer in Kafka - Practice Problems & Coding Challenges

Choose your learning style9 modes available
Challenge - 5 Problems
🎖️
Kafka Python Consumer Master
Get all challenges correct to earn this badge!
Test your skills under time pressure!
Predict Output
intermediate
2:00remaining
What is the output of this Kafka consumer code snippet?

Consider the following Python Kafka consumer code using confluent_kafka. What will be printed?

Kafka
from confluent_kafka import Consumer

conf = {'bootstrap.servers': 'localhost:9092', 'group.id': 'testgroup', 'auto.offset.reset': 'earliest'}
consumer = Consumer(conf)
consumer.subscribe(['mytopic'])

msg = consumer.poll(timeout=1.0)
if msg is None:
    print('No message received')
elif msg.error():
    print(f'Error: {msg.error()}')
else:
    print(f'Received message: {msg.value().decode("utf-8")}')
consumer.close()
AError: KafkaError{code=_UNKNOWN_TOPIC_OR_PART}
BReceived message: <message content>
CNo message received
DRaises a TypeError
Attempts:
2 left
💡 Hint

If the topic has no messages or Kafka is not running, poll returns None.

Predict Output
intermediate
2:00remaining
What does this Kafka consumer print when a message with value 'hello' is received?

Given this code snippet, what will be printed if a message with value b'hello' is received?

Kafka
from confluent_kafka import Consumer

conf = {'bootstrap.servers': 'localhost:9092', 'group.id': 'grp1', 'auto.offset.reset': 'earliest'}
consumer = Consumer(conf)
consumer.subscribe(['topic1'])

msg = consumer.poll(timeout=2.0)
if msg and not msg.error():
    print(f'Message: {msg.value().decode()}')
else:
    print('No valid message')
consumer.close()
AMessage: b'hello'
BRaises a UnicodeDecodeError
CNo valid message
DMessage: hello
Attempts:
2 left
💡 Hint

Message values are bytes and need decoding to strings.

🧠 Conceptual
advanced
2:00remaining
Which error will this Kafka consumer code raise?

What error will this code raise when run?

Kafka
from confluent_kafka import Consumer

conf = {'bootstrap.servers': 'localhost:9092', 'group.id': 'grp2'}
consumer = Consumer(conf)
consumer.subscribe(['topic2'])

msg = consumer.poll(timeout=1.0)
print(msg.value().decode('utf-8'))
consumer.close()
ANo error, prints message value
BAttributeError: 'NoneType' object has no attribute 'value'
CUnicodeDecodeError
DKafkaException
Attempts:
2 left
💡 Hint

If poll returns None, calling value() causes an error.

Predict Output
advanced
2:00remaining
What is the output of this Kafka consumer loop?

What will this code print if the topic has three messages with values 'one', 'two', 'three'?

Kafka
from confluent_kafka import Consumer

conf = {'bootstrap.servers': 'localhost:9092', 'group.id': 'grp3', 'auto.offset.reset': 'earliest'}
consumer = Consumer(conf)
consumer.subscribe(['topic3'])

count = 0
while count < 3:
    msg = consumer.poll(timeout=1.0)
    if msg and not msg.error():
        print(msg.value().decode())
        count += 1
consumer.close()
Aone\ntwo\nthree
Bthree\ntwo\none
CNo output
DRaises a RuntimeError
Attempts:
2 left
💡 Hint

The consumer reads messages in order from the topic.

🔧 Debug
expert
3:00remaining
Why does this Kafka consumer code hang indefinitely?

Given this code, why does it never exit the loop?

Kafka
from confluent_kafka import Consumer

conf = {'bootstrap.servers': 'localhost:9092', 'group.id': 'grp4', 'auto.offset.reset': 'latest'}
consumer = Consumer(conf)
consumer.subscribe(['topic4'])

while True:
    msg = consumer.poll(timeout=0.5)
    if msg is None:
        continue
    if msg.error():
        print(f'Error: {msg.error()}')
        break
    print(msg.value().decode())
consumer.close()
ABecause 'auto.offset.reset' is 'latest' and no new messages arrive, poll returns None forever
BBecause the loop condition is always True without a break on valid messages
CBecause consumer.close() is never called inside the loop
DBecause poll timeout is too short causing infinite retries
Attempts:
2 left
💡 Hint

Think about what happens if the consumer starts after messages were produced and 'auto.offset.reset' is 'latest'.