Consider a Kafka producer configured with Avro schema validation. The producer tries to send a message that does not conform to the registered schema. What will happen?
from confluent_kafka import avro from confluent_kafka.avro import AvroProducer schema_str = '''{ "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "age", "type": "int"} ] }''' value_schema = avro.loads(schema_str) producer_config = { 'bootstrap.servers': 'localhost:9092', 'schema.registry.url': 'http://localhost:8081' } producer = AvroProducer(producer_config, default_value_schema=value_schema) try: producer.produce(topic='users', value={'name': 'Alice', 'age': 'twenty'}) producer.flush() print('Message sent successfully') except Exception as e: print(f'Error: {e}')
Check the data types in the message against the schema fields.
The schema expects the field 'age' to be an integer, but the message provides a string 'twenty'. This causes a serialization error during schema validation.
What is the main purpose of schema validation in Kafka producers?
Think about why schemas are used in data pipelines.
Schema validation ensures that the data sent by producers matches the expected format, preventing corrupt or incompatible data from entering Kafka topics.
Identify the error raised by this Kafka producer code snippet when schema validation is enabled.
from confluent_kafka.avro import AvroProducer from confluent_kafka import avro producer_config = { 'bootstrap.servers': 'localhost:9092', 'schema.registry.url': 'http://localhost:8081' } value_schema_str = '''{ "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "age", "type": "int"} ] }''' value_schema = avro.loads(value_schema_str) producer = AvroProducer(producer_config, default_value_schema=value_schema) producer.produce(topic='users', value={'name': 'Bob'}) producer.flush()
Check if all required fields in the schema are present in the message.
The schema requires both 'name' and 'age' fields. The message is missing 'age', so schema validation fails with a MessageSerializerError.
Choose the correct Python code snippet to configure an AvroProducer with schema validation.
Check the required keys in the configuration dictionary and parameter names.
The AvroProducer requires both 'bootstrap.servers' and 'schema.registry.url' in the config. The schema must be passed as default_value_schema.
Given the following code, how many messages will be sent successfully to the 'users' topic?
from confluent_kafka.avro import AvroProducer from confluent_kafka import avro schema_str = '''{ "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "age", "type": "int"} ] }''' value_schema = avro.loads(schema_str) producer_config = { 'bootstrap.servers': 'localhost:9092', 'schema.registry.url': 'http://localhost:8081' } producer = AvroProducer(producer_config, default_value_schema=value_schema) messages = [ {'name': 'Alice', 'age': 30}, {'name': 'Bob', 'age': 'thirty'}, {'name': 'Charlie', 'age': 25}, {'name': 'Dana'} ] success_count = 0 for msg in messages: try: producer.produce(topic='users', value=msg) success_count += 1 except Exception: pass producer.flush() print(success_count)
Check which messages conform to the schema and which cause errors.
Only messages with both 'name' as string and 'age' as int pass validation. 'Bob' has age as string, 'Dana' is missing 'age'. So only 'Alice' and 'Charlie' succeed.