watermark
- watermark的作用
就是延迟触发窗口,让乱序到达的元素依然能够落在正确的窗口内。为啥能实现这个效果,一直通过公式更新watermark,如果乱序到的元素就不能更新watermark,相当于就是延迟触发计算操作。 - 触发时间
watermark 大于窗口的最大值 - allowedLateness
允许迟到的时间,到底啥时到的元素算迟到元素,如果元素的窗口满足下面这个公式,那这个窗口就去被清掉,这个元素就会认为是迟到元素。
窗口的最大值是固定值
allowedLateness 是设置的固定值
剩下来看就和watermark有关,如果有新元素不断来,一直更新watermark,那么之前这个窗口很短时间后就会被清理掉了。换个说法说,如果watermark没有更新,如果一直来的元素都满足下面的条件,那么之前的窗口就会一直输出。
window.maxTimestamp() + allowedLateness <=watermark
代码
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> localhost = env.socketTextStream("localhost", 9093);
final OutputTag<Tuple3<String, Long, Integer>> lateTag = new OutputTag<Tuple3<String, Long, Integer>>("late-data") {
};
SingleOutputStreamOperator<Tuple3<String, Long, Integer>> reduce = localhost.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksAdapter.Strategy<>(new BoundedOutOfOrdernessTimestampExtractor<String>(Duration.ofSeconds(0)) {
@Override
public long extractTimestamp(String element) {
String[] split = element.split(",");
return Long.valueOf(split[0]);
}
})).map(new MapFunction<String, Tuple3<String, Long, Integer>>() {
@Override
public Tuple3<String, Long, Integer> map(String value) throws Exception {
String[] split = value.split(",");
return new Tuple3<>(split[1], System.currentTimeMillis(), 1);
}
}).keyBy(new KeySelector<Tuple3<String, Long, Integer>, String>() {
@Override
public String getKey(Tuple3<String, Long, Integer> value) throws Exception {
return value.f0;
}
}).window(TumblingEventTimeWindows.of(Duration.ofMinutes(1))).allowedLateness(Duration.ofMinutes(1)).sideOutputLateData(lateTag).
reduce(new ReduceFunction<Tuple3<String, Long, Integer>>() {
@Override
public Tuple3<String, Long, Integer> reduce(Tuple3<String, Long, Integer> value1, Tuple3<String, Long, Integer> value2) throws Exception {
value1.f2 = value1.f2 + value2.f2;
return value1;
}
});
reduce.print();
reduce.getSideOutput(lateTag).print();
try {
env.execute("aa");
} catch (Exception e) {
throw new RuntimeException(e);
}