Challenge - 5 Problems
Kafka State Store Master
Get all challenges correct to earn this badge!
Test your skills under time pressure!
❓ Predict Output
intermediate2:00remaining
What is the output of this Kafka Streams state store code?
Consider the following Kafka Streams processor code snippet that uses a state store to count occurrences of keys. What will be the printed output after processing the input records?
Kafka
StreamsBuilder builder = new StreamsBuilder(); KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("counts-store"); KStream<String, String> input = builder.stream("input-topic"); input.groupByKey() .count(Materialized.<String, Long>as(storeSupplier)) .toStream() .foreach((key, count) -> System.out.println(key + ": " + count)); // Assume input records processed in order: ("apple", "1"), ("banana", "1"), ("apple", "1")
Attempts:
2 left
💡 Hint
Think about how the count state store updates counts for each key as records arrive.
✗ Incorrect
The count state store increments the count for each key every time a record with that key is processed. The first 'apple' record sets count to 1, then 'banana' is 1, then the second 'apple' increments to 2.
🧠 Conceptual
intermediate1:30remaining
Which statement best describes a Kafka Streams state store?
Choose the correct description of a Kafka Streams state store.
Attempts:
2 left
💡 Hint
Think about where Kafka Streams keeps data it needs to remember during processing.
✗ Incorrect
Kafka Streams state stores are local storage components that hold data such as counts or aggregates. They allow querying and fault-tolerance via changelog topics.
🔧 Debug
advanced2:30remaining
Why does this Kafka Streams code throw a NullPointerException?
Examine the following code snippet that uses a state store. Why does it throw a NullPointerException at runtime?
Kafka
StreamsBuilder builder = new StreamsBuilder();
builder.addStateStore(Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("my-store"),
Serdes.String(),
Serdes.Long()));
KStream<String, String> stream = builder.stream("input-topic");
stream.process(() -> new Processor<String, String>() {
private KeyValueStore<String, Long> store;
@Override
public void init(ProcessorContext context) {
store = (KeyValueStore<String, Long>) context.getStateStore("my-store");
}
@Override
public void process(String key, String value) {
Long count = store.get(key);
store.put(key, (count == null ? 0L : count) + 1);
}
@Override
public void close() {}
}, "my-store");Attempts:
2 left
💡 Hint
Check how the state store is connected to the processor.
✗ Incorrect
The processor must specify the state store name in the process() method call to bind it. Without it, context.getStateStore returns null causing NullPointerException.
📝 Syntax
advanced1:30remaining
Which option correctly creates a persistent key-value state store in Kafka Streams?
Select the correct Java code snippet that creates a persistent RocksDB key-value state store named "persistent-store".
Attempts:
2 left
💡 Hint
Look for the method that creates a persistent key-value store, not in-memory or windowed.
✗ Incorrect
Stores.persistentKeyValueStore creates a RocksDB backed persistent key-value store. In-memory and window stores are different types.
🚀 Application
expert2:00remaining
How many entries will the state store contain after processing these records?
Given a Kafka Streams application with a key-value state store that counts occurrences of keys, and the following input records processed in order:
("cat", "1"), ("dog", "1"), ("cat", "1"), ("bird", "1"), ("dog", "1"), ("cat", "1")
How many unique keys will the state store contain after processing all records?
Attempts:
2 left
💡 Hint
Count how many different keys appear in the input records.
✗ Incorrect
The keys are "cat", "dog", and "bird". The store counts occurrences per key, so it will have 3 entries.