Flink -- Time and Window
- Flink 时间语义
- 水位线 Watermark
- 水位线的概念
- 有序流中的水位线
- 乱序流中的水位线
- 水位线的特性
- 水位线的基本使用
- 水位线生成策略
- 内置水位线生成器
- 自定义水位线策略
- 在自定义数据源中发送水位线
- 窗口 Window
- 窗口的基本概述
- 窗口的基本概念
- 窗口的分类
- 窗口的 API
- 窗口的基本使用
- 窗口分配器
- 时间窗口
- 计数窗口
- 窗口函数
- 增量聚合函数
- 全窗口函数
- 增量聚合和全窗口函数的结合使用
- 测试水位线与窗口的使用
- 窗口的其他 API
- 触发器 Trigger
- 移除器 Evictor
- 允许延迟 Allowed Lateness
- 侧入流 Side Output
- 迟到数据的处理
Flink 时间语义
在一台机器当中,所谓的时间当然就是指系统时间;但是在一个分布式的系统当中,各个节点彼此独立、互不影响,因此并不存在一个统一的时钟。因此,当我们需要进行数据的聚合计算时,”并不同时被处理“的事件可能会导致计算结果出错。
此外,当流中的数据进行传输时,也会存在传输的延迟,延迟将会导致时间语义的模糊,比如:上游任务在8点59分59秒
发出一条数据,到下游要做窗口计算时已经是9点零1秒
了,那这条数据到底该不该被收到 8~9 点的窗口呢?
因此,我们需要指定一个统一的时间标准,按照这个标准进行数据的收集和计算。Flink 中为我们提供了两种时间语义,分别为事件时间和处理时间。
-
处理时间(Processing Time)
处理时间就是在执行处理操作时机器的系统时间。这种时间语义简单粗暴,不需要各个节点之间的同步协调,因此处理时间是最简单的语义。
-
事件时间(Event Time)
事件时间指的是数据生成的时间,这个时间伴随着数据的生成而生成,并且是不会变化的,因此我们可以将这个时间作为一个属性嵌入数据当中,并以该时间作为指标进行数据的收集和计算。
在 Flink 中,由于处理时间比较简单,早期版本默认的时间语义是处理时间;而考虑到事件时间在实际应用中更为广泛,从 1.12 版本开始,Flink 已经将事件时间作为了默认的时间语义。
水位线 Watermark
水位线的概念
当使用事件时间作为时间语义时,就相当于每条数据都携带了一个时钟。因此,在数据处理过程中的时钟则根据数据的到来而不停的驱动。即何时产生的数据到达处理系统时,系统的时间就会推进到当前数据产生的时间,从而实现数据的时间标准化。
但是在分布式系统中,上述的时间驱动方式会存在以下问题:
-
若数据在处理转换的过程当中存在窗口聚合操作,那么会导致下游的数据量减少,因此对于时间控制的精细度便会下降;
-
在一般情况下,数据向下游的传递只会传递给一个子任务,此时其他并行子任务的时钟便无法推进;
为解决上述问题,我们在进行数据传递时,需要把时钟也以数据的形式进行传递,这个时间的标志不会因窗口聚合运算而停滞,同时可以直接广播到下游。
在 Flink 中,这种用来衡量事件时间 Event Time 进展的标记,就被称作”水位线“ Watermark 。水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后,这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。
如上图所示,当某个数据到来之后,就可以依据这条数据的事件时间,在其后插入一个水位线,并伴随着数据一起向下游流动,以推进时间进度。
有序流中的水位线
在理想状态下,数据会按照生成的顺序,排队进入流中进行处理。这就可以保证时间的推进都是从小到大的,即插入的水位线是不断增长的。
在实际应用中,若当前的数据量非常大,就可能会存在很多具有相同时间戳的数据,此时若针对每条数据都插入水位线,就会产生很多相同的无意义的水位线。因此对于有序流,一般采取周期性的水位线。如下图所示。
注意,这里的周期也是一个时间的概念,周期是以系统时间为基准的。
乱序流中的水位线
这里所说的乱序是指数据到达的顺序是混乱的,由于网络延迟的存在,先产生的数据可能之后才会到达。如果还采取之前的策略,那么有可能会产生“时间倒流”的问题。比如第9秒
的数据到达之后,流中的时间水位推移到第 9 秒,随后第7秒
数据到达,此时再插入水位线将导致时间水位变回第 7 秒。
针对这个问题,Flink 采取判断 + 周期生成的办法。假设我们的周期设置为 5s,那么 Flink 会记录五秒内数据携带的时间戳的最大值,并每隔五秒将水位线添加进流中传递。采用这种方法可以有效避免“时间倒流”的问题。如下图所示。
此外,乱序流还存在另外一个头疼的问题,即当我们需要对数据按照时间窗口进行聚合计算时,“迟到”的数据将会导致汇总数据结果不准确。比如我们需要每 10 秒计算一下某电站的发电量,若第8秒
的发电量在第10秒
之后达到,那么在第10秒
的数据到达之后,该时间窗口就已经关闭进行计算并输出结果,因此第8秒
的数据就不会被统计,这将导致错误的汇总结果。
为解决“迟到”数据的问题,Flink 允许我们采取“等待策略”。在 Flink 当中,水位线就是时钟,若我们在确定水位线时故意延迟几秒,这样就可以达到等待“迟到”数据的效果。如下图所示。我们对“迟到”数据的容忍度设置为 2 秒。因此,在第一次插入水位线时,即使该周期内的最大时间为第9秒
的数据,但我们插入的水位线时间为 7,这意味着第7秒
前的数据已经全部到达,即使第8秒
的数据随后才抵达,那也不会影响聚合计算的结果,因为水位线就是时钟。
该方法通过牺牲数据的实时性而保证了数据计算的准确定。值得注意的一点是,如果数据"迟到"的太久,那么我们只能通过加长等待时间来处理。
水位线的特性
-
水位线是插入数据流中的一个标记,可以看作一个特殊的数据;
-
水位线主要内容就是一个时间戳,用于推进时钟;
-
水位线是基于数据的时间戳而生成的;
-
水位线的时间戳必须单调递增,以保证任务时钟一直向前推进;
-
水位线可以通过设置延迟的方式来保证乱序数据的正确处理;
-
若水位线到达某个时间 t,则代表第 t 秒之前的数据已经全部到达(包含 t),在之后的数据中不会有时间戳小于等于 t 的数据出现;
水位线的基本使用
水位线生成策略
Flink 的 DataStream API 中提供了一个用于生成水位线的方法.assignTimestampsAndWatermarks()
,该方法可以为流中的数据分配时间戳,并生成水位线来指示时钟。
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks
(WatermarkStrategy<T> watermarkStrategy)
该方法需要传入一个WatermarkStrategy
类对象作为参数,该类中包含一个时间戳分配器方法createTimestampAssigner()
以及生成水位线生成器方法createWatermarkGenerator()
。
下述代码中使用的实体类与源算子的代码分别如下:
实体类 Event
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Event {
public String user;
public String url;
public Long timestamp;
}
源算子 EventSource
public class EventSource implements SourceFunction<Event> {
private Boolean flag = true;
String[] users = {"曹操", "刘备", "孙权", "诸葛亮"};
String[] urls = {"/home", "/test?id=1", "/test?id=2", "/play/football", "/play/basketball"};
@Override
public void run(SourceContext<Event> sourceContext) throws Exception {
Random random = new Random();
while (flag) {
sourceContext.collect(new Event(
users[random.nextInt(users.length)],
urls[random.nextInt(urls.length)],
Calendar.getInstance().getTimeInMillis()
));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
flag = false;
}
}
内置水位线生成器
Flink 提供了内置的水位线生成器(WatermarkGenerator),不仅开箱即用简化了编程,
而且也为我们自定义水位线策略提供了模板。这两个生成器可以通过调用WatermarkStrategy
的静态辅助方法来创建。它们都是周期性生成水位线的,分别对应着处理有序流和乱序流的场景。
示例代码如下,该代码段中提取实体类 Event 的 Timestamp 字段作为时间戳,并设置 5s 的延迟时间用于处理乱序流的迟到数据。
public class WaterMarkDemo {
public static void main(String[] args) throws Exception {
// 1. 环境准备
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
environment.setParallelism(1);
// 2. 加载数据源
environment.addSource(new EventSource())
// 3. 设置水位线逻辑
.assignTimestampsAndWatermarks(WatermarkStrategy
// 3.1 针对乱序流插入水位线,延迟时间设置为 5s
// 针对有序流周期性生成水位线即可:.<Event>forMonotonousTimestamps();
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
// 3.2 抽取时间戳的逻辑
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
}))
// 4. 打印输出
.print();
// 5. 执行程序
environment.execute();
}
}
这里需要注意的是,乱序流中生成的水位线真正的时间戳,其实是当前最大时间戳 – 延 迟时间 – 1
,位是毫秒。
为什么要减 1 毫秒呢?回想一下水位线的特点:时间戳为 t 的水位线,表示时间戳小于等于 t 的数据全部到齐,不会再来了。如果考虑有序流,也就是延迟时间为 0 的情况,那么时间戳为 7 秒的数据到来时,之后其实是还有可能继续来 7 秒的数据的;所以生成的水位线不是 7 秒,而是 6 秒 999 毫秒,7 秒的数据还可以继续来。这一点可以在BoundedOutOfOrdernessWatermarks
的源码中明显地看到:
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(this.maxTimestamp - this.outOfOrdernessMillis - 1L));
}
自定义水位线策略
自定义水位线策略时,需要我们创建一个类并实现WatermarkStrategy
接口,重写接口中的时间戳分配器方法createTimestampAssigner
与水位线生成方法createWatermarkGenerator
即可。
-
时间戳分配器:接收数据的实体类,提取某个字段作为时间戳即可;
-
水位线生成器:需要返回一个
WatermarkGenerator
接口类,实现该接口需要实现两个方法-
onEvent()
:每来一条数据便调用一次,用于观察判断输入的事件; -
onPeriodicEmit()
:由框架周期性的进行调用,生成水位线;
-
示例代码如下:
public class CustomWaterMarkDemo {
public static void main(String[] args) throws Exception {
// 1. 环境准备
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
environment.setParallelism(1);
// 2. 加载数据源
environment.addSource(new EventSource())
// 3. 设置水位线逻辑
.assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
// 4. 打印输出
.print();
// 5. 执行程序
environment.execute();
}
}
自定义水位线策略 CustomWatermarkStrategy:
public class CustomWatermarkStrategy implements WatermarkStrategy<Event> {
@Override
public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
};
}
@Override
public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new CustomPeriodicGenerator();
// return new CustomPunctuatedGenerator();
}
/**
* 周期性水位线生成器(Periodic Generator)
*/
public static class CustomPeriodicGenerator implements WatermarkGenerator<Event> {
private Long delayTime = 5000L;
private Long maxTs = Long.MIN_VALUE + delayTime + 1L;
/**
* 每来一条数据就调用一次
*/
@Override
public void onEvent(Event event, long l, WatermarkOutput watermarkOutput) {
// 更新最大时间戳
maxTs = Math.max(event.timestamp, maxTs);
}
/**
* 发射水位线,默认 200ms 调用一次
* 可以调用环境配置的.setAutoWatermarkInterval()方法来设置
*/
@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
watermarkOutput.emitWatermark(new Watermark(maxTs - delayTime - 1L));
}
}
/**
* 断点式水位线生成器(Punctuated Generator)
*/
public static class CustomPunctuatedGenerator implements WatermarkGenerator<Event> {
/**
* 遇到特定的事件时发出水位线
*/
@Override
public void onEvent(Event event, long l, WatermarkOutput watermarkOutput) {
if ("曹操".equals(event.user)) {
watermarkOutput.emitWatermark(new Watermark(event.timestamp - 1));
}
}
/**
* 发射水位线
*/
@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
// 不需要做任何处理,在 onEvent() 中发射水位线
}
}
}
在自定义数据源中发送水位线
也可以在自定义的源算子中定义发送水位线的逻辑,采用该源算子就不需要在代码中使用assignTimestampsAndWatermarks
重复设置水位线了。
示例代码如下:
public class ClickSourceWithWatermark implements SourceFunction<Event> {
private Boolean flag = true;
String[] users = {"曹操", "刘备", "孙权", "诸葛亮"};
String[] urls = {"/home", "/test?id=1", "/test?id=2", "/play/football", "/play/basketball"};
@Override
public void run(SourceContext<Event> sourceContext) throws Exception {
Random random = new Random();
while (flag) {
Event event = new Event(
users[random.nextInt(users.length)],
urls[random.nextInt(users.length)],
Calendar.getInstance().getTimeInMillis()
);
// 使用 collectWithTimestamp 方法将数据发送出去,并指明数据中的时间戳的字段
sourceContext.collectWithTimestamp(event, event.timestamp);
// 发送水位线
sourceContext.emitWatermark(new Watermark(event.timestamp - 1L));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
flag = false;
}
}
窗口 Window
窗口的基本概述
窗口的基本概念
Flink 是一种流式计算的引擎,主要用于处理无界数据流。但是对于无界的数据,有时也需要进行一些“批处理”的操作,即将无限的数据按照一定的规则切割成有限的数据块进行处理,这就是所谓的窗口。
在 Flink 中,窗口实际上就是一个数据的存储桶,如下图所示。窗口可以把流切割成有限大小的多个“存储桶”,每个数据都会分发到对应的桶中。当到达窗口结束时间时,就对每个桶中收集的数据进行计算处理。
窗口的分类
Flink 中的窗口可以按照不同的角度进行分类:
-
按照驱动类型分类
-
时间窗口
Time Window
:可以定义窗口的开始和结束时间,到达结束时间时,出发计算并输出结果,同时窗口关闭销毁。时间窗口是一个左闭右开的区间; -
计数窗口
Count Window
:基于元素的个数来截取数据,到达固定的个数时出发计算并关闭窗口;
-
-
按照窗口分配数据的规则分类
-
滚动窗口
Tumbling Windows
:滚动窗口拥有固定的大小,是一种对数据均匀划分的切割方式,窗口之间没有间隔、没有重叠,是一种“首尾相接”的状态; -
滑动窗口
Sliding Windows
:滑动窗口同样拥有固定的大小,与滚动窗口不同的是,滑动窗口是存在重叠部分的。其存在一个滑动步长,每间隔该步长,则向前滑动一个步长的距离; -
会话窗口
Session Windows
:类似 web 应用中 session 的概念,需要制定一个超时时间参数,超过该时间没有数据到来则关闭窗口,因此绘画窗口只能基于时间定义; -
全局窗口
Global Windows
:该窗口没有切割数据流,而是将所有相同 key 的数据分配到同一个窗口,数据依旧是无界流。若希望对数据进行计算处理,还需要自定义“触发器”实现;
-
窗口的 API
在定义窗口操作之前,首先需要确定是基于按键分区的数据流KeyedStream
来开窗还是在没有按键分区的DataStream
上开窗。
-
按键分区窗口:相同 key 的数据会被发送至同一个并行子任务,故窗口计算会被分配到多个并行子任务上执行,即每一组 key 都定义了一组窗口,各自独立统计;
stream.keyBy(...) .window(...)
-
非按键分区:并行度为 1,一般不使用
stream.windowAll(...)
窗口的基本使用
窗口分配器
定义窗口分配器是构建窗口算子的第一步,定义数据应该被分到哪个窗口,即窗口分配器就是在指定窗口的类型。
时间窗口
-
滚动处理时间窗口
-
窗口分配器类:
TumblingProcessingTimeWindows
; -
方法:
of
; -
参数
-
参数一:传入
Time
类型的参数 size,表示滚动窗口的大小; -
参数二:可以传入
Time
类型的参数 offset,表示窗口开始的时间偏移量;
-
-
-
滑动处理时间窗口
-
窗口分配器类:
SlidingProcessingTimeWindows
; -
方法:
of
; -
参数:
-
参数一:传入
Time
类型的参数 size,表示滚动窗口的大小; -
参数二:传入
Time
类型的参数 slide,表示滑动窗口的滑动步长;
-
-
-
处理时间会话窗口
-
窗口分配器类:
ProcessingTimeSessionWindows
; -
方法:
withGap()
或withDynamicGap()
-
参数:
-
withGap()
传入一个Time
类型的参数 size,表示会话的超时时间; -
withDynamicGap()
传入一个SessionWindowTimeGapExtractor
,定义动态提取超时时间的逻辑;
-
-
-
滚动事件时间窗口
-
窗口分配器类:
TumblingEventTimeWindows
; -
使用方法同“滚动处理时间窗口”;
-
-
滑动事件时间窗口
-
窗口分配器类:
SlidingEventTimeWindows
; -
使用方法同“滑动处理时间窗口”;
-
-
事件时间会话窗口
-
窗口分配器类:
EventTimeSessionWindows
; -
使用方法同“处理时间会话窗口”;
-
计数窗口
-
滚动计数窗口
- 直接调用
countWindow()
方法,传入长整型的参数 size,表示窗口的大小;
- 直接调用
-
滑动计数窗口
- 直接调用
countWindow()
方法,传入长整型的参数 size 和 slide,表示窗口的大小以及窗口的滑动步长;
- 直接调用
-
全局窗口
- 直接调用
window()
,分配器由GlobalWindows
类提供;
- 直接调用
窗口函数
窗口分配器定义了数据的收集逻辑,窗口函数则定义了收集起来的数据的计算逻辑。
经窗口分配器处理之后,数据被分配到对应的窗口中,而数据流经过转换得到的数据类型是 WindowedStream
。这个类型并不是DataStream
,因此并不能直接进行其他转换,必须
进一步调用窗口函数,对收集的数据进行处理计算之后,才能最终再次得到DataStream
。
增量聚合函数
规约函数 ReduceFunction
窗口的归约就是将窗口中收集到的数据两两进行归约。当我们进行流处理时,就是要保存一个状态;每来一个新的数据,就和之前的聚合状态做归约,这样就实现了增量式的聚合。
示例代码如下,该代码段统计了在 5s 的时间窗口内各个 url 的访问量。
public class WindowReduceDemo {
public static void main(String[] args) throws Exception {
// 1. 环境准备
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
environment.setParallelism(1);
// 2. 加载数据源并设置水位线
SingleOutputStreamOperator<Event> stream = environment
// 2.1 加载数据源
.addSource(new EventSource())
// 2.2 获取时间戳、设置水位线
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner((SerializableTimestampAssigner<Event>) (event, l) -> event.timestamp));
// 3. 数据处理及输出
stream
// 3.1 将数据转换为 tuple 二元组,方便计算
.map(new MapFunction<Event, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(Event event) throws Exception {
return Tuple2.of(event.getUrl(), 1L);
}
})
// 3.2 按键进行分区
.keyBy(tuple -> tuple.f0)
// 3.3 设置滚动事件时间窗口,窗口大小为 5s
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
// 3.4 定义规约规则,窗口闭合时向下游发送结果
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) {
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
})
// 3.5 输出结果
.print();
// 4. 执行程序
environment.execute();
}
}
聚合函数 AggregateFunction
增量聚合函数可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。这就要求我们必须在聚合前,先将数据转换成预期结果类型。
Flink 的 Window API 中的aggregate
则提供了更加灵活的操作,该方法需要传入一个AggregateFunction
的实现类作为参数。该接口存在四个方法,在下列代码示例中有功能解释。该代码段计算了在 10s 的时间窗口内,所有 url 的人均重复访问量(总访问量 / 访问用户数),每 2s 统计一次。
public class WindowAggregateDemo {
public static void main(String[] args) throws Exception {
// 1. 环境准备
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
environment.setParallelism(1);
// 2. 加载数据源并设置水位线
SingleOutputStreamOperator<Event> stream = environment
// 2.1 加载数据源
.addSource(new EventSource())
// 2.2 获取时间戳、设置水位线
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner((SerializableTimestampAssigner<Event>) (event, l) -> event.timestamp));
// 3. 数据处理及输出
stream
// 3.1 分区,将所有数据发送到一个分区进行统计
.keyBy(item -> true)
// 3.2 设置滑动事件事件窗口,窗口大小为 10s,滑动步长为 2s
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2)))
// 3.3 定义聚合规则
.aggregate(new CustomAggregate())
// 3.4 输出结果
.print();
// 4. 执行程序
environment.execute();
}
public static class CustomAggregate implements AggregateFunction<Event, Tuple2<HashSet<String>, Long>, Double> {
/**
* 创建累加器
*/
@Override
public Tuple2<HashSet<String>, Long> createAccumulator() {
return Tuple2.of(new HashSet<String>(), 0L);
}
/**
* 累加逻辑设计
*/
@Override
public Tuple2<HashSet<String>, Long> add(Event event, Tuple2<HashSet<String>, Long> accumulator) {
// 每当一条属于该窗口的数据到来,就进行累加并返回累加器
accumulator.f0.add(event.user);
return Tuple2.of(accumulator.f0, accumulator.f1 + 1L);
}
/**
* 窗口闭合时执行计算
*/
@Override
public Double getResult(Tuple2<HashSet<String>, Long> accumulator) {
// 计算结果并发送到下游
return (double) accumulator.f1 / accumulator.f0.size();
}
/**
* 合并两个累加器
*/
@Override
public Tuple2<HashSet<String>, Long> merge(Tuple2<HashSet<String>, Long> hashSetLongTuple2, Tuple2<HashSet<String>, Long> acc1) {
return null;
}
}
}
全窗口函数
ProcessWindowFunction
是 Window API 中最底层的通用窗口函数接口。之所以说它“最底
层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction
还可以获取到一个“上下文对象”Context
。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。
ProcessWindowFunction
的优势是以牺牲性能和资源为代价的,作为一个全窗口函数其需要将所有的数据缓存下来,等到窗口触发计算时再使用。
示例代码如下,该代码段计算了在 5s 的时间窗口内所有 url 的独立访客数量。
public class ProcessWindowDemo {
public static void main(String[] args) throws Exception {
// 1. 环境准备
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
environment.setParallelism(1);
// 2. 加载数据源并设置水位线
SingleOutputStreamOperator<Event> stream = environment
// 2.1 加载数据源
.addSource(new EventSource())
// 2.2 获取时间戳、设置水位线
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner((SerializableTimestampAssigner<Event>) (event, l) -> event.timestamp));
// 3. 数据处理及输出
stream
// 3.1 分区,将所有数据发送到一个分区进行统计
.keyBy(item -> true)
// 3.2 设置滚动事件时间窗口,窗口大小为 10s
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
// 3.3 定义窗口函数处理规则
.process(new CustomProcessWindow())
// 3.4 输出结果
.print();
// 4. 执行程序
environment.execute();
}
public static class CustomProcessWindow extends ProcessWindowFunction<Event, String, Boolean, TimeWindow> {
/**
* 窗口函数处理规则,窗口关闭时执行处理
*/
@Override
public void process(Boolean aBoolean, ProcessWindowFunction<Event, String, Boolean, TimeWindow>.Context context,
Iterable<Event> iterable, Collector<String> collector) {
// 创建用户统计Set
HashSet<String> userSet = new HashSet<>();
for (Event event: iterable) {
userSet.add(event.user);
}
long start = context.window().getStart();
long end = context.window().getEnd();
// 定制输出内容
collector.collect("窗口【" + new TimeStamp(start) + "~" + new TimeStamp(end)
+ "】的独立访客数量为>>>" + userSet.size());
}
}
}
增量聚合和全窗口函数的结合使用
通过上面对增量聚合以及全窗口函数的了解,不难发现增量聚合的计算更加高效,其将计算过程分摊到窗口收集数据的过程当中,而全窗口函数则是将数据全部收集起来,等到窗口要闭合时再统一处理,因此增量聚合函数效率更高。
但是全窗口函数的优势在于其可以提供更多的信息,其只负责收集数据并提供信息,这让我们可以对数据执行更加灵活的操作。因此,在实际使用中,常把增量聚合以及全窗口函数结合使用。
通过结合使用,只需要在增量聚合中维护一个状态。当窗口触发需要执行计算时,全窗口函数直接获取增量函数的结果即可。
示例代码如下,该代码段计算了在 10s 的时间窗口内各个 url 的人均重复访问量(url 访问量 / url 访问用户数),并以 EventUrlCount 对象的形式输出结果,每 5s 计算一次。
public class CombinedUseDemo {
public static void main(String[] args) throws Exception {
// 1. 环境准备
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
environment.setParallelism(1);
// 2. 加载数据源并设置水位线
SingleOutputStreamOperator<Event> stream = environment
// 2.1 加载数据源
.addSource(new EventSource())
// 2.2 获取时间戳、设置水位线
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner((SerializableTimestampAssigner<Event>) (event, l) -> event.timestamp));
// 3. 数据处理及输出
stream
// 3.1 分区,将所有数据发送到一个分区进行统计
.keyBy(item -> item.url)
// 3.2 设置滑动事件事件窗口,窗口大小为 10s,滑动步长为 5s
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
// 3.3 结合使用增量聚合函数和全窗口函数
.aggregate(new UrlCountAgg(), new UrlCountRes())
// 3.4 输出结果
.print();
// 4. 执行程序
environment.execute();
}
/**
* 增量聚合函数定义
*/
public static class UrlCountAgg implements AggregateFunction<Event, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(Event event, Long accumulator) {
return accumulator + 1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long aLong, Long acc1) {
return null;
}
}
/**
* 全窗口函数定义
*/
public static class UrlCountRes extends ProcessWindowFunction<Long, EventUrlCount, String, TimeWindow> {
@Override
public void process(String url, ProcessWindowFunction<Long, EventUrlCount, String, TimeWindow>.Context context,
Iterable<Long> iterable, Collector<EventUrlCount> collector) {
collector.collect(new EventUrlCount(
url,
iterable.iterator().next(),
context.window().getStart(),
context.window().getEnd()
));
}
}
}
实体类 EventUrlCount
@Data
@NoArgsConstructor
@AllArgsConstructor
public class EventUrlCount {
public String url;
public Long count;
public Long windowStart;
public Long windowEnd;
}
测试水位线与窗口的使用
在之前的介绍中,水位线就是程序时钟的指示,而时间窗口也是根据时钟对数据进行分割的,在了解完水位线和窗口的知识后,可以将两者结合起来,通过下列测试程序简单的感受一下水位线对窗口的控制。
测试代码如下:此处采用 socket 数据源算子,详情参考Flink – DataStream API
public class WaterMarkWithWindowDemo {
public static void main(String[] args) throws Exception {
// 1. 环境准备
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
environment.setParallelism(1);
// 2. 加载数据源进行数据处理
environment
// 2.1 加载 socket 数据源
.socketTextStream("XXX.XX.XX.XXX", 8080)
// 2.2 对数据源进行简单处理,封装成对象
.map(new MapFunction<String, Event>() {
@Override
public Event map(String s) throws Exception {
String[] split = s.split(",");
return new Event(
split[0].trim(),
split[1].trim(),
Long.valueOf(split[2].trim())
);
}
})
// 2.3 设置水位线,时延5s,timestamp 字段作为时间戳
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long recordTimestamp) {
return event.timestamp;
}
}))
// 2.4 按照 user 字段进行分区
.keyBy(event -> event.user)
// 2.5 配置滚动时间窗口,窗口大小为 10s
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
// 2.6 配置全窗口处理函数
.process(new WaterMarkResult())
// 2.7 输出结果
.print();
// 3. 执行处理
environment.execute();
}
public static class WaterMarkResult extends ProcessWindowFunction<Event, String, String, TimeWindow> {
/**
* 全局窗口处理函数逻辑
*/
@Override
public void process(String s, ProcessWindowFunction<Event, String, String, TimeWindow>.Context context,
Iterable<Event> iterable, Collector<String> collector) throws Exception {
long start = context.window().getStart();
long end = context.window().getEnd();
long currentWatermark = context.currentWatermark();
long count = iterable.spliterator().getExactSizeIfKnown();
collector.collect("窗口[" + start + "~" + end + "]中共有" + count + "个元素|窗口闭合计算时,水位线处于>>>" + currentWatermark);
}
}
}
执行测试,观察结果:
注意:窗口左闭右开,如 10000ms 的数据,应属于【10000-20000】窗口
窗口的其他 API
对于一个窗口算子,窗口分配器与窗口函数是必不可少的。除此之外,Flink 还提供了一些可选的 API,让我们可以更加灵活的控制窗口行为。
触发器 Trigger
触发器主要用于控制窗口何时进行触发计算,其是窗口算子的内部属性。对于 Flink 内置的窗口类型,其内部触发器都已经做了相应的实现,因此一般情况下不需要我们自定义触发器执行。
若业务逻辑确实需求自定义触发器,则直接调用 WindowedStream 的trigger()
方法并传入自定义的触发器即可。如下所示。
stream.keyBy(...)
.window(...)
.trigger(new CustomTrigger())
自定义触发器需要继承Trigger
抽象类,并实现下列四个方法:
-
onElement()
:窗口中每到来一个元素,都会调用该方法; -
onEventTime()
:当注册的事件时间定时器触发时,将调用该方法; -
onProcessingTime
:当注册的处理时间定时器触发时,将调用该方法; -
clear()
:当窗口关闭销毁时,将调用该方法,一般用于清除某些自定义的状态;
上述的方法中,处最后一个方法,另外三个方法的返回值均为枚举类型TriggerResult
,其中定义了对窗口进行操作的四种类型,用于响应事件:
-
CONTINUE
:继续,什么都不做; -
FIRE
:触发,触发计算,输出结果; -
PURGE
:清除,清空窗口中的所有数据,销毁窗口; -
FIRE_AND_PURGE
:触发并清除,触发计算输出结果,并清除窗口;
移除器 Evictor
移除器用于定义移除某些数据的逻辑,不同的窗口类型都有各自预实现的移除器。
若业务逻辑确实需求自定义移除器,则直接调用 WindowedStream 的evictor()
方法并传入自定义的移除器即可。如下所示。
stream.keyBy(...)
.window(...)
.evictor(new CustomEvictor())
自定义移除器需要实现 Evictor 接口并实现下列两个方法:
-
evictBefore()
:定义执行窗口函数之前的移除数据操作; -
evictAfter()
:定义执行窗口函数之后的一处数据操作;
允许延迟 Allowed Lateness
在事件时间语义当中,窗口中会出现迟到数据。若这部分数据在窗口触发计算并输出结果之后才到达,此时窗口一般已经销毁,因此迟到数据一般就会直接丢弃,这将导致统计结果不准确。
为解决迟到数据问题,除了之前提到过的设置水位线延迟的方法,Flink 还为开发者提供了一个特殊的接口。该接口可以为窗口算子提供一个”允许最大延迟“,在这段时间内,窗口不会被销毁,迟到的数据仍然可以被收集到相应的窗口并触发计算,直到时钟被推进到”水位线 + 允许最大延迟“时才会真正的关闭窗口。
基于 WindowedStream 调用allowedLateness()
方法并传入一个Time
类型的延迟时间即可完成允许延迟的设置,如下所示。
stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.allowedLateness(Time.minutes(5))
侧入流 Side Output
在某些情况下,即便设置了”水位线延迟“+”允许延迟时间“,很多数据还是会被丢弃。Flink 提供了侧输出流 side output 的方式来处理迟到的数据。该输出流相当于是数据流的一个分支,专门用于存放迟到过久的、本应该丢弃的数据。
基于 WindowedStream 调用sideOutputLateData()
方法并传入一个OutputTag
类型的侧入流输出标签即可实现该功能,OutputTag
中的类型应与数据流中的类型相同,如下所示。
OutputTag<Event> outputTag = new OutputTag<Event>("late") {};
stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.sideOutputLateData(outputTag)
基于窗口处理完成之后的 DataStream,调用getSideOutput()
方法并传入对应的输出标签,就可以获取到迟到数据所在的流,如下所示。
这里需要注意,getSideOutput()
是 SingleOutputStreamOperator 的方法,获取到的侧输出流数据类型应该和OutputTag
指定的类型一致,与窗口聚合之后流中的数据类型可以不同。
SingleOutputStreamOperator<AggResult> winAggStream = stream
.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.sideOutputLateData(outputTag)
.aggregate(new MyAggregateFunction())
DataStream<Event> lateStream = winAggStream.getSideOutput(outputTag);
迟到数据的处理
通过对水位线以及窗口相关知识的了解,在此总结一下对于迟到数据的处理方式。在 Flink 中,对迟到数据的处理主要有下述三种方式:
-
设置水位线延迟时间;
-
设置允许延时,允许窗口处理迟到数据;
-
将迟到数据放入侧输出流;
一般来说,对于可控的较小延迟,通过 1、2 所述的方法就可以解决。但某些情况下,即便有前面的双重保证,但窗口也无法一直等待下去,总归是要关闭的。此时只能用 3 所属方法来兜底,保证数据不会丢失。后续开发者可以从侧输出流中重新获取数据并判断其所属的窗口,对数据进行手动的汇总更新。
下述代码在之前测试增量聚合函数与全窗口函数结合使用的代码上加以改进,增加了对迟到数据的处理,可以作为简单示例感受一下整个处理过程,代码如下:
public class LateDataDemo {
public static void main(String[] args) throws Exception {
// 1. 环境准备
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
environment.setParallelism(1);
// 2. 加载数据源并设置水位线
SingleOutputStreamOperator<Event> stream = environment
// 2.1 加载数据源
.socketTextStream("47.92.146.85", 8080)
// 2.2 对数据源进行简单处理,封装成对象
.map(new MapFunction<String, Event>() {
@Override
public Event map(String s) throws Exception {
String[] split = s.split(",");
return new Event(
split[0].trim(),
split[1].trim(),
Long.valueOf(split[2].trim())
);
}
})
// 2.3 获取时间戳、设置水位线
.assignTimestampsAndWatermarks(WatermarkStrategy
// 迟到数据处理方式一:设置水位线延时
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2L))
.withTimestampAssigner((SerializableTimestampAssigner<Event>) (event, l) -> event.timestamp));
// 定义侧输出流标签
OutputTag<Event> outputTag = new OutputTag<Event>("late"){};
// 3. 数据处理及输出
SingleOutputStreamOperator<EventUrlCount> result = stream
// 3.1 分区,将所有数据发送到一个分区进行统计
.keyBy(event -> event.url)
// 3.2 设置滚动窗口,窗口大小为 10s
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
// 3.3 设置窗口允许延迟时间为 1min
// 迟到数据处理方式二:设置窗口允许延时时间
.allowedLateness(Time.minutes(1L))
// 3.4 将最终的迟到数据输出到侧输出流
// 迟到数据处理方式三:迟到数据入侧输出流
.sideOutputLateData(outputTag)
// 3.5 结合使用增量聚合函数和全窗口函数
.aggregate(new UrlCountAgg(), new UrlCountRes());
// 4. 输出结果
// 4.1 正常数据输出
result.print("result");
// 4.2 迟到数据输出
result.getSideOutput(outputTag).print("late");
// 5. 执行程序
environment.execute();
}
/**
* 增量聚合函数定义
*/
public static class UrlCountAgg implements AggregateFunction<Event, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(Event event, Long accumulator) {
return accumulator + 1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long aLong, Long acc1) {
return null;
}
}
/**
* 全窗口函数定义
*/
public static class UrlCountRes extends ProcessWindowFunction<Long, EventUrlCount, String, TimeWindow> {
@Override
public void process(String url, ProcessWindowFunction<Long, EventUrlCount, String, TimeWindow>.Context context,
Iterable<Long> iterable, Collector<EventUrlCount> collector) {
collector.collect(new EventUrlCount(
url,
iterable.iterator().next(),
context.window().getStart(),
context.window().getEnd()
));
}
}
}
测试结果如下,数据的关键时间点用红色方框标出,箭头指示的是聚合结果中包含的数据