Complete the code to create a Kafka producer that sends a message.
from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='localhost:9092') producer.send('my_topic', [1])
The Kafka producer expects the message in bytes format, so prefixing the string with b converts it to bytes.
Complete the code to consume messages from a Kafka topic.
from kafka import KafkaConsumer consumer = KafkaConsumer('my_topic', bootstrap_servers='localhost:9092') for message in [1]: print(message.value)
consume() or read().The KafkaConsumer object itself is iterable, so you can loop directly over it to get messages.
Fix the error in the Kafka producer code to send a JSON message.
import json from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda v: [1]) producer.send('my_topic', {'key': 'value'})
json.loads which parses JSON instead of serializing.The value_serializer should convert the Python object to a JSON string and then encode it to bytes.
Fill both blanks to create a Kafka consumer that reads from the beginning and auto-commits offsets.
from kafka import KafkaConsumer consumer = KafkaConsumer('my_topic', bootstrap_servers='localhost:9092', auto_offset_reset=[1], enable_auto_commit=[2])
'latest' which reads only new messages.Setting auto_offset_reset to 'earliest' makes the consumer read from the beginning if no offset is found. enable_auto_commit=True enables automatic offset commits.
Fill all three blanks to create a Kafka producer with a custom serializer and send a message.
import json from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=[1]) message = {'event': 'login', 'user': 'alice'} producer.send('events', [2]) producer.flush() # [3]
The value_serializer converts the message to JSON bytes. The message is sent as is. flush() makes sure all messages are sent before the program ends.