Flink 窗口触发器(Trigger)(一)

news2024/11/22 11:51:20

Flink 窗口触发器(Trigger)(一)
Flink 窗口触发器(Trigger)(二)

Flink的窗口触发器(Trigger)是流处理中一个非常关键的概念,它定义了窗口何时被触发并决定触发后的行为(如进行窗口数据的计算或清理)。

一、基本概念

  • 定义:触发器决定了窗口何时被触发以及触发后的行为。在Flink中,窗口的触发是通过设置定时器来实现的。
  • 作用:控制窗口数据的聚合时机,确保数据在适当的时间点被处理和输出。
    在这里插入图片描述

二、类型

Flink提供了多种内置的触发器,以下几种为常用类型:

  1. EventTimeTrigger
  • 工作原理:基于事件时间和水印(Watermark)机制来触发窗口计算。当窗口的最大时间戳小于等于当前的水印时,立即触发窗口计算。
  • 适用场景:适用于需要基于事件时间进行处理的场景,如金融交易、日志分析等。
  1. ProcessingTimeTrigger
  • 工作原理:基于处理时间(即机器的系统时间)来触发窗口计算。当处理时间达到窗口的结束时间时,触发窗口计算。
  • 适用场景:适用于对时间精度要求不高的场景,或者当事件时间无法准确获取时。
  1. CountTrigger
  • 工作原理:根据窗口内元素的数量来触发计算。当窗口内的元素数量达到预设的阈值时,触发窗口计算。
  • 适用场景:适用于需要基于数据量进行处理的场景,如批量数据处理、流量分析等。
  1. ContinuousEventTimeTriggerContinuousProcessingTimeTrigger
  • 工作原理:根据间隔时间周期性触发窗口计算,或者当窗口的结束时间小于当前的时间(事件时间或处理时间)时触发计算。
  • 适用场景:适用于需要周期性处理数据的场景,如实时监控、周期性报表等。
  1. DeltaTrigger
  • 工作原理:根据接入数据计算出的Delta指标是否超过指定的阈值来触发窗口计算。
  • 适用场景:适用于需要基于数据变化量进行处理的场景,如异常检测、趋势分析等。
  1. PurgingTrigger
  • 工作原理:将其他触发器作为参数转换为Purge类型的触发器,在触发计算后清除窗口内的数据。
  • 适用场景:适用于需要在计算完成后立即清除窗口数据的场景,以节省存储空间。

三、关键方法

触发器通常包含以下几个关键方法:

  1. onElement(T element, long timestamp, W window, TriggerContext ctx)
    当元素被添加到窗口时调用,用于注册定时器或更新窗口状态。
  2. onEventTime(long time, W window, TriggerContext ctx)
    当事件时间计时器触发时调用,用于处理事件时间相关的触发逻辑。
  3. onProcessingTime(long time, W window, TriggerContext ctx)
    当处理时间计时器触发时调用,用于处理处理时间相关的触发逻辑。
  4. onMerge(W window, OnMergeContext ctx)(可选)
    当两个窗口合并时调用,用于合并窗口的状态和定时器。
  5. clear(W window, TriggerContext ctx)
    当窗口被删除时调用,用于清理窗口的状态和定时器。

四、行为

触发器在触发时会返回一个TriggerResult枚举值,以决定窗口的后续行为。常见的TriggerResult值包括:

  • CONTINUE:表示不进行任何操作,等待下一个触发条件。
  • FIRE:表示触发窗口计算并输出结果,但窗口状态保持不变。
  • PURGE:表示不触发窗口计算,只清除窗口内的数据和状态。
  • FIRE_AND_PURGE:表示触发窗口计算并输出结果,然后清除窗口内的数据和状态。

Flink的窗口触发器是流处理中非常灵活且强大的工具,它允许开发者根据实际需求定义窗口的触发条件和触发后的行为。通过选择合适的触发器和配置相应的参数,可以实现高效、准确的流数据处理。

五、Trigger

