背景
本文我们实现一个周期性触发的自定义触发器,顺便看下实现自定义触发器的一些要点
周期性触发器实现
实现一个每分钟触发一次的自定义事件时间触发器,实现代码和注意事项如下所示
package wikiedits.trigger;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
public class OneMinuteIntervalTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;
private final long interval;
// 触发时间的状态对象
private final ValueStateDescriptor<Long> stateDesc =
new ValueStateDescriptor<>("fire-time", TypeInformation.of(Long.class));
private OneMinuteIntervalTrigger(long interval) {
this.interval = interval;
}
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {// 这里其实不是必要的,取决于窗口结束时间到之后是否要触发一次计算
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {// 多次注册也没事,反正是同一个计时器,这表明窗口结束时想要触发一次计算,此外注意getEnd和maxTimestamp方法的区别
ctx.registerEventTimeTimer(window.maxTimestamp());
}
// 仅仅在第一次未注册时注册一次,后续由ontimer触发
ValueState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
if (fireTimestamp.value() == null) {
long start = timestamp - (timestamp % interval);
long nextFireTimestamp = start + interval;
ctx.registerEventTimeTimer(nextFireTimestamp);
fireTimestamp.update(nextFireTimestamp);
}
return TriggerResult.CONTINUE;
}
// 计时器触发的函数
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
// 这里窗口结束时触发不是必要的,取决于是否想要在窗口结束是触发一次计算,并且这里如果不处理延迟的消息,可以返回FIRE_AND_PURGE清理窗口状态(但是注意即使返回PURGE,也不会清理触发器的状态)
if (time == window.maxTimestamp()) {
return TriggerResult.FIRE;
}
ValueState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);
Long fireTimestamp = fireTimestampState.value();
// 继续注册计时器
if (fireTimestamp != null && fireTimestamp == time) {
fireTimestampState.update(time + interval);
ctx.registerEventTimeTimer(time + interval);
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
// 清理触发器状态
ValueState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
Long timestamp = fireTimestamp.value();
if (timestamp != null) {
ctx.deleteEventTimeTimer(timestamp);
fireTimestamp.clear();
}
}
}
代码里面注解已经比较详细的说明了注意事项,此外对于状态的清理,我们需要看的是WindowOperator,如下