1. 窗口的其他API简介
对于一个窗口算子而言,窗口分配器和窗口函数是必不可少的。除此之外,Flink 还提供了其他一些可选的 API,可以更加灵活地控制窗口行为。
1.1 触发器(Trigger)
触发器主要是用来控制窗口什么时候触发计算。所谓的"触发计算"本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程。
基于WindowsStream调用.trigger()方法,参数传入一个自定义的窗口触发器(Trigger)。
stream.keyBy(...)
.window(...)
.trigger(new MyTrigger())
Trigger
是窗口算子的内部属性,每个窗口分配器(WindowAssigner)
都会对应一个默认的触发器;对于 Flink 内置的窗口类型,它们的触发器都已经做了实现。例如,所有事件时间窗口,默认的触发器都是 EventTimeTrigger
;类似还有 ProcessingTimeTrigger
和 CountTrigger
。所以一般情况下是不需要自定义触发器的。
Trigger
是一个抽象类,自定义时必须实现以下四个抽象方法:
onElement()
:窗口中每到来一个元素,都会调用这个方法。onEventTime()
:当注册的事件时间定时触发时,将调用这个方法。onProcessingTime ()
:当注册的处理时间定时器触发时,将调用这个方法。clear()
:当窗口关闭销毁时,调用这个方法。一般用来清除自定义的状态。
除了 clear()
比较像生命周期方法,其他三个方法其实都是对某种事件的响应。onElement()
是对流中数据元素到来的响应;而另两个则是对时间的响应。这些方法参数中都有一个“触发器上下文”(TriggerContext)
对象,可以用来注册定时器回调(callback)
。对于时间窗口(TimeWindow)
而言,就是在窗口的结束时间设定了一个定时器,这样到时间就可以触发窗口的计算输出了。
另外这三个方法的返回类型都是 TriggerResult
,这是一个枚举类型(enum)
,其中定义了对窗口进行操作的四种类型
。
CONTINUE
(继续):什么都不做FIRE
(触发):触发计算,输出结果PURGE
(清除):清空窗口中的所有数据,销毁窗口FIRE_AND_PURGE
(触发并清除):触发计算输出结果,并清除窗口
Trigger
除了可以控制触发计算,还可以定义窗口什么时候关闭(销毁)。并且TriggerResult
的返回结果可以让计算输出结果和关闭窗口分开执行。
示例:
在日常业务场景中,我们经常会开比较大的窗口来计算每个窗口的pv 或者 uv 等数据。但窗口开的太大,会使我们看到计算结果的时间间隔变长。所以我们可以使用触发器,来隔一段时间触发一次窗口计算。我们在代码中计算了每个 url 在 10 秒滚动窗口的 pv 指标,然后设置了触发器,每隔 1 秒钟触发一次窗口的计算。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 创建自定义数据源的实时流
DataStream<Event> stringDataStreamSource = env.addSource(new CustomSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.getTimestamp();
}
})
);
stringDataStreamSource.keyBy(Event::getUrl)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.trigger(new MyTrigger())
.process(new WindowResult())
.print();
stringDataStreamSource.print("data");
env.execute();
}
public static class WindowResult extends ProcessWindowFunction<Event, UrlViewCount, String, TimeWindow> {
@Override
public void process(String s, ProcessWindowFunction<Event, UrlViewCount, String, TimeWindow>.Context context, Iterable<Event> elements, Collector<UrlViewCount> out) throws Exception {
out.collect(new UrlViewCount(s, elements.spliterator().getExactSizeIfKnown(), context.window().getStart(), context.window().getEnd()));
}
}
private static class UrlViewCount {
private String s;
private long size;
private long start;
private long end;
public UrlViewCount(String s, long size, long start, long end) {
this.s = s;
this.size = size;
this.start = start;
this.end = end;
}
@Override
public String toString() {
return "UrlViewCount{" +
"s='" + s + '\'' +
", size=" + size +
", start='" + start + '\'' +
", end='" + end + '\'' +
'}';
}
}
public static class MyTrigger extends Trigger<Event, TimeWindow> {
@Override
public TriggerResult onElement(Event element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
ValueState<Boolean> isFirstEvent = ctx.getPartitionedState(new ValueStateDescriptor<Boolean>("first-event", Types.BOOLEAN));
if (isFirstEvent.value() == null) {
for (long i = window.getStart(); i < window.getEnd() ; i = i + 1000L) {
ctx.registerEventTimeTimer(i);
}
isFirstEvent.update(true);
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ValueState<Boolean> isFirstEvent = ctx.getPartitionedState(new ValueStateDescriptor<Boolean>("first-event", Types.BOOLEAN));
isFirstEvent.clear();
}
}
输出结果:
1.2 移除器(Evictor)
移除器主要用来定义移除某些数据的逻辑。基于 WindowedStream
调用.evictor()
方法,就可以传入一个自定义的移除器(Evictor)
。Evictor
是一个接口,不同的窗口类型都有各自预实现的移除器。
Evictor
接口定义了两个方法:
evictBefore()
:定义执行窗口函数之前的移除数据操作evictAfter()
:定义执行窗口函数之后的以处数据操作
默认情况下,预实现的移除器都是在执行窗口函数(window fucntions)
之前移除数据的。
1.3 允许延迟(Allowed Lateness)
在事件时间语义下,窗口会出现迟到的数据。之所以出现迟到数据是因为在乱序流中,水位线并一定能保证时间戳更早的所有数据不会再出现,当水位线
已经到达窗口的结束时间时,窗口触发计算并输出结果
,这时一般就要销毁窗口
了;如果还有本该属于这个窗口的数据到达,默认情况下会被丢弃。
大多数情况下直接丢弃数据会导致统计结果不准,为了解决迟到数据的问题,Flink
提供了一个特殊接口,可以为窗口算子设置一个"允许最大延迟
"(Allowed Lateness
)。也就是说,我们可以设定允许延迟一段时间
,在这段时间内,窗口不会销毁,继续到来的数据依然可以进入窗口中并触发计算。直到水位线推进到了 窗口结束时间 + 延迟时间
,才真正将窗口的内容清空,正式关闭窗口。
基于 WindowedStream
调用.allowedLateness()
方法,传入一个Time
类型的延迟时间,就可以表示允许这段时间内的延迟数据。
stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.allowedLateness(Time.minutes(1))
比如上面的代码中,我们定义了 1 小时的滚动窗口
,并设置了允许 1 分钟
的延迟数据。也就是说,在不考虑水位线延迟的情况下,对于 8 点~9 点
的窗口,本来应该是水位线到达 9 点整
就触发计算并关闭
窗口;现在允许延迟 1 分钟,那么 9 点整就只是触发一次计算并输出结果
,并不会关窗
。后续到达的数据,只要属于 8 点~9 点
窗口,依然可以在之前统计的基础上继续叠加
,并且再次输出一个更新后的结果。直到水位线到达了 9 点零 1 分
,这时就真正清空状态、关闭窗口
,之后再来的迟到数据就会被丢弃了。
从这里可以看到,窗口的触发计算(Fire)
和清除(Purge)
操作确实可以分开。不过在默认情况下,允许的延迟是 0,这样一旦水位线到达了窗口结束时间就会触发计算并清除窗口,两个操作看起来就是同时发生了。当窗口被清除(关闭)之后,
再来的数据就会被丢弃。
1.4 将迟到的数据放入侧输出流
对于处理迟到数据,仅仅提供延迟时间还是会出现迟到的数据,所以Flink提供了另外一种方式处理迟到数据。可以将迟到数据放到侧输出流(side output)
进行另外的处理。所谓的侧输出流,相当于是数据流的一个“分支”
,这个流中单独放置那些错过了该上的车、本该被丢弃的数据。
基于 WindowedStream
调用.sideOutputLateData()
方法,就可以实现这个功能。方法需要传入一个“输出标签”(OutputTag)
,用来标记分支的迟到数据流。因为保存的就是流中的原始数据,所以OutputTag
的类型与流中数据类型相同。
DataStream<Event> stream = env.addSource(...);
OutputTag<Event> outputTag = new OutputTag<Event>("late") {};
stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.sideOutputLateData(outputTag)
将迟到数据放入侧输出流
之后,还可以将它提取出来。基于窗口处理完成之后的DataStream
,调用.getSideOutput()
方法,传入对应的输出标签
,就可以获取到迟到数据
所在的流了。
SingleOutputStreamOperator<AggResult> winAggStream = stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.sideOutputLateData(outputTag)
.aggregate(new MyAggregateFunction())
DataStream<Event> lateStream = winAggStream.getSideOutput(outputTag);
这里注意,getSideOutput()
是SingleOutputStreamOperator
的方法,获取到的侧输出流数据
类型应该和 OutputTag
指定的类型一致
,与窗口聚合
之后流中的数据类型
可以不同。