How to Use JDBC Source Connector in Kafka for Database Integration
To use the
JDBC Source Connector in Kafka, configure it with your database connection details and specify the tables to import. The connector reads data from the database and streams it into Kafka topics automatically.Syntax
The JDBC Source Connector configuration requires these key properties:
- name: Unique connector name.
- connector.class: Set to
io.confluent.connect.jdbc.JdbcSourceConnector. - connection.url: JDBC URL to connect to your database.
- connection.user and connection.password: Database credentials.
- table.whitelist: Comma-separated list of tables to import.
- mode: How to detect new or updated rows (e.g.,
incrementing,timestamp, orbulk). - topic.prefix: Prefix for Kafka topics where data will be published.
json
{
"name": "jdbc-source-connector",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://localhost:5432/mydb",
"connection.user": "dbuser",
"connection.password": "dbpassword",
"table.whitelist": "my_table",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "mydb-"
}Example
This example shows a complete configuration for the JDBC Source Connector to import data from a PostgreSQL table named customers. It uses the incrementing mode to track new rows by the customer_id column and publishes data to Kafka topics prefixed with pg-.
json
{
"name": "jdbc-source-postgres",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://localhost:5432/shopdb",
"connection.user": "shopuser",
"connection.password": "shoppass",
"table.whitelist": "customers",
"mode": "incrementing",
"incrementing.column.name": "customer_id",
"topic.prefix": "pg-"
}Output
Connector started and streaming new rows from 'customers' table into Kafka topic 'pg-customers'.
Common Pitfalls
- Incorrect JDBC URL: Make sure the JDBC URL matches your database type and host.
- Missing incrementing or timestamp column: When using
incrementingortimestampmode, specify the correct column to track new data. - Table whitelist vs blacklist: Use
table.whitelistto include tables ortable.blacklistto exclude, but not both. - Connector not restarting properly: Offsets must be managed correctly to avoid duplicate or missed data.
json
{
"mode": "incrementing"
// missing "incrementing.column.name" causes connector failure
}
{
"mode": "incrementing",
"incrementing.column.name": "id"
}Quick Reference
| Property | Description | Example |
|---|---|---|
| name | Unique name for the connector | "jdbc-source-connector" |
| connector.class | Connector class to use | "io.confluent.connect.jdbc.JdbcSourceConnector" |
| connection.url | JDBC URL for database | "jdbc:mysql://localhost:3306/mydb" |
| connection.user | Database username | "user" |
| connection.password | Database password | "password" |
| table.whitelist | Tables to import | "orders,customers" |
| mode | Data import mode (bulk, incrementing, timestamp) | "incrementing" |
| incrementing.column.name | Column to track new rows in incrementing mode | "id" |
| timestamp.column.name | Column to track updates in timestamp mode | "last_modified" |
| topic.prefix | Prefix for Kafka topics | "db-" |
Key Takeaways
Configure the JDBC Source Connector with correct database URL, credentials, and tables to import.
Choose the right mode (incrementing, timestamp, or bulk) to track new or updated data.
Always specify the incrementing or timestamp column when required to avoid connector errors.
Use topic prefixes to organize Kafka topics created by the connector.
Monitor connector offsets to prevent data duplication or loss during restarts.