0
0
Apache-sparkHow-ToBeginner ยท 3 min read

How to Write Data to Kafka Using PySpark

To write data to Kafka using PySpark, use the DataFrame.write.format('kafka') method with options like kafka.bootstrap.servers and topic. The data must have key and value columns as bytes or strings before writing.
๐Ÿ“

Syntax

Use the write method on a PySpark DataFrame with format('kafka'). Set Kafka server and topic using option. The DataFrame must have key and value columns, both as strings or bytes.

  • kafka.bootstrap.servers: Kafka server address
  • topic: Kafka topic name
  • key: message key (optional)
  • value: message value (required)
python
df.write
  .format('kafka')
  .option('kafka.bootstrap.servers', 'host1:9092,host2:9092')
  .option('topic', 'your_topic')
  .save()
๐Ÿ’ป

Example

This example creates a simple DataFrame with key and value columns, then writes it to a Kafka topic named test_topic. It shows how to convert strings to bytes as Kafka expects byte data.

python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr

spark = SparkSession.builder.appName('WriteToKafka').getOrCreate()

# Create sample data
data = [("key1", "value1"), ("key2", "value2")]
df = spark.createDataFrame(data, ["key", "value"]).selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Write to Kafka
(df.selectExpr("CAST(key AS STRING) AS key", "CAST(value AS STRING) AS value")
   .write
   .format("kafka")
   .option("kafka.bootstrap.servers", "localhost:9092")
   .option("topic", "test_topic")
   .save())

spark.stop()
Output
No output printed; data is sent to Kafka topic 'test_topic'.
โš ๏ธ

Common Pitfalls

  • Not casting key and value columns to string or binary causes errors.
  • Missing kafka.bootstrap.servers or topic options will fail the write.
  • Trying to write without a value column will cause failure because Kafka messages require a value.
python
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('WrongWrite').getOrCreate()

# Missing value column
data = [("key1",)]
df = spark.createDataFrame(data, ["key"])

# This will fail because 'value' column is missing
try:
    (df.write.format('kafka')
       .option('kafka.bootstrap.servers', 'localhost:9092')
       .option('topic', 'test_topic')
       .save())
except Exception as e:
    print(f'Error: {e}')

spark.stop()
Output
Error: Kafka write requires 'value' column in DataFrame
๐Ÿ“Š

Quick Reference

Remember these key points when writing to Kafka with PySpark:

  • DataFrame must have key and value columns as strings or bytes.
  • Use format('kafka') and set kafka.bootstrap.servers and topic options.
  • Use save() to write the data.
  • Kafka expects byte data, so cast strings properly.
โœ…

Key Takeaways

Always include 'key' and 'value' columns in your DataFrame before writing to Kafka.
Set 'kafka.bootstrap.servers' and 'topic' options correctly in the write method.
Cast your data columns to string or binary types to avoid errors.
Use DataFrame.write.format('kafka').save() to send data to Kafka.
Missing required columns or options will cause write failures.