How to Join Streams in Kafka: Syntax and Example
To join streams in Kafka, use the
Kafka Streams API with methods like join(), leftJoin(), or outerJoin() on KStream objects. These methods combine records from two streams based on matching keys within a specified time window.Syntax
The basic syntax to join two Kafka streams involves calling a join method on one KStream and passing the other KStream, a ValueJoiner function, and a JoinWindows time window.
stream1.join(stream2, ValueJoiner, JoinWindows): Inner join of two streams.stream1.leftJoin(stream2, ValueJoiner, JoinWindows): Left join, keeps all records fromstream1.stream1.outerJoin(stream2, ValueJoiner, JoinWindows): Outer join, keeps all records from both streams.
The ValueJoiner defines how to combine values from both streams, and JoinWindows defines the time range for matching records.
java
KStream<K, V1> stream1 = builder.stream("topic1"); KStream<K, V2> stream2 = builder.stream("topic2"); KStream<K, VR> joinedStream = stream1.join( stream2, (value1, value2) -> /* combine value1 and value2 */, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)) );
Example
This example shows how to join two streams of user clicks and user profiles by user ID, combining their values within a 5-minute window.
java
StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> clicks = builder.stream("user-clicks"); KStream<String, String> profiles = builder.stream("user-profiles"); KStream<String, String> joined = clicks.join( profiles, (clickValue, profileValue) -> "Click: " + clickValue + ", Profile: " + profileValue, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)) ); joined.to("joined-output");
Output
Records in 'joined-output' topic will have keys of user IDs and values combining click and profile info if they occur within 5 minutes of each other.
Common Pitfalls
- Missing or incorrect window size: Without a proper
JoinWindows, records may not match as expected. - Key mismatch: Streams must be keyed by the same key type and value for join to work.
- Using wrong join type: Choose
join(),leftJoin(), orouterJoin()based on whether you want inner, left, or full outer join behavior. - Serialization issues: Ensure serializers and deserializers match the data types used in streams.
java
/* Wrong: No window specified - will cause compile error or unexpected behavior */ // stream1.join(stream2, (v1, v2) -> v1 + v2); /* Right: Specify window for join */ stream1.join(stream2, (v1, v2) -> v1 + v2, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)));
Quick Reference
| Join Method | Description | Use Case |
|---|---|---|
| join() | Inner join two streams on matching keys within a time window | When you want records present in both streams |
| leftJoin() | Left join keeps all records from the left stream | When you want all left stream records even if no match in right |
| outerJoin() | Full outer join keeps all records from both streams | When you want all records from both streams regardless of match |
| JoinWindows.ofTimeDifferenceWithNoGrace(Duration) | Defines time window for matching records | Set appropriate window size for your data timing |
Key Takeaways
Use Kafka Streams API join methods with a time window to combine streams by key.
Always specify a JoinWindows duration to control matching time range.
Choose the join type (inner, left, outer) based on your data needs.
Ensure both streams are keyed correctly and use compatible serializers.
Test joins with sample data to verify window size and join logic.