Consider the following Kafka Streams code snippet that groups records by key and counts the number of records per key.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("input-topic");
KTable<String, Long> counts = stream
.groupByKey()
.count();
counts.toStream().foreach((key, value) -> System.out.println(key + ":" + value));If the input topic contains the records with keys: a, b, a, a, b, what will be printed?
StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> stream = builder.stream("input-topic"); KTable<String, Long> counts = stream .groupByKey() .count(); counts.toStream().foreach((key, value) -> System.out.println(key + ":" + value));
Count counts how many records exist per key.
The count() aggregation counts the number of records per key. Since the input keys are 'a', 'b', 'a', 'a', 'b', the counts are 'a' = 3 and 'b' = 2.
Given the following Kafka Streams code that groups records by key and reduces values by concatenation:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("input-topic");
KTable<String, String> reduced = stream
.groupByKey()
.reduce((v1, v2) -> v1 + v2);
reduced.toStream().foreach((key, value) -> System.out.println(key + ":" + value));If the input topic contains records with keys and values: (a, x), (b, y), (a, z), (a, w), (b, q), what will be printed?
StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> stream = builder.stream("input-topic"); KTable<String, String> reduced = stream .groupByKey() .reduce((v1, v2) -> v1 + v2); reduced.toStream().foreach((key, value) -> System.out.println(key + ":" + value));
Reduce concatenates values in the order they arrive per key.
The reduce function concatenates values for each key in the order they arrive. For key 'a', values are 'x', 'z', 'w' concatenated as 'xzw'. For key 'b', values are 'y', 'q' concatenated as 'yq'.
Consider this Kafka Streams aggregation that sums integer values per key:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Integer> stream = builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.Integer()));
KTable<String, Integer> aggregated = stream
.groupByKey()
.aggregate(
() -> 0,
(key, value, aggregate) -> aggregate + value
);
aggregated.toStream().foreach((key, value) -> System.out.println(key + ":" + value));If the input topic contains records: (a, 2), (b, 3), (a, 5), (a, -1), (b, 4), what will be printed?
StreamsBuilder builder = new StreamsBuilder(); KStream<String, Integer> stream = builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.Integer())); KTable<String, Integer> aggregated = stream .groupByKey() .aggregate( () -> 0, (key, value, aggregate) -> aggregate + value ); aggregated.toStream().foreach((key, value) -> System.out.println(key + ":" + value));
Aggregate sums values starting from 0 per key.
The aggregate starts at 0 for each key. For 'a': 0 + 2 + 5 + (-1) = 6. For 'b': 0 + 3 + 4 = 7.
Examine this Kafka Streams code snippet:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("input-topic");
KTable<String, Long> counts = stream
.groupBy((key, value) -> value)
.count();What error will this code raise when compiled or run?
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("input-topic");
KTable<String, Long> counts = stream
.groupBy((key, value) -> value)
.count();Check the expected input type for groupBy.
The groupBy method expects a KeyValueMapper<K, V, KR> that returns a key type. The lambda returns only the value, which is a String, but the method signature expects a KeyValueMapper that returns a key type. This mismatch causes a compilation error.
Given a Kafka Streams KStream with records having keys: a, b, c, a, b, a and values: 1, 2, 3, 4, 5, 6, the following aggregation is performed:
KTable<String, Integer> result = stream
.groupByKey()
.aggregate(
() -> 0,
(key, value, aggregate) -> aggregate + value
);After processing all records, how many entries will the result KTable contain?
Count distinct keys after grouping.
The aggregation groups by key. The keys are 'a', 'b', and 'c'. So the resulting KTable will have 3 entries, one per distinct key.