Consider this Kafka producer code snippet that sends a message with String serialization for both key and value:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("test-topic", "key1", "value1"));
producer.close();
System.out.println("Message sent");What will this code print when run successfully?
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); producer.send(new ProducerRecord<>("test-topic", "key1", "value1")); producer.close(); System.out.println("Message sent");
Think about what the System.out.println line outputs after sending the message.
The code sends a message asynchronously and then prints "Message sent". The send method does not block or return the message content, so the only output is the print statement.
Given this Kafka consumer code snippet that uses JSON deserialization for values:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "group1");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.connect.json.JsonDeserializer");
KafkaConsumer<String, Map<String, Object>> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("json-topic"));
ConsumerRecords<String, Map<String, Object>> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, Map<String, Object>> record : records) {
System.out.println(record.value().get("name"));
}
consumer.close();If the topic has one message with value {"name":"Alice","age":30}, what will be printed?
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "group1"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.connect.json.JsonDeserializer"); KafkaConsumer<String, Map<String, Object>> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("json-topic")); ConsumerRecords<String, Map<String, Object>> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, Map<String, Object>> record : records) { System.out.println(record.value().get("name")); } consumer.close();
Look at what key is accessed from the deserialized JSON map.
The JSON deserializer converts the JSON string into a Map. Accessing get("name") returns the string "Alice".
Consider this Kafka producer code snippet that tries to send an Avro serialized message without configuring schema registry:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
Producer<String, GenericRecord> producer = new KafkaProducer<>(props);
GenericRecord record = new GenericData.Record(schema);
record.put("field", "value");
producer.send(new ProducerRecord<>("avro-topic", "key1", record));
producer.close();What error will this code most likely raise?
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); Producer<String, GenericRecord> producer = new KafkaProducer<>(props); GenericRecord record = new GenericData.Record(schema); record.put("field", "value"); producer.send(new ProducerRecord<>("avro-topic", "key1", record)); producer.close();
Think about what happens if the schema registry URL is not set but Avro serializer is used.
The Avro serializer tries to register or fetch the schema from the schema registry. Without the URL configured, it will fail with a 404 error from the REST client.
In Kafka, you want to choose a serialization format that supports schema evolution, allowing you to add or remove fields without breaking consumers. Which format is best suited for this?
Consider which format uses a schema registry to manage schemas and compatibility.
Avro serialization with schema registry supports schema evolution by managing schemas centrally and ensuring compatibility between producers and consumers.
Given this Kafka consumer code snippet using JSON deserialization:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "group1");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.connect.json.JsonDeserializer");
KafkaConsumer<String, Map<String, Object>> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("json-topic"));
ConsumerRecords<String, Map<String, Object>> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, Map<String, Object>> record : records) {
Map<String, Object> valueMap = record.value();
System.out.println(valueMap.size());
}
consumer.close();If the topic has a message with JSON value {"name":"Bob","age":25,"city":"NY"}, what will be printed?
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "group1"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.connect.json.JsonDeserializer"); KafkaConsumer<String, Map<String, Object>> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("json-topic")); ConsumerRecords<String, Map<String, Object>> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, Map<String, Object>> record : records) { Map<String, Object> valueMap = record.value(); System.out.println(valueMap.size()); } consumer.close();
Count the number of keys in the JSON object after deserialization.
The JSON object has three keys: "name", "age", and "city". The deserializer converts it into a Map with three entries, so size() returns 3.