EventTimeTrigger

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.apache.flink.streaming.api.windowing.triggers;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private EventTimeTrigger() {
    }

    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

    public TriggerResult onEventTime(long time, TimeWindow window, Trigger.TriggerContext ctx) {
        return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
    }

    public TriggerResult onProcessingTime(long time, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    public void clear(TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
        ctx.deleteEventTimeTimer(window.maxTimestamp());
    }

    public boolean canMerge() {
        return true;
    }

    public void onMerge(TimeWindow window, Trigger.OnMergeContext ctx) {
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
            ctx.registerEventTimeTimer(windowMaxTimestamp);
        }

    }

    public String toString() {
        return "EventTimeTrigger()";
    }

    public static EventTimeTrigger create() {
        return new EventTimeTrigger();
    }
}

ProcessingTimeTrigger

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.apache.flink.streaming.api.windowing.triggers;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

@PublicEvolving
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private ProcessingTimeTrigger() {
    }

    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) {
        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }

    public TriggerResult onEventTime(long time, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    public TriggerResult onProcessingTime(long time, TimeWindow window, Trigger.TriggerContext ctx) {
        return TriggerResult.FIRE;
    }

    public void clear(TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
        ctx.deleteProcessingTimeTimer(window.maxTimestamp());
    }

    public boolean canMerge() {
        return true;
    }

    public void onMerge(TimeWindow window, Trigger.OnMergeContext ctx) {
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
            ctx.registerProcessingTimeTimer(windowMaxTimestamp);
        }

    }

    public String toString() {
        return "ProcessingTimeTrigger()";
    }

    public static ProcessingTimeTrigger create() {
        return new ProcessingTimeTrigger();
    }
}

ProcessingTimeoutTrigger

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.apache.flink.streaming.api.windowing.triggers;

import java.time.Duration;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.windows.Window;

@PublicEvolving
public class ProcessingTimeoutTrigger<T, W extends Window> extends Trigger<T, W> {
    private static final long serialVersionUID = 1L;
    private final Trigger<T, W> nestedTrigger;
    private final long interval;
    private final boolean resetTimerOnNewRecord;
    private final boolean shouldClearOnTimeout;
    private final ValueStateDescriptor<Long> timeoutStateDesc;

    private ProcessingTimeoutTrigger(Trigger<T, W> nestedTrigger, long interval, boolean resetTimerOnNewRecord, boolean shouldClearOnTimeout) {
        this.nestedTrigger = nestedTrigger;
        this.interval = interval;
        this.resetTimerOnNewRecord = resetTimerOnNewRecord;
        this.shouldClearOnTimeout = shouldClearOnTimeout;
        this.timeoutStateDesc = new ValueStateDescriptor("timeout", LongSerializer.INSTANCE);
    }

    public TriggerResult onElement(T element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
        TriggerResult triggerResult = this.nestedTrigger.onElement(element, timestamp, window, ctx);
        if (triggerResult.isFire()) {
            this.clear(window, ctx);
            return triggerResult;
        } else {
            ValueState<Long> timeoutState = (ValueState)ctx.getPartitionedState(this.timeoutStateDesc);
            long nextFireTimestamp = ctx.getCurrentProcessingTime() + this.interval;
            Long timeoutTimestamp = (Long)timeoutState.value();
            if (timeoutTimestamp != null && this.resetTimerOnNewRecord) {
                ctx.deleteProcessingTimeTimer(timeoutTimestamp);
                timeoutState.clear();
                timeoutTimestamp = null;
            }

            if (timeoutTimestamp == null) {
                timeoutState.update(nextFireTimestamp);
                ctx.registerProcessingTimeTimer(nextFireTimestamp);
            }

            return triggerResult;
        }
    }

    public TriggerResult onProcessingTime(long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
        TriggerResult triggerResult = this.nestedTrigger.onProcessingTime(timestamp, window, ctx);
        if (this.shouldClearOnTimeout) {
            this.clear(window, ctx);
        }

        return triggerResult.isPurge() ? TriggerResult.FIRE_AND_PURGE : TriggerResult.FIRE;
    }

    public TriggerResult onEventTime(long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
        TriggerResult triggerResult = this.nestedTrigger.onEventTime(timestamp, window, ctx);
        if (this.shouldClearOnTimeout) {
            this.clear(window, ctx);
        }

        return triggerResult.isPurge() ? TriggerResult.FIRE_AND_PURGE : TriggerResult.FIRE;
    }

