Trigger 触发器
触发器作用:控制窗口什么时候除法计算。即执行窗口函数;基于WindowStream调用trigger()方法,传入自定义触发器(trigger);
每一个窗口分配器(windowAssigner) 都会对应一个默认的触发器;
源码样例
@PublicEvolving
public <W extends Window> WindowedStream<T, KEY, W> window(
WindowAssigner<? super T, W> assigner) {
return new WindowedStream<>(this, assigner);
}
@PublicEvolving
public WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {
this.input = input;
this.builder =
new WindowOperatorBuilder<>(
windowAssigner,
windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()),
input.getExecutionConfig(),
input.getType(),
input.getKeySelector(),
input.getKeyType());
}
==============默认触发器===
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return EventTimeTrigger.create();
}
Triger类有4个方法
-
onElement:窗口中每来一个元素调用该方法。 onProcessingTime:当注册的处理时间定时器触发时,将调用这个方法。 onEventTime:当注时的事件时间定时器触发时,将调用这个方法。 clear:窗口关闭冰销毁时调用这个方法,一般用来清除自定义状态。 onElement() ,onProcessingTime(),onEventTime()方法的返回类型都是 TriggerResult;TriggerResult中包含四个枚举值: CONTINUE:表示对窗口不执行任何操作。 FIRE:触发计算并输出结果。注意计算完成后,窗口中的数据并不会被清除,将会被保留。 PURGE:表示将窗口中的数据和窗口清除。 FIRE_AND_PURGE:表示先将数据进行计算,输出结果,然后将窗口中的数据和窗口进行清除。
源码
/** No action is taken on the window. */
CONTINUE(false, false),
/** {@code FIRE_AND_PURGE} evaluates the window function and emits the window result. */
FIRE_AND_PURGE(true, true),
/**
* On {@code FIRE}, the window is evaluated and results are emitted. The window is not purged,
* though, all elements are retained.
*/
FIRE(true, false),
/**
* All elements in the window are cleared and the window is discarded, without evaluating the
* window function or emitting any elements.
*/
PURGE(false, true);
flink提供的触发器
flink提供触发器
ProcessingTimeoutTrigger
- EventTimeTrigger:通过对比EventTime和窗口的Endtime确定是否触发窗口计算,如果EventTime大于Window EndTime则触发,否则不触发,窗口将继续等待。
- ProcessingTimeoutTrigger:当内置触发器满足设置的超时时间时,触发窗口的计算。
- ProcessTimeTrigger:通过对比ProcessTime和窗口EndTme确定是否触发窗口,如果ProcessTime大于EndTime则触发计算,否则窗口继续等待。
- ContinuousEventTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束时间小于当前EndTime触发窗口计算。
- ContinuousProcessingTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束时间小于当前ProcessTime触发窗口计算。
- CountTrigger:根据接入数据量是否超过设定的阙值判断是否触发窗口计算。
- DeltaTrigger:根据接入数据计算出来的Delta指标是否超过指定的Threshold去判断是否触发窗口计算。
- PurgingTrigger:可以将任意触发器作为参数转换为Purge类型的触发器,计算完成后数据将被清理。
- NeverTrigger:任何时候都不触发窗口计算,全局窗口触发器,
原文链接:https://blog.csdn.net/qq_37555071/article/details/122514061
水印触发一般是窗口关闭时间
flink提供的触发器是与窗口对应,当有水印时,如果水印时间大于等于窗口结束时间会触发计算;window.maxTimestamp()获取的是窗口end-1; EventTimeTrigger 的源码可以很明确可以看到注册时注册了触发时间为window.maxTimestamp(),这也是窗口关闭的触发机制。
如果在窗口关闭前触发计算设置多个触发计算时间,这样实现一些特定的需求。例如,每10s输出一次当天的累计数据;
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private EventTimeTrigger() {}
@Override
public TriggerResult onElement(
Object element, long timestamp, TimeWindow window, TriggerContext ctx)
throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
// 限定触发条件为窗口关闭时间,否则就继续窗口
return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx)
throws Exception {
return TriggerResult.CONTINUE;
}
.....
自定义触发器
继承Triger,重写抽象方法,案例
.window(TumblingEventTimeWindows.of(Time.hours(24)))
.trigger(new MyTrigger())
.process(new WindowResult())
.print();
窗口长24小时,每十秒触发一次计算
===================
public static class MyTrigger extends Trigger<Event, TimeWindow> {
@Override
public TriggerResult onElement(Event event, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
//定义状态,记录该状态 触发器第一个元素进来时注册全部的触发器
ValueState<Boolean> isFirstEvent = triggerContext.getPartitionedState(
new ValueStateDescriptor<Boolean>("first-event", Types.BOOLEAN)
);
//第一次注册,右面全部跳过
if (isFirstEvent.value() == null) {
for (long i = timeWindow.getStart(); i < timeWindow.getEnd(); i = i + 10000L) {
//注册触发器 间隔10s
triggerContext.registerEventTimeTimer(i);
}
isFirstEvent.update(true);
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
//使用的事件时间,因此触发窗口的计算
return TriggerResult.FIRE;
}
@Override
public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
ValueState<Boolean> isFirstEvent = triggerContext.getPartitionedState(
new ValueStateDescriptor<Boolean>("first-event", Types.BOOLEAN)
);
isFirstEvent.clear();
}
}
移除器Evictor
作用:主要用来定义移除某些数据的逻辑。基于windowedStream调用evictor()方法,就可以传入一个自定义得移除器(Evictor)。不同窗口类型都有各自预测实现的移除器。
stream.keyby().window().evictor(new MyEvictor)
evictBefore():定义窗口执行函数之前移除的数据操作,移除后的数据不参与窗口计算;
evictAfter():定义执行窗口函数后移除数据的操作;
默认情况下预实现的移出弃都是在执行窗口函数之前移除数据
注意:如果在evict中使用了iterable.iterator(),后面再次使用时不能直接使用
.keyBy(r -> r.user)
.window(TumblingEventTimeWindows.of(Time.seconds(10)));
window.evictor(new Evictor<Event, TimeWindow>() {
@Override
public void evictBefore(Iterable<TimestampedValue<Event>> elements, int size, TimeWindow window, EvictorContext evictorContext) {
Iterator<TimestampedValue<Event>> iterator = elements.iterator();
while (iterator.hasNext()){
TimestampedValue<Event> next = iterator.next();
if(next.getValue().url.equals("./prod?id=1")){
iterator.remove();
}
}
}
@Override
public void evictAfter(Iterable<TimestampedValue<Event>> elements, int size, TimeWindow window, EvictorContext evictorContext) {
return;
}
})
.process(new ProcessWindowFunction<Event, UrlViewCount, String, TimeWindow>() {
@Override
public void process(String s, ProcessWindowFunction<Event, UrlViewCount, String, TimeWindow>.Context context, Iterable<Event> elements, Collector<UrlViewCount> out) throws Exception {
AtomicInteger i= new AtomicInteger();
elements.forEach(v-> i.getAndIncrement());
out.collect(
new UrlViewCount(
s+"====",
// 获取迭代器中的元素个数
不能再使用iterable.spliterator().getExactSizeIfKnown(),否侧获取数据一一直为-1
i.longValue(),
context.window().getStart(),
context.window().getEnd()
));
} })
.print();