0
0
KafkaHow-ToBeginner · 4 min read

How to Use S3 Sink Connector in Kafka for Data Export

To use the S3 Sink Connector in Kafka, configure it with your Kafka Connect cluster by specifying the connector class io.confluent.connect.s3.S3SinkConnector, your AWS credentials, target S3 bucket, and the Kafka topic to export. Then deploy this configuration to your Kafka Connect service to start streaming data from Kafka topics into S3.
📐

Syntax

The S3 Sink Connector configuration requires key properties to connect Kafka topics to an S3 bucket. These include the connector class, AWS credentials, S3 bucket name, Kafka topics, and data format settings.

  • connector.class: The Java class for the S3 Sink Connector.
  • tasks.max: Number of tasks to run in parallel.
  • topics: Kafka topics to export.
  • s3.bucket.name: Target S3 bucket name.
  • aws.access.key.id and aws.secret.access.key: AWS credentials.
  • format.class: Data format for files (e.g., JSON, Avro).
json
{
  "name": "s3-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "1",
    "topics": "your-kafka-topic",
    "s3.bucket.name": "your-s3-bucket",
    "aws.access.key.id": "YOUR_AWS_ACCESS_KEY",
    "aws.secret.access.key": "YOUR_AWS_SECRET_KEY",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "flush.size": "1000"
  }
}
💻

Example

This example shows a complete JSON configuration for the S3 Sink Connector that exports data from the Kafka topic orders to an S3 bucket named my-kafka-data in JSON format. It flushes data every 500 records.

json
{
  "name": "s3-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "1",
    "topics": "orders",
    "s3.bucket.name": "my-kafka-data",
    "aws.access.key.id": "AKIAEXAMPLEKEY",
    "aws.secret.access.key": "exampleSecretKey1234567890",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "flush.size": "500",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner"
  }
}
Output
Connector s3-sink-connector created and running, streaming 'orders' topic data to S3 bucket 'my-kafka-data'. Files appear in S3 as JSON objects after every 500 records.
⚠️

Common Pitfalls

  • Missing AWS credentials: Ensure aws.access.key.id and aws.secret.access.key are correct and have permissions to write to the S3 bucket.
  • Incorrect bucket name: The S3 bucket must exist and be accessible.
  • Wrong topic name: The topics property must match the Kafka topic exactly.
  • Flush size too large or small: Setting flush.size too high delays data upload; too low causes many small files.
  • Connector class mismatch: Use the correct connector class io.confluent.connect.s3.S3SinkConnector.
json
Wrong example:
{
  "connector.class": "io.confluent.connect.s3.SinkConnector",
  "topics": "orders",
  "s3.bucket.name": "my-kafka-data"
}

Right example:
{
  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "topics": "orders",
  "s3.bucket.name": "my-kafka-data"
}
📊

Quick Reference

PropertyDescriptionExample Value
connector.classJava class for S3 Sink Connectorio.confluent.connect.s3.S3SinkConnector
tasks.maxMax parallel tasks1
topicsKafka topics to exportorders
s3.bucket.nameTarget S3 bucketmy-kafka-data
aws.access.key.idAWS access keyAKIAEXAMPLEKEY
aws.secret.access.keyAWS secret keyexampleSecretKey1234567890
format.classData format for filesio.confluent.connect.s3.format.json.JsonFormat
flush.sizeNumber of records before flush500

Key Takeaways

Configure the S3 Sink Connector with correct AWS credentials and bucket name to export Kafka topic data to S3.
Use the connector class io.confluent.connect.s3.S3SinkConnector and specify the Kafka topics to export.
Adjust flush.size to balance between upload frequency and file size in S3.
Verify permissions and bucket existence to avoid connection errors.
Deploy the connector configuration to Kafka Connect to start streaming data automatically.