    public void clear(W window, Trigger.TriggerContext ctx) throws Exception {
        ValueState<Long> timeoutTimestampState = (ValueState)ctx.getPartitionedState(this.timeoutStateDesc);
        Long timeoutTimestamp = (Long)timeoutTimestampState.value();
        if (timeoutTimestamp != null) {
            ctx.deleteProcessingTimeTimer(timeoutTimestamp);
            timeoutTimestampState.clear();
        }

        this.nestedTrigger.clear(window, ctx);
    }

    public String toString() {
        return "TimeoutTrigger(" + this.nestedTrigger.toString() + ")";
    }

    public static <T, W extends Window> ProcessingTimeoutTrigger<T, W> of(Trigger<T, W> nestedTrigger, Duration timeout) {
        return new ProcessingTimeoutTrigger(nestedTrigger, timeout.toMillis(), false, true);
    }

    public static <T, W extends Window> ProcessingTimeoutTrigger<T, W> of(Trigger<T, W> nestedTrigger, Duration timeout, boolean resetTimerOnNewRecord, boolean shouldClearOnTimeout) {
        return new ProcessingTimeoutTrigger(nestedTrigger, timeout.toMillis(), resetTimerOnNewRecord, shouldClearOnTimeout);
    }
}

CountTrigger

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.apache.flink.streaming.api.windowing.triggers;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.windows.Window;

@PublicEvolving
public class CountTrigger<W extends Window> extends Trigger<Object, W> {
    private static final long serialVersionUID = 1L;
    private final long maxCount;
    private final ReducingStateDescriptor<Long> stateDesc;

    private CountTrigger(long maxCount) {
        this.stateDesc = new ReducingStateDescriptor("count", new Sum(), LongSerializer.INSTANCE);
        this.maxCount = maxCount;
    }

    public TriggerResult onElement(Object element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
        ReducingState<Long> count = (ReducingState)ctx.getPartitionedState(this.stateDesc);
        count.add(1L);
        if ((Long)count.get() >= this.maxCount) {
            count.clear();
            return TriggerResult.FIRE;
        } else {
            return TriggerResult.CONTINUE;
        }
    }

    public TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx) {
        return TriggerResult.CONTINUE;
    }

    public TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    public void clear(W window, Trigger.TriggerContext ctx) throws Exception {
        ((ReducingState)ctx.getPartitionedState(this.stateDesc)).clear();
    }

    public boolean canMerge() {
        return true;
    }

    public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception {
        ctx.mergePartitionedState(this.stateDesc);
    }

    public String toString() {
        return "CountTrigger(" + this.maxCount + ")";
    }

    public static <W extends Window> CountTrigger<W> of(long maxCount) {
        return new CountTrigger(maxCount);
    }

    private static class Sum implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1L;

        private Sum() {
        }

        public Long reduce(Long value1, Long value2) throws Exception {
            return value1 + value2;
        }
    }
}

ContinuousEventTimeTrigger和ContinuousProcessingTimeTrigger

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.apache.flink.streaming.api.windowing.triggers;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.Window;

