0
0
KafkaHow-ToBeginner · 4 min read

How to Join Streams in Kafka: Syntax and Example

To join streams in Kafka, use the Kafka Streams API with methods like join(), leftJoin(), or outerJoin() on KStream objects. These methods combine records from two streams based on matching keys within a specified time window.
📐

Syntax

The basic syntax to join two Kafka streams involves calling a join method on one KStream and passing the other KStream, a ValueJoiner function, and a JoinWindows time window.

  • stream1.join(stream2, ValueJoiner, JoinWindows): Inner join of two streams.
  • stream1.leftJoin(stream2, ValueJoiner, JoinWindows): Left join, keeps all records from stream1.
  • stream1.outerJoin(stream2, ValueJoiner, JoinWindows): Outer join, keeps all records from both streams.

The ValueJoiner defines how to combine values from both streams, and JoinWindows defines the time range for matching records.

java
KStream<K, V1> stream1 = builder.stream("topic1");
KStream<K, V2> stream2 = builder.stream("topic2");

KStream<K, VR> joinedStream = stream1.join(
    stream2,
    (value1, value2) -> /* combine value1 and value2 */, 
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5))
);
💻

Example

This example shows how to join two streams of user clicks and user profiles by user ID, combining their values within a 5-minute window.

java
StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> clicks = builder.stream("user-clicks");
KStream<String, String> profiles = builder.stream("user-profiles");

KStream<String, String> joined = clicks.join(
    profiles,
    (clickValue, profileValue) -> "Click: " + clickValue + ", Profile: " + profileValue,
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5))
);

joined.to("joined-output");
Output
Records in 'joined-output' topic will have keys of user IDs and values combining click and profile info if they occur within 5 minutes of each other.
⚠️

Common Pitfalls

  • Missing or incorrect window size: Without a proper JoinWindows, records may not match as expected.
  • Key mismatch: Streams must be keyed by the same key type and value for join to work.
  • Using wrong join type: Choose join(), leftJoin(), or outerJoin() based on whether you want inner, left, or full outer join behavior.
  • Serialization issues: Ensure serializers and deserializers match the data types used in streams.
java
/* Wrong: No window specified - will cause compile error or unexpected behavior */
// stream1.join(stream2, (v1, v2) -> v1 + v2);

/* Right: Specify window for join */
stream1.join(stream2, (v1, v2) -> v1 + v2, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)));
📊

Quick Reference

Join MethodDescriptionUse Case
join()Inner join two streams on matching keys within a time windowWhen you want records present in both streams
leftJoin()Left join keeps all records from the left streamWhen you want all left stream records even if no match in right
outerJoin()Full outer join keeps all records from both streamsWhen you want all records from both streams regardless of match
JoinWindows.ofTimeDifferenceWithNoGrace(Duration)Defines time window for matching recordsSet appropriate window size for your data timing

Key Takeaways

Use Kafka Streams API join methods with a time window to combine streams by key.
Always specify a JoinWindows duration to control matching time range.
Choose the join type (inner, left, outer) based on your data needs.
Ensure both streams are keyed correctly and use compatible serializers.
Test joins with sample data to verify window size and join logic.