Complete the code to group the stream by the 'user' field.
KGroupedStream<String, String> groupedStream = stream.[1]();The groupByKey method groups the stream by its existing key, which is 'user' here.
Complete the code to count the number of records per key.
KTable<String, Long> counts = groupedStream.[1]();The count method counts the number of records for each key in the grouped stream.
Fix the error in the aggregation code by completing the missing method.
KTable<String, Integer> aggregated = groupedStream.[1](() -> 0, (key, value, aggregate) -> aggregate + Integer.parseInt(value));
The aggregate method allows custom aggregation with initializer and aggregator functions.
Fill both blanks to group the stream by 'category' and sum the values as integers.
KTable<String, Integer> summed = stream.mapValues(Integer::parseInt).[1]( (key, value) -> key.split(":")[0] ).[2](Integer::sum);
groupBy groups by a new key extracted from the original key. reduce sums the integer values.
Fill the blanks to create a grouped stream by 'type', aggregate values starting at 0, and add parsed integers.
KTable<String, Integer> result = stream.[1]( (key, value) -> value.split(",")[0] ).[2](() -> 0, (aggKey, newValue, aggValue) -> aggValue + Integer.parseInt(newValue));
groupBy groups by the first part of the value, aggregate initializes and updates the aggregation, and mapValues is not used here but included as a distractor.