@PublicEvolving
public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object, W> {
    private static final long serialVersionUID = 1L;
    private final long interval;
    private final ReducingStateDescriptor<Long> stateDesc;

    private ContinuousEventTimeTrigger(long interval) {
        this.stateDesc = new ReducingStateDescriptor("fire-time", new Min(), LongSerializer.INSTANCE);
        this.interval = interval;
    }

    public TriggerResult onElement(Object element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            ReducingState<Long> fireTimestampState = (ReducingState)ctx.getPartitionedState(this.stateDesc);
            if (fireTimestampState.get() == null) {
                this.registerNextFireTimestamp(timestamp - timestamp % this.interval, window, ctx, fireTimestampState);
            }

            return TriggerResult.CONTINUE;
        }
    }

    public TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
        if (time == window.maxTimestamp()) {
            return TriggerResult.FIRE;
        } else {
            ReducingState<Long> fireTimestampState = (ReducingState)ctx.getPartitionedState(this.stateDesc);
            Long fireTimestamp = (Long)fireTimestampState.get();
            if (fireTimestamp != null && fireTimestamp == time) {
                fireTimestampState.clear();
                this.registerNextFireTimestamp(time, window, ctx, fireTimestampState);
                return TriggerResult.FIRE;
            } else {
                return TriggerResult.CONTINUE;
            }
        }
    }

    public TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    public void clear(W window, Trigger.TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestamp = (ReducingState)ctx.getPartitionedState(this.stateDesc);
        Long timestamp = (Long)fireTimestamp.get();
        if (timestamp != null) {
            ctx.deleteEventTimeTimer(timestamp);
            fireTimestamp.clear();
        }

    }

    public boolean canMerge() {
        return true;
    }

    public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception {
        ctx.mergePartitionedState(this.stateDesc);
        Long nextFireTimestamp = (Long)((ReducingState)ctx.getPartitionedState(this.stateDesc)).get();
        if (nextFireTimestamp != null) {
            ctx.registerEventTimeTimer(nextFireTimestamp);
        }

    }

    public String toString() {
        return "ContinuousEventTimeTrigger(" + this.interval + ")";
    }

    @VisibleForTesting
    public long getInterval() {
        return this.interval;
    }

    public static <W extends Window> ContinuousEventTimeTrigger<W> of(Time interval) {
        return new ContinuousEventTimeTrigger(interval.toMilliseconds());
    }

    private void registerNextFireTimestamp(long time, W window, Trigger.TriggerContext ctx, ReducingState<Long> fireTimestampState) throws Exception {
        long nextFireTimestamp = Math.min(time + this.interval, window.maxTimestamp());
        fireTimestampState.add(nextFireTimestamp);
        ctx.registerEventTimeTimer(nextFireTimestamp);
    }

    private static class Min implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1L;

        private Min() {
        }

        public Long reduce(Long value1, Long value2) throws Exception {
            return Math.min(value1, value2);
        }
    }
}

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.apache.flink.streaming.api.windowing.triggers;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.Window;

@PublicEvolving
public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<Object, W> {
    private static final long serialVersionUID = 1L;
    private final long interval;
    private final ReducingStateDescriptor<Long> stateDesc;

    private ContinuousProcessingTimeTrigger(long interval) {
        this.stateDesc = new ReducingStateDescriptor("fire-time", new Min(), LongSerializer.INSTANCE);
        this.interval = interval;
    }

    public TriggerResult onElement(Object element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestampState = (ReducingState)ctx.getPartitionedState(this.stateDesc);
        timestamp = ctx.getCurrentProcessingTime();
        if (fireTimestampState.get() == null) {
            this.registerNextFireTimestamp(timestamp - timestamp % this.interval, window, ctx, fireTimestampState);
        }

        return TriggerResult.CONTINUE;
    }

    public TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    public TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestampState = (ReducingState)ctx.getPartitionedState(this.stateDesc);
        if (((Long)fireTimestampState.get()).equals(time)) {
            fireTimestampState.clear();
            this.registerNextFireTimestamp(time, window, ctx, fireTimestampState);
            return TriggerResult.FIRE;
        } else {
            return TriggerResult.CONTINUE;
        }
    }

    public void clear(W window, Trigger.TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestamp = (ReducingState)ctx.getPartitionedState(this.stateDesc);
        Long timestamp = (Long)fireTimestamp.get();
        if (timestamp != null) {
            ctx.deleteProcessingTimeTimer(timestamp);
            fireTimestamp.clear();
        }

    }

    public boolean canMerge() {
        return true;
    }

    public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception {
        ctx.mergePartitionedState(this.stateDesc);
        Long nextFireTimestamp = (Long)((ReducingState)ctx.getPartitionedState(this.stateDesc)).get();
        if (nextFireTimestamp != null) {
            ctx.registerProcessingTimeTimer(nextFireTimestamp);
        }

    }

    @VisibleForTesting
    public long getInterval() {
        return this.interval;
    }

    public String toString() {
        return "ContinuousProcessingTimeTrigger(" + this.interval + ")";
    }

    public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(Time interval) {
        return new ContinuousProcessingTimeTrigger(interval.toMilliseconds());
    }

    private void registerNextFireTimestamp(long time, W window, Trigger.TriggerContext ctx, ReducingState<Long> fireTimestampState) throws Exception {
        long nextFireTimestamp = Math.min(time + this.interval, window.maxTimestamp());
        fireTimestampState.add(nextFireTimestamp);
        ctx.registerProcessingTimeTimer(nextFireTimestamp);
    }

    private static class Min implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1L;

        private Min() {
        }

        public Long reduce(Long value1, Long value2) throws Exception {
            return Math.min(value1, value2);
        }
    }
}

