0
0
Apache-sparkHow-ToBeginner ยท 3 min read

How to Read Data from Kafka Using PySpark Streaming

To read data from Kafka in PySpark, use spark.readStream.format('kafka') with options like kafka.bootstrap.servers and subscribe to specify Kafka servers and topics. Then call load() to create a streaming DataFrame that you can process.
๐Ÿ“

Syntax

Use spark.readStream.format('kafka') to start reading from Kafka as a streaming source. Set options like kafka.bootstrap.servers for Kafka server addresses and subscribe for the topic name. Finally, call load() to get the streaming DataFrame.

  • spark.readStream: Starts a streaming read.
  • format('kafka'): Specifies Kafka as the source.
  • option('kafka.bootstrap.servers', 'host:port'): Kafka server address.
  • option('subscribe', 'topic'): Kafka topic to read from.
  • load(): Loads the streaming data as a DataFrame.
python
df = spark.readStream.format('kafka')\
    .option('kafka.bootstrap.servers', 'localhost:9092')\
    .option('subscribe', 'my_topic')\
    .load()
๐Ÿ’ป

Example

This example shows how to read messages from a Kafka topic named my_topic using PySpark. It reads the data as a streaming DataFrame, selects the message value as a string, and writes it to the console.

python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr

spark = SparkSession.builder.appName('KafkaReadExample').getOrCreate()

# Read streaming data from Kafka
kafka_df = spark.readStream.format('kafka') \
    .option('kafka.bootstrap.servers', 'localhost:9092') \
    .option('subscribe', 'my_topic') \
    .load()

# Select the 'value' column and cast it to string
messages = kafka_df.selectExpr('CAST(value AS STRING) as message')

# Write the streaming data to console
query = messages.writeStream \
    .outputMode('append') \
    .format('console') \
    .start()

query.awaitTermination()
Output
[Console output showing messages from Kafka topic as they arrive]
โš ๏ธ

Common Pitfalls

  • Not setting kafka.bootstrap.servers correctly causes connection errors.
  • Forgetting to call load() means no DataFrame is created.
  • Not casting the value column from binary to string leads to unreadable data.
  • Using read instead of readStream will read only static data, not streaming.
  • Not starting the streaming query with start() means no data is processed.
python
## Wrong way (missing load and casting):
df_wrong = spark.readStream.format('kafka')\
    .option('kafka.bootstrap.servers', 'localhost:9092')\
    .option('subscribe', 'my_topic')

# Correct way:
df_correct = spark.readStream.format('kafka')\
    .option('kafka.bootstrap.servers', 'localhost:9092')\
    .option('subscribe', 'my_topic')\
    .load()

messages = df_correct.selectExpr('CAST(value AS STRING) as message')
๐Ÿ“Š

Quick Reference

Remember these key options when reading from Kafka in PySpark:

OptionDescription
kafka.bootstrap.serversKafka server address (host:port)
subscribeKafka topic name to read from
startingOffsets'earliest' or 'latest' to control where to start reading
failOnDataLossSet to false to ignore missing data errors
kafka.security.protocolSecurity protocol if Kafka is secured
โœ…

Key Takeaways

Use spark.readStream.format('kafka') with options to read streaming data from Kafka.
Always call load() to create the streaming DataFrame.
Cast the Kafka message value from binary to string for readable data.
Start the streaming query with start() and awaitTermination() to process data.
Check Kafka server address and topic name carefully to avoid connection errors.