How to Use Elasticsearch Sink Connector in Kafka
To use the
Elasticsearch Sink Connector in Kafka, you configure it with the Kafka Connect framework by specifying the connector class, Kafka topic, and Elasticsearch details in a JSON config file. Then, deploy this config to Kafka Connect to stream data from Kafka topics directly into Elasticsearch indexes.Syntax
The Elasticsearch sink connector configuration requires specifying key properties:
- connector.class: The connector plugin class name.
- topics: Kafka topics to read from.
- connection.url: Elasticsearch server URL.
- type.name: Document type in Elasticsearch (deprecated in newer versions).
- key.ignore: Whether to ignore Kafka message keys.
- schema.ignore: Whether to ignore schemas in messages.
json
{
"name": "elasticsearch-sink-connector",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "your-kafka-topic",
"connection.url": "http://localhost:9200",
"type.name": "_doc",
"key.ignore": "true",
"schema.ignore": "true"
}
}Example
This example shows a complete JSON configuration to stream data from Kafka topic users to Elasticsearch index users running locally.
json
{
"name": "elasticsearch-sink-connector",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "users",
"connection.url": "http://localhost:9200",
"type.name": "_doc",
"key.ignore": "true",
"schema.ignore": "true",
"behavior.on.null.values": "delete"
}
}Output
Connector elasticsearch-sink-connector created and running, streaming data from topic 'users' to Elasticsearch index 'users'.
Common Pitfalls
- Not setting
connection.urlcorrectly causes connection failures. - Forgetting to set
key.ignoreto true when keys are not needed can cause indexing issues. - Using deprecated
type.nameincorrectly with newer Elasticsearch versions. - Not matching Kafka topic names exactly in the
topicsproperty. - Ignoring schema compatibility when
schema.ignoreis false.
json
{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "users",
"connection.url": "http://wrong-url:9200",
"key.ignore": "false"
}
// Corrected version:
{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "users",
"connection.url": "http://localhost:9200",
"key.ignore": "true"
}Quick Reference
Remember these key points when configuring the Elasticsearch sink connector:
- connector.class: Always
io.confluent.connect.elasticsearch.ElasticsearchSinkConnector - topics: Kafka topics to stream
- connection.url: Elasticsearch URL, e.g.,
http://localhost:9200 - key.ignore: Set to
trueif Kafka keys are not used for indexing - schema.ignore: Set to
trueto ignore schemas if messages are plain JSON
Key Takeaways
Configure the Elasticsearch sink connector in Kafka Connect with correct connector class and connection URL.
Set the Kafka topic names correctly in the connector configuration to stream data.
Use key.ignore=true if Kafka message keys are not needed for Elasticsearch indexing.
Ensure Elasticsearch is reachable at the specified connection URL before starting the connector.
Check schema.ignore setting based on whether your Kafka messages include schemas.