DeltaTrigger

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.apache.flink.streaming.api.windowing.triggers;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
import org.apache.flink.streaming.api.windowing.windows.Window;

@PublicEvolving
public class DeltaTrigger<T, W extends Window> extends Trigger<T, W> {
    private static final long serialVersionUID = 1L;
    private final DeltaFunction<T> deltaFunction;
    private final double threshold;
    private final ValueStateDescriptor<T> stateDesc;

    private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction, TypeSerializer<T> stateSerializer) {
        this.deltaFunction = deltaFunction;
        this.threshold = threshold;
        this.stateDesc = new ValueStateDescriptor("last-element", stateSerializer);
    }

    public TriggerResult onElement(T element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
        ValueState<T> lastElementState = (ValueState)ctx.getPartitionedState(this.stateDesc);
        if (lastElementState.value() == null) {
            lastElementState.update(element);
            return TriggerResult.CONTINUE;
        } else if (this.deltaFunction.getDelta(lastElementState.value(), element) > this.threshold) {
            lastElementState.update(element);
            return TriggerResult.FIRE;
        } else {
            return TriggerResult.CONTINUE;
        }
    }

    public TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx) {
        return TriggerResult.CONTINUE;
    }

    public TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    public void clear(W window, Trigger.TriggerContext ctx) throws Exception {
        ((ValueState)ctx.getPartitionedState(this.stateDesc)).clear();
    }

    public String toString() {
        return "DeltaTrigger(" + this.deltaFunction + ", " + this.threshold + ")";
    }

    public static <T, W extends Window> DeltaTrigger<T, W> of(double threshold, DeltaFunction<T> deltaFunction, TypeSerializer<T> stateSerializer) {
        return new DeltaTrigger(threshold, deltaFunction, stateSerializer);
    }
}

PurgingTrigger

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.apache.flink.streaming.api.windowing.triggers;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.streaming.api.windowing.windows.Window;

@PublicEvolving
public class PurgingTrigger<T, W extends Window> extends Trigger<T, W> {
    private static final long serialVersionUID = 1L;
    private Trigger<T, W> nestedTrigger;

    private PurgingTrigger(Trigger<T, W> nestedTrigger) {
        this.nestedTrigger = nestedTrigger;
    }

    public TriggerResult onElement(T element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
        TriggerResult triggerResult = this.nestedTrigger.onElement(element, timestamp, window, ctx);
        return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
    }

