Complete the code to create a Kafka producer that sends a message to an MSK topic.
from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='[1]') producer.send('my-topic', b'Hello MSK!') producer.flush()
The bootstrap_servers parameter must point to the MSK broker endpoint, such as b-1.mskcluster.example.com:9092.
Complete the code to consume messages from an MSK topic named 'orders'.
from kafka import KafkaConsumer consumer = KafkaConsumer('orders', bootstrap_servers='[1]', auto_offset_reset='earliest') for message in consumer: print(message.value.decode('utf-8'))
The bootstrap_servers must be the MSK broker endpoint, here b-2.mskcluster.example.com:9092, to consume messages.
Fix the error in the code to configure the Kafka producer with SSL for MSK.
producer = KafkaProducer(bootstrap_servers='b-1.mskcluster.example.com:9092', security_protocol='[1]', ssl_cafile='/path/to/ca.pem')
To enable SSL encryption, security_protocol must be set to SSL.
Fill both blanks to create a Kafka consumer that uses SASL authentication with MSK.
consumer = KafkaConsumer('logs', bootstrap_servers='b-3.mskcluster.example.com:9092', security_protocol='[1]', sasl_mechanism='[2]', sasl_plain_username='user', sasl_plain_password='pass')
MSK with SASL authentication typically uses SASL_SSL for security_protocol and PLAIN for sasl_mechanism.
Fill all three blanks to produce a dictionary comprehension that maps topic partitions to their latest offsets in MSK.
offsets = { [1]: [2] for [3] in consumer.assignment() }This comprehension creates a dictionary where each partition maps to its current offset using consumer.position(partition).