0
0
KafkaHow-ToBeginner · 4 min read

How to Create Custom Connector in Kafka: Step-by-Step Guide

To create a custom connector in Kafka, implement the SourceConnector or SinkConnector interface from the Kafka Connect API, then package and deploy it. Your connector must define configuration, task classes, and data handling logic to integrate with Kafka Connect.
📐

Syntax

A custom Kafka connector requires creating a class that extends either SourceConnector or SinkConnector. You must override key methods like start(), stop(), taskClass(), and taskConfigs(). The connector configuration is defined using ConfigDef.

java
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.common.config.ConfigDef;
import java.util.*;

public class MySourceConnector extends SourceConnector {
    @Override
    public void start(Map<String, String> props) {
        // Initialize connector with props
    }

    @Override
    public Class<? extends Task> taskClass() {
        return MySourceTask.class;
    }

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        // Return configurations for tasks
        return Collections.singletonList(new HashMap<>());
    }

    @Override
    public void stop() {
        // Cleanup resources
    }

    @Override
    public ConfigDef config() {
        return new ConfigDef()
            .define("my.config", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "My config description");
    }

    @Override
    public String version() {
        return "1.0";
    }
}
💻

Example

This example shows a simple custom source connector that reads data from a static list and sends it to Kafka topics.

java
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.common.config.ConfigDef;

import java.util.*;

public class SimpleSourceConnector extends SourceConnector {
    private String topic;

    @Override
    public void start(Map<String, String> props) {
        topic = props.get("topic");
    }

    @Override
    public Class<? extends Task> taskClass() {
        return SimpleSourceTask.class;
    }

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        Map<String, String> config = new HashMap<>();
        config.put("topic", topic);
        return Collections.singletonList(config);
    }

    @Override
    public void stop() {}

    @Override
    public ConfigDef config() {
        return new ConfigDef()
            .define("topic", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Topic to publish to");
    }

    @Override
    public String version() {
        return "1.0";
    }
}

class SimpleSourceTask extends SourceTask {
    private String topic;
    private Iterator<String> dataIterator;

    @Override
    public void start(Map<String, String> props) {
        topic = props.get("topic");
        List<String> data = Arrays.asList("message1", "message2", "message3");
        dataIterator = data.iterator();
    }

    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        if (!dataIterator.hasNext()) {
            Thread.sleep(1000);
            return Collections.emptyList();
        }
        String value = dataIterator.next();
        SourceRecord record = new SourceRecord(
            null, null, topic, null, null, null, value
        );
        return Collections.singletonList(record);
    }

    @Override
    public void stop() {}

    @Override
    public String version() {
        return "1.0";
    }
}
Output
When deployed in Kafka Connect, this connector sends three messages: "message1", "message2", and "message3" to the configured Kafka topic.
⚠️

Common Pitfalls

  • Not overriding all required methods like taskClass() or taskConfigs() causes runtime errors.
  • Failing to properly configure the connector properties leads to misconfiguration and startup failure.
  • Not handling resource cleanup in stop() can cause memory leaks.
  • Returning empty or incorrect task configurations prevents tasks from running.
java
/* Wrong: Missing taskConfigs method */
public class BadConnector extends SourceConnector {
    @Override
    public void start(Map<String, String> props) {}
    @Override
    public Class<? extends Task> taskClass() { return BadTask.class; }
    @Override
    public void stop() {}
    @Override
    public ConfigDef config() { return new ConfigDef(); }
    @Override
    public String version() { return "1.0"; }
}

/* Right: Implement taskConfigs properly */
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
    Map<String, String> config = new HashMap<>();
    config.put("topic", "my-topic");
    return Collections.singletonList(config);
}
📊

Quick Reference

  • SourceConnector: For connectors that read data into Kafka.
  • SinkConnector: For connectors that write data from Kafka to external systems.
  • taskClass(): Returns the class that does the actual data work.
  • taskConfigs(): Provides configs for each task instance.
  • start() and stop(): Manage connector lifecycle.
  • ConfigDef: Defines configuration options for your connector.

Key Takeaways

Implement SourceConnector or SinkConnector and override required methods to create a custom Kafka connector.
Define connector configuration clearly using ConfigDef for easy setup and validation.
Provide task configurations correctly to enable parallel data processing.
Always manage resources properly in start() and stop() methods to avoid leaks.
Test your connector thoroughly in Kafka Connect runtime before production use.