    public TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
        TriggerResult triggerResult = this.nestedTrigger.onEventTime(time, window, ctx);
        return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
    }

    public TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
        TriggerResult triggerResult = this.nestedTrigger.onProcessingTime(time, window, ctx);
        return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
    }

    public void clear(W window, Trigger.TriggerContext ctx) throws Exception {
        this.nestedTrigger.clear(window, ctx);
    }

    public boolean canMerge() {
        return this.nestedTrigger.canMerge();
    }

    public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception {
        this.nestedTrigger.onMerge(window, ctx);
    }

    public String toString() {
        return "PurgingTrigger(" + this.nestedTrigger.toString() + ")";
    }

    public static <T, W extends Window> PurgingTrigger<T, W> of(Trigger<T, W> nestedTrigger) {
        return new PurgingTrigger(nestedTrigger);
    }

    @VisibleForTesting
    public Trigger<T, W> getNestedTrigger() {
        return this.nestedTrigger;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1890121.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

node与npm安装教程

node与npm的下载安装教程&#xff1a; 文章目录 node与npm的下载安装教程&#xff1a;---Node.js 介绍NPM 介绍 一&#xff1a;下载&#xff08;node与npm的安装包是在一起的&#xff09;二&#xff1a;安装1&#xff1a;双击运行安装文件1-node-v14.15.0-x64.msi,点击下一步。…

区块链加载解析方法

一.区块链加载解析 对于数据的下载主要包括三种方式&#xff1a; 1.实现比特币网络协议&#xff0c;通过该协议和其他比特币全节点建立联系&#xff0c;然后同步区块数据。 2.通过比特币节点提供的API服务下载区块链数据。 3.通过blickchain.com提供的rest服务下载区块数据…

windows 屏幕录制录屏;gif工具推荐;滑动截屏

1、gif工具推荐gif123 参考&#xff1a;https://gif123.aardio.com/ 很小&#xff0c;很简洁 2、滑动截屏 参考&#xff1a;https://shipinzan.com/ll-gd-jp.html 通过google、edge浏览器 3、windows 屏幕录制录屏 1&#xff09;WinG 2)笔记本 prtsc 按键 可以截图&…

ResNet50V2

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 一、ResNetV1和ResNetV2的区别 ResNetV2 和 ResNetV1 都是深度残差网络&#xff08;ResNet&#xff09;的变体&#xff0c;它们的主要区别在于残差块的设计和…

如何对低代码平台进行分类?

现在市面上的低代码平台就像雨后春笋一样冒出来&#xff0c;而且源源不绝&#xff0c;但总结下来&#xff0c;大致的也就以下三类。 一、 aPaaS多引擎类&#xff08;有很多成熟引擎、做好东西要一起用&#xff09; 这类产品包括&#xff1a;织信Informat&#xff08;国内&…

多模态图像生成的突破:Image Anything一种无需训练的智能框架

多模态图像生成是内容创作领域的热点技术&#xff0c;尤其在媒体、艺术和元宇宙等领域。该技术旨在模拟人类的想象力&#xff0c;将视觉、文本和音频等多种模态属性相关联&#xff0c;以生成图像。早期的方法主要侧重于单一模态输入的图像生成&#xff0c;例如基于图像、文本或…

C++部分复习笔记下

7. C11 范围for 使用格式 vector<int> v { 1,2,3,4,5 }; for (auto e : v) {cout << e << " "; } cout << endl;底层原理&#xff0c;使用迭代器 vector<int> v { 1,2,3,4,5 }; auto it v.begin(); while (it ! v.end()) {cout…

项目2:API Hunter 细节回顾 -1

一. 接口调用 对于开发者来说&#xff0c;接口的调用应当是方便快捷的&#xff0c;而且出于安全考虑&#xff0c;通常会选择在后端调用第三方 API&#xff0c;避免在前端暴露诸如密码的敏感信息。 若采用 HTTP 调用方式&#xff1a; HttpClientRestTemplate第三方库&#xf…

kaggle量化赛金牌方案(第七名解决方案)(下)

— 无特征工程的神经网络模型&#xff08;得分 5.34X&#xff09; 比赛进入最后阶段&#xff0c;现在是时候深入了解一些关于神经网络模型的见解了。由于 Kaggle 讨论区的需求&#xff0c;我在这里分享两个神经网络模型。第一个是 LSTM 模型&#xff0c;第二个是卷积网络&…

PyPDF2指定范围拆分PDF文件为单个页面

本文目录 前言一、指定范围拆分PDF1、过程讲解2、拆分效果图3、完整代码二、其他问题1、更改页码索引值前言 上一篇文章讲解了怎么讲一个PDF文档分割为多个单页面PDF,本文来讲解一下进阶,就是指定范围拆分PDF页面,有的时候,我们只想把PDF文档中的某几页拆分出来,而不是全…

【论文解读】iSDF: Real-Time Neural Signed Distance Fields for Robot Perception

《iSDF: Real-Time Neural Signed Distance Fields for Robot Perception》提出了一种用于实时签名距离场&#xff08;SDF&#xff09;重建的持续学习系统。 论文&#xff1a;https://arxiv.org/abs/2204.02296https://arxiv.org/abs/2204.02296 项目&#xff1a;iSDFhttps:/…

QT创建地理信息shp文件编辑器shp_editor

空闲之余创建一个简单的矢量shp文件编辑器&#xff0c;加深对shp文件的理解。 一、启动程序 二、打开shp文件 三、显示shp文件的几何图形 四、双击右边表格中的feature&#xff0c;主窗体显示选中feature的各个节点。 五、鼠标在主窗体中选中feature的节点&#xff0c;按鼠标左…

【坚果识别】果实识别+图像识别系统+Python+计算机课设+人工智能课设+卷积算法

一、介绍 坚果识别系统&#xff0c;使用Python语言进行开发&#xff0c;通过TensorFlow搭建卷积神经网络算法模型&#xff0c;对10种坚果果实&#xff08;‘杏仁’, ‘巴西坚果’, ‘腰果’, ‘椰子’, ‘榛子’, ‘夏威夷果’, ‘山核桃’, ‘松子’, ‘开心果’, ‘核桃’&a…

Python爬虫实战案例——王者荣耀皮肤抓取

大家好&#xff0c;我是你们的老朋友——南枫&#xff0c;今天我们一起来学习一下该如何抓取大家经常玩的游戏——王者荣耀里面的所有英雄的皮肤。 老规矩&#xff0c;直接上代码&#xff1a; 导入我们需要使用到的&#xff0c;也是唯一用到的库&#xff1a; 我们要抓取皮肤其…

使用ref定义响应式数据变量

Ref 使用 Ref 可以方便地创建和管理Vue组件中的响应式数据。例如&#xff0c;如果你有一个计数器组件&#xff0c;你可以使用 Ref 来创建一个响应式的计数器变量&#xff0c;然后在组件内部或外部修改这个变量的值&#xff0c;而不需要手动触发视图更新。 先声明一个变量&…

数据结构初阶 堆的问题详解(三)

题目一 4.一棵完全二叉树的节点数位为531个&#xff0c;那么这棵树的高度为&#xff08; &#xff09; A 11 B 10 C 8 D 12 我们有最大的节点如下 假设最大高度为10 那么它的最多节点应该是有1023 假设最大高度为9 那么它的最多节点应该是 511 所以说这一题选B 题目二 …

昇思25天学习打卡营第11天|基于MindSpore通过GPT实现情感分类

学AI还能赢奖品&#xff1f;每天30分钟&#xff0c;25天打通AI任督二脉 (qq.com) 基于MindSpore通过GPT实现情感分类 %%capture captured_output # 实验环境已经预装了mindspore2.2.14&#xff0c;如需更换mindspore版本&#xff0c;可更改下面mindspore的版本号 !pip uninsta…

【深海王国】小学生都能玩的语音模块?ASRPRO打造你的第一个智能语音助手(4)

Hi~ (o^^o)♪, 各位深海王国的同志们&#xff0c;早上下午晚上凌晨好呀~ 辛勤工作的你今天也辛苦啦(/≧ω) 今天大都督继续为大家带来系列——小学生都能玩的语音模块&#xff0c;帮你一周内快速学会语音模块的使用方式&#xff0c;打造一个可用于智能家居、物联网领域的语音助…

01 Docker 概述

目录 1.Docker简介 2.传统虚拟机 vs 容器 3.Docker运行速度快的原因 4.Docker基本组成三要素 5.Docker 平台架构 入门版 架构版 1.Docker简介 Docker是基于Go语言实现的云开源项目。 Docker的主要目标是&#xff1a;Build, Ship and Run Any App, Anywhere&#xff0c…

抖音常用的视频剪辑软件有哪些,变速视频如何制作?

抖音是一款当下流行的短视频软件。很多人都想在上面发表自己的作品&#xff0c;但是也还有人因为不会剪辑&#xff0c;找不到合适的视频制作软件&#xff0c;一直没能行动。今天就为大家解答抖音常用的制作视频软件有哪些&#xff0c;如何调整抖音制作视频的速度。 希望大家看完…