Consider a Kafka consumer configured to process messages from a topic. If a message processing fails, it is sent to a dead letter queue (DLQ). What will be the output after processing the following messages?
from kafka import KafkaConsumer, KafkaProducer consumer = KafkaConsumer('main_topic', bootstrap_servers='localhost:9092') producer = KafkaProducer(bootstrap_servers='localhost:9092') def process_message(msg): if msg == b'bad_message': raise ValueError('Processing failed') return msg.decode('utf-8').upper() for message in consumer: try: result = process_message(message.value) print(f'Processed: {result}') except Exception as e: producer.send('dead_letter_queue', message.value) print(f'Message sent to DLQ: {message.value.decode("utf-8")}') break # For this example, process only one message
Think about what happens when process_message raises an exception.
The code tries to process the message. If the message is b'bad_message', it raises an error and sends the message to the dead letter queue, printing the DLQ message.
Choose the best description of why a dead letter queue (DLQ) is used in Kafka message processing.
Think about what happens to messages that cause errors during processing.
A dead letter queue is used to capture messages that fail processing so they can be inspected or retried later without blocking the main processing flow.
Examine the following Kafka consumer code snippet. It is supposed to send failed messages to a dead letter queue, but it never does. What is the cause?
from kafka import KafkaConsumer, KafkaProducer consumer = KafkaConsumer('main_topic', bootstrap_servers='localhost:9092') producer = KafkaProducer(bootstrap_servers='localhost:9092') def process_message(msg): if msg == b'error': raise Exception('Error') return msg.decode('utf-8') for message in consumer: try: print(process_message(message.value)) except Exception: producer.send('dead_letter_queue', message.value) producer.flush()
Consider how KafkaProducer sends messages asynchronously and when they are actually sent.
KafkaProducer.send() is asynchronous. Without calling flush() after send(), messages may stay buffered and not be sent immediately. Since flush() is called only once after the loop, if the loop never ends or exceptions occur, messages may not be sent.
Choose the code snippet that correctly sets up a Kafka consumer that sends failed messages to a dead letter queue.
Look for correct exception handling syntax and proper flush usage inside the loop.
Option D correctly catches exceptions with except Exception: and calls producer.flush() inside the except block to ensure messages are sent immediately to the DLQ.
A Kafka consumer processes a batch of 5 messages: ['ok', 'fail', 'ok', 'fail', 'fail']. The process_message function raises an exception for 'fail' messages, which are sent to the dead letter queue. How many messages will the dead letter queue contain after processing all 5?
messages = [b'ok', b'fail', b'ok', b'fail', b'fail'] failed_messages = [] def process_message(msg): if msg == b'fail': raise Exception('Failed') return msg.decode('utf-8') for msg in messages: try: process_message(msg) except Exception: failed_messages.append(msg) print(len(failed_messages))
Count how many messages cause exceptions.
There are 3 'fail' messages in the list. Each causes an exception and is added to the dead letter queue list, so the count is 3.