Consider the following Python Kafka consumer code using confluent_kafka. What will be printed?
from confluent_kafka import Consumer conf = {'bootstrap.servers': 'localhost:9092', 'group.id': 'testgroup', 'auto.offset.reset': 'earliest'} consumer = Consumer(conf) consumer.subscribe(['mytopic']) msg = consumer.poll(timeout=1.0) if msg is None: print('No message received') elif msg.error(): print(f'Error: {msg.error()}') else: print(f'Received message: {msg.value().decode("utf-8")}') consumer.close()
If the topic has no messages or Kafka is not running, poll returns None.
The poll method waits for a message. If none arrives within 1 second, it returns None. So the code prints 'No message received'.
Given this code snippet, what will be printed if a message with value b'hello' is received?
from confluent_kafka import Consumer conf = {'bootstrap.servers': 'localhost:9092', 'group.id': 'grp1', 'auto.offset.reset': 'earliest'} consumer = Consumer(conf) consumer.subscribe(['topic1']) msg = consumer.poll(timeout=2.0) if msg and not msg.error(): print(f'Message: {msg.value().decode()}') else: print('No valid message') consumer.close()
Message values are bytes and need decoding to strings.
The message value is bytes b'hello'. Decoding it with decode() converts it to string 'hello'.
What error will this code raise when run?
from confluent_kafka import Consumer conf = {'bootstrap.servers': 'localhost:9092', 'group.id': 'grp2'} consumer = Consumer(conf) consumer.subscribe(['topic2']) msg = consumer.poll(timeout=1.0) print(msg.value().decode('utf-8')) consumer.close()
If poll returns None, calling value() causes an error.
If no message is received, poll returns None. Calling value() on None raises AttributeError.
What will this code print if the topic has three messages with values 'one', 'two', 'three'?
from confluent_kafka import Consumer conf = {'bootstrap.servers': 'localhost:9092', 'group.id': 'grp3', 'auto.offset.reset': 'earliest'} consumer = Consumer(conf) consumer.subscribe(['topic3']) count = 0 while count < 3: msg = consumer.poll(timeout=1.0) if msg and not msg.error(): print(msg.value().decode()) count += 1 consumer.close()
The consumer reads messages in order from the topic.
The consumer polls and prints each message in the order they were produced: 'one', 'two', 'three'.
Given this code, why does it never exit the loop?
from confluent_kafka import Consumer conf = {'bootstrap.servers': 'localhost:9092', 'group.id': 'grp4', 'auto.offset.reset': 'latest'} consumer = Consumer(conf) consumer.subscribe(['topic4']) while True: msg = consumer.poll(timeout=0.5) if msg is None: continue if msg.error(): print(f'Error: {msg.error()}') break print(msg.value().decode()) consumer.close()
Think about what happens if the consumer starts after messages were produced and 'auto.offset.reset' is 'latest'.
With 'auto.offset.reset' set to 'latest', the consumer waits for new messages after the current end. If no new messages arrive, poll returns None repeatedly, causing the loop to continue forever.