How to Create Custom Connector in Kafka: Step-by-Step Guide
To create a custom connector in Kafka, implement the
SourceConnector or SinkConnector interface from the Kafka Connect API, then package and deploy it. Your connector must define configuration, task classes, and data handling logic to integrate with Kafka Connect.Syntax
A custom Kafka connector requires creating a class that extends either SourceConnector or SinkConnector. You must override key methods like start(), stop(), taskClass(), and taskConfigs(). The connector configuration is defined using ConfigDef.
java
import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.common.config.ConfigDef; import java.util.*; public class MySourceConnector extends SourceConnector { @Override public void start(Map<String, String> props) { // Initialize connector with props } @Override public Class<? extends Task> taskClass() { return MySourceTask.class; } @Override public List<Map<String, String>> taskConfigs(int maxTasks) { // Return configurations for tasks return Collections.singletonList(new HashMap<>()); } @Override public void stop() { // Cleanup resources } @Override public ConfigDef config() { return new ConfigDef() .define("my.config", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "My config description"); } @Override public String version() { return "1.0"; } }
Example
This example shows a simple custom source connector that reads data from a static list and sends it to Kafka topics.
java
import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.common.config.ConfigDef; import java.util.*; public class SimpleSourceConnector extends SourceConnector { private String topic; @Override public void start(Map<String, String> props) { topic = props.get("topic"); } @Override public Class<? extends Task> taskClass() { return SimpleSourceTask.class; } @Override public List<Map<String, String>> taskConfigs(int maxTasks) { Map<String, String> config = new HashMap<>(); config.put("topic", topic); return Collections.singletonList(config); } @Override public void stop() {} @Override public ConfigDef config() { return new ConfigDef() .define("topic", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Topic to publish to"); } @Override public String version() { return "1.0"; } } class SimpleSourceTask extends SourceTask { private String topic; private Iterator<String> dataIterator; @Override public void start(Map<String, String> props) { topic = props.get("topic"); List<String> data = Arrays.asList("message1", "message2", "message3"); dataIterator = data.iterator(); } @Override public List<SourceRecord> poll() throws InterruptedException { if (!dataIterator.hasNext()) { Thread.sleep(1000); return Collections.emptyList(); } String value = dataIterator.next(); SourceRecord record = new SourceRecord( null, null, topic, null, null, null, value ); return Collections.singletonList(record); } @Override public void stop() {} @Override public String version() { return "1.0"; } }
Output
When deployed in Kafka Connect, this connector sends three messages: "message1", "message2", and "message3" to the configured Kafka topic.
Common Pitfalls
- Not overriding all required methods like
taskClass()ortaskConfigs()causes runtime errors. - Failing to properly configure the connector properties leads to misconfiguration and startup failure.
- Not handling resource cleanup in
stop()can cause memory leaks. - Returning empty or incorrect task configurations prevents tasks from running.
java
/* Wrong: Missing taskConfigs method */ public class BadConnector extends SourceConnector { @Override public void start(Map<String, String> props) {} @Override public Class<? extends Task> taskClass() { return BadTask.class; } @Override public void stop() {} @Override public ConfigDef config() { return new ConfigDef(); } @Override public String version() { return "1.0"; } } /* Right: Implement taskConfigs properly */ @Override public List<Map<String, String>> taskConfigs(int maxTasks) { Map<String, String> config = new HashMap<>(); config.put("topic", "my-topic"); return Collections.singletonList(config); }
Quick Reference
- SourceConnector: For connectors that read data into Kafka.
- SinkConnector: For connectors that write data from Kafka to external systems.
- taskClass(): Returns the class that does the actual data work.
- taskConfigs(): Provides configs for each task instance.
- start() and stop(): Manage connector lifecycle.
- ConfigDef: Defines configuration options for your connector.
Key Takeaways
Implement SourceConnector or SinkConnector and override required methods to create a custom Kafka connector.
Define connector configuration clearly using ConfigDef for easy setup and validation.
Provide task configurations correctly to enable parallel data processing.
Always manage resources properly in start() and stop() methods to avoid leaks.
Test your connector thoroughly in Kafka Connect runtime before production use.