6.1 时间语义
6.1.1 Flink中的时间语义
对于一台机器而言,时间就是系统时间。但是Flink是一个分布式处理系统,多台机器“各自为政”,没有统一的时钟,各自有各自的系统时间。而对于并行的子任务来说,在不同的节点,系统时间就会有所差异。
我们知道一个集群有JobManager,作为管理者,是不是让它统一向所有 TaskManager 发送同步时钟信号就行了呢?这也是不行的。因为网络传输会有延迟,而且这延迟是不确定的,所以 JobManager 发出的同步信号无法同时到达所有节点;想要拥有一个全局统一的时钟,在分布式系统里是做不到的。
另一个麻烦的问题是,在流式处理的过程中,数据是在不同的节点间不停流动的,这同样也会有网络传输的延迟。例如,上游任务在 8 点 59 分 59 秒发出一条数据,到下游要做窗口计算时已经是 9 点01 秒了,那这条数据到底该不该被收到 8 点~9 点的窗口呢?
流式数据处理过程:事件发生->生成数据->进入分布式消息队列->源算子读取->转换算子(窗口算子)做处理->输出算子输出
两个重要的时间点:数据的产生时间——事件时间;转换算子(窗口算子)处理的事件——处理时间
我们在定义窗口操作时,到底以哪种时间作为衡量标准,就是所谓的时间语义。
1、处理时间
执行处理操作的机器的系统时间
2、事件时间
事件在对应设备上发生的时间,也就是数据生成的时间
举例:用户在手机上点击某个按钮生成点击事件,点击时手机上的时间是8:59:59,数据传送到某个节点进行计算处理时的时间为9:00:01。
8:59:59——事件时间
9:00:01——处理时间如果我们以事件时间为准,则这条数据属于8——9点,如果我们以处理时间为准,则这条数据属于9——10点
在实际应用中,由于分布式系统中网络传输延迟的不确定性,数据达到的顺序往往是乱序的。例如:我现在以事件时间为准,进行统计每一个小时的点击量。现在有三个点击事件,事件时间分别为 a——8:50:00、b——8:59:59、 c——9:00:01,但是到达时间分别为9:02:00、09:01:00、08:58:00。可以看到c事件最先到达。那么当窗口接收到c事件时,c事件的事件时间是9:00:01,这时窗口认为现在已经过了9点了。应该马上统计8——9点的点击量。但是a、b事件由于延迟,在c事件到达之后才到达。导致没有被统计到8——9点的点击量中。
所以还不能简单的使用事件时间来当作时钟,还需要用另外的标志来表示事件时间的进展。这个标志我们称之为“水位线”。
6.1.2 哪种语义更重要
两种语义都有各自适用的场景。通常来说处理时间是计算效率的衡量标准,而事件时间更符合业务的计算逻辑。
处理时间一般用在实时性极高,但结果准确性要求不高的的场景。事件时间语义是以一定延迟为代价,换来了处理结果的正确性。
除了事件时间和处理时间,Flink 还有一个“摄入时间”(Ingestion Time)的概念,它是指数据进入 Flink 数据流的时间,也就是 Source 算子读入数据的时间。摄入时间相当于是事件时间和处理时间的一个中和,它是把 Source 任务的处理时间,当作了数据的产生时间添加到数据里。这种时间语义可以保证比较好的正确性,同时又不会引入太大的延迟。它的具体行为跟事件时间非常像,可以当作特殊的事件时间来处理。
6.2 水位线
6.2.1 事件时间和窗口
前面已经讲过,一个数据产生的时刻,就是流处理中事件触发的时间点,这就是“事件时间”。有时候我们不是来一个数据就处理输出,而是要计算一段时间内的数。比如实时统计每个小时的点击量。例如:8——9点的点击量,需要等数据到齐了才能统计输出。那么这个8——9点就是一个窗口。而这里1个小时就是窗口的大小。
6.2.2 什么是水位线
只通过事件时间来判断是否一个窗口的数据已经到齐是不行的。我们可以基于事件时间去自定义一个时钟,用来表示当前时间的进展。例如:我们定义一个时钟,这个时钟的时间逻辑是比事件时间晚5分钟。当一个数据过来,它的事件时间是9:00:00,这时窗口会认为是8:55:00。这时,窗口认为还没有到9点,所以8——9点的窗口统计还不到时间。会再等等,等收到大于或等于9:05:00的数据时才会进行统计。这样如果有事件时间为8:58:00的数据在9:04:00才到来时,也能够被统计到8——9点的窗口中。因为9:04:00的窗口时间是8:59:00并没有到9点。
我们定义的这个时钟,是用来衡量事件时间进展的,是一个逻辑时钟。
但仅仅通过定义一个逻辑时钟,还不够。还存在以下问题:
- 当窗口聚合时,要攒一批数据才会输出结果,那么给下游的数据就会变少,时间进度的控制就不够精细了
- 数据向下游任务传递时,一般只能传输给一个子任务(除广播外),这样其他的并行子任务的时钟就无法推进了,不能进行窗口计算。
解决办法:
在数据流中加入一个时钟标记,记录当前的事件时间;这个标记可以直接广播到下游,当下游任务收到这个标记,就可以更新自己的时钟了。由于类似于水流中用来做标志的记号,在 Flink 中,这种用来衡量事件时间(Event Time)进展的标记,就被称作**“水位线”(Watermark)**。
1、有序流中的水位线
在理想状态下,数据应该按照它们生成的先后顺序、排好队进入流中;这样的话我们从每个数据中提取时间戳,就可以保证总是从小到大增长的,从而插入的水位线也会不断增长、事件时钟不断向前推进。
实际应用中,如果当前数据量非常大,可能会有很多数据的时间戳是相同的,这时每来一条数据就提取时间戳、插入水位线就做了大量的无用功。而且即使时间戳不同,同时涌来的数据时间差会非常小(比如几毫秒),往往对处理计算也没什么影响。所以为了提高效率,一般会每隔一段时间生成一个水位线,这个水位线的时间戳,就是当前最新数据的时间戳。这里周期时间是指处理时间(系统时间),而不是事件时间
2、乱序流中的水位线
我们知道在分布式系统中,数据在节点间传输,会因为网络传输延迟的不确定性, 导致顺序发生改变,这就是所谓的“乱序数据”。这里所说的“乱序”(out-of-order),是指数据的先后顺序不一致,主要就是基于数据的产生时间而言的。
最直观的想法自然是跟之前一样,我们还是靠数据来驱动,每来一个数据就提取它的时间戳、插入一个水位线。不过现在的情况是数据乱序,所以有可能新的时间戳比之前的还小,如果直接将这个时间的水位线再插入,我们的“时钟”就回退了——水位线就代表了时钟,时光不能倒流,所以水位线的时间戳也不能减小。解决思路也很简单:我们插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就不再生成新的水位线。
如果考虑到大量数据同时到来的处理效率,我们同样可以周期性地生成水位线。这时只需要保存一下之前所有数据中的最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新的水位线。这样做尽管可以定义出一个事件时钟,却也会带来一个非常大的问题:我们无法正确处理“迟到”的数据。为了了让窗口能够正确收集到迟到的数据,我们也可以等上 2 秒;也就是用当前已有数据的最大时间戳减去 2 秒,就是要插入的水位线的时间戳
如果仔细观察,我们可以知道,这种“等 2 秒”的策略其实并不能处理所有的乱序数据。因为有时候不知道最大延迟是多少。当你设置了等10秒,但是这时有条数据晚了20秒,就会被遗漏丢弃。所以这个时候我们需要去单独处理“迟到”的数据。后面会讲解这种情况的处理。
需要注意的地方:
1.由于水位线是周期性生成的,所以插入的位置不一定是在时间戳最大的数据后面。
2.这里一个窗口所收集的数据,并不是之前所有已经到达的数据。因为数据属于哪个窗口,是由数据本身的时间戳决定的,一个窗口只会收集真正属于它的那些数据。例如上图中尽管水位线 W(20)之前有时间戳为 22 的数据到来,但是10~20 秒的窗口中也不会收集这个数据,进行计算依然可以得到正确的结果。
3、水位线的特性
-
水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
-
水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
-
水位线是基于数据的时间戳生成的
-
水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
-
水位线可以通过设置延迟,来保证正确处理乱序数据
-
一个水位线 Watermark(t),表示在当前流中事件时间已经达到了时间戳 t, 这代表 t 之前的所有数据都到齐了,之后流中不会出现时间戳 t’ ≤ t 的数据
6.2.3 如何生成水位线
生成水位线其实就是定义我们的时钟的逻辑。
1、水位线生成的总体原则
我们知道,完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。而完美的东西总是可望不可及,我们只能尽量去保证水位线的正确。如果对结果正确性要求很高、想要让窗口收集到所有数据,我们该怎么做呢?一个字,等。由于网络传输的延迟不确定,为了获取所有迟到数据,我们只能等待更长的时间。作为筹划全局的程序员,我们当然不会傻傻地一直等下去。那到底等多久呢?如果等太久,那很多迟到的数据基本不会遗漏,但是程序输出延迟会增加。如果等待时间短,那迟到的数据会被遗漏,结果的准确性难以保证。
所以水位线生成的总体原则:权衡低延迟和结果正确性
常用解决方案:
a.需要对相关领域有一定的了解了,根据业务来定夺。
b.可以单独创建一个 Flink 作业来监控事件流,建立概率分布或者机器学习模型,学习事件的迟到规律。得到分布规律之后,就可以选择置信区间来确定延迟,作为水位线的生成策略了。例如,如果得到数据的迟到时间服从μ=1,σ=1 的正态分布,那么设置水位线延迟为 3 秒,就可以保证至少 97.7%的数据可以正确处理。
c.对迟到数据单独处理
2、水位线生成策略
在Flink的DataStream API中,有一个单独用于生成水位线的方法。为流中的数据分配时间戳,并生成水位线。
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(WatermarkStrategy<T> watermarkStrategy)
数据本身不是有时间戳吗,为什么还要为数据分配时间戳呢?
这是因为原始的时间戳只是写入日志数据的一个字段,如果不提取出来并明确把它分配给数据, Flink 是无法知道数据真正产生的时间的。
参数:
watermarkStrategy: 水位线策略
WatermarkStrategy继承了TimestampAssignerSupplier——时间分配器和WatermarkGeneratorSupplier——水位线生成器
public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>
TimestampAssigner
:主要负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。
WatermarkGenerator
:主要负责按照既定的方式, 基于时间戳生成水位线。在WatermarkGenerator 接口中,主要又有两个方法:onEvent()和 onPeriodicEmit()
onEvent
:每个事件(数据)到来都会调用的方法,它的参数有当前事件、时间戳, 以及允许发出水位线的一个 WatermarkOutput,可以基于事件做各种操作
onPeriodicEmit
:onPeriodicEmit:周期性调用的方法,可以由 WatermarkOutput 发出水位线。周期时间为处理时间,可以调用环境配置的.setAutoWatermarkInterval()方法来设置,默认为
200ms
3、Flink内置水位线生成器
atermarkStrategy 这个接口是一个生成水位线策略的抽象,让我们可以灵活地实现自己的需求;
Flink提供了内置的水位线生成器(WatermarkGenerator),不仅开箱即用简化了编程,而且也为我们自定义水位线策略提供了模板
(1)有序流
对于有序流,主要特点就是时间戳单调增长(Monotonously Increasing Timestamps),所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景。简单来说,就是直接拿当前最大的时间戳作为水位线就可以了
WatermarkStrategy.forMonotonousTimestamps()
(2)乱序流
由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间
WatermarkStrategy.forBoundedOutOfOrderness(Duration maxOutOfOrderness)
这个方法需要传入一个 maxOutOfOrderness 参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;
4、自定义水位线
两种:
- 周期性水位线生成器:周期性调用的方法onPeriodicEmit()中发出水位线
- 断点式水位线生成器:在事件触发的方法onEvent()中发出水位线
(1)周期性水位线生成器(Periodic Generator)
周期性生成器一般是通过 onEvent()观察判断输入的事件,而在 onPeriodicEmit()里发出水位线。
public class CustomWatermarkStrategy implements WatermarkStrategy<Event> {
@Override
public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
SerializableTimestampAssigner<Event> timestampAssigner = new SerializableTimestampAssigner<Event>(){
@Override
public long extractTimestamp(Event event, long recordTimestamp) {
return event.getTimestamp();
}
};
return timestampAssigner;
}
@Override
public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
WatermarkGenerator<Event> watermarkGenerator = new WatermarkGenerator<Event>(){
// 延迟
private Long delayTime = 5000L;
// 观察到的最大时间戳,这里+ delayTime + 1L是为了防止溢出
private Long maxTs = Long.MIN_VALUE + delayTime + 1L;
@Override
public void onEvent(Event event, long l, WatermarkOutput watermarkOutput) {
// 更新最大时间戳
maxTs = Math.max(event.getTimestamp(), maxTs);
}
@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
// 发射水位线,默认 200ms 调用一次
watermarkOutput.emitWatermark(new Watermark(maxTs - delayTime - 1L));
}
};
return watermarkGenerator;
}
}
(2)断点式水位生成器
断点式生成器会不停地检测 onEvent()中的事件,当发现带有水位线信息的特殊事件时, 就立即发出水位线。
public class CustomWatermarkStrategy implements WatermarkStrategy<Event> {
@Override
public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
SerializableTimestampAssigner<Event> timestampAssigner = new SerializableTimestampAssigner<Event>(){
@Override
public long extractTimestamp(Event event, long recordTimestamp) {
return event.getTimestamp();
}
};
return timestampAssigner;
}
@Override
public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
WatermarkGenerator<Event> watermarkGenerator = new WatermarkGenerator<Event>(){
@Override
public void onEvent(Event event, long l, WatermarkOutput watermarkOutput) {
// 只有在遇到特定的 itemId 时,才发出水位线
if (event.user.equals("Mary")) {
watermarkOutput.emitWatermark(new Watermark(event.timestamp - 1));
}
}
@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
// 不需要做任何事情
}
};
return watermarkGenerator;
}
}
5、在自定义数据源中发送水位线
我们也可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks 方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用 assignTimestampsAndWatermarks 方法生成水位线二者只能取其一。
public class ClickSource extends RichParallelSourceFunction<Event> {
// 声明一个布尔变量,作为控制数据生成的标识位
private Boolean running = true;
Random random = new Random();
@Override
public void run(SourceContext ctx) throws Exception {
// 在指定的数据集中随机选取数据
String[] users = {"Mary", "Alice", "Bob", "Cary"};
String[] urls = {"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2"};
while (Boolean.TRUE.equals(running)){
Event event = new Event(
users[random.nextInt(users.length)],
urls[random.nextInt(urls.length)],
Calendar.getInstance().getTimeInMillis());
ctx.collectWithTimestamp(event, event.getTimestamp());
ctx.emitWatermark(new Watermark(event.timestamp - 1L));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
System.out.println("open" + getRuntimeContext().getIndexOfThisSubtask());
}
@Override
public void close() throws Exception {
super.close();
System.out.println("close" + getRuntimeContext().getIndexOfThisSubtask());
}
}
6.2.4 水位线的传递
我们知道水位线是数据流中插入的一个标记,用来表示事件时间的进展,它会随着数据一起在任务间传递。如果只是直通式(forward)的传输,那很简单,数据和水位线都是按照本身的顺序依次传递、依次处理的;一旦水位线到达了算子任务, 那么这个任务就会将它内部的时钟设为这个水位线的时间戳。
在这里,“任务的时钟”其实仍然是各自为政的,并没有统一的时钟。实际应用中往往上下游都有多个并行子任务,为了统一推进事件时间的进展,我们要求上游任务处理完水位线、时钟改变之后,要把当前的水位线再次发出,广播给所有的下游子任务。这样,后续任务就不需要依赖原始数据中的时间戳(经过转化处理后,数据可能已经改变了),也可以知道当前事件时间了
可是还有另外一个问题,那就是在“重分区”(redistributing)的传输模式下,一个任务有可能会收到来自不同分区上游子任务的数据。而不同分区的子任务时钟并不同步,所以同一时刻发给下游任务的水位线可能并不相同。这时下游任务又该听谁的呢?
这就要回到水位线定义的本质了:它表示的是“当前时间之前的数据,都已经到齐了”。这是一种保证,告诉下游任务“只要你接到这个水位线,就代表之后我不会再给你发更早的数据了,你可以放心做统计计算而不会遗漏数据”。所以如果一个任务收到了来自上游并行任务的不同的水位线,说明上游各个分区处理得有快有慢,进度各不相同比如上游有两个并行子任务都发来了水位线,一个是 5 秒,一个是 7 秒;这代表第一个并行任务已经处理完 5 秒之前的
所有数据,而第二个并行任务处理到了 7 秒。那这时自己的时钟怎么确定呢?当然也要以“这之前的数据全部到齐”为标准。如果我们以较大的水位线 7 秒作为当前时间,那就表示“7 秒前的数据都已经处理完”,这显然不是事实——第一个上游分区才处理到 5 秒,5~7 秒的数据还会不停地发来;而如果以最小的水位线 5 秒作为当前时钟就不会有这个问题了,因为确实所
有上游分区都已经处理完,不会再发 5 秒前的数据了。这让我们想到“木桶原理”:所有的上游并行任务就像围成木桶的一块块木板,它们中最短的那一块,决定了我们桶中的水位。
我们可以用一个具体的例子,将水位线在任务间传递的过程完整梳理一遍。如图 6-12 所示,当前任务的上游,有四个并行子任务,所以会接收到来自四个分区的水位线;而下游有三个并行子任务,所以会向三个分区发出水位线。具体过程如下:
(1)上游并行子任务发来不同的水位线,当前任务会为每一个分区设置一个“分区水位线”(Partition Watermark),这是一个分区时钟;而当前任务自己的时钟,就是所有分区时钟里最小的那个。
(2)当有一个新的水位线(第一分区的 4)从上游传来时,当前任务会首先更新对应的分区时钟;然后再次判断所有分区时钟中的最小值,如果比之前大,说明事件时间有了进展,当前任务的时钟也就可以更新了。这里要注意,更新后的任务时钟,并不一定是新来的那个分区水位线,比如这里改变的是第一分区的时钟,但最小的分区时钟是第三分区的 3,于是当前任务时钟就推进到了 3。当时钟有进展时,当前任务就会将自己的时钟以水位线的形式,广播给下游所有子任务。
(3)再次收到新的水位线(第二分区的 7)后,执行同样的处理流程。首先将第二个分区时钟更新为 7,然后比较所有分区时钟;发现最小值没有变化,那么当前任务的时钟也不变,也不会向下游任务发出水位线。
(4)同样道理,当又一次收到新的水位线(第三分区的 6)之后,第三个分区时钟更新为6,同时所有分区时钟最小值变成了第一分区的 4,所以当前任务的时钟推进到 4,并发出时间戳为 4 的水位线,广播到下游各个分区任务。水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题, 每个任务都以“处理完之前所有数据”为标准来确定自己的时钟,就可以保证窗口处理的结果总是正确的。对于有多条流合并之后进行处理的场景,水位线传递的规则是类似的。
备注:
水位线的默认计算公式:水位线 = 观察到的最大事件时间 – 最大延迟时间 – 1 毫秒
在数据流开始之前,Flink 会插入一个大小是负无穷大(在 Java 中是-Long.MAX_VALUE) 的水位线,而在数据流结束时,Flink 会插入一个正无穷大(Long.MAX_VALUE)的水位线,保证所有的窗口闭合以及所有的定时器都被触发。对于离线数据集,Flink 也会将其作为流读入,也就是一条数据一条数据的读取。在这种情况下,Flink 对于离线数据集,只会插入两次水位线,也就是在最开始处插入负无穷大的水位线,在结束位置插入一个正无穷大的水位线。因为只需要插入两次水位线,就可以保证计算的正确,无需在数据流的中间插入水位线了
6.3 窗口
我们已经了解了 Flink 中事件时间和水位线的概念,那它们有什么具体应用呢?当然是做基于时间的处理计算了。其中最常见的场景,就是窗口聚合计算。
之前我们已经了解了 Flink 中基本的聚合操作。在流处理中,我们往往需要面对的是连续不断、无休无止的无界流,不可能等到所有所有数据都到齐了才开始处理。所以聚合计算其实只能针对当前已有的数据——之后再有数据到来,就需要继续叠加、再次输出结果。这样似乎很“实时”,但现实中大量数据一般会同时到来,需要并行处理,这样频繁地更新结果就会给系统带来很大负担了。
更加高效的做法是,把无界流进行切分,每一段数据分别进行聚合,结果只输出一次。这就相当于将无界流的聚合转化为了有界数据集的聚合,这就是所谓的“窗口”(Window)聚合操作。窗口聚合其实是对实时性和处理效率的一个权衡。在实际应用中,我们往往更关心一段时间内数据的统计结果,比如在过去的 1 分钟内有多少用户点击了网页。在这种情况下,我们就可以定义一个窗口,收集最近一分钟内的所有用户点击数据,然后进行聚合统计,最终输出一个结果就可以了。
6.3.1 窗口的概念
所以在 Flink 中,窗口其实并不是一个“框”,流进来的数据被框住了就只能进这一个窗口。相比之下,我们应该把窗口理解成一个“桶”,如图 6-15 所示。在 Flink 中,窗口可以把流切割成有限大小的多个“存储桶”(bucket);每个数据都会分发到对应的桶中,当到达窗口结束时间时,就对每个桶中收集的数据进行计算处理。
我们可以梳理一下事件时间语义下,之前例子中窗口的处理过程:
(1)第一个数据时间戳为 2,判断之后创建第一个窗口[0, 10),并将 2 秒数据保存进去;
(2)后续数据依次到来,时间戳均在 [0, 10)范围内,所以全部保存进第一个窗口;
(3)11 秒数据到来,判断它不属于[0, 10)窗口,所以创建第二个窗口[10, 20),并将 11秒的数据保存进去。由于水位线设置延迟时间为 2 秒,所以现在的时钟是 9 秒,第一个窗口也没有到关闭时间;
(4)之后又有 9 秒数据到来,同样进入[0, 10)窗口中;
(5)12 秒数据到来,判断属于[10, 20)窗口,保存进去。这时产生的水位线推进到了 10 秒,所以 [0, 10)窗口应该关闭了。第一个窗口收集到了所有的 7 个数据,进行处理计算后输出结果,并将窗口关闭销毁;
(6)同样的,之后的数据依次进入第二个窗口,遇到 20 秒的数据时会创建第三个窗口[20,
30)并将数据保存进去;遇到 22 秒数据时,水位线达到了 20 秒,第二个窗口触发计算,输出结果并关闭。
这里需要注意的是,Flink 中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。另外,这里我们认为到达窗口结束时间时, 窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开
其实就是水位线只是告知不会再收到某个时间点后面的数据了。比如来了个水位线W(10),说明不会再收到小于10的数据了,尽管已经收到了事件时间大于10的数据了,比如上图收到12、11。但是统计的时候是按照事件时间去统计的。w(10)来的时候会把事件时间落在[0,10)的数据进行统计。
6.3.2 窗口的分类
1、按照驱动类型分类
(1)时间窗口
就是按照时间段去截取数据
时间窗口类:TimeWindow
(2)计数窗口
计数窗口基于元素的个数来截取数据
为什么不把窗口区间定义成左开右闭、包含上结束时间呢?这样maxTimestamp 跟 end 一致,不就可以省去一个方法的定义吗?
答:这主要是为了方便判断窗口什么时候关闭。对于事件时间语义,窗口的关闭需要水位线推进到窗口的结束时间;而我们知道,水位线 Watermark(t)代表的含义是“时间戳小于等于 t 的数据都已到齐,不会再来了”。为了简化分析,我们先不考虑乱序流设置的延迟时间。那么当新到一个时间戳为 t 的数据时,当前水位线的时间推进到了 t – 1(还记得乱序流里生成水位线的减一操作吗?)。所以当时间戳为 end 的数据到来时,水位线推进到了 end - 1;如果我们把窗口定义为不包含 end,那么当前的水位线刚好就是 maxTimestamp,表示窗口能够包含的数据都已经到齐,我们就可以直接关闭窗口了。所以有了这样的定义,我们就不需要再去考虑那烦人的“减一”了,直接看到时间戳为 end 的数据,就关闭对应的窗口。如果为乱序流设置了水位线延迟时间 delay,也只需要等到时间戳为 end + delay 的数据,就可以关窗了。
2、按照窗口分配数据的规则分类
(1)滚动窗口
按照固定大小(可以是固定的时间间隔或者是固定的数据数量),对数据进行划分,窗口间没有重叠。
(2)滑动窗口
滑动窗口的大小也是固定的。区别在于,窗口之间并不是首尾相接的, 而是可以“错开”一定的位置。如果看作一个窗口的运动,那么就像是向前小步“滑动”一样。同样可以基于时间和计数。
(3)会话窗口
会话窗口顾名思义,是基于“会话”(session)来来对数据进行分组的。这里的会话类似 Web 应用中 session 的概念,不过并不表示两端的通讯过程,而是借用会话超时失效的机制来描述窗口。简单来说,就是数据来了之后就开启一个会话窗口,如果接下来还有数据陆续到来, 那么就一直保持会话;如果一段时间一直没收到数据,那就认为会话超时失效,窗口自动关闭。这就好像我们打电话一样,如果时不时总能说点什么,那说明还没聊完;如果陷入了尴尬的沉默,半天都没话说,那自然就可以挂电话了。
与滑动窗口和滚动窗口不同,会话窗口只能基于时间来定义,而没有“会话计数窗口”的概念。这很好理解,“会话”终止的标志就是“隔一段时间没有数据来”,如果不依赖时间而改成个数,就成了“隔几个数据没有数据来”,这完全是自相矛盾的说法。
而同样是基于这个判断标准,这“一段时间”到底是多少就很重要了,必须明确指定。对于会话窗口而言,最重要的参数就是这段时间的长度(size),它表示会话的超时时间,也就是两个会话窗口之间的最小距离。如果相邻两个数据到来的时间间隔(Gap)小于指定的大小
(size),那说明还在保持会话,它们就属于同一个窗口;如果 gap 大于 size,那么新来的数据就应该属于新的会话窗口,而前一个窗口就应该关闭了。在具体实现上,我们可以设置静态固定的大小(size),也可以通过一个自定义的提取器(gap extractor)动态提取最小间隔 gap 的值。
考虑到事件时间语义下的乱序流,这里又会有一些麻烦。相邻两个数据的时间间隔 gap 大于指定的 size,我们认为它们属于两个会话窗口,前一个窗口就关闭;可在数据乱序的情况下,可能会有迟到数据,它的时间戳刚好是在之前的两个数据之间的。这样一来,之前我们判断的间隔中就不是“一直没有数据”,而缩小后的间隔有可能会比 size 还要小——这代表三个数据本来应该属于同一个会话窗口。
所以在 Flink 底层,对会话窗口的处理会比较特殊:每来一个新的数据,都会创建一个新的会话窗口;然后判断已有窗口之间的距离,如果小于给定的 size,就对它们进行合并(merge) 操作。在 Window 算子中,对会话窗口会有单独的处理逻辑。
我们可以看到,与前两种窗口不同,会话窗口的长度不固定,起始和结束时间也是不确定的,各个分区之间窗口没有任何关联。如图,会话窗口之间一定是不会重叠的,而且会留有至少为 size 的间隔(session gap)。
(4)全局窗口
这种窗口全局有效,会把相同 key 的所有数据都分配到同一个窗口中;说直白一点,就跟没分窗口一样。无界流的数据永无止尽,所以这种窗口也没有结束的时候,默认是不会做触发计算的。如果希望它能对数据进行计算处理, 还需要自定义“触发器”(Trigger)。
6.3.3 窗口API概览
1、按键分区和非按键分区
在定义窗口操作之前,需要确定是基于按键分区还是非按键分区数据流上开窗。
(1)按键分区
stream.keyBy(...).window(...);
(2)非按键分区
stream.windowAll(...)
2、代码中窗口API的调用
窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)
stream.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(<window function>)
.window()方法需要传入一个窗口分配器,它指明了窗口的类型;而后面的.aggregate() 方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。
**窗口分配器:**指定用哪一种窗口,时间 or 计数?滑动、滚动、会话?…
**窗口函数:**对窗口的数据的计算逻辑
6.3.4 窗口分配器
定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口。
1、时间窗口
(1)滚动处理时间窗口
stream.keyBy(...)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(...)
这里.of()方法需要传入一个 Time 类型的参数 size,表示滚动窗口的大小,我们这里创建了一个长度为 5 秒的滚动窗口。
另外,.of()还有一个重载方法,可以传入两个 Time 类型的参数:size 和 offset。第一个参数当然还是窗口大小,第二个参数则表示窗口起始点的偏移量。这里需要多做一些解释:对于我们之前的定义,滚动窗口其实只有一个 size 是不能唯一确定的。比如我们定义 1 天的滚动窗口,从每天的 0 点开始计时是可以的,统计的就是一个自然日的所有数据;而如果从每天的
凌晨 2 点开始计时其实也完全没问题,只不过统计的数据变成了每天 2 点到第二天 2 点。这个起始点的选取,其实对窗口本身的类型没有影响;而为了方便应用,默认的起始点时间戳是窗口大小的整倍数。也就是说,如果我们定义 1 天的窗口,默认就从 0 点开始;如果定义 1 小时的窗口,默认就从整点开始。而如果我们非要不从这个默认值开始,那就可以通过设置偏移量offset 来调整。
这里读者可能会觉得奇怪:这个功能好像没什么用,非要弄个偏移量不是给自己找别扭吗?这其实是有实际用途的。我们知道,不同国家分布在不同的时区。标准时间戳其实就是
1970 年 1 月 1 日 0 时 0 分 0 秒 0 毫秒开始计算的一个毫秒数,而这个时间是以 UTC 时间,也就是 0 时区(伦敦时间)为标准的。我们所在的时区是东八区,也就是 UTC+8,跟 UTC 有 8 小时的时差。我们定义 1 天滚动窗口时,如果用默认的起始点,那么得到就是伦敦时间每天 0
点开启窗口,这时是北京时间早上 8 点。那怎样得到北京时间每天 0 点开启的滚动窗口呢?只要设置-8 小时的偏移量就可以了:
.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
(2)滑动处理时间窗口
stream.keyBy(...)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)
(3)处理时间会话窗口
静态会话超时时间会话窗口
stream.keyBy(...)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)
动态会话超时时间的会话窗口
.window(ProcessingTimeSessionWindows.withDynamicGap(
new SessionWindowTimeGapExtractor<Tuple2<String, Long>>() {
@Override
public long extract(Tuple2<String, Long> element) {
// 提取 session gap 值返回, 单位毫秒
return element.f0.length() * 1000;
}
}
))
(4)滚动事件时间窗口
stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(...)
(5)滑动事件时间窗口
stream.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)
(6)事件时间会话窗口
stream.keyBy(...)
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)
2、计数窗口
(1)滚动计数窗口
stream.keyBy(...).countWindow(10)
(2)滑动计数窗口
stream.keyBy(...).countWindow(10)
3、全局窗口
全局窗口是计数窗口的底层实现,一般在需要自定义窗口时使用。需要注意使用全局窗口,必须自行定义触发器才能实现窗口计算,否则起不到任何作用。
stream.keyBy(...).window(GlobalWindows.create());
6.3.5 窗口函数
定义了窗口分配器,我们只是知道了数据属于哪个窗口,可以将数据收集起来了;至于收集起来到底要做什么,其实还完全没有头绪。所以在窗口分配器之后,必须再接上一个定义窗口如何进行计算的操作,这就是所谓的“窗口函数”(window functions)。
经窗口分配器处理之后,数据可以分配到对应的窗口中,而数据流经过转换得到的数据类型是 WindowedStream。这个类型并不是 DataStream,所以并不能直接进行其他转换,而必须进一步调用窗口函数,对收集到的数据进行处理计算之后,才能最终再次得到 DataStream
窗口函数根据处理的方式可以分为两类:增量聚合函数和全窗口函数
1、增量聚合函数
窗口将数据收集起来,最基本的处理操作当然就是进行聚合。窗口对无限流的切分,可以看作得到了一个有界数据集。如果我们等到所有数据都收集齐,在窗口到了结束时间要输出结果的一瞬间再去进行聚合,显然就不够高效了——这相当于真的在用批处理的思路来做实时流处理。
为了提高实时性,我们可以再次将流处理的思路发扬光大:就像 DataStream 的简单聚合一样,每来一条数据就立即进行计算,中间只要保持一个简单的聚合状态就可以了;区别只是在于不立即输出结果,而是要等到窗口结束时间。等到窗口到了结束时间需要输出计算结果的时候,我们只需要拿出之前聚合的状态直接输出,这无疑就大大提高了程序运行的效率和实时性。
典型的增量聚合函数有两个:ReduceFunction 和 AggregateFunction。
(1)归约函数
就是将中间结果和新来的数据两两归约
窗口函数中也提供了 ReduceFunction:只要基于 WindowedStream 调用.reduce()方法,然后传入 ReduceFunction 作为参数,就可以指定以归约两个元素的方式去对窗口中数据进行聚合了。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long l) {
return event.getTimestamp();
}
}));
stream.map(new MapFunction<Event, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(Event event) throws Exception {
return Tuple2.of(event.url, 1L);
}
})
.keyBy(r->r.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce(new MyReduceFunction()).print();
env.execute();
}
// 自定义归约函数
public class MyReduceFunction implements ReduceFunction<Tuple2<String, Long>> {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
// 定义累加规则
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
}
ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。
(2)聚合函数
在有些情况下,还需要对状态进行进一步处理才能得到输出结果,这时它们的类型可能不同,使用 ReduceFunction 就会非常麻烦
例如,如果我们希望计算一组数据的平均值,应该怎样做聚合呢?很明显,这时我们需要计算两个状态量:数据的总和(sum),以及数据的个数(count),而最终输出结果是两者的商
(sum/count)。如果用 ReduceFunction,那么我们应该先把数据转换成二元组(sum, count)的形式,然后进行归约聚合,最后再将元组的两个元素相除转换得到最后的平均值。本来应该只是一个任务,可我们却需要 map-reduce-map 三步操作,这显然不够高效。
于是自然可以想到,如果取消类型一致的限制,让输入数据、中间状态、输出结果三者类型都可以不同,不就可以一步直接搞定了吗?
Flink 的 Window API 中的 aggregate 就提供了这样的操作。直接基于 WindowedStream 调用.aggregate() 方法, 就可以定义更加灵活的窗口聚合操作。这个方法需要传入一个
AggregateFunction 的实现类作为参数。AggregateFunction 在源码中的定义如下:
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable
{
ACC createAccumulator();
ACC add(IN value, ACC accumulator);
OUT getResult(ACC accumulator);
ACC merge(ACC a, ACC b);
}
AggregateFunction 可以看作是 ReduceFunction 的通用版本,这里有三种类型:输入类型
(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型 IN 就是输入流中元素的数据类型;累加器类型 ACC 则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了。
接口中有四个方法:
-
createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次
-
add():将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进一步聚合的过程。方法传入两个参数:当前新到的数据 value,和当前的累加器accumulator;返回一个新的累加器值,也就是对聚合状态进行更新。每条数据到来之后都会调用这个方法
-
getResult():从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态, 然后再基于这些聚合的状态计算出一个结果进行输出。比如之前我们提到的计算平均值,就可以把 sum 和 count 作为状态放入累加器,而在调用这个方法时相除得到最终结果。这个方法只在窗口要输出结果时调用
-
merge():合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging Window)的场景就是会话窗口(Session Windows)
所以可以看到,AggregateFunction 的工作原理是:首先调用 createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次 add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用 getResult()方法得到计算结果。很明显, 与 ReduceFunction 相同,AggregateFunction 也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。
下面来看一个具体例子。我们知道,在电商网站中,PV(页面浏览量)和 UV(独立访客数)是非常重要的两个流量指标。一般来说,PV 统计的是所有的点击量;而对用户 id 进行去重之后,得到的就是 UV。所以有时我们会用 PV/UV 这个比值,来表示“人均重复访问量”,也就是平均每个用户会访问多少次页面,这在一定程度上代表了用户的粘度
代码略
代码中我们创建了事件时间滑动窗口,统计 10 秒钟的“人均 PV”,每 2 秒统计一次。由于聚合的状态还需要做处理计算,因此窗口聚合时使用了更加灵活的 AggregateFunction。为了统计 UV,我们用一个 HashSet 保存所有出现过的用户 id,实现自动去重;而 PV 的统计则类似一个计数器,每来一个数据加一就可以了。所以这里的状态,定义为包含一个 HashSet 和一个 count 值的二元组(Tuple2<HashSet, Long>),每来一条数据,就将 user 存入 HashSet,同时 count 加 1。这里的 count 就是 PV,而 HashSet 中元素的个数(size)就是 UV;所以最终窗口的输出结果,就是它们的比值。
这里没有涉及会话窗口,所以 merge()方法可以不做任何操作。
另外,Flink 也为窗口的聚合提供了一系列预定义的简单聚合方法, 可以直接基于
WindowedStream 调用。主要包括.sum()/max()/maxBy()/min()/minBy(),与 KeyedStream 的简单聚合非常相似。它们的底层,其实都是通过 AggregateFunction 来实现的。
通过 ReduceFunction 和 AggregateFunction 我们可以发现,增量聚合函数其实就是在用流处理的思路来处理有界数据集,核心是保持一个聚合状态,当数据到来时不停地更新状态。这就是 Flink 所谓的“有状态的流处理”,通过这种方式可以极大地提高程序运行的效率,所以在实际应用中最为常见。
2、全窗口函数
窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。
很明显,这就是典型的批处理思路了——先攒数据,等一批都到齐了再正式启动处理流程。这样做毫无疑问是低效的:因为窗口全部的计算任务都积压在了要输出结果的那一瞬间,而在之前收集数据的漫长过程中却无所事事。这就好比平时不用功,到考试之前通宵抱佛脚,肯定不如把工夫花在日常积累上。
那为什么还需要有全窗口函数呢?这是因为有些场景下,我们要做的计算必须基于全部的数据才有效,这时做增量聚合就没什么意义了;另外,输出的结果有可能要包含上下文中的一些信息(比如窗口的起始时间),这是增量聚合函数做不到的。所以,我们还需要有更丰富的窗口计算方式,这就可以用全窗口函数来实现。
在 Flink 中,全窗口函数也有两种:WindowFunction 和 ProcessWindowFunction。
(1)窗口函数
我们可以基于 WindowedStream 调用.apply()方法,传入一个 WindowFunction 的实现类。
stream
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction());
这个类中可以获取到包含窗口所有数据的可迭代集合( Iterable),还可以拿到窗口(Window)本身的信息。
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}
当窗口到达结束时间需要触发计算时,就会调用这里的 apply 方法。我们可以从 input 集合中取出窗口收集的数据,结合 key 和 window 信息,通过收集器(Collector)输出结果。这里 Collector 的用法,与 FlatMapFunction 中相同。
不过我们也看到了,WindowFunction 能提供的上下文信息较少,也没有更高级的功能。事实上,它的作用可以被 ProcessWindowFunction 全覆盖,所以之后可能会逐渐弃用。一般在实际应用,直接使用 ProcessWindowFunction 就可以了。
(2)处理窗口函数
ProcessWindowFunction 是 Window API 中最底层的通用窗口函数接口。之所以说它“最底层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction 还可以获取到一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得 ProcessWindowFunction 更加灵活、功能更加丰富。事实上,ProcessWindowFunction 是 Flink 底层 API——处理函数(process function)中的一员,关于处理函数我们会在后续章节展开讲解。
当然,这些好处是以牺牲性能和资源为代价的。作为一个全窗口函数,ProcessWindowFunction 同样需要将所有数据缓存下来、等到窗口触发计算时才使用。它其实就是一个增强版的 WindowFunction。
具体使用跟 WindowFunction 非常类似,我们可以基于 WindowedStream 调用.process()方法,传入一个 ProcessWindowFunction 的实现类。下面是一个电商网站统计每小时 UV 的例子
代码略
3、增量聚合和全窗口函数的结合使用
这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了 Iterable 类型的输入。一般情况下,这时的可迭代集合中就只有一个元素了。
窗口处理的主体还是增量聚合,而引入全窗口函数又可以获取到更多的信息包装输出
// ReduceFunction 与 WindowFunction 结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function)
// ReduceFunction 与 ProcessWindowFunction 结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function)
// AggregateFunction 与 WindowFunction 结合
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction, WindowFunction<V, R, K, W> windowFunction)
// AggregateFunction 与 ProcessWindowFunction 结合
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction, ProcessWindowFunction<V, R, K, W> windowFunction)
代码略
6.3.6 测试水位线和窗口的使用
代码略
6.3.7 其它API
1、触发器
用来控制窗口什么时候触发计算
2、移除器
主要用来定义移除某些数据的逻辑
3、允许延迟
可以为窗口算子设置一个“允许的最大延迟”(Allowed Lateness)。也就是说,我们可以设定允许延迟一段时间,在这段时间内,窗口不会销毁,继续到来的数据依然可以进入窗口中并触发计算。直到水位线推进到了 窗口结束时间 + 延迟时间,才真正将窗口的内容清空,正式关闭窗口。
4、将迟到的数据放到侧输出流
我们自然会想到,即使可以设置窗口的延迟时间,终归还是有限的,后续的数据还是会被丢弃。如果不想丢弃任何一个数据,又该怎么做呢?
Flink 还提供了另外一种方式处理迟到数据。我们可以将未收入窗口的迟到数据,放入“侧输出流”(side output)进行另外的处理。所谓的侧输出流,相当于是数据流的一个“分支”,这个流中单独放置那些错过了该上的车、本该被丢弃的数据。
6.3.8 窗口的生命周期
1、窗口的创建
窗口的类型和基本信息由窗口分配器(window assigners)指定,但窗口不会预先创建好, 而是由数据驱动创建。当第一个应该属于这个窗口的数据元素到达时,就会创建对应的窗口。
2、窗口计算的触发
除了窗口分配器,每个窗口还会有自己的窗口函数(window functions)和触发器(trigger)。窗口函数可以分为增量聚合函数和全窗口函数,主要定义了窗口中计算的逻辑;而触发器则是指定调用窗口函数的条件。
对于不同的窗口类型,触发计算的条件也会不同。例如,一个滚动事件时间窗口,应该在水位线到达窗口结束时间的时候触发计算,属于“定点发车”;而一个计数窗口,会在窗口中元素数量达到定义大小时触发计算,属于“人满就发车”。所以 Flink 预定义的窗口类型都有对应内置的触发器。
对于事件时间窗口而言,除去到达结束时间的“定点发车”,还有另一种情形。当我们设置了允许延迟,那么如果水位线超过了窗口结束时间、但还没有到达设定的最大延迟时间,这期间内到达的迟到数据也会触发窗口计算。这类似于没有准时赶上班车的人又追上了车,这时车要再次停靠、开门,将新的数据整合统计进来。
3、窗口的销毁
一般情况下,当时间达到了结束点,就会直接触发计算输出结果、进而清除状态销毁窗口。这时窗口的销毁可以认为和触发计算是同一时刻。这里需要注意, Flink 中只对时间窗口(TimeWindow)有销毁机制;由于计数窗口(CountWindow)是基于全局窗口(GlobalWindw) 实现的,而全局窗口不会清除状态,所以就不会被销毁。
在特殊的场景下,窗口的销毁和触发计算会有所不同。事件时间语义下,如果设置了允许延迟,那么在水位线到达窗口结束时间时,仍然不会销毁窗口;窗口真正被完全删除的时间点, 是窗口的结束时间加上用户指定的允许延迟时间。
4、窗口API调用总结
Window API 首先按照时候按键分区分成两类。keyBy 之后的 KeyedStream,可以调用.window()方法声明按键分区窗口(Keyed Windows);而如果不做 keyBy,DataStream 也可以直接调用.windowAll()声明非按键分区窗口。之后的方法调用就完全一样了。
接下来首先是通过.window()/.windowAll()方法定义窗口分配器,得到 WindowedStream; 然 后 通 过 各 种 转 换 方 法 ( reduce/aggregate/apply/process ) 给 出 窗 口 函 数
(ReduceFunction/AggregateFunction/ProcessWindowFunction),定义窗口的具体计算处理逻辑, 转换之后重新得到 DataStream。这两者必不可少,是窗口算子(WindowOperator)最重要的组成部分。
此外,在这两者之间,还可以基于 WindowedStream 调用.trigger()自定义触发器、调用.evictor()定义移除器、调用.allowedLateness()指定允许延迟时间、调用.sideOutputLateData() 将迟到数据写入侧输出流,这些都是可选的 API,一般不需要实现。而如果定义了侧输出流, 可以基于窗口聚合之后的 DataStream 调用.getSideOutput()获取侧输出流。
6.4 迟到的数据处理
对于乱序流,水位线本身就可以设置一个延迟时间;而做窗口计算时,我们又可以设置窗口的允许延迟时间;另外窗口还有将迟到数据输出到测输出流的用法。所有的这些方法,它们之间有什么关系,我们又该怎样合理利用呢?
6.4.1 设置水位线延迟时间
水位线是事件时间的进展,它是我们整个应用的全局逻辑时钟。水位线生成之后,会随着数据在任务间流动,从而给每个任务指明当前的事件时间。所以从这个意义上讲,水位线是一个覆盖万物的存在,它并不只针对事件时间窗口有效。
之前我们讲到触发器时曾提到过“定时器”,时间窗口的操作底层就是靠定时器来控制触发的。既然是底层机制,定时器自然就不可能是窗口的专利了;事实上它是 Flink 底层 API——处理函数(process function)的重要部分。
所以水位线其实是所有事件时间定时器触发的判断标准。那么水位线的延迟,当然也就是全局时钟的滞后,相当于是上帝拨动了琴弦,所有人的表都变慢了。
既然水位线这么重要,那一般情况就不应该把它的延迟设置得太大,否则流处理的实时性就会大大降低。因为水位线的延迟主要是用来对付分布式网络传输导致的数据乱序,而网络传输的乱序程度一般并不会很大,大多集中在几毫秒至几百毫秒。所以实际应用中,我们往往会给水位线设置一个“能够处理大多数乱序数据的小延迟”,视需求一般设在毫秒~秒级。
当我们设置了水位线延迟时间后,所有定时器就都会按照延迟后的水位线来触发。如果一个数据所包含的时间戳,小于当前的水位线,那么它就是所谓的“迟到数据”
6.4.2 允许窗口处理迟到数据
水位线延迟设置的比较小,那之后如果仍有数据迟到该怎么办?对于窗口计算而言,如果水位线已经到了窗口结束时间,默认窗口就会关闭,那么之后再来的数据就要被丢弃了。
自然想到,Flink 的窗口也是可以设置延迟时间,允许继续处理迟到数据的。
这种情况下,由于大部分乱序数据已经被水位线的延迟等到了,所以往往迟到的数据不会太多。这样,我们会在水位线到达窗口结束时间时,先快速地输出一个近似正确的计算结果; 然后保持窗口继续等到延迟数据,每来一条数据,窗口就会再次计算,并将更新后的结果输出。这样就可以逐步修正计算结果,最终得到准确的统计值了。
类比班车的例子,我们可以这样理解:大多数人是在发车时刻前后到达的,所以我们只要把表调慢,稍微等一会儿,绝大部分人就都上车了,这个把表调慢的时间就是水位线的延迟; 到点之后,班车就准时出发了,不过可能还有该来的人没赶上。于是我们就先慢慢往前开,这段时间内,如果迟到的人抓点紧还是可以追上的;如果有人追上来了,就停车开门让他上来, 然后车继续向前开。当然我们的车不能一直慢慢开,需要有一个时间限制,这就是窗口的允许延迟时间。一旦超过了这个时间,班车就不再停留,开上高速疾驰而去了。
所以我们将水位线的延迟和窗口的允许延迟数据结合起来,最后的效果就是先快速实时地输出一个近似的结果,而后再不断调整,最终得到正确的计算结果。回想流处理的发展过程, 这不就是著名的 Lambda 架构吗?原先需要两套独立的系统来同时保证实时性和结果的最终正确性,如今 Flink 一套系统就全部搞定了
6.4.3 将迟到数据放到窗口侧输出流
即使我们有了前面的双重保证,可窗口不能一直等下去,最后总要真正关闭。窗口一旦关闭,后续的数据就都要被丢弃了。那如果真的还有漏网之鱼又该怎么办呢?
那就要用到最后一招了:用窗口的侧输出流来收集关窗以后的迟到数据。这种方式是最后
“兜底”的方法,只能保证数据不丢失;因为窗口已经真正关闭,所以是无法基于之前窗口的结果直接做更新的。我们只能将之前的窗口计算结果保存下来,然后获取侧输出流中的迟到数据,判断数据所属的窗口,手动对结果进行合并更新。尽管有些烦琐,实时性也不够强,但能够保证最终结果一定是正确的。
如果还用赶班车来类比,那就是车已经上高速开走了,这班车是肯定赶不上了。不过我们还留下了行进路线和联系方式,迟到的人如果想办法辗转到了目的地,还是可以和大部队会合的。最终,所有该到的人都会在目的地出现。
所以总结起来,Flink 处理迟到数据,对于结果的正确性有三重保障:水位线的延迟,窗口允许迟到数据,以及将迟到数据放入窗口侧输出流。我们可以回忆一下之前 6.3.5 小节统计每个 url 浏览次数的代码 UrlViewCountExample,稍作改进,增加处理迟到数据的功能。
代码略