How to Use S3 Sink Connector in Kafka for Data Export
To use the
S3 Sink Connector in Kafka, configure it with your Kafka Connect cluster by specifying the connector class io.confluent.connect.s3.S3SinkConnector, your AWS credentials, target S3 bucket, and the Kafka topic to export. Then deploy this configuration to your Kafka Connect service to start streaming data from Kafka topics into S3.Syntax
The S3 Sink Connector configuration requires key properties to connect Kafka topics to an S3 bucket. These include the connector class, AWS credentials, S3 bucket name, Kafka topics, and data format settings.
- connector.class: The Java class for the S3 Sink Connector.
- tasks.max: Number of tasks to run in parallel.
- topics: Kafka topics to export.
- s3.bucket.name: Target S3 bucket name.
- aws.access.key.id and aws.secret.access.key: AWS credentials.
- format.class: Data format for files (e.g., JSON, Avro).
json
{
"name": "s3-sink-connector",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "1",
"topics": "your-kafka-topic",
"s3.bucket.name": "your-s3-bucket",
"aws.access.key.id": "YOUR_AWS_ACCESS_KEY",
"aws.secret.access.key": "YOUR_AWS_SECRET_KEY",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"flush.size": "1000"
}
}Example
This example shows a complete JSON configuration for the S3 Sink Connector that exports data from the Kafka topic orders to an S3 bucket named my-kafka-data in JSON format. It flushes data every 500 records.
json
{
"name": "s3-sink-connector",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "1",
"topics": "orders",
"s3.bucket.name": "my-kafka-data",
"aws.access.key.id": "AKIAEXAMPLEKEY",
"aws.secret.access.key": "exampleSecretKey1234567890",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"flush.size": "500",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner"
}
}Output
Connector s3-sink-connector created and running, streaming 'orders' topic data to S3 bucket 'my-kafka-data'. Files appear in S3 as JSON objects after every 500 records.
Common Pitfalls
- Missing AWS credentials: Ensure
aws.access.key.idandaws.secret.access.keyare correct and have permissions to write to the S3 bucket. - Incorrect bucket name: The S3 bucket must exist and be accessible.
- Wrong topic name: The
topicsproperty must match the Kafka topic exactly. - Flush size too large or small: Setting
flush.sizetoo high delays data upload; too low causes many small files. - Connector class mismatch: Use the correct connector class
io.confluent.connect.s3.S3SinkConnector.
json
Wrong example:
{
"connector.class": "io.confluent.connect.s3.SinkConnector",
"topics": "orders",
"s3.bucket.name": "my-kafka-data"
}
Right example:
{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"topics": "orders",
"s3.bucket.name": "my-kafka-data"
}Quick Reference
| Property | Description | Example Value |
|---|---|---|
| connector.class | Java class for S3 Sink Connector | io.confluent.connect.s3.S3SinkConnector |
| tasks.max | Max parallel tasks | 1 |
| topics | Kafka topics to export | orders |
| s3.bucket.name | Target S3 bucket | my-kafka-data |
| aws.access.key.id | AWS access key | AKIAEXAMPLEKEY |
| aws.secret.access.key | AWS secret key | exampleSecretKey1234567890 |
| format.class | Data format for files | io.confluent.connect.s3.format.json.JsonFormat |
| flush.size | Number of records before flush | 500 |
Key Takeaways
Configure the S3 Sink Connector with correct AWS credentials and bucket name to export Kafka topic data to S3.
Use the connector class io.confluent.connect.s3.S3SinkConnector and specify the Kafka topics to export.
Adjust flush.size to balance between upload frequency and file size in S3.
Verify permissions and bucket existence to avoid connection errors.
Deploy the connector configuration to Kafka Connect to start streaming data automatically.