How to Use Avro with Kafka: Simple Guide for Beginners
To use
Avro with Kafka, you serialize your messages using Avro schemas and send them to Kafka topics. Use the KafkaAvroSerializer and KafkaAvroDeserializer from the Confluent Schema Registry client to handle schema management and message encoding automatically.Syntax
Using Avro with Kafka involves these parts:
- Avro Schema: Defines the structure of your data in JSON format.
- Schema Registry: Stores and manages Avro schemas centrally.
- Kafka Producer: Serializes data using
KafkaAvroSerializerbefore sending. - Kafka Consumer: Deserializes data using
KafkaAvroDeserializerwhen receiving.
java
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); props.put("schema.registry.url", "http://localhost:8081"); // Producer sends GenericRecord or SpecificRecord serialized with Avro Producer<String, GenericRecord> producer = new KafkaProducer<>(props);
Example
This example shows how to produce and consume Avro messages with Kafka using the Confluent Schema Registry.
java
import io.confluent.kafka.serializers.KafkaAvroSerializer; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class AvroKafkaExample { public static void main(String[] args) { String schemaString = "{\"namespace\":\"example.avro\",\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"}]}"; Schema.Parser parser = new Schema.Parser(); Schema schema = parser.parse(schemaString); Properties producerProps = new Properties(); producerProps.put("bootstrap.servers", "localhost:9092"); producerProps.put("key.serializer", StringSerializer.class.getName()); producerProps.put("value.serializer", KafkaAvroSerializer.class.getName()); producerProps.put("schema.registry.url", "http://localhost:8081"); Producer<String, GenericRecord> producer = new KafkaProducer<>(producerProps); GenericRecord user = new GenericData.Record(schema); user.put("name", "Alice"); user.put("age", 30); ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("users", "user1", user); producer.send(record); producer.flush(); producer.close(); Properties consumerProps = new Properties(); consumerProps.put("bootstrap.servers", "localhost:9092"); consumerProps.put("group.id", "avro-consumer-group"); consumerProps.put("key.deserializer", StringDeserializer.class.getName()); consumerProps.put("value.deserializer", KafkaAvroDeserializer.class.getName()); consumerProps.put("schema.registry.url", "http://localhost:8081"); consumerProps.put("specific.avro.reader", "false"); Consumer<String, GenericRecord> consumer = new KafkaConsumer<>(consumerProps); consumer.subscribe(Collections.singletonList("users")); ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, GenericRecord> rec : records) { System.out.println("Received user: " + rec.value()); } consumer.close(); } }
Output
Received user: {"name": "Alice", "age": 30}
Common Pitfalls
Common mistakes when using Avro with Kafka include:
- Not running or connecting to the Schema Registry, causing serialization errors.
- Using mismatched schemas between producer and consumer, leading to deserialization failures.
- Forgetting to set
schema.registry.urlin producer or consumer configs. - Not using the correct serializer/deserializer classes (
KafkaAvroSerializerandKafkaAvroDeserializer).
java
/* Wrong: Missing schema.registry.url */ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); // Missing schema.registry.url causes error /* Right: Include schema.registry.url */ props.put("schema.registry.url", "http://localhost:8081");
Quick Reference
- Avro Schema: JSON file defining data structure.
- Schema Registry URL: Must be set in configs.
- Producer Serializer:
KafkaAvroSerializer - Consumer Deserializer:
KafkaAvroDeserializer - Data Format: Use
GenericRecordor generated classes.
Key Takeaways
Always set the schema registry URL in Kafka producer and consumer configurations.
Use KafkaAvroSerializer and KafkaAvroDeserializer to handle Avro data automatically.
Define and register your Avro schema before producing messages.
Ensure producer and consumer use compatible schemas to avoid errors.
Test your setup with simple GenericRecord messages before using generated classes.