0
0
KafkaDebug / FixIntermediate · 4 min read

How to Handle Schema Compatibility in Kafka: Fix and Best Practices

To handle schema compatibility in Kafka, use the Confluent Schema Registry to manage Avro schemas and set compatibility rules like BACKWARD or FORWARD. This ensures new schemas work with old data and prevents serialization errors during producer or consumer operations.
🔍

Why This Happens

Schema compatibility errors occur when a new schema version is incompatible with the previous versions used in Kafka messages. This breaks serialization or deserialization, causing errors in producers or consumers.

For example, removing a required field or changing a field type without proper compatibility breaks the data format.

java
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.Schema;
import java.util.Properties;

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");

KafkaProducer<String, Object> producer = new KafkaProducer<>(props);

// Old schema with field 'name'
// New schema removed 'name' field (incompatible)
Schema oldSchema = new Schema.Parser().parse("""
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "name", "type": "string"}
  ]
}
""");

Object record = new GenericRecordBuilder(oldSchema)
    .set("id", 1)
    .build();

producer.send(new ProducerRecord<>("test-topic", record));
Output
org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
🔧

The Fix

Fix schema compatibility by setting the Schema Registry compatibility mode to BACKWARD or FORWARD. This allows adding new optional fields or default values without breaking existing data.

Update your schema by adding new fields with default values or making fields optional instead of removing or changing types.

java
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.Schema;
import java.util.Properties;

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");

KafkaProducer<String, Object> producer = new KafkaProducer<>(props);

// New schema adds 'name' field with default value
Schema newSchema = new Schema.Parser().parse("""
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "name", "type": "string", "default": "unknown"}
  ]
}
""");

Object record = new GenericRecordBuilder(newSchema)
    .set("id", 1)
    .build();

producer.send(new ProducerRecord<>("test-topic", record));
Output
Message sent successfully without serialization errors
🛡️

Prevention

To avoid schema compatibility issues in the future:

  • Always use a Schema Registry to manage schemas centrally.
  • Set compatibility mode to BACKWARD, FORWARD, or FULL depending on your use case.
  • When evolving schemas, add new fields with default values or make fields optional instead of removing or changing existing fields.
  • Test schema changes in a staging environment before production.
  • Use tools like avro-tools or Schema Registry API to validate compatibility before deployment.
⚠️

Related Errors

Other common errors related to schema compatibility include:

  • SerializationException: Happens when producer schema is incompatible with Schema Registry.
  • DeserializationException: Happens when consumer schema cannot read the data due to incompatible changes.
  • Schema not found: Occurs if the schema is not registered or the Schema Registry URL is incorrect.

Quick fixes involve checking schema versions, ensuring Schema Registry connectivity, and validating schema compatibility settings.

Key Takeaways

Use Confluent Schema Registry to manage Kafka schemas centrally.
Set schema compatibility mode to BACKWARD or FORWARD to allow safe schema evolution.
Add new fields with default values or make them optional to maintain compatibility.
Test schema changes before deploying to production to avoid runtime errors.
Validate schemas using Schema Registry API or avro-tools before applying changes.