How to Use Confluent Schema Registry in Kafka: Simple Guide
To use
Confluent Schema Registry in Kafka, configure your Kafka producers and consumers to register and retrieve schemas from the registry by setting the schema.registry.url property. This allows Kafka to serialize and deserialize messages using Avro or other supported formats, ensuring data compatibility and evolution.Syntax
To use Confluent Schema Registry with Kafka, you need to configure your producer and consumer clients with the following key properties:
schema.registry.url: URL of the running Schema Registry service.key.serializerandvalue.serializer: Use Avro serializers that integrate with the Schema Registry.key.deserializerandvalue.deserializer: Use Avro deserializers that fetch schemas from the registry.
This setup enables automatic schema registration and retrieval during message production and consumption.
java
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("schema.registry.url", "http://localhost:8081"); props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); // For consumer props.put("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"); props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
Example
This example shows a simple Kafka producer and consumer using Confluent Schema Registry with Avro serialization.
java
import io.confluent.kafka.serializers.KafkaAvroSerializer; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericData; import java.util.*; public class AvroExample { public static void main(String[] args) { String topic = "test-topic"; // Producer properties Properties producerProps = new Properties(); producerProps.put("bootstrap.servers", "localhost:9092"); producerProps.put("schema.registry.url", "http://localhost:8081"); producerProps.put("key.serializer", StringSerializer.class.getName()); producerProps.put("value.serializer", KafkaAvroSerializer.class.getName()); Producer<String, GenericRecord> producer = new KafkaProducer<>(producerProps); // Create Avro schema and record String userSchema = "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]}"; Schema.Parser parser = new Schema.Parser(); Schema schema = parser.parse(userSchema); GenericRecord user = new GenericData.Record(schema); user.put("name", "Alice"); // Send record ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topic, "key1", user); producer.send(record); producer.flush(); producer.close(); // Consumer properties Properties consumerProps = new Properties(); consumerProps.put("bootstrap.servers", "localhost:9092"); consumerProps.put("group.id", "test-group"); consumerProps.put("schema.registry.url", "http://localhost:8081"); consumerProps.put("key.deserializer", StringDeserializer.class.getName()); consumerProps.put("value.deserializer", KafkaAvroDeserializer.class.getName()); consumerProps.put("auto.offset.reset", "earliest"); Consumer<String, GenericRecord> consumer = new KafkaConsumer<>(consumerProps); consumer.subscribe(Collections.singletonList(topic)); ConsumerRecords<String, GenericRecord> records = consumer.poll(java.time.Duration.ofSeconds(5)); for (ConsumerRecord<String, GenericRecord> rec : records) { System.out.println("Received: key=" + rec.key() + ", value=" + rec.value()); } consumer.close(); } }
Output
Received: key=key1, value={"name": "Alice"}
Common Pitfalls
Common mistakes when using Confluent Schema Registry with Kafka include:
- Not setting
schema.registry.urlcorrectly, causing serialization errors. - Using incompatible serializers/deserializers that do not integrate with the Schema Registry.
- Not handling schema evolution properly, leading to compatibility issues.
- Forgetting to start the Schema Registry service before running producers or consumers.
Always ensure the Schema Registry is running and accessible, and use the Confluent Avro serializers/deserializers.
java
/* Wrong: Missing schema.registry.url */ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); // Right: Include schema.registry.url props.put("schema.registry.url", "http://localhost:8081");
Quick Reference
Summary tips for using Confluent Schema Registry with Kafka:
- Always set
schema.registry.urlin producer and consumer configs. - Use
KafkaAvroSerializerandKafkaAvroDeserializerfor Avro data. - Start Schema Registry service before Kafka clients.
- Manage schema versions carefully to avoid compatibility issues.
Key Takeaways
Configure Kafka clients with the correct schema.registry.url to connect to Schema Registry.
Use Confluent's Avro serializers and deserializers to enable schema-based message serialization.
Ensure the Schema Registry service is running before producing or consuming messages.
Handle schema evolution carefully to maintain compatibility between producers and consumers.
Test serialization and deserialization locally to catch configuration errors early.