0
0
KafkaHow-ToBeginner · 4 min read

How to Use JDBC Sink Connector in Kafka for Database Integration

To use the JDBC Sink Connector in Kafka, configure it with your database connection details and specify the Kafka topic to sink data from. The connector reads data from Kafka topics and writes it into the target database tables automatically.
📐

Syntax

The JDBC Sink Connector configuration requires specifying the connector class, database connection URL, user credentials, Kafka topics, and table mapping. Key properties include:

  • connector.class: The connector plugin class name.
  • connection.url: JDBC URL to connect to the database.
  • connection.user and connection.password: Database credentials.
  • topics: Kafka topics to sink.
  • auto.create: Whether to create tables automatically.
  • insert.mode: How to insert data (e.g., insert, upsert).
json
{
  "name": "jdbc-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:postgresql://localhost:5432/mydb",
    "connection.user": "dbuser",
    "connection.password": "dbpassword",
    "topics": "my_topic",
    "auto.create": "true",
    "insert.mode": "insert"
  }
}
💻

Example

This example shows a complete configuration for the JDBC Sink Connector that writes data from Kafka topic orders into a PostgreSQL database table. It enables automatic table creation and uses insert mode.

json
{
  "name": "jdbc-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:postgresql://localhost:5432/shopdb",
    "connection.user": "shopuser",
    "connection.password": "shoppass",
    "topics": "orders",
    "auto.create": "true",
    "insert.mode": "insert",
    "pk.mode": "none"
  }
}
Output
Connector jdbc-sink-connector started and writing data from topic 'orders' to PostgreSQL table 'orders'.
⚠️

Common Pitfalls

Common mistakes when using the JDBC Sink Connector include:

  • Incorrect connection.url causing connection failures.
  • Missing or wrong database credentials.
  • Not setting auto.create to true when the target table does not exist.
  • Using incompatible insert.mode for your use case.
  • Not matching Kafka message schema with database table schema.

Always verify your database connectivity and schema compatibility before running the connector.

json
{
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "connection.url": "jdbc:mysql://wronghost:3306/db",
  "connection.user": "user",
  "connection.password": "pass",
  "topics": "my_topic",
  "auto.create": "false"
}

{
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "connection.url": "jdbc:mysql://localhost:3306/db",
  "connection.user": "user",
  "connection.password": "pass",
  "topics": "my_topic",
  "auto.create": "true"
}
📊

Quick Reference

PropertyDescriptionExample Value
connector.classConnector plugin classio.confluent.connect.jdbc.JdbcSinkConnector
connection.urlJDBC URL for databasejdbc:postgresql://localhost:5432/mydb
connection.userDatabase usernamedbuser
connection.passwordDatabase passworddbpassword
topicsKafka topics to sinkorders
auto.createCreate table if missingtrue
insert.modeInsert mode (insert, upsert, update)insert
pk.modePrimary key mode (none, record_key, etc.)none

Key Takeaways

Configure JDBC Sink Connector with correct database URL, user, password, and Kafka topics.
Set auto.create to true to let the connector create tables automatically if needed.
Match Kafka message schema with the database table schema to avoid errors.
Choose the right insert.mode based on how you want data inserted into the database.
Test database connectivity before starting the connector to prevent connection issues.