How to Read Data from Kafka Using PySpark Streaming
To read data from Kafka in PySpark, use
spark.readStream.format('kafka') with options like kafka.bootstrap.servers and subscribe to specify Kafka servers and topics. Then call load() to create a streaming DataFrame that you can process.Syntax
Use spark.readStream.format('kafka') to start reading from Kafka as a streaming source. Set options like kafka.bootstrap.servers for Kafka server addresses and subscribe for the topic name. Finally, call load() to get the streaming DataFrame.
- spark.readStream: Starts a streaming read.
- format('kafka'): Specifies Kafka as the source.
- option('kafka.bootstrap.servers', 'host:port'): Kafka server address.
- option('subscribe', 'topic'): Kafka topic to read from.
- load(): Loads the streaming data as a DataFrame.
python
df = spark.readStream.format('kafka')\ .option('kafka.bootstrap.servers', 'localhost:9092')\ .option('subscribe', 'my_topic')\ .load()
Example
This example shows how to read messages from a Kafka topic named my_topic using PySpark. It reads the data as a streaming DataFrame, selects the message value as a string, and writes it to the console.
python
from pyspark.sql import SparkSession from pyspark.sql.functions import col, expr spark = SparkSession.builder.appName('KafkaReadExample').getOrCreate() # Read streaming data from Kafka kafka_df = spark.readStream.format('kafka') \ .option('kafka.bootstrap.servers', 'localhost:9092') \ .option('subscribe', 'my_topic') \ .load() # Select the 'value' column and cast it to string messages = kafka_df.selectExpr('CAST(value AS STRING) as message') # Write the streaming data to console query = messages.writeStream \ .outputMode('append') \ .format('console') \ .start() query.awaitTermination()
Output
[Console output showing messages from Kafka topic as they arrive]
Common Pitfalls
- Not setting
kafka.bootstrap.serverscorrectly causes connection errors. - Forgetting to call
load()means no DataFrame is created. - Not casting the
valuecolumn from binary to string leads to unreadable data. - Using
readinstead ofreadStreamwill read only static data, not streaming. - Not starting the streaming query with
start()means no data is processed.
python
## Wrong way (missing load and casting): df_wrong = spark.readStream.format('kafka')\ .option('kafka.bootstrap.servers', 'localhost:9092')\ .option('subscribe', 'my_topic') # Correct way: df_correct = spark.readStream.format('kafka')\ .option('kafka.bootstrap.servers', 'localhost:9092')\ .option('subscribe', 'my_topic')\ .load() messages = df_correct.selectExpr('CAST(value AS STRING) as message')
Quick Reference
Remember these key options when reading from Kafka in PySpark:
| Option | Description |
|---|---|
| kafka.bootstrap.servers | Kafka server address (host:port) |
| subscribe | Kafka topic name to read from |
| startingOffsets | 'earliest' or 'latest' to control where to start reading |
| failOnDataLoss | Set to false to ignore missing data errors |
| kafka.security.protocol | Security protocol if Kafka is secured |
Key Takeaways
Use spark.readStream.format('kafka') with options to read streaming data from Kafka.
Always call load() to create the streaming DataFrame.
Cast the Kafka message value from binary to string for readable data.
Start the streaming query with start() and awaitTermination() to process data.
Check Kafka server address and topic name carefully to avoid connection errors.