How to Write Data to Kafka Using PySpark
To write data to Kafka using
PySpark, use the DataFrame.write.format('kafka') method with options like kafka.bootstrap.servers and topic. The data must have key and value columns as bytes or strings before writing.Syntax
Use the write method on a PySpark DataFrame with format('kafka'). Set Kafka server and topic using option. The DataFrame must have key and value columns, both as strings or bytes.
- kafka.bootstrap.servers: Kafka server address
- topic: Kafka topic name
- key: message key (optional)
- value: message value (required)
python
df.write .format('kafka') .option('kafka.bootstrap.servers', 'host1:9092,host2:9092') .option('topic', 'your_topic') .save()
Example
This example creates a simple DataFrame with key and value columns, then writes it to a Kafka topic named test_topic. It shows how to convert strings to bytes as Kafka expects byte data.
python
from pyspark.sql import SparkSession from pyspark.sql.functions import col, expr spark = SparkSession.builder.appName('WriteToKafka').getOrCreate() # Create sample data data = [("key1", "value1"), ("key2", "value2")] df = spark.createDataFrame(data, ["key", "value"]).selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") # Write to Kafka (df.selectExpr("CAST(key AS STRING) AS key", "CAST(value AS STRING) AS value") .write .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("topic", "test_topic") .save()) spark.stop()
Output
No output printed; data is sent to Kafka topic 'test_topic'.
Common Pitfalls
- Not casting
keyandvaluecolumns tostringorbinarycauses errors. - Missing
kafka.bootstrap.serversortopicoptions will fail the write. - Trying to write without a
valuecolumn will cause failure because Kafka messages require a value.
python
from pyspark.sql import SparkSession spark = SparkSession.builder.appName('WrongWrite').getOrCreate() # Missing value column data = [("key1",)] df = spark.createDataFrame(data, ["key"]) # This will fail because 'value' column is missing try: (df.write.format('kafka') .option('kafka.bootstrap.servers', 'localhost:9092') .option('topic', 'test_topic') .save()) except Exception as e: print(f'Error: {e}') spark.stop()
Output
Error: Kafka write requires 'value' column in DataFrame
Quick Reference
Remember these key points when writing to Kafka with PySpark:
- DataFrame must have
keyandvaluecolumns as strings or bytes. - Use
format('kafka')and setkafka.bootstrap.serversandtopicoptions. - Use
save()to write the data. - Kafka expects byte data, so cast strings properly.
Key Takeaways
Always include 'key' and 'value' columns in your DataFrame before writing to Kafka.
Set 'kafka.bootstrap.servers' and 'topic' options correctly in the write method.
Cast your data columns to string or binary types to avoid errors.
Use DataFrame.write.format('kafka').save() to send data to Kafka.
Missing required columns or options will cause write failures.