一、时间语义
1.1 为什么会出现时间语义?
flink是一个大数据处理引擎,它的最大特点就是分布式。每一个机器都有自己的时间,那么集群当中的时间应该以什么为准呢?
比如:我们希望统计8-9点的数据时,对并行任务来说并不是“同时”的,收集到的的数据会有误差。
1.1.1 时间类型
事件时间(Event Time):每个事件在对应的设备上发生的时间,也就是数据生成的时间。
处理时间(Pricessing Time):执行处理操作的机器的系统时间
摄取时间(Ingestion Time):事件进入到flink的时间
从1.13版本开始,系统默认使用EventTime事件时间。需要程序员指定时间字段。
1.1.2 哪种时间语义更重要
事件时间语义更为常见。一般情况下,业务日志数据中都会记录数据生成的时间戳,它就可以作为事件时间的判断基础。处理时间是我们计算效率的衡量标准,而事件事件更符合我们的业务计算逻辑。而处理时间是我们计算效率的衡量标准,由于没有任何附加考虑,数据一来就直接处理,因此这种方式可以让流处理延迟降到最低,效率达到最高。
flink1.12版本开始,将事件时间作为默认的时间语义。
二、水位线watermark
既然我们采用事件事件作为时间语义,数据本身在处理转换的过程中会变化, 如果遇到窗口聚合这样的操作,其实是要攒一批数据才会输出一个结果, 那么下游的数据就会变少,时间进度的控制就不够精细了。当我们希望统计8-9点的数据时,如何通知9点往后的窗口来存入数据呢?于是引入了水位线的概念,用来衡量时间时间进展的标记。
2.1 事件时间的窗口
一个数据产生的时刻,就是流处理中事件触发的时间点,即“数据时间”,一般以时间戳的形式作为一个字段记录在数据里。当我们想要统计一段时间内的数据,需要划分时间窗口,这时只要判断一下时间戳就可以知道数据属于哪个时间窗口了。
明确了一个数据的所属窗口,还不能直接进行计算。因为窗口处理的是有界数据,我们要等窗口的数据都到齐了,才能计算最终的统计结果。对于时间窗口来说,窗口的结束时间应该是收集到了所有数据,就可以触发计算输出结果了。
2.2 什么是水位线
水位线可以看作是一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件事件。而它插入流中的位置,就应该是在某个数据到来之后。表示该时间点之前所有的时间都已经到达系统。
1. 有序流的水位线 - 按照时间排序,先发生的事件先到达
数据到来的顺序就是数据生成时的先后顺序。那么水位线也是从小到大不断增长的。针对周期性生成的时间,周期时间以处理时间(系统时间)为标准,而不是事件时间。
2. 乱序流的水位线 - 有可能先发生的事件后到达,后发生的事件先到达
我们插入新的水位线时,先判断一下时间戳是否比之前的打,否则就不再生成新的水位线。也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这是才插入水位线。
考虑到大量数据同时到来的处理效率,我们同样可以周期性的生成水位线。这是只需要保存之前所有数据中的最大时间戳。需要插入水位线时,就直接以它作为时间戳生成新的水位线。
这样会带来一个问题:无法正确处理“迟到”的数据。为了让窗口能够正确收集到迟到的数据,我们可以等上2秒。这就是延迟等待机制(Delay Wait Mechanism)或延迟处理机制(Delay Processing Mechanism)。
相当于是当前时间6:30,而我把我的时钟调成6:25,那么6:30前的人都能赶上这班车。
但这种“等2秒”的策略其实并不能处理所有的乱序数据。我们可以多等几秒,也就是把时钟调的慢一点。最终的目的就是让窗口能够把所有迟到数据都收进来,也就是保证时间进展到了这个时间戳,之后不可能再有迟到数据来了。
3. 水位线的特性
水位线代表当前事件时间时钟,而且可以在数据的时间戳基础上加一些延迟来保证不丢失数据。
特性:
- 水位线时插入到数据流中的一个标记,可以认为是一个特殊的数据
- 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
- 水位线时基于数据的时间戳生成的
- 水位线的时间戳必须单调递增,以保证正确处理乱序数据
- 一个水位线表示在当前流中事件事件已经达到了时间戳t,t之前的所有数据都到齐了,之后流中不会出现时间戳t'<t的数据
总结起来,水位线(watermark)在Flink中的作用是用于处理乱序事件流,确保事件按照正确的顺序进行处理,以便进行准确的窗口计算和延迟处理。也就是, 牺牲掉一定的实时性,为了保证数据的完整性。
2.3 如何生成水位线
1. 生成水位线的总体原则
Flink中的水位线,其实是流处理中对 低延迟 和 结果正确性 的一个权衡机制。而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。
2. 水位线的生成策略
DataStream API中,有一个单独用于生成水位线的方法:.assignTimestampsAndWatermarks(),主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。
水位线使用方法:
val stream = env.addSource(new ClickSource)
val withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(<watermark strategy>)
assignTimestampsAndWatermarks()方法需要传入一个WatermarkStrategy作为参数,就是所谓的“水位线生成策略”。WatermarkStrategy中包含一个“时间戳分配器”TimestampAssigner和一个“水位线生成器”WatermarkGenerator。
public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{
@Override
TimestampAssigner<T>
createTimestampAssigner(TimestampAssignerSupplier.Context context);
@Override
WatermarkGenerator<T>
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}
可以通过调用环境配置的 setAutoWatermarkInterval() 方法来设置周期时间(处理时间),默认为200ms
env.getConfig.setAutoWatermarkInterval(60 * 1000L)
TimestampAssigner:主要负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。
WatermarkGenerator:主要负责按照既定的方式,基于时间戳生成水位线。在 WatermarkGenerator 接口中,主要又有两个方法:onEvent()和 onPeriodicEmit()。
3. flink内置水位线生成器
针对乱序流插入水位线,延迟时间设置为5s
// 此方法用来是设置流 水位线策略 和 指定 事件时间字段
val dataStream2: DataStream[SensorReading] = dataStream.assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(
new SerializableTimestampAssigner[SensorReading] {
override def extractTimestamp(t: SensorReading, l: Long): Long = {
t.timestamp
}
}
)
)
有序流水位线生成器:
WatermarkStrategy.forMonotonousTimestamps()
乱序流水位线生成器:
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))
2.4 水位线的传递
watermark是一条携带时间戳的特殊数据,从代码指定生成的位置,插入到流里面。
在 Flink 的数据流处理中,水位线是以特定的事件元素形式插入到数据流中的。这个特殊的事件元素被称为水位线事件(Watermark Event),它包含了水位线的时间戳信息。当数据流中的水位线事件到达算子(Operator)时,Flink 会根据其时间戳更新当前的水位线。
在源算子(Source Operator)中,可以通过调用特定的方法(如assignTimestampsAndWatermarks)来插入水位线事件。这样,在源算子产生的数据流中就会包含水位线事件,以及普通的数据事件。
然后,水位线事件会随着数据流在不同的算子之间进行传递。当算子处理数据时,它会检查接收到的事件的时间戳,并与当前水位线进行比较。如果事件的时间戳大于当前水位线,算子会更新水位线,并触发相应的操作。
在“重分区”的传输模式下,一个任务有可能会收到来自不同分区上游子任务的数据。而不同分区的子任务时钟并不同步,这时我们应该以最慢的那个时钟,也就是最小的水位线为准。
水位线在上下游任务之间的传递,非常巧妙的避免了分布式系统中没有统一时钟的问题,每个任务都以“处理完之前所有数据”为标准来确定自己的时钟,就可以保证窗口处理的结果总是正确的。
2.5 水位线的总结
我们之前学习过批处理,是指数据积累到一定的程度再进行处理。而Flink是一种流式处理框架。所谓流处理,就是数据来一条数据处理一条。
那么,如果我们的数据是按顺序发送(有序流),那么按照顺序进行处理没有问题。但是消息不在是按照顺序发送,产生了乱序,这时候该怎么处理?于是我们引入了水位线的概念。
1. 水位线有什么用?
水位线就是用来处理乱序时间的。而正确的处理乱序事件,通常用watermark机制结合window来实现。
我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。
但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。
2. 如何使用Watermarks处理乱序的数据流?
什么是乱序?就是数据到达的顺序和时间发生的时间不一致。比如延迟、背压、重试等。我们可以根据
三、窗口window
3.1 窗口的概念
Flink主要是来处理无界数据流的。想要高效处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”。
在flink中,窗口时处理无界流的核心。我们可以想象成一个固定位置的“框”,数据源源不断的六过来,到某个时间点窗口关闭了,就停止收集数据、触发计算并输出结果。如下图所示。
在窗口计算中,一般使用半开半闭的时间范围,即左闭右开。也就是说,窗口的开始时间是包含的,而结束时间是不包含的。因此,当一个事件的事件时间正好等于水位线时间时,它会被包含在该窗口的计算范围内。
但是由于有乱序数据的存在,我们需要设置一个延迟时间来等所有数据到齐。比如设置延迟时间为2秒,这样0~12秒的窗口会在12的数据到来之后,才真正关闭计算输出结果。这样就可以包含迟到的数据了。
3.2 窗口的分类
1. 按照驱动类型分类
窗口可以按照驱动类型分为时间窗口、计数窗口
计数窗口按照某个固定的个数,来截取一段数据集,这种窗口叫计数窗口
时间窗口
flink中有一个时间窗口的类,叫TimeWindow,有两个私有属性:start 和 end。表示窗口的开始和结束的时间戳,单位为毫秒
private final long start;
private final long end;
我们可以调用公有的 getStart() 和 getEnd() 方法直接获取这两个时间戳。另外TimeWindow还提供了maxTimestamp()方法,用来获取窗口中能够包含数据的最大时间戳,窗口中运行的最大时间戳为end - 1,这代表了我们定义的窗口时间范围都是左闭右开的区间[start,end)
public long maxTimestamp(){
return end - 1;
}
计数窗口
基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口。
计数窗口理解简单,只需指定窗口大小,就可以把数据分配到对应的窗口中,Flink内部对应的类来表示计数窗口,底层通过全局窗口(Global Window)实现。
2. 按照窗口分配数据的规则分类
时间窗口、计数窗口只是对窗口的一个大致划分。在具体应用时,还需要定义更加精细的规则,来控制数据应该划分到哪个窗口中去。不同的分配数据的方式,就可以由不同的功能应用。
滚动窗口Tumbling Windows
滚动窗口对数据进行均匀分片。窗口之间没有重叠,也不会有间隔,是首尾相接的状态,如果把多个窗口的创建看作一个窗口的移动,那么他就像在滚动一样。
滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有窗口大小,我们可以定义一个长度为1小时的滚动时间窗口,那么每个小时就会进行一次统计;或者定义一个长度为10的滚动计数窗口,就会每10个数进行一次统计。
滑动窗口Sliding Windows
由窗口大小和滑动距离确定,每个窗口之间有一定重叠部分。滑动窗口是滚动窗口的一种广义方式,当滑动步长等于滑动窗口大小时,就是滚动窗口。
滑动窗口的大小固定,但窗口之间不是首尾相接,而有部分重合。滑动窗口可以基于时间定义、数据个数。
定义滑动窗口的参数与两个:窗口大小,滑动步长。滑动步长是固定的,且代表了两个个窗口开始/结束的时间间隔。数据分配到多个窗口的个数 = 窗口大小/滑动步长
会话窗口Session Windows
会话窗口只能基于时间来定义,“会话”终止的标志就是隔一段时间没有数据来。
size:两个会话窗口之间的最小距离。我们可以设置静态固定的size,也可以通过一个自定义的提取器(gap extractor)动态提取最小间隔gap的值。
在Flink底层,对会话窗口有比较特殊的处理:每来一个新的数据,都会创建一个新的会话窗口,然后判断已有窗口之间的距离,如果小于给定的size,就对它们进行合并操作。在Winodw算子中,对会话窗口有单独的处理逻辑。
会话窗口的长度不固定、起始和结束时间不确定,各个分区窗口之间没有任何关联。会话窗口之间一定是不会重叠的,且会留有至少为size的间隔
全局窗口Global Windows
还有一类比较通用的窗口,就是全局窗口。这种窗口全局有效,会把相同key的所有数据分配到同一个窗口中。说直白点,就是没分窗口一样,这种窗口没有结束的时候,默认是不会做触发计算的,必须编写触发器(Trigger)。
3.3 窗口API概览
按键分区
stream.keyBy(...).window(...)
非按键分区
stream.windowAll(...)
窗口api的使用
stream.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(<window function>)
3.4 窗口分配器
1、时间窗口
时间窗口又可以细分为:滚动、滑动、会话三种
stream.keyBy(...)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(...)
.of()还有一个重载方法,可以传入两个Time类型的参数:size和offset。第二个参数代表窗口起始点的偏移量,比如,标志时间戳是1970年1月1日0时0分0秒0毫秒开始计算的一个毫秒数,这个时间时UTC时间,以0时区为标准,而我们所在的时区为东八区(UTC+8)。我们定义一天滚动窗口时,伦敦时间0但对应北京时间早上8点。那么设定如下就可以得到北京时间每天0点开开启滚动窗口
.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
(2)滑动处理时间窗口
stream.keyBy(...)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)
(3)处理时间会话窗口
stream.keyBy(...)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)
.withGap()方法需要传入一个Time类型的参数size,表示会话的超时时间,也就是最小间隔session gap,静态的
(4)滚动事件时间窗口
stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(...)
(5)滑动事件时间窗口
stream.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)
(6)事件时间会话窗口
stream.keyBy(...)
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)
2、计数窗口
底层是全局窗口,Flink为我们提供了非常方便地接口:直接调用countWindow()方法,根据分配规则的不同,又可以分为滚动计数、滑动计数窗口。
stream.keyBy(...)
.countWindow(10)
(2)滑动计数窗口
stream.keyBy(...)
.countWindow(10,3)
3. 全局窗口
stream.keyBy(...)
.window(GlobalWindows.create());
使用全局窗口,必须自行定义触发器才能实现窗口计算,否则起不到任何作用。
3.5 窗口函数
val dataStream2: DataStream[SensorReading] = dataStream.assignTimestampsAndWatermarks( // 此方法用来是设置流 水位线策略 和 指定 事件时间字段
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(
new SerializableTimestampAssigner[SensorReading] {
override def extractTimestamp(t: SensorReading, l: Long): Long = {
t.timestamp
}
}
)
)
val windowStream: WindowedStream[SensorReading, String, TimeWindow] = dataStream2.keyBy(x => x.id)
.window(TumblingEventTimeWindows.of(Time.seconds(15))) // 滚动窗口