How to Use JDBC Sink Connector in Kafka for Database Integration
To use the
JDBC Sink Connector in Kafka, configure it with your database connection details and specify the Kafka topic to sink data from. The connector reads data from Kafka topics and writes it into the target database tables automatically.Syntax
The JDBC Sink Connector configuration requires specifying the connector class, database connection URL, user credentials, Kafka topics, and table mapping. Key properties include:
- connector.class: The connector plugin class name.
- connection.url: JDBC URL to connect to the database.
- connection.user and connection.password: Database credentials.
- topics: Kafka topics to sink.
- auto.create: Whether to create tables automatically.
- insert.mode: How to insert data (e.g., insert, upsert).
json
{
"name": "jdbc-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://localhost:5432/mydb",
"connection.user": "dbuser",
"connection.password": "dbpassword",
"topics": "my_topic",
"auto.create": "true",
"insert.mode": "insert"
}
}Example
This example shows a complete configuration for the JDBC Sink Connector that writes data from Kafka topic orders into a PostgreSQL database table. It enables automatic table creation and uses insert mode.
json
{
"name": "jdbc-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://localhost:5432/shopdb",
"connection.user": "shopuser",
"connection.password": "shoppass",
"topics": "orders",
"auto.create": "true",
"insert.mode": "insert",
"pk.mode": "none"
}
}Output
Connector jdbc-sink-connector started and writing data from topic 'orders' to PostgreSQL table 'orders'.
Common Pitfalls
Common mistakes when using the JDBC Sink Connector include:
- Incorrect
connection.urlcausing connection failures. - Missing or wrong database credentials.
- Not setting
auto.createto true when the target table does not exist. - Using incompatible
insert.modefor your use case. - Not matching Kafka message schema with database table schema.
Always verify your database connectivity and schema compatibility before running the connector.
json
{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://wronghost:3306/db",
"connection.user": "user",
"connection.password": "pass",
"topics": "my_topic",
"auto.create": "false"
}
{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://localhost:3306/db",
"connection.user": "user",
"connection.password": "pass",
"topics": "my_topic",
"auto.create": "true"
}Quick Reference
| Property | Description | Example Value |
|---|---|---|
| connector.class | Connector plugin class | io.confluent.connect.jdbc.JdbcSinkConnector |
| connection.url | JDBC URL for database | jdbc:postgresql://localhost:5432/mydb |
| connection.user | Database username | dbuser |
| connection.password | Database password | dbpassword |
| topics | Kafka topics to sink | orders |
| auto.create | Create table if missing | true |
| insert.mode | Insert mode (insert, upsert, update) | insert |
| pk.mode | Primary key mode (none, record_key, etc.) | none |
Key Takeaways
Configure JDBC Sink Connector with correct database URL, user, password, and Kafka topics.
Set auto.create to true to let the connector create tables automatically if needed.
Match Kafka message schema with the database table schema to avoid errors.
Choose the right insert.mode based on how you want data inserted into the database.
Test database connectivity before starting the connector to prevent connection issues.