0
0
Kafkadevops~7 mins

Source connectors in Kafka - Commands & Configuration

Choose your learning style9 modes available
Introduction
Source connectors help move data from external systems into Kafka automatically. They solve the problem of manually writing code to import data by providing ready-made tools to connect databases, files, or other sources to Kafka topics.
When you want to stream data from a database into Kafka for real-time processing.
When you need to import log files continuously into Kafka for analysis.
When you want to capture changes from a source system and send them to Kafka topics.
When you want to integrate external data sources with Kafka without custom coding.
When you want to automate data ingestion pipelines into Kafka.
Config File - source-connector.json
source-connector.json
{
  "name": "jdbc-source-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:postgresql://localhost:5432/mydb",
    "connection.user": "myuser",
    "connection.password": "mypassword",
    "table.whitelist": "customers",
    "mode": "incrementing",
    "incrementing.column.name": "id",
    "topic.prefix": "mydb-",
    "poll.interval.ms": "5000"
  }
}

This JSON configures a JDBC source connector to import data from a PostgreSQL database.

  • name: Unique name for the connector.
  • connector.class: The connector plugin to use (JDBC source).
  • tasks.max: Number of parallel tasks to run.
  • connection.url, connection.user, connection.password: Database connection details.
  • table.whitelist: Which tables to import data from.
  • mode: How to detect new rows (incrementing column).
  • incrementing.column.name: Column used to track new rows.
  • topic.prefix: Prefix added to Kafka topic names.
  • poll.interval.ms: How often to check for new data.
Commands
This command creates the source connector by sending the JSON config to Kafka Connect's REST API.
Terminal
curl -X POST -H "Content-Type: application/json" --data @source-connector.json http://localhost:8083/connectors
Expected OutputExpected
{"name":"jdbc-source-connector","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector","tasks.max":"1","connection.url":"jdbc:postgresql://localhost:5432/mydb","connection.user":"myuser","connection.password":"mypassword","table.whitelist":"customers","mode":"incrementing","incrementing.column.name":"id","topic.prefix":"mydb-","poll.interval.ms":"5000"},"tasks":[],"type":"source"}
-X POST - Specifies the HTTP method to create the connector
-H "Content-Type: application/json" - Sets the content type header for JSON data
--data @source-connector.json - Sends the connector configuration from the file
This command checks the status of the source connector to verify it is running.
Terminal
curl http://localhost:8083/connectors/jdbc-source-connector/status
Expected OutputExpected
{"name":"jdbc-source-connector","connector":{"state":"RUNNING","worker_id":"worker-1"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"worker-1"}]}
This command reads the first 5 messages from the Kafka topic where the source connector sends data, to verify data ingestion.
Terminal
kafka-console-consumer --bootstrap-server localhost:9092 --topic mydb-customers --from-beginning --max-messages 5
Expected OutputExpected
{"id":1,"name":"Alice","email":"alice@example.com"} {"id":2,"name":"Bob","email":"bob@example.com"} {"id":3,"name":"Carol","email":"carol@example.com"} {"id":4,"name":"Dave","email":"dave@example.com"} {"id":5,"name":"Eve","email":"eve@example.com"}
--from-beginning - Reads messages from the start of the topic
--max-messages 5 - Limits output to 5 messages
Key Concept

If you remember nothing else from this pattern, remember: source connectors automate importing data from external systems into Kafka topics without custom coding.

Common Mistakes
Using incorrect database connection details in the connector config.
The connector cannot connect to the source database and fails to start.
Double-check and test the database URL, username, and password before creating the connector.
Not specifying the correct mode and incrementing column for incremental data import.
The connector may import duplicate data or miss new rows.
Set mode to 'incrementing' or 'timestamp+incrementing' and specify the correct column to track new data.
Not verifying connector status after creation.
You may think the connector is running when it has failed or is paused.
Always check the connector status via the REST API to confirm it is running.
Summary
Create a source connector by posting a JSON config to Kafka Connect REST API.
Check the connector status to ensure it is running properly.
Consume messages from the Kafka topic to verify data is flowing from the source system.