0
0
Kafkadevops~5 mins

Sink connectors in Kafka - Commands & Configuration

Choose your learning style9 modes available
Introduction
Sink connectors move data from Kafka topics to external systems like databases or storage. They help automate data export without writing custom code.
When you want to save Kafka topic data into a database automatically.
When you need to export streaming data to a file system for backups.
When integrating Kafka with analytics tools that require data in their own storage.
When you want to keep external systems updated in real-time from Kafka.
When you want to avoid manual data transfer from Kafka to other systems.
Config File - sink-connector.json
sink-connector.json
{
  "name": "example-sink-connector",
  "config": {
    "connector.class": "FileStreamSinkConnector",
    "tasks.max": "1",
    "topics": "example-topic",
    "file": "/tmp/output.txt",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter"
  }
}

This JSON configures a sink connector named example-sink-connector.

connector.class specifies the type of sink connector; here it writes to a file.

tasks.max sets how many parallel tasks run.

topics lists Kafka topics to read from.

file is the output file path where data will be saved.

key.converter and value.converter define how Kafka data is converted to strings.

Commands
This command creates the sink connector by sending the JSON config to Kafka Connect's REST API.
Terminal
curl -X POST -H "Content-Type: application/json" --data @sink-connector.json http://localhost:8083/connectors
Expected OutputExpected
{"name":"example-sink-connector","config":{...},"tasks":[],"type":"sink"}
-X POST - Specifies the HTTP method to create the connector.
-H "Content-Type: application/json" - Sets the content type to JSON for the request.
--data @sink-connector.json - Sends the connector configuration file as the request body.
This command checks the status of the sink connector to verify it is running.
Terminal
curl http://localhost:8083/connectors/example-sink-connector/status
Expected OutputExpected
{"name":"example-sink-connector","connector":{"state":"RUNNING","worker_id":"worker-1"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"worker-1"}]}
This command shows the live output file where the sink connector writes data from the Kafka topic.
Terminal
tail -f /tmp/output.txt
Expected OutputExpected
example message 1 example message 2 example message 3
-f - Follows the file to show new data as it arrives.
Key Concept

Sink connectors automatically export Kafka topic data to external systems without manual coding.

Common Mistakes
Not specifying the correct topic name in the connector config.
The connector won't receive any data if the topic name is wrong or missing.
Double-check the topic name matches exactly the Kafka topic you want to export.
Using incompatible converters for key or value data.
Data may not be properly serialized or deserialized, causing errors or empty output.
Use converters that match your Kafka data format, like StringConverter for plain text.
Not checking connector status after creation.
You might think the connector is running when it has failed or is paused.
Always check the connector status via the REST API to confirm it is RUNNING.
Summary
Create a sink connector by posting a JSON config to Kafka Connect REST API.
Verify the connector is running by checking its status endpoint.
View the exported data in the external system, like a file, to confirm data flow.