How to Handle Schema Compatibility in Kafka: Fix and Best Practices
schema compatibility in Kafka, use the Confluent Schema Registry to manage Avro schemas and set compatibility rules like BACKWARD or FORWARD. This ensures new schemas work with old data and prevents serialization errors during producer or consumer operations.Why This Happens
Schema compatibility errors occur when a new schema version is incompatible with the previous versions used in Kafka messages. This breaks serialization or deserialization, causing errors in producers or consumers.
For example, removing a required field or changing a field type without proper compatibility breaks the data format.
import io.confluent.kafka.serializers.KafkaAvroSerializer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.Schema; import java.util.Properties; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); props.put("schema.registry.url", "http://localhost:8081"); KafkaProducer<String, Object> producer = new KafkaProducer<>(props); // Old schema with field 'name' // New schema removed 'name' field (incompatible) Schema oldSchema = new Schema.Parser().parse(""" { "type": "record", "name": "User", "fields": [ {"name": "id", "type": "int"}, {"name": "name", "type": "string"} ] } """); Object record = new GenericRecordBuilder(oldSchema) .set("id", 1) .build(); producer.send(new ProducerRecord<>("test-topic", record));
The Fix
Fix schema compatibility by setting the Schema Registry compatibility mode to BACKWARD or FORWARD. This allows adding new optional fields or default values without breaking existing data.
Update your schema by adding new fields with default values or making fields optional instead of removing or changing types.
import io.confluent.kafka.serializers.KafkaAvroSerializer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.Schema; import java.util.Properties; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); props.put("schema.registry.url", "http://localhost:8081"); KafkaProducer<String, Object> producer = new KafkaProducer<>(props); // New schema adds 'name' field with default value Schema newSchema = new Schema.Parser().parse(""" { "type": "record", "name": "User", "fields": [ {"name": "id", "type": "int"}, {"name": "name", "type": "string", "default": "unknown"} ] } """); Object record = new GenericRecordBuilder(newSchema) .set("id", 1) .build(); producer.send(new ProducerRecord<>("test-topic", record));
Prevention
To avoid schema compatibility issues in the future:
- Always use a Schema Registry to manage schemas centrally.
- Set compatibility mode to
BACKWARD,FORWARD, orFULLdepending on your use case. - When evolving schemas, add new fields with default values or make fields optional instead of removing or changing existing fields.
- Test schema changes in a staging environment before production.
- Use tools like
avro-toolsor Schema Registry API to validate compatibility before deployment.
Related Errors
Other common errors related to schema compatibility include:
- SerializationException: Happens when producer schema is incompatible with Schema Registry.
- DeserializationException: Happens when consumer schema cannot read the data due to incompatible changes.
- Schema not found: Occurs if the schema is not registered or the Schema Registry URL is incorrect.
Quick fixes involve checking schema versions, ensuring Schema Registry connectivity, and validating schema compatibility settings.