0
0
Kafkadevops~20 mins

Schema validation in producers in Kafka - Practice Problems & Coding Challenges

Choose your learning style9 modes available
Challenge - 5 Problems
🎖️
Kafka Schema Validation Master
Get all challenges correct to earn this badge!
Test your skills under time pressure!
Predict Output
intermediate
2:00remaining
What is the output of this Kafka producer schema validation code?

Consider a Kafka producer configured with Avro schema validation. The producer tries to send a message that does not conform to the registered schema. What will happen?

Kafka
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer

schema_str = '''{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age", "type": "int"}
  ]
}'''

value_schema = avro.loads(schema_str)

producer_config = {
    'bootstrap.servers': 'localhost:9092',
    'schema.registry.url': 'http://localhost:8081'
}

producer = AvroProducer(producer_config, default_value_schema=value_schema)

try:
    producer.produce(topic='users', value={'name': 'Alice', 'age': 'twenty'})
    producer.flush()
    print('Message sent successfully')
except Exception as e:
    print(f'Error: {e}')
AError: KeyError: 'age'
BMessage sent successfully
CRuntimeError: Producer not connected
DError: <class 'confluent_kafka.avro.serializer.MessageSerializerError'>: Value for field age is not an int
Attempts:
2 left
💡 Hint

Check the data types in the message against the schema fields.

🧠 Conceptual
intermediate
1:30remaining
Which option correctly describes schema validation in Kafka producers?

What is the main purpose of schema validation in Kafka producers?

ATo encrypt messages before sending to Kafka topics
BTo ensure messages conform to a predefined schema before sending to Kafka topics
CTo compress messages to reduce network usage
DTo authenticate the producer with Kafka brokers
Attempts:
2 left
💡 Hint

Think about why schemas are used in data pipelines.

🔧 Debug
advanced
2:00remaining
What error does this Kafka producer code raise?

Identify the error raised by this Kafka producer code snippet when schema validation is enabled.

Kafka
from confluent_kafka.avro import AvroProducer
from confluent_kafka import avro

producer_config = {
    'bootstrap.servers': 'localhost:9092',
    'schema.registry.url': 'http://localhost:8081'
}

value_schema_str = '''{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age", "type": "int"}
  ]
}'''

value_schema = avro.loads(value_schema_str)

producer = AvroProducer(producer_config, default_value_schema=value_schema)

producer.produce(topic='users', value={'name': 'Bob'})
producer.flush()
AMessageSerializerError: Missing required field 'age'
BTypeError: value must be a dict
CNo error, message sent successfully
DKeyError: 'age'
Attempts:
2 left
💡 Hint

Check if all required fields in the schema are present in the message.

📝 Syntax
advanced
1:30remaining
Which option correctly configures schema validation for a Kafka Avro producer?

Choose the correct Python code snippet to configure an AvroProducer with schema validation.

Aproducer = AvroProducer({'bootstrap.servers': 'localhost:9092'}, default_value_schema=value_schema)
Bproducer = AvroProducer({'bootstrap.servers': 'localhost:9092', 'schema.registry.url': 'http://localhost:8081'}, value_schema=value_schema)
Cproducer = AvroProducer({'bootstrap.servers': 'localhost:9092', 'schema.registry.url': 'http://localhost:8081'}, default_value_schema=value_schema)
Dproducer = AvroProducer({'schema.registry.url': 'http://localhost:8081'}, default_value_schema=value_schema)
Attempts:
2 left
💡 Hint

Check the required keys in the configuration dictionary and parameter names.

🚀 Application
expert
2:30remaining
How many messages will be successfully sent with this Kafka producer code?

Given the following code, how many messages will be sent successfully to the 'users' topic?

Kafka
from confluent_kafka.avro import AvroProducer
from confluent_kafka import avro

schema_str = '''{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age", "type": "int"}
  ]
}'''

value_schema = avro.loads(schema_str)

producer_config = {
    'bootstrap.servers': 'localhost:9092',
    'schema.registry.url': 'http://localhost:8081'
}

producer = AvroProducer(producer_config, default_value_schema=value_schema)

messages = [
    {'name': 'Alice', 'age': 30},
    {'name': 'Bob', 'age': 'thirty'},
    {'name': 'Charlie', 'age': 25},
    {'name': 'Dana'}
]

success_count = 0

for msg in messages:
    try:
        producer.produce(topic='users', value=msg)
        success_count += 1
    except Exception:
        pass
producer.flush()
print(success_count)
A2
B4
C3
D1
Attempts:
2 left
💡 Hint

Check which messages conform to the schema and which cause errors.