How to Use Structured Streaming in PySpark: Syntax and Example
Use
spark.readStream to read streaming data and writeStream to output results in PySpark. Define a streaming query with a schema, transformations, and start it with start() to process data continuously.Syntax
Structured streaming in PySpark uses readStream to read data continuously and writeStream to write output. You define a streaming DataFrame, apply transformations, then start the query.
- spark.readStream.format(): Specify the input source like 'csv', 'json', or 'kafka'.
- option(): Set options like file path or Kafka topic.
- load(): Load the streaming data.
- writeStream.format(): Specify output sink like 'console' or 'parquet'.
- outputMode(): Choose 'append', 'complete', or 'update' mode.
- start(): Begin the streaming query.
python
streamingDF = spark.readStream.format('csv')\ .option('header', 'true')\ .option('maxFilesPerTrigger', 1)\ .load('/path/to/input') query = streamingDF.writeStream.format('console')\ .outputMode('append')\ .start() query.awaitTermination()
Example
This example reads CSV files from a folder as they arrive, selects a column, and prints new rows to the console.
python
from pyspark.sql import SparkSession spark = SparkSession.builder.appName('StructuredStreamingExample').getOrCreate() # Define streaming DataFrame reading CSV files streamingDF = spark.readStream.format('csv') \ .option('header', 'true') \ .option('maxFilesPerTrigger', 1) \ .load('input_folder') # Select a column to display selectedDF = streamingDF.select('name') # Write stream to console query = selectedDF.writeStream.format('console') \ .outputMode('append') \ .start() query.awaitTermination()
Output
-------------------------------------------
Batch: 0
-------------------------------------------
+----+
|name|
+----+
|John|
|Anna|
+----+
-------------------------------------------
Batch: 1
-------------------------------------------
+----+
|name|
+----+
|Mike|
+----+
Common Pitfalls
- Not specifying
outputMode()correctly causes errors; use 'append' for new rows only. - For file sources, forgetting
maxFilesPerTriggercan cause all files to load at once. - Not calling
awaitTermination()means the streaming query stops immediately. - Using unsupported formats or missing schema for some sources leads to failures.
python
wrong_query = streamingDF.writeStream.format('console').start() # Missing outputMode causes error # Correct way correct_query = streamingDF.writeStream.format('console').outputMode('append').start()
Quick Reference
| Method | Description |
|---|---|
| spark.readStream.format(source) | Set streaming input source like 'csv', 'json', 'kafka' |
| option(key, value) | Set options for input or output, e.g., path, header, topic |
| load(path) | Load streaming data from path or source |
| writeStream.format(sink) | Set output sink like 'console', 'parquet', 'memory' |
| outputMode(mode) | Set output mode: 'append', 'complete', or 'update' |
| start() | Start the streaming query |
| awaitTermination() | Wait for the streaming to finish |
Key Takeaways
Use spark.readStream and writeStream to build continuous data pipelines in PySpark.
Always specify outputMode to control how results are written.
Call awaitTermination() to keep the streaming query running.
Use maxFilesPerTrigger option to control file ingestion rate.
Structured streaming supports multiple sources like files, Kafka, and sockets.