Complete the code to produce a message to a Kafka topic.
producer.send([1], b'Hello Kafka')
The send method requires the topic name as a string. Using 'my_topic' correctly specifies the topic.
Complete the code to consume messages from a Kafka topic.
consumer = KafkaConsumer([1], bootstrap_servers='localhost:9092')
The KafkaConsumer accepts a list of topic names as positional argument. Using ['my_topic'] correctly specifies the topic list.
Fix the error in the code to commit offsets manually after processing messages.
for message in consumer: process(message) consumer.[1]()
commit_async() which commits asynchronously.The standard synchronous method to commit offsets manually is commit(). commit_async() is asynchronous.
Fill both blanks to create a Kafka Streams topology that filters messages with value length greater than 5.
builder = StreamsBuilder() stream = builder.stream([1]) filtered = stream.filter(lambda k, v: len(v) [2] 5)
The stream reads from 'input-topic'. The filter keeps messages where the value length is greater than 5 using >.
Fill all three blanks to build a dictionary comprehension that maps keys to values only if the value is positive.
result = { [1]: [2] for [3] in data.items() if [2] > 0 }The dictionary comprehension uses k as key and v as value for each k, v pair in data.items(), filtering values greater than zero.