文章目录
- 一 时间语义与Wartermark
- 1 Flink中的时间语义
- 2 EventTime的引入
- 3 Watermark(水位线)
- (1)基本概念
- (2)水位线测试
- a 代码编写
- b 计算水位线
- c 计算结果
- d 深入分析
- (3)水位线时间测试
- a 代码编写
- b 结果分析
- (4)水位线插入时间测试(调优)
一 时间语义与Wartermark
1 Flink中的时间语义
在Flink的流式处理中,会涉及到时间的不同概念,如下图所示:
Event Time(事件时间):是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳。
Ingestion Time(摄入时间):是数据进入Flink的时间。
Processing Time(处理时间):是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time。
例如电影《星球大战》的例子:
时间时间为上面的1-5,处理时间为下面的年份。
再例如,一条日志进入Flink的时间为2022-11-12 10:00:00.123,到达Window的系统时间为2022-11-12 10:00:01.234,日志的内容如下:
2022-11-02 18:37:15.624 INFO Fail over to rm2
对于业务来说,要统计1min内的故障日志个数,eventTime 时间是最有意义,因此要根据日志的生成时间进行统计。Flink 1.12 默认使用事件时间,无需设置。
2 EventTime的引入
在Flink的流式处理中,绝大部分的业务都会使用eventTime,一般只在eventTime无法使用时,才会被迫使用ProcessingTime或者IngestionTime。
如果要使用EventTime,那么需要引入EventTime的时间属性,引入方式如下所示:
但是使用事件时间会带来一个问题,我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的,虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、分布式等原因,导致乱序的产生,所谓乱序,就是指Flink接收到的事件的先后顺序不是严格按照事件的Event Time顺序排列的,如下图:
理想情况:希望12345依次按序到达。
实际情况:145先到达,23然后到达,这时如果一个0-5S的窗口,在接收到数据5时就不能够关闭窗口,因为在其后面还有数据。
乱序事件的影响:
- 当 Flink 以 Event Time 模式处理数据流时,它会根据数据里的时间戳来处理基于时间的算子。
- 由于网络、分布式等原因,会导致乱序数据的产生。
- 乱序数据会让窗口计算不准确。
那么此时出现一个问题,一旦出现乱序,如果只根据eventTime决定window的运行,我们不能明确数据是否全部到位,但又不能无限期的等下去,此时必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了,这个特别的机制,就是Watermark。
3 Watermark(水位线)
使用水位线需要考虑以下问题:
- 怎样避免乱序数据带来计算不正确?
- 遇到一个时间戳达到了窗口关闭时间,不应该立刻触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口
- 要等多长时间?碰到含有 10000s 时间戳的事件,是否可以闭合 0s - 5s 滚动窗口吗?
(1)基本概念
- Watermark是一种衡量Event Time进展的机制。
- Watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark机制结合window来实现。
- 数据流中的Watermark用于表示timestamp小于等于Watermark的数据,Flink认为其都已经到达了,因此,window的执行也是由Watermark触发的(水位线 >= 窗口结束时间)。
- Watermark可以理解成一个延迟触发机制,用来让程序自己平衡延迟和结果正确性,可以设置Watermark的延时时长t,每次系统会校验已经到达的数据中最大的maxEventTime,然后认定eventTime小于maxEventTime - t -1的所有数据都已经到达,如果有窗口的停止时间等于maxEventTime - t - 1,那么这个窗口被触发执行。
- 水位线由程序员编程插入到数据流中,是一种逻辑时钟,对于分布式系统来讲,最重要的一个概念就是逻辑时钟。
- 水位线是一种特殊的事件。
- 在事件时间的世界里,水位线就是时间。
- 水位线 = 观察到的最大时间戳 - 最大延迟时间 - 1 毫秒【因为精度在1ms,所以不会出现0.999毫秒】。
下述文字抽象度较高,如果难以理解,请看关于水位线的测试,然后再回来理解这段文字。
有序流的Watermarker如下图所示(Watermark设置为0):
乱序流的Watermarker如下图所示(Watermark设置为2):
当Flink接收到数据时,会按照一定的规则去生成Watermark,这条Watermark就等于当前所有到达数据中的maxEventTime - 最大延迟时长 -1ms,也就是说,Watermark是基于数据携带的时间戳生成的,一旦Watermark比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口的执行。由于event time是由数据携带的,因此,如果运行过程中无法获取新的数据,那么没有被触发的窗口将永远都不被触发。
上图中,我们设置的允许最大延迟到达时间为2s,所以时间戳为7s的事件对应的Watermark是7s - 2s - 1ms = 4999ms
,时间戳为12s的事件的Watermark是12s - 2s - 1ms = 9999ms
,如果窗口1是0s~5s
,窗口2是5s~10s
,那么时间戳为7s的事件到达时的Watermarker恰好触发窗口1,时间戳为12s的事件到达时的Watermark恰好触发窗口2。
Watermark 就是触发前一窗口的“关窗时间”,一旦触发关窗那么以当前时刻为准,在窗口范围内的所有数据都会收入窗中。
只要没有达到水位线,那么不管现实中的时间推进了多久都不会触发关窗,从这里可以看到,水位线只是在一定程度上解决了数据延迟问题,并不能全部解决乱序问题,那么Flink针对迟到数据也会进行处理,如何处理请见下文。
(2)水位线测试
a 代码编写
为每个用户的PV定义水位线:
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env
// 输入的内容格式类似:'a 1'
.socketTextStream("localhost",9999)
// 需要将原始数据变为元组(a,1000L)
// value为事件时间
.map(new MapFunction<String, Tuple2<String,Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] arr = value.split(" ");
return Tuple2.of(arr[0],Long.parseLong(arr[1]) * 1000L);
}
})
// 抽取时间戳,设置水位线
// 默认每隔200ms的机器时间插入一次水位线
.assignTimestampsAndWatermarks(
// 最大延迟时间设置为5S,并设置数据流中的泛型
WatermarkStrategy.<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
// 告诉Flink,事件时间是数据源中的哪一个字段
return element.f1;
}
})
)
.keyBy(r ->r.f0)
// 开启5S的事件时间滚动窗口
.window(
TumblingEventTimeWindows.of(Time.seconds(5))
)
// 使用全窗口函数进行聚合
.process(
new ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<Tuple2<String, Long>> elements, Collector<String> out) throws Exception {
long windowStart = context.window().getStart();
long windowEnd = context.window().getEnd();
// 获取迭代器里面的元素个数
long count = elements.spliterator().getExactSizeIfKnown();
out.collect("用户【" + key +"】在窗口" + new Timestamp(windowStart) +
" -- " + new Timestamp(windowEnd) + "中的pv次数是" + count);
}
})
.print();
env.execute();
}
b 计算水位线
水位线计算方法:
- 首先在没有任何时间来临之前Flink会在程序中插入一个
负无穷大的水位线
; - 这时输入一个元素
a 1
,每隔200ms向数据流中插入一个水位线,这时的水位线等于1s - 5s - 1ms = -4001ms
,此时数据流中插入一个-4001ms
的水位线【逻辑时钟,可以为负】; - 再输入一个
a 2
,间隔200ms,再插入一个水位线,大小等于2s -5s -1ms = -3001ms
; - 再输入一个
a 5
,此时水位线等于5s - 5s - 1ms = -1ms
; - 再输入一个
a 3
,此时水位线等于5s - 5s - 1ms = -1ms
; - 窗口大小为左闭右开,此时0-5的窗口包含三条元素
a 1,a 2,a 3
,其中a 5
被分配到了5 - 10的窗口中,但因为此时逻辑时钟处于-1ms
,窗口不会关闭; - 再输入一个
a 10
,此时水位线等于10s - 5s - 1ms = 4999ms
,在事件时间的世界中,当前时钟达到定时器时间,关闭0-5的窗口
c 计算结果
运行结果如下:
d 深入分析
水位线是事件时间中唯一的时钟,当水位线到达4999ms,就认为4999ms之前的数据全部到达,但是4999ms之前的数据真的全部到来了吗?答案显然是否定的,如此时再输入a 4
,就进不去0-5的窗口中,那么下面会介绍如何处理迟到数据。
水位线插入的位置:在map操作之前,map操作之后会输出元组,插入的位置就在map的数据流中。这时,给予我们以下启示
- 在做任何分组(keyBy)之前,先插入水位线
- 在插入水位线前,数据流的并行度最好为1,否则随机将数据读取到任务槽,再插入水位线,时钟会乱掉
水位线会插入到数据流中,并跟随着数据进入算子内部的这样一个流动状态,这样不同的算子看到的水位线可能不一样,如在Q算子内部,数据进行了无穷大次数的循环,在其后面有一个ProcessWindowFunction函数,那么Q算子就会将这个数据流阻塞,如b中a 1
的-4001ms的水位线,其不能够跳过这个算子向前传送。这样Q算子看到的时间就是-4001ms,其后的函数看到的时间就是负无穷大,只有当Q算子计算完成,数据流时钟状态才会更新。
(3)水位线时间测试
a 代码编写
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env
.socketTextStream("localhost",9999)
.map(r -> Tuple2.of(r.split(" ")[0],Long.parseLong(r.split(" ")[1]) * 1000L))
.returns(Types.TUPLE(Types.STRING,Types.LONG))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
return element.f1;
}
})
)
.keyBy(r -> r.f0)
.process(new KeyedProcessFunction<String,Tuple2<String,Long>,String>(){
@Override
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<String> out) throws Exception {
out.collect("当前的水位线是:" + ctx.timerService().currentWatermark());
ctx.timerService().registerEventTimeTimer(value.f1 + 5000L);
out.collect("注册了一个时间戳是:【" + new Timestamp(value.f1 + 5000L) + "】的定时器!");
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
super.onTimer(timestamp, ctx, out);
out.collect("定时器触发了!");
}
})
.print();
env.execute();
}
b 结果分析
-
输入
a 1
,立即执行processElement,由于a 1
前面插入了一个负无穷大的水位线,输出当前的水位线是:-9223372036854775808 注册了一个时间戳是:【1970-01-01 08:00:06.0】的定时器!
-
输入‘a 2’,输出
1 - 5 - 1ms
当前的水位线是:-4001 注册了一个时间戳是:【1970-01-01 08:00:07.0】的定时器!
-
输入
a 11
,输出2 - 5 - 1ms
当前的水位线是:-3001 注册了一个时间戳是:【1970-01-01 08:00:16.0】的定时器!
-
输入
a 12
,输出11 -5 - 1ms
当前的水位线是:5999 注册了一个时间戳是:【1970-01-01 08:00:17.0】的定时器! 定时器触发了!
-
输入
a 23
,输出12 - 5 - 1ms
当前的水位线是:6999 注册了一个时间戳是:【1970-01-01 08:00:28.0】的定时器! 定时器触发了!
(4)水位线插入时间测试(调优)
将默认插入水位线的时间改为1分钟,同时将最大延迟时间设置为0s。。
// 每隔6分钟,插入一条水位线
env.getConfig().setAutoWatermarkInterval(60 * 1000L);
快速输入三个a 1
,一个a 5
,两个a 1
,确保输入时间不超过1分钟,最终输出结果a的pv为6,因为输入a 5
将要触发关闭窗口操作时,水位线还没有插入,所以不会关闭,直到1分钟之后,才会关闭。
所以水位线的插入时间和最大延迟时间需要根据一些经验和指标来设置。