This example shows a streaming job reading data from Kafka, counting messages per minute, and printing results live.
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col
# Create Spark session
spark = SparkSession.builder.appName('KappaExample').getOrCreate()
# Read streaming data from Kafka
streaming_df = spark.readStream.format('kafka') \
.option('kafka.bootstrap.servers', 'localhost:9092') \
.option('subscribe', 'sensor-data') \
.load()
# Convert value to string
sensor_values = streaming_df.selectExpr('CAST(value AS STRING) as value', 'timestamp')
# Simple processing: count messages per 1 minute window
counts = sensor_values.groupBy(window(col('timestamp'), '1 minute')).count()
# Output to console for demo
query = counts.writeStream.outputMode('complete').format('console').start()
query.awaitTermination(10) # Run for 10 seconds
spark.stop()