Complete the code to specify the Kafka topic to consume data from.
kafkaParams = {"metadata.broker.list": "localhost:9092"}
topic = "[1]"
rdd = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams)The Kafka topic name must be set correctly to consume data. Here, "kafka_topic" is the correct topic name used in the example.
Complete the code to set the Kafka broker list for Hadoop integration.
kafkaParams = {"metadata.broker.list": "[1]"}
rdd = KafkaUtils.createDirectStream(ssc, ["kafka_topic"], kafkaParams)The Kafka broker list specifies the host and port where Kafka brokers listen. The default Kafka port is 9092.
Fix the error in the code to correctly create a direct Kafka stream in Spark Streaming.
from pyspark.streaming.kafka import KafkaUtils rdd = KafkaUtils.createDirectStream(ssc, ["kafka_topic"], [1])
The parameter must be the dictionary variable 'kafkaParams' without parentheses or brackets.
Fill both blanks to create a dictionary comprehension that filters Kafka messages with value length greater than 5.
filtered_messages = {msg.key: msg.value for msg in messages if len(msg.[1]) [2] 5}We check the length of the message value and filter those with length greater than 5.
Fill all three blanks to create a dictionary comprehension that maps uppercase keys to values where value length is less than 10.
result = {msg.[1].upper(): msg.[2] for msg in messages if len(msg.[3]) < 10}The keys are converted to uppercase, values are mapped directly, and filtering is done on value length less than 10.