0
0
KafkaHow-ToBeginner · 4 min read

How to Use Elasticsearch Sink Connector in Kafka

To use the Elasticsearch Sink Connector in Kafka, you configure it with the Kafka Connect framework by specifying the connector class, Kafka topic, and Elasticsearch details in a JSON config file. Then, deploy this config to Kafka Connect to stream data from Kafka topics directly into Elasticsearch indexes.
📐

Syntax

The Elasticsearch sink connector configuration requires specifying key properties:

  • connector.class: The connector plugin class name.
  • topics: Kafka topics to read from.
  • connection.url: Elasticsearch server URL.
  • type.name: Document type in Elasticsearch (deprecated in newer versions).
  • key.ignore: Whether to ignore Kafka message keys.
  • schema.ignore: Whether to ignore schemas in messages.
json
{
  "name": "elasticsearch-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "your-kafka-topic",
    "connection.url": "http://localhost:9200",
    "type.name": "_doc",
    "key.ignore": "true",
    "schema.ignore": "true"
  }
}
💻

Example

This example shows a complete JSON configuration to stream data from Kafka topic users to Elasticsearch index users running locally.

json
{
  "name": "elasticsearch-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "users",
    "connection.url": "http://localhost:9200",
    "type.name": "_doc",
    "key.ignore": "true",
    "schema.ignore": "true",
    "behavior.on.null.values": "delete"
  }
}
Output
Connector elasticsearch-sink-connector created and running, streaming data from topic 'users' to Elasticsearch index 'users'.
⚠️

Common Pitfalls

  • Not setting connection.url correctly causes connection failures.
  • Forgetting to set key.ignore to true when keys are not needed can cause indexing issues.
  • Using deprecated type.name incorrectly with newer Elasticsearch versions.
  • Not matching Kafka topic names exactly in the topics property.
  • Ignoring schema compatibility when schema.ignore is false.
json
{
  "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "tasks.max": "1",
  "topics": "users",
  "connection.url": "http://wrong-url:9200",
  "key.ignore": "false"
}

// Corrected version:
{
  "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "tasks.max": "1",
  "topics": "users",
  "connection.url": "http://localhost:9200",
  "key.ignore": "true"
}
📊

Quick Reference

Remember these key points when configuring the Elasticsearch sink connector:

  • connector.class: Always io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
  • topics: Kafka topics to stream
  • connection.url: Elasticsearch URL, e.g., http://localhost:9200
  • key.ignore: Set to true if Kafka keys are not used for indexing
  • schema.ignore: Set to true to ignore schemas if messages are plain JSON

Key Takeaways

Configure the Elasticsearch sink connector in Kafka Connect with correct connector class and connection URL.
Set the Kafka topic names correctly in the connector configuration to stream data.
Use key.ignore=true if Kafka message keys are not needed for Elasticsearch indexing.
Ensure Elasticsearch is reachable at the specified connection URL before starting the connector.
Check schema.ignore setting based on whether your Kafka messages include schemas.