0
0
KafkaHow-ToBeginner · 4 min read

How to Use JDBC Source Connector in Kafka for Database Integration

To use the JDBC Source Connector in Kafka, configure it with your database connection details and specify the tables to import. The connector reads data from the database and streams it into Kafka topics automatically.
📐

Syntax

The JDBC Source Connector configuration requires these key properties:

  • name: Unique connector name.
  • connector.class: Set to io.confluent.connect.jdbc.JdbcSourceConnector.
  • connection.url: JDBC URL to connect to your database.
  • connection.user and connection.password: Database credentials.
  • table.whitelist: Comma-separated list of tables to import.
  • mode: How to detect new or updated rows (e.g., incrementing, timestamp, or bulk).
  • topic.prefix: Prefix for Kafka topics where data will be published.
json
{
  "name": "jdbc-source-connector",
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "connection.url": "jdbc:postgresql://localhost:5432/mydb",
  "connection.user": "dbuser",
  "connection.password": "dbpassword",
  "table.whitelist": "my_table",
  "mode": "incrementing",
  "incrementing.column.name": "id",
  "topic.prefix": "mydb-"
}
💻

Example

This example shows a complete configuration for the JDBC Source Connector to import data from a PostgreSQL table named customers. It uses the incrementing mode to track new rows by the customer_id column and publishes data to Kafka topics prefixed with pg-.

json
{
  "name": "jdbc-source-postgres",
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "connection.url": "jdbc:postgresql://localhost:5432/shopdb",
  "connection.user": "shopuser",
  "connection.password": "shoppass",
  "table.whitelist": "customers",
  "mode": "incrementing",
  "incrementing.column.name": "customer_id",
  "topic.prefix": "pg-"
}
Output
Connector started and streaming new rows from 'customers' table into Kafka topic 'pg-customers'.
⚠️

Common Pitfalls

  • Incorrect JDBC URL: Make sure the JDBC URL matches your database type and host.
  • Missing incrementing or timestamp column: When using incrementing or timestamp mode, specify the correct column to track new data.
  • Table whitelist vs blacklist: Use table.whitelist to include tables or table.blacklist to exclude, but not both.
  • Connector not restarting properly: Offsets must be managed correctly to avoid duplicate or missed data.
json
{
  "mode": "incrementing"
  // missing "incrementing.column.name" causes connector failure
}

{
  "mode": "incrementing",
  "incrementing.column.name": "id"
}
📊

Quick Reference

PropertyDescriptionExample
nameUnique name for the connector"jdbc-source-connector"
connector.classConnector class to use"io.confluent.connect.jdbc.JdbcSourceConnector"
connection.urlJDBC URL for database"jdbc:mysql://localhost:3306/mydb"
connection.userDatabase username"user"
connection.passwordDatabase password"password"
table.whitelistTables to import"orders,customers"
modeData import mode (bulk, incrementing, timestamp)"incrementing"
incrementing.column.nameColumn to track new rows in incrementing mode"id"
timestamp.column.nameColumn to track updates in timestamp mode"last_modified"
topic.prefixPrefix for Kafka topics"db-"

Key Takeaways

Configure the JDBC Source Connector with correct database URL, credentials, and tables to import.
Choose the right mode (incrementing, timestamp, or bulk) to track new or updated data.
Always specify the incrementing or timestamp column when required to avoid connector errors.
Use topic prefixes to organize Kafka topics created by the connector.
Monitor connector offsets to prevent data duplication or loss during restarts.