0
0
Kafkadevops~5 mins

Schema validation in producers in Kafka - Commands & Configuration

Choose your learning style9 modes available
Introduction
When sending data to Kafka, producers must ensure the data format is correct. Schema validation helps catch mistakes early by checking data against a defined structure before sending it.
When you want to prevent sending wrong or incomplete data to Kafka topics.
When multiple applications produce data to the same topic and need a consistent format.
When you want to avoid errors in consumers caused by unexpected data shapes.
When evolving data formats over time and need to maintain compatibility.
When using a schema registry to centrally manage data formats.
Config File - producer.properties
producer.properties
bootstrap.servers=localhost:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url=http://localhost:8081

This configuration file sets up the Kafka producer to connect to the Kafka broker at localhost:9092.

It uses StringSerializer for keys and KafkaAvroSerializer for values, which enables schema validation using Avro format.

The schema.registry.url points to the Schema Registry service that stores and manages schemas.

Commands
This command starts a console producer that sends messages to 'example-topic'. It includes a simple Avro schema for validation with fields 'name' (string) and 'age' (int).
Terminal
kafka-console-producer --broker-list localhost:9092 --topic example-topic --property value.schema='{"type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"}]}'
Expected OutputExpected
>
--broker-list - Specifies the Kafka broker address to connect to.
--topic - Specifies the Kafka topic to send messages to.
--property value.schema - Defines the Avro schema for validating message values.
This command sends a valid JSON message matching the schema to the topic. The producer validates the message against the schema before sending.
Terminal
echo '{"name":"Alice","age":30}' | kafka-console-producer --broker-list localhost:9092 --topic example-topic --property value.schema='{"type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"}]}'
Expected OutputExpected
No output (command runs silently)
This command tries to send a message where 'age' is a string instead of an int. Schema validation will reject this message.
Terminal
echo '{"name":"Bob","age":"thirty"}' | kafka-console-producer --broker-list localhost:9092 --topic example-topic --property value.schema='{"type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"}]}'
Expected OutputExpected
ERROR: Failed to serialize message due to schema validation error
This command reads one message from the beginning of 'example-topic' to verify that only valid messages were sent.
Terminal
kafka-console-consumer --bootstrap-server localhost:9092 --topic example-topic --from-beginning --max-messages 1
Expected OutputExpected
{"name":"Alice","age":30}
--from-beginning - Reads messages from the start of the topic.
--max-messages - Limits the number of messages to read.
Key Concept

If you remember nothing else from this pattern, remember: schema validation in producers stops bad data before it enters Kafka.

Common Mistakes
Not configuring the producer to use the schema registry and serializer.
Without this, the producer cannot validate data and may send invalid messages.
Set 'value.serializer' to KafkaAvroSerializer and provide 'schema.registry.url' in producer config.
Sending messages that do not match the schema fields or types.
Schema validation will reject these messages, causing errors and failed sends.
Ensure message data matches the schema exactly in field names and data types.
Not running a schema registry service or pointing to the wrong URL.
Producer cannot fetch or register schemas, so validation fails.
Run schema registry on localhost:8081 or update 'schema.registry.url' to the correct address.
Summary
Configure the Kafka producer with Avro serializer and schema registry URL for schema validation.
Send messages that match the defined schema to prevent invalid data from entering Kafka.
Use console producer and consumer commands to test schema validation and message flow.