目录
Window Join
Tumbling Window Join
Sliding Window Join
Session Window Join
Interval Join
Window CoGroup
Window Join
窗口连接(window join)将两个流的元素连接在一起,这两个流共享一个公共键,并且位于同一窗口。这些窗口可以通过使用窗口分配器来定义,并对来自两个流的元素进行计算。
然后将来自两边的元素传递给用户定义的JoinFunction或FlatJoinFunction,用户可以在其中发出满足连接条件的结果。
一般用法可以概括如下:
stream.join(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>);
两个流stream和otherStream通过join连接在一起,where方法中的KeySelector指定stream流的键,equalTo方法中的KeySelector指定otherStream流的键,window方法为两个流分配公共窗口,最后apply方法通过传入JoinFunction实现join结果的具体逻辑。这非常类似如下SQL:
select
a.id,a.age,b.name
from t_age as a join t_name as b on a.id = b.id
where a.time between 0 and 1 and b.time between 0 and 1
(1)stream.join(otherStream)等价于SQL中的t_age as a join t_name as b的部分,
(2).where(<KeySelector>).equalTo(<KeySelector>)等价于a.id = b.id的部分,
(3)apply(<JoinFunction>)等价于select a.id,a.age,b.name的部分,显示最终join的结果,
(4)window方法等价于a.time between 0 and 1 and b.time between 0 and 1。
关于语义的一些注意事项:
(1)创建两个流的元素成对组合的行为类似于内部连接,这意味着如果一个流中的元素没有来自另一个流的相应元素要连接,则不会发出这些元素。
(2)那些连接后的元素有它们自己的时间戳,将是各自窗口中仍然存在的最大时间戳。例如,以[5,10)为边界的窗口将导致连接后的元素的时间戳为9。
在下一节中,我们将使用一些示例场景概述不同类型的窗口连接的行为。
Tumbling Window Join
在执行滚动窗口连接时,所有具有公共键和公共滚动窗口的元素都以成对组合的形式连接起来,并传递给JoinFunction或FlatJoinFunction。因为它的行为类似于内部连接,所以一个流的元素如果在其滚动窗口中没有来自另一个流的元素,则不会发出。
如图所示,我们定义了一个大小为2毫秒的滚动窗口,其窗体为[0,2),[2,4),....该图像显示了每个窗口中所有元素的成对组合,这些元素将传递给JoinFunction。请注意,在滚动窗口[6,8)中没有发射任何东西,因为绿色流中不存在与橙色元素⑥和⑦连接的元素。
package com.leboop.joining;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.JoinedStreams;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.Arrays;
/**
* Description TODO.
* Date 2024/8/13 16:16
*
* @author leb
* @version 2.0
*/
public class TumblingWindowJoinDemo {
public static void main(String[] args) throws Exception {
// 初始化环境.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从集合中读取数据. Tuple2的第二个分量为事件时间.
DataStreamSource<Tuple2<Integer, Long>> greenStream = env.fromCollection(Arrays.asList(
new Tuple2<>(0, 0L),
new Tuple2<>(1, 1000L),
new Tuple2<>(3, 3000L),
new Tuple2<>(4, 4000L)
));
// 为流添加水位线,并指定时间戳抽取方法.
SingleOutputStreamOperator<Tuple2<Integer, Long>> greenWatermarks =
greenStream.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<Integer, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new MySerializableTimestampAssigner()));
// 从集合中读取数据. Tuple2的第二个分量为事件时间.
DataStreamSource<Tuple2<Integer, Long>> orangeStream = env.fromCollection(Arrays.asList(
new Tuple2<>(0, 0L),
new Tuple2<>(1, 1000L),
new Tuple2<>(2, 2000L),
new Tuple2<>(3, 3000L),
new Tuple2<>(4, 4000L),
new Tuple2<>(5, 5000L),
new Tuple2<>(6, 6000L),
new Tuple2<>(7, 7000L)
));
// 为流添加水位线,并指定时间戳抽取方法.
SingleOutputStreamOperator<Tuple2<Integer, Long>> orangeWatermarks =
orangeStream.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<Integer, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new MySerializableTimestampAssigner()));
// 读取的两个流做join.
JoinedStreams<Tuple2<Integer, Long>, Tuple2<Integer, Long>> joinedStreams =
orangeWatermarks.join(greenWatermarks);
// 两个流指定公共key和公共窗口.
DataStream<String> apply = joinedStreams
.where(new MyKeySelector())
.equalTo(new MyKeySelector())
.window(TumblingEventTimeWindows.of(Time.seconds(2)))
.apply(new MyJoinFunction());
apply.print();
env.execute();
}
static class MyKeySelector implements KeySelector<Tuple2<Integer, Long>, Integer> {
@Override
public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
return 1;
}
}
static class MyJoinFunction implements JoinFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>, String> {
@Override
public String join(Tuple2<Integer, Long> first, Tuple2<Integer, Long> second) throws Exception {
return first.f0 + "," + second.f0;
}
}
static class MySerializableTimestampAssigner implements SerializableTimestampAssigner<Tuple2<Integer, Long>> {
@Override
public long extractTimestamp(Tuple2<Integer, Long> element, long recordTimestamp) {
return element.f1;
}
}
}
代码运行结果:
6> 0,0
6> 0,1
6> 1,0
6> 1,1
6> 2,3
6> 3,3
6> 4,4
6> 5,4
结果与上图一致。
Sliding Window Join
在执行滑动窗口连接时,所有具有公共键和公共滑动窗口的元素都以成对组合的形式连接起来,并传递给JoinFunction或FlatJoinFunction。在当前滑动窗口中,如果一个流中的元素没有另一个流中的元素,则不会发出!请注意,有些元素可能会在一个滑动窗口中连接,而不是在另一个滑动窗口中连接!
在这个例子中,我们使用大小为2毫秒的滑动窗口,并将它们滑动1毫秒,从而产生滑动窗口[- 1,1),[0,2),[1,3),[2,4),....x轴下方的连接元素是传递给每个滑动窗口的JoinFunction的元素。在这里,你也可以看到,例如,橙色②与窗口[2,4)中的绿色③是如何结合在一起的,但没有与窗口[1,2]中的任何东西结合在一起。
package com.leboop.joining;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.JoinedStreams;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.Arrays;
/**
* Description TODO.
* Date 2024/8/13 16:16
*
* @author leb
* @version 2.0
*/
public class SlidingWindowJoinDemo {
public static void main(String[] args) throws Exception {
// 初始化环境.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从集合中读取数据. Tuple2的第二个分量为事件时间.
DataStreamSource<Tuple2<Integer, Long>> greenStream = env.fromCollection(Arrays.asList(
new Tuple2<>(0, 0L),
// new Tuple2<>(1, 1000L),
new Tuple2<>(3, 3000L),
new Tuple2<>(4, 4000L)
));
// 为流添加水位线,并指定时间戳抽取方法.
SingleOutputStreamOperator<Tuple2<Integer, Long>> greenWatermarks =
greenStream.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<Integer, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new MySerializableTimestampAssigner()));
// 从集合中读取数据. Tuple2的第二个分量为事件时间.
DataStreamSource<Tuple2<Integer, Long>> orangeStream = env.fromCollection(Arrays.asList(
new Tuple2<>(0, 0L),
new Tuple2<>(1, 1000L),
new Tuple2<>(2, 2000L),
new Tuple2<>(3, 3000L),
new Tuple2<>(4, 4000L),
new Tuple2<>(5, 5000L),
new Tuple2<>(6, 6000L),
new Tuple2<>(7, 7000L)
));
// 为流添加水位线,并指定时间戳抽取方法.
SingleOutputStreamOperator<Tuple2<Integer, Long>> orangeWatermarks =
orangeStream.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<Integer, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new MySerializableTimestampAssigner()));
// 读取的两个流做join.
JoinedStreams<Tuple2<Integer, Long>, Tuple2<Integer, Long>> joinedStreams =
orangeWatermarks.join(greenWatermarks);
// 两个流指定公共key和公共窗口.
DataStream<String> apply = joinedStreams
.where(new MyKeySelector())
.equalTo(new MyKeySelector())
.window(SlidingEventTimeWindows.of(Time.seconds(2), Time.seconds(1)))
.apply(new MyJoinFunction());
apply.print();
env.execute();
}
static class MyKeySelector implements KeySelector<Tuple2<Integer, Long>, Integer> {
@Override
public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
return 1;
}
}
static class MyJoinFunction implements JoinFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>, String> {
@Override
public String join(Tuple2<Integer, Long> first, Tuple2<Integer, Long> second) throws Exception {
return first.f0 + "," + second.f0;
}
}
static class MySerializableTimestampAssigner implements SerializableTimestampAssigner<Tuple2<Integer, Long>> {
@Override
public long extractTimestamp(Tuple2<Integer, Long> element, long recordTimestamp) {
return element.f1;
}
}
}
程序运行结果如下:
6> 0,0
6> 0,0
6> 1,0
6> 2,3
6> 3,3
6> 3,3
6> 3,4
6> 4,3
6> 4,4
6> 4,4
6> 5,4
运行结果和图中所示结果一致。
Session Window Join
当执行会话窗口连接时,“组合”时满足会话准则的具有相同键的所有元素以成对组合的方式连接并传递给JoinFunction或FlatJoinFunction。这再次执行内部连接,所以如果有一个会话窗口只包含来自一个流的元素,则不会发出任何输出!
这里我们定义了一个会话窗口连接,其中每个会话至少间隔1ms。有三个会话,在前两个会话中,来自两个流的连接元素被传递给JoinFunction。在第三次会话中,绿色流中没有元素,所以⑧和⑨没有连接!
package com.leboop.joining;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.JoinedStreams;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.Arrays;
/**
* Description TODO.
* Date 2024/8/13 16:16
*
* @author leb
* @version 2.0
*/
public class SessionWindowJoinDemo {
public static void main(String[] args) throws Exception {
// 初始化环境.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从集合中读取数据. Tuple2的第二个分量为事件时间.
DataStreamSource<Tuple2<Integer, Long>> greenStream = env.fromCollection(Arrays.asList(
new Tuple2<>(0, 0L),
new Tuple2<>(4, 4000L),
new Tuple2<>(5, 5000L)
));
// 为流添加水位线,并指定时间戳抽取方法.
SingleOutputStreamOperator<Tuple2<Integer, Long>> greenWatermarks =
greenStream.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<Integer, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new MySerializableTimestampAssigner()));
// 从集合中读取数据. Tuple2的第二个分量为事件时间.
DataStreamSource<Tuple2<Integer, Long>> orangeStream = env.fromCollection(Arrays.asList(
new Tuple2<>(1, 1000L),
new Tuple2<>(2, 2000L),
new Tuple2<>(5, 5000L),
new Tuple2<>(6, 6000L),
new Tuple2<>(8, 8000L),
new Tuple2<>(9, 9000L)
));
// 为流添加水位线,并指定时间戳抽取方法.
SingleOutputStreamOperator<Tuple2<Integer, Long>> orangeWatermarks =
orangeStream.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<Integer, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new MySerializableTimestampAssigner()));
// 读取的两个流做join.
JoinedStreams<Tuple2<Integer, Long>, Tuple2<Integer, Long>> joinedStreams =
orangeWatermarks.join(greenWatermarks);
// 两个流指定公共key和公共窗口.
DataStream<String> apply = joinedStreams
.where(new MyKeySelector())
.equalTo(new MyKeySelector())
.window(EventTimeSessionWindows.withGap(Time.seconds(1)))
.apply(new MyJoinFunction());
apply.print();
env.execute();
}
static class MyKeySelector implements KeySelector<Tuple2<Integer, Long>, Integer> {
@Override
public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
return 1;
}
}
static class MyJoinFunction implements JoinFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>, String> {
@Override
public String join(Tuple2<Integer, Long> first, Tuple2<Integer, Long> second) throws Exception {
return first.f0 + "," + second.f0;
}
}
static class MySerializableTimestampAssigner implements SerializableTimestampAssigner<Tuple2<Integer, Long>> {
@Override
public long extractTimestamp(Tuple2<Integer, Long> element, long recordTimestamp) {
return element.f1;
}
}
}
程序运行结果如下:
6> 1,0
6> 2,0
6> 5,4
6> 5,5
6> 6,4
6> 6,5
这与图中所示一致。
Interval Join
区间连接用一个共同的键连接两个流的元素(我们现在称它们为A&B),其中流B的元素具有与流A中元素的时间戳处于相对时间区间的时间戳。下图中橙色流是A流,绿色流是B流。
这也可以更正式地表示为:
b.timestamp∈[a.timestamp + lowerBound,a.timestamp + upperBound]
或
a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
其中a和b是A和B中共享一个公共键的元素。下界lowerBound和上界lowerBound都可以是正负,只要下界总是小于或等于上界。区间连接目前只执行内部连接。
当一对元素被传递给ProcessJoinFunction时,它们将被分配两个元素中较大的时间戳(可以通过ProcessJoinFunction.ontext访问)。
区间连接目前只支持事件时间。
在上面的例子中,我们以-2毫秒的下限和+1毫秒的上限连接两个流“橙色”和“绿色”。默认情况下,这些边界是包含的,但是可以应用. lowerboundexclusive()和. upperboundexclusive()来改变行为。
再次使用更正式的符号,这将转化为
orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts +upperBound
如三角形所示。
package com.leboop.joining;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import java.util.Arrays;
/**
* Description TODO.
* Date 2024/8/13 16:16
*
* @author leb
* @version 2.0
*/
public class IntervalJoinDemo {
public static void main(String[] args) throws Exception {
// 初始化环境.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从集合中读取数据. Tuple2的第二个分量为事件时间.
DataStreamSource<Tuple2<Integer, Long>> greenStream = env.fromCollection(Arrays.asList(
new Tuple2<>(0, 0L),
new Tuple2<>(1, 1000L),
new Tuple2<>(6, 6000L),
new Tuple2<>(7, 7000L)
));
// 为流添加水位线,并指定时间戳抽取方法.
SingleOutputStreamOperator<Tuple2<Integer, Long>> greenWatermarks =
greenStream.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<Integer, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new MySerializableTimestampAssigner()));
KeyedStream<Tuple2<Integer, Long>, Integer> greenKeyedStream = greenWatermarks.keyBy(new MyKeySelector());
// 从集合中读取数据. Tuple2的第二个分量为事件时间.
DataStreamSource<Tuple2<Integer, Long>> orangeStream = env.fromCollection(Arrays.asList(
new Tuple2<>(0, 0L),
new Tuple2<>(2, 2000L),
new Tuple2<>(3, 3000L),
new Tuple2<>(4, 4000L),
new Tuple2<>(5, 5000L),
new Tuple2<>(7, 7000L)
));
// 为流添加水位线,并指定时间戳抽取方法.
SingleOutputStreamOperator<Tuple2<Integer, Long>> orangeWatermarks =
orangeStream.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<Integer, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new MySerializableTimestampAssigner()));
KeyedStream<Tuple2<Integer, Long>, Integer> orangeKeyedStream = orangeWatermarks.keyBy(new MyKeySelector());
// 读取的两个流做join.
SingleOutputStreamOperator<String> intervalJoinStream = orangeKeyedStream.intervalJoin(greenKeyedStream)
.between(Time.seconds(-2), Time.seconds(1))
.process(new ProcessJoinFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>, String>() {
@Override
public void processElement(Tuple2<Integer, Long> left, Tuple2<Integer, Long> right, Context ctx, Collector<String> out) throws Exception {
out.collect(left.f0 + "," + right.f0);
}
});
intervalJoinStream.print();
env.execute();
}
static class MyKeySelector implements KeySelector<Tuple2<Integer, Long>, Integer> {
@Override
public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
return 1;
}
}
static class MyJoinFunction implements JoinFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>, String> {
@Override
public String join(Tuple2<Integer, Long> first, Tuple2<Integer, Long> second) throws Exception {
return first.f0 + "," + second.f0;
}
}
static class MySerializableTimestampAssigner implements SerializableTimestampAssigner<Tuple2<Integer, Long>> {
@Override
public long extractTimestamp(Tuple2<Integer, Long> element, long recordTimestamp) {
return element.f1;
}
}
}
程序运行结果如下:
6> 0,0
6> 0,1
6> 2,0
6> 2,1
6> 3,1
6> 5,6
6> 7,6
6> 7,7
结果与图中一致。
Window CoGroup
Window CoGroup类似SQL中的full outer join。两个流连接,没有匹配上的结果也会输出。根据Window CoGroup根据具体窗口类型不同,也有
(1)Tumbling Window CoGroup
(2)Sliding Window CoGroup
(3)Session Window CoGroup
Window CoGroup的一个例子代码如下:
package com.leboop.joining;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import java.util.Arrays;
/**
* Description TODO.
* Date 2024/8/13 16:16
*
* @author leb
* @version 2.0
*/
public class WindowCogroupDemo {
public static void main(String[] args) throws Exception {
// 初始化环境.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从集合中读取数据. Tuple2的第二个分量为事件时间.
DataStreamSource<Tuple2<Integer, Long>> greenStream = env.fromCollection(Arrays.asList(
new Tuple2<>(0, 0L),
new Tuple2<>(1, 1000L),
new Tuple2<>(3, 3000L),
new Tuple2<>(4, 4000L)
));
// 为流添加水位线,并指定时间戳抽取方法.
SingleOutputStreamOperator<Tuple2<Integer, Long>> greenWatermarks =
greenStream.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<Integer, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new MySerializableTimestampAssigner()));
// 从集合中读取数据. Tuple2的第二个分量为事件时间.
DataStreamSource<Tuple2<Integer, Long>> orangeStream = env.fromCollection(Arrays.asList(
new Tuple2<>(0, 0L),
new Tuple2<>(1, 1000L),
new Tuple2<>(2, 2000L),
new Tuple2<>(3, 3000L),
new Tuple2<>(4, 4000L),
new Tuple2<>(5, 5000L),
new Tuple2<>(6, 6000L),
new Tuple2<>(7, 7000L)
));
// 为流添加水位线,并指定时间戳抽取方法.
SingleOutputStreamOperator<Tuple2<Integer, Long>> orangeWatermarks =
orangeStream.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<Integer, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new MySerializableTimestampAssigner()));
// 读取的两个流做join.
CoGroupedStreams<Tuple2<Integer, Long>, Tuple2<Integer, Long>> coGroupedStreams =
orangeWatermarks.coGroup(greenWatermarks);
// 两个流指定公共key和公共窗口.
DataStream<String> apply = coGroupedStreams
.where(new MyKeySelector())
.equalTo(new MyKeySelector())
.window(TumblingEventTimeWindows.of(Time.seconds(2)))
.apply(new MyCoGroupFunction());
apply.print();
env.execute();
}
static class MyKeySelector implements KeySelector<Tuple2<Integer, Long>, Integer> {
@Override
public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
return value.f0;
}
}
static class MyCoGroupFunction implements CoGroupFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>, String> {
@Override
public void coGroup(Iterable<Tuple2<Integer, Long>> first, Iterable<Tuple2<Integer, Long>> second, Collector<String> out) throws Exception {
out.collect(first + "," + second);
}
}
static class MySerializableTimestampAssigner implements SerializableTimestampAssigner<Tuple2<Integer, Long>> {
@Override
public long extractTimestamp(Tuple2<Integer, Long> element, long recordTimestamp) {
return element.f1;
}
}
}
程序运行结果如下:
1> [(4,4000)],[(4,4000)]
6> [(0,0)],[(0,0)]
6> [(1,1000)],[(1,1000)]
8> [(3,3000)],[(3,3000)]
8> [(2,2000)],[]
8> [(5,5000)],[]
8> [(7,7000)],[]
2> [(6,6000)],[]