1.Window Join
a)概述
Window join 作用在两个流中有相同 key 且处于相同窗口的元素上,窗口可以通过 window assigner 定义,并且两个流中的元素都会被用于计算窗口的结果。
两个流中的元素在组合之后,会被传递给用户定义的 JoinFunction
或 FlatJoinFunction
,可以用它们输出符合 join 要求的结果。
stream.join(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)
注意:
- 从两个流中创建成对的元素与 inner-join 类似,即一个流中的元素在与另一个流中对应的元素完成 join 之前不会被输出。
- 完成 join 的元素会将他们的 timestamp 设为对应窗口中允许的最大 timestamp。比如一个边界为
[5, 10)
窗口中的元素在 join 之后的 timestamp 为 9。
b)滚动 Window Join
使用滚动 window join 时,所有 key 相同且共享一个滚动窗口的元素会被组合成对,并传递给 JoinFunction
或 FlatJoinFunction
。
行为与 inner join 类似,所以一个流中的元素如果没有与另一个流中的元素组合起来,它就不会被输出!
如图所示,定义了一个大小为 2 毫秒的滚动窗口,即形成了边界为 [0,1], [2,3], ...
的窗口,将每个窗口中的元素组合成对,组合的结果将被传递给 JoinFunction
。
注意:滚动窗口 [6,7]
将不会输出任何数据,因为绿色流当中没有数据可以与橙色流的 ⑥ 和 ⑦ 配对。
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;
orangeStream.join(greenStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
.apply (new JoinFunction<Integer, Integer, String> (){
@Override
public String join(Integer first, Integer second) {
return first + "," + second;
}
});
c)滑动 Window Join
当使用滑动 window join 时,所有 key 相同且处于同一个滑动窗口的元素将被组合成对,并传递给 JoinFunction
或 FlatJoinFunction
,当前滑动窗口内,如果一个流中的元素没有与另一个流中的元素组合起来,它就不会被输出!
注意:在某个滑动窗口中被 join 的元素不一定会在其他滑动窗口中被 join。
本例中定义了长度为两毫秒,滑动距离为一毫秒的滑动窗口,生成的窗口实例区间为 [-1, 0],[0,1],[1,2],[2,3], …
。 X 轴下方是每个滑动窗口中被 join 后传递给 JoinFunction
的元素;图中可以看到橙色 ② 与绿色 ③ 在窗口 [2,3]
中 join,但没有与窗口 [1,2]
中任何元素 join。
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;
orangeStream.join(greenStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
.apply (new JoinFunction<Integer, Integer, String> (){
@Override
public String join(Integer first, Integer second) {
return first + "," + second;
}
});
d)会话 Window Join
使用会话 window join 时,所有 key 相同且组合后符合会话要求的元素将被组合成对,并传递给 JoinFunction
或 FlatJoinFunction
,这个操作同样是 inner join,如果一个会话窗口中只含有某一个流的元素,这个窗口将不会产生输出!
定义了一个间隔为至少一毫秒的会话窗口。图中总共有三个会话,前两者中两个流都有元素,它们被 join 并传递给 JoinFunction
,而第三个会话中,绿流没有任何元素,所以 ⑧ 和 ⑨ 没有被 join!
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;
orangeStream.join(greenStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
.apply (new JoinFunction<Integer, Integer, String> (){
@Override
public String join(Integer first, Integer second) {
return first + "," + second;
}
});