0
0
KafkaHow-ToBeginner · 4 min read

How to Use Confluent Schema Registry in Kafka: Simple Guide

To use Confluent Schema Registry in Kafka, configure your Kafka producers and consumers to register and retrieve schemas from the registry by setting the schema.registry.url property. This allows Kafka to serialize and deserialize messages using Avro or other supported formats, ensuring data compatibility and evolution.
📐

Syntax

To use Confluent Schema Registry with Kafka, you need to configure your producer and consumer clients with the following key properties:

  • schema.registry.url: URL of the running Schema Registry service.
  • key.serializer and value.serializer: Use Avro serializers that integrate with the Schema Registry.
  • key.deserializer and value.deserializer: Use Avro deserializers that fetch schemas from the registry.

This setup enables automatic schema registration and retrieval during message production and consumption.

java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("schema.registry.url", "http://localhost:8081");
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");

// For consumer
props.put("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
💻

Example

This example shows a simple Kafka producer and consumer using Confluent Schema Registry with Avro serialization.

java
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericData;
import java.util.*;

public class AvroExample {
    public static void main(String[] args) {
        String topic = "test-topic";

        // Producer properties
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", "localhost:9092");
        producerProps.put("schema.registry.url", "http://localhost:8081");
        producerProps.put("key.serializer", StringSerializer.class.getName());
        producerProps.put("value.serializer", KafkaAvroSerializer.class.getName());

        Producer<String, GenericRecord> producer = new KafkaProducer<>(producerProps);

        // Create Avro schema and record
        String userSchema = "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]}";
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(userSchema);
        GenericRecord user = new GenericData.Record(schema);
        user.put("name", "Alice");

        // Send record
        ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topic, "key1", user);
        producer.send(record);
        producer.flush();
        producer.close();

        // Consumer properties
        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers", "localhost:9092");
        consumerProps.put("group.id", "test-group");
        consumerProps.put("schema.registry.url", "http://localhost:8081");
        consumerProps.put("key.deserializer", StringDeserializer.class.getName());
        consumerProps.put("value.deserializer", KafkaAvroDeserializer.class.getName());
        consumerProps.put("auto.offset.reset", "earliest");

        Consumer<String, GenericRecord> consumer = new KafkaConsumer<>(consumerProps);
        consumer.subscribe(Collections.singletonList(topic));

        ConsumerRecords<String, GenericRecord> records = consumer.poll(java.time.Duration.ofSeconds(5));
        for (ConsumerRecord<String, GenericRecord> rec : records) {
            System.out.println("Received: key=" + rec.key() + ", value=" + rec.value());
        }
        consumer.close();
    }
}
Output
Received: key=key1, value={"name": "Alice"}
⚠️

Common Pitfalls

Common mistakes when using Confluent Schema Registry with Kafka include:

  • Not setting schema.registry.url correctly, causing serialization errors.
  • Using incompatible serializers/deserializers that do not integrate with the Schema Registry.
  • Not handling schema evolution properly, leading to compatibility issues.
  • Forgetting to start the Schema Registry service before running producers or consumers.

Always ensure the Schema Registry is running and accessible, and use the Confluent Avro serializers/deserializers.

java
/* Wrong: Missing schema.registry.url */
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");

// Right: Include schema.registry.url
props.put("schema.registry.url", "http://localhost:8081");
📊

Quick Reference

Summary tips for using Confluent Schema Registry with Kafka:

  • Always set schema.registry.url in producer and consumer configs.
  • Use KafkaAvroSerializer and KafkaAvroDeserializer for Avro data.
  • Start Schema Registry service before Kafka clients.
  • Manage schema versions carefully to avoid compatibility issues.

Key Takeaways

Configure Kafka clients with the correct schema.registry.url to connect to Schema Registry.
Use Confluent's Avro serializers and deserializers to enable schema-based message serialization.
Ensure the Schema Registry service is running before producing or consuming messages.
Handle schema evolution carefully to maintain compatibility between producers and consumers.
Test serialization and deserialization locally to catch configuration errors early.