0
0
Apache Sparkdata~5 mins

Reading from Kafka with Spark in Apache Spark

Choose your learning style9 modes available
Introduction

We read data from Kafka in Spark to process live streams of information quickly and easily.

When you want to analyze real-time data like sensor readings or user clicks.
When you need to process messages from a messaging system continuously.
When you want to build dashboards that update automatically with new data.
When you want to combine streaming data with batch data for analysis.
When you want to filter or transform live data before saving it.
Syntax
Apache Spark
spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic_name")
  .load()

Use readStream to read live data continuously.

Set kafka.bootstrap.servers to your Kafka server addresses.

Examples
Reads streaming data from Kafka topic my_topic on local server.
Apache Spark
df = spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "my_topic")
  .load()
Reads from multiple Kafka topics on multiple servers.
Apache Spark
df = spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", "server1:9092,server2:9092")
  .option("subscribe", "topic1,topic2")
  .load()
Sample Program

This program connects to Kafka on localhost, reads messages from 'test_topic', converts keys and values to strings, and prints them to the console for 5 seconds.

Apache Spark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("KafkaReadExample").getOrCreate()

# Read streaming data from Kafka topic 'test_topic'
df = spark.readStream.format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "test_topic") \
  .load()

# Select the key and value as strings
from pyspark.sql.functions import col

string_df = df.select(col("key").cast("string"), col("value").cast("string"))

# Write the streaming data to console for demo
query = string_df.writeStream.format("console") \
  .option("truncate", "false") \
  .start()

query.awaitTermination(5)  # Run for 5 seconds then stop
query.stop()

spark.stop()
OutputSuccess
Important Notes

Kafka messages have keys and values stored as bytes; cast them to strings to read text.

Use awaitTermination() to keep the stream running.

Make sure Kafka server is running and accessible before running the code.

Summary

Use spark.readStream.format("kafka") to read live Kafka data.

Set Kafka servers and topics with option().

Cast Kafka message bytes to strings to work with readable data.