How to Use Debezium with Kafka Connect: Step-by-Step Guide
To use
Debezium with Kafka Connect, deploy Debezium connectors as Kafka Connect plugins that monitor your database and stream change events to Kafka topics. Configure the connector with your database details and start Kafka Connect to capture and stream changes automatically.Syntax
The basic syntax to configure a Debezium connector in Kafka Connect is a JSON configuration file specifying connector class, database connection, and topic settings.
Key parts include:
- "name": Unique connector name.
- "connector.class": Debezium connector class for your database (e.g., MySQL, PostgreSQL).
- "database.hostname", "database.port", "database.user", "database.password": Database connection details.
- "database.server.name": Logical name for the database server, used as Kafka topic prefix.
- "table.include.list": Tables to capture changes from.
json
{
"name": "my-connector",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": 3306,
"database.user": "dbuser",
"database.password": "dbpass",
"database.server.name": "mydbserver",
"table.include.list": "mydb.mytable",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "schema-changes.mydb"
}Example
This example shows how to configure a Debezium MySQL connector for Kafka Connect to capture changes from a single table and stream them to Kafka.
json
{
"name": "inventory-connector",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": 3306,
"database.user": "debezium",
"database.password": "dbz",
"database.server.name": "inventory",
"table.include.list": "inventory.products",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}Output
Connector inventory-connector created and started
Kafka topic inventory.inventory.products receives change events
Schema changes recorded in schema-changes.inventory topic
Common Pitfalls
- Not setting
database.history.kafka.topiccauses schema history errors. - Incorrect database credentials prevent connector startup.
- Missing
database.server.nameleads to confusing topic names. - Forgetting to whitelist tables with
table.include.listresults in no data captured. - Not running Kafka Connect in distributed or standalone mode properly.
json
{
"name": "bad-connector",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": 3306,
"database.user": "wronguser",
"database.password": "wrongpass",
"database.server.name": "",
"table.include.list": "",
"database.history.kafka.bootstrap.servers": "localhost:9092"
}
// Corrected version:
{
"name": "good-connector",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": 3306,
"database.user": "debezium",
"database.password": "dbz",
"database.server.name": "inventory",
"table.include.list": "inventory.products",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}Quick Reference
Remember these key points when using Debezium with Kafka Connect:
- Use the correct Debezium connector class for your database.
- Set
database.server.nameto prefix Kafka topics. - Configure
database.history.kafka.topicto track schema changes. - Whitelist tables with
table.include.listto capture specific data. - Run Kafka Connect with Debezium plugins installed.
Key Takeaways
Install Debezium connectors as plugins in Kafka Connect before configuring.
Configure database connection and specify tables to capture with JSON config.
Set database history topic to track schema changes and avoid errors.
Start Kafka Connect with the Debezium connector config to stream changes.
Check logs for connection errors and verify Kafka topics receive change events.