聊聊Flink:这次把Flink的触发器(Trigger)、移除器(Evictor)讲透

news2024/12/26 18:47:57

一、触发器(Trigger)

Trigger 决定了一个窗口(由 window assigner 定义)何时可以被 window function 处理。 每个 WindowAssigner 都有一个默认的 Trigger。 如果默认 trigger 无法满足你的需要,你可以在 trigger(…) 调用中指定自定义的 trigger。

1.1 Flink中预置的Trigger

窗口的计算触发依赖于窗口触发器,每种类型的窗口都有对应的窗口触发机制,都有一个默认的窗口触发器,触发器的作用就是去控制什么时候来触发计算。flink内部定义多种触发器,每种触发器对应于不同的WindowAssigner。常见的触发器如下:

  • EventTimeTrigger:通过对比EventTime和窗口的Endtime确定是否触发窗口计算,如果EventTime大于Window EndTime则触发,否则不触发,窗口将继续等待。
  • ProcessTimeTrigger:通过对比ProcessTime和窗口EndTme确定是否触发窗口,如果ProcessTime大于EndTime则触发计算,否则窗口继续等待。
  • ProcessingTimeoutTrigger:可以将任何触发器转变为超时触发器。
  • ContinuousEventTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束时间小于当前EndTime触发窗口计算。
  • ContinuousProcessingTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束时间小于当前ProcessTime触发窗口计算。
  • CountTrigger:根据接入数据量是否超过设定的阙值判断是否触发窗口计算。
  • DeltaTrigger:根据接入数据计算出来的Delta指标是否超过指定的Threshold去判断是否触发窗口计算。
  • PurgingTrigger:可以将任意触发器作为参数转换为Purge类型的触发器,计算完成后数据将被清理。
  • NeverTrigger:任何时候都不触发窗口计算

1.2 Trigger的抽象类

Trigger 接口提供了五个方法来响应不同的事件:

  • onElement() 方法在每个元素被加入窗口时调用。
  • onEventTime() 方法在注册的 event-time timer 触发时调用。
  • onProcessingTime() 方法在注册的 processing-time timer 触发时调用。
  • canMerge() 方法判断是否可以合并。
  • onMerge() 方法与有状态的 trigger 相关。该方法会在两个窗口合并时, 将窗口对应 trigger 的状态进行合并,比如使用会话窗口时。
  • clear() 方法处理在对应窗口被移除时所需的逻辑。

触发器接口的源码如下:

@PublicEvolving
public abstract class Trigger<T, W extends Window> implements Serializable {

    private static final long serialVersionUID = -4104633972991191369L;

    /**
     * Called for every element that gets added to a pane. The result of this will determine whether
     * the pane is evaluated to emit results.
     *
     * @param element The element that arrived.
     * @param timestamp The timestamp of the element that arrived.
     * @param window The window to which the element is being added.
     * @param ctx A context object that can be used to register timer callbacks.
     */
    public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx)
            throws Exception;

    /**
     * Called when a processing-time timer that was set using the trigger context fires.
     *
     * @param time The timestamp at which the timer fired.
     * @param window The window for which the timer fired.
     * @param ctx A context object that can be used to register timer callbacks.
     */
    public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)
            throws Exception;

    /**
     * Called when an event-time timer that was set using the trigger context fires.
     *
     * @param time The timestamp at which the timer fired.
     * @param window The window for which the timer fired.
     * @param ctx A context object that can be used to register timer callbacks.
     */
    public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx)
            throws Exception;

    /**
     * Returns true if this trigger supports merging of trigger state and can therefore be used with
     * a {@link org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner}.
     *
     * <p>If this returns {@code true} you must properly implement {@link #onMerge(Window,
     * OnMergeContext)}
     */
    public boolean canMerge() {
        return false;
    }

    /**
     * Called when several windows have been merged into one window by the {@link
     * org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}.
     *
     * @param window The new window that results from the merge.
     * @param ctx A context object that can be used to register timer callbacks and access state.
     */
    public void onMerge(W window, OnMergeContext ctx) throws Exception {
        throw new UnsupportedOperationException("This trigger does not support merging.");
    }

    /**
     * Clears any state that the trigger might still hold for the given window. This is called when
     * a window is purged. Timers set using {@link TriggerContext#registerEventTimeTimer(long)} and
     * {@link TriggerContext#registerProcessingTimeTimer(long)} should be deleted here as well as
     * state acquired using {@link TriggerContext#getPartitionedState(StateDescriptor)}.
     */
    public abstract void clear(W window, TriggerContext ctx) throws Exception;

    // ------------------------------------------------------------------------

    /**
     * A context object that is given to {@link Trigger} methods to allow them to register timer
     * callbacks and deal with state.
     */
    public interface TriggerContext {
		// ...
    }

    /**
     * Extension of {@link TriggerContext} that is given to {@link Trigger#onMerge(Window,
     * OnMergeContext)}.
     */
    public interface OnMergeContext extends TriggerContext {
        <S extends MergingState<?, ?>> void mergePartitionedState(
                StateDescriptor<S, ?> stateDescriptor);
    }
}

关于上述方法,需要注意三件事:

(1)前三个方法返回TriggerResult枚举类型,其包含四个枚举值:

  • CONTINUE:表示对窗口不执行任何操作。即不触发窗口计算,也不删除元素。
  • FIRE:触发窗口计算,但是保留窗口元素。
  • PURGE:不触发窗口计算,丢弃窗口,并且删除窗口的元素。
  • FIRE_AND_PURGE:触发窗口计算,输出结果,然后将窗口中的数据和窗口进行清除。

源码如下:

public enum TriggerResult {

    // 不触发,也不删除元素
    CONTINUE(false, false),

    // 触发窗口,窗口出发后删除窗口中的元素
    FIRE_AND_PURGE(true, true),

    // 触发窗口,但是保留窗口元素
    FIRE(true, false),

    // 不触发窗口,丢弃窗口,并且删除窗口的元素
    PURGE(false, true);

    // ------------------------------------------------------------------------

    private final boolean fire;
    private final boolean purge;

    TriggerResult(boolean fire, boolean purge) {
        this.purge = purge;
        this.fire = fire;
    }

    public boolean isFire() {
        return fire;
    }

    public boolean isPurge() {
        return purge;
    }
}

(2) 每一个窗口分配器都拥有一个属于自己的 Trigger,Trigger上会有定时器,用来决定一个窗口何时能够被计算或清除,当定时器触发后,会调用对应的回调返回,返回TriggerResult。Trigger的返回结果可以是 continue(不做任何操作),fire(处理窗口数据),purge(移除窗口和窗口中的数据),或者 fire + purge。一个Trigger的调用结果只是fire的话,那么会计算窗口并保留窗口原样,也就是说窗口中的数据仍然保留不变,等待下次Trigger fire的时候再次执行计算。一个窗口可以被重复计算多次知道它被 purge 了。在purge之前,窗口会一直占用着内存。

1.3 ProcessingTimeTrigger源码分析

@PublicEvolving
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private ProcessingTimeTrigger() {}

    @Override
    public TriggerResult onElement(
            Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx)
            throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
        return TriggerResult.FIRE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteProcessingTimeTimer(window.maxTimestamp());
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(TimeWindow window, OnMergeContext ctx) {
        // only register a timer if the time is not yet past the end of the merged window
        // this is in line with the logic in onElement(). If the time is past the end of
        // the window onElement() will fire and setting a timer here would fire the window twice.
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
            ctx.registerProcessingTimeTimer(windowMaxTimestamp);
        }
    }

    @Override
    public String toString() {
        return "ProcessingTimeTrigger()";
    }

    /** Creates a new trigger that fires once system time passes the end of the window. */
    public static ProcessingTimeTrigger create() {
        return new ProcessingTimeTrigger();
    }
}

在 onElement()方法中,ctx.registerProcessingTimeTimer(window.maxTimestamp())将会注册一个ProcessingTime定时器,时间参数是window.maxTimestamp(),也就是窗口的最终时间,当时间到达这个窗口最终时间,定时器触发并调用 onProcessingTime()方法,在 onProcessingTime() 方法中,return TriggerResult.FIRE 即返回 FIRE,触发窗口中数据的计算,但是会保留窗口元素。

需要注意的是ProcessingTimeTrigger类只会在窗口的最终时间到达的时候触发窗口函数的计算,计算完成后并不会清除窗口中的数据,这些数据存储在内存中,除非调用PURGE或FIRE_AND_PURGE,否则数据将一直存在内存中。实际上,Flink中提供的Trigger类,除了PurgingTrigger类,其他的都不会对窗口中的数据进行清除。

EventTimeTriggerr在onElement设置的定时器:

在这里插入图片描述
EventTime通过registerEventTimeTimer注册定时器,在内部Watermark达到或超过Timer设定的时间戳时触发。

二、移除器(Evictor)

2.1 Evictor扮演的角色
在这里插入图片描述
当一个元素进入stream中之后,一般要经历Window(开窗)、Trigger(触发器)、Evitor(移除器)、Windowfunction(窗口计算操作),具体过程如下:

  • Window中的WindowAssigner(窗口分配器)定义了数据应该被分配到哪个窗口中,每一个 WindowAssigner都会有一个默认的Trigger,如果用户在代码中指定了窗口的trigger,默认的 trigger 将会被覆盖。
  • Trigger上会有定时器,用来决定一个窗口何时能够被计算或清除。Trigger的返回结果可以是 continue(不做任何操作),fire(处理窗口数据),purge(移除窗口和窗口中的数据),或者 fire + purge。一个Trigger的调用结果只是fire的话,那么会计算窗口并保留窗口原样,也就是说窗口中的数据仍然保留不变,等待下次Trigger fire的时候再次执行计算。一个窗口可以被重复计算多次知道它被 purge 了。在purge之前,窗口会一直占用着内存。
  • 当Trigger fire了,窗口中的元素集合就会交给Evictor(如果指定了的话)。Evictor 主要用来遍历窗口中的元素列表,并决定最先进入窗口的多少个元素需要被移除。剩余的元素会交给用户指定的函数进行窗口的计算。如果没有 Evictor 的话,窗口中的所有元素会一起交给WindowFunction进行计算。
  • WindowFunction收到了窗口的元素(可能经过了 Evictor 的过滤),并计算出窗口的结果值,并发送给下游。窗口计算操作有很多,比如预定义的sum(),min(),max(),还有 ReduceFunction,WindowFunction。WindowFunction 是最通用的计算函数,其他的预定义的函数基本都是基于该函数实现的。

现在,大致了解了Evitor(移除器)扮演的角色和移除器在流中的哪个位置,让我们继续看为何使用Evictor。

Evictor接口定义如下:

在这里插入图片描述
evictBefore()包含要在窗口函数之前应用的清除逻辑,而evictAfter()包含要在窗口函数之后应用的清除逻辑。应用窗口函数之前清除的元素将不会被窗口函数处理。

窗格是具有相同Key和相同窗口的元素组成的桶,即同一个窗口中相同Key的元素一定属于同一个窗格。一个元素可以在多个窗格中(当一个元素被分配给多个窗口时),这些窗格都有自己的清除器实例。

注:window默认没有evictor,一旦把window指定Evictor,该window会由EvictWindowOperator类来负责操作。

2.2 Flink内置的Evitor

  • CountEvictor:保留窗口中用户指定的元素数量,并丢弃窗口缓冲区剩余的元素。
  • DeltaEvictor:依次计算窗口缓冲区中的最后一个元素与其余每个元素之间的delta值,若delta值大于等于指定的阈值,则该元素会被移除。使用DeltaEvictor清除器需要指定两个参数,一个是double类型的阈值;另一个是DeltaFunction接口的实例,DeltaFunction用于指定具体的delta值计算逻辑。
  • TimeEvictor:传入一个以毫秒为单位的时间间隔参数(例如以size表示),对于给定的窗口,取窗口中元素的最大时间戳(例如以max表示),使用TimeEvictor清除器将删除所有时间戳小于或等于max-size的元素(即清除从窗口开头到指定的截止时间之间的元素)。

2.2.1 CountEvictor

private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
    if (size <= maxCount) {
        // 小于最大数量,不做处理
        return;
    } else {
        int evictedCount = 0;
        for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){
            iterator.next();
            evictedCount++;
            if (evictedCount > size - maxCount) {
                break;
            } else {
                // 移除前size - maxCount个元素,只剩下最后maxCount个元素
                iterator.remove();
            }
        }
    }
}

2.2.2 DeltaEvictor

DeltaEvictor通过计算DeltaFunction的值(依次传入每个元素和最后一个元素),并将其与threshold进行对比,如果DeltaFunction计算结果大于等于threshold,则该元素会被移除。DeltaEvictor的实现如下:

private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) {

    // 获取最后一个元素
    TimestampedValue<T> lastElement = Iterables.getLast(elements);
    
    for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext();){
        TimestampedValue<T> element = iterator.next();
        // 依次计算每个元素和最后一个元素的delta值,同时和threshold的值进行比较
        // 若计算结果大于threshold值或者是相等,则该元素会被移除
        if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) >= this.threshold) {
            iterator.remove();
        }
    }
}

2.2.3 TimeEvictor

TimeEvictor以时间为判断标准,决定元素是否会被移除。TimeEvictor会获取窗口中所有元素的最大时间戳currentTime,currentTime减去窗口大小(windowSize) 可得到能保留最久的元素的时间戳evictCutoff,然后再遍历窗口中的元素,如果元素的时间戳小于evictCutoff,就执行移除操作,否则不移除。具体逻辑如下图所示:

在这里插入图片描述
TimeEvictor的代码实现如下:

private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {

    // 如果element没有timestamp,直接返回
    if (!hasTimestamp(elements)) {
        return;
    }

    // 获取elements中最大的时间戳(到来最晚的元素的时间)
    long currentTime = getMaxTimestamp(elements);
    // 截止时间为: 到来最晚的元素的时间 - 窗口大小(可以理解为保留最近的多久的元素)
    long evictCutoff = currentTime - windowSize;

    for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) {
        TimestampedValue<Object> record = iterator.next();
        
        // 清除所有时间戳小于截止时间的元素
        if (record.getTimestamp() <= evictCutoff) {
            iterator.remove();
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2252087.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[Redis#12] 常用类型接口学习 | string | list

目录 0.准备 1.string get | set set_with_timeout_test.cpp set_nx_xx_test.cpp mset_test.cpp mget_test.cpp getrange_setrange_test.cpp incr_decr_test.cpp 2.list lpush_lrange_test.cpp rpush_test.cpp lpop_rpop_test.cpp blpop_test.cpp llen_test.cpp…

网络安全之IP伪造

眼下非常多站点的涉及存在一些安全漏洞&#xff0c;黑客easy使用ip伪造、session劫持、xss攻击、session注入等手段危害站点安全。在纪录片《互联网之子》&#xff08;建议搞IT的都要看下&#xff09;中。亚伦斯沃茨&#xff08;真实人物&#xff0c;神一般的存在&#xff09;涉…

Spring Web开发(请求)获取JOSN对象| 获取数据(Header)

大家好&#xff0c;我叫小帅今天我们来继续Spring Boot的内容。 文章目录 1. 获取JSON对象2. 获取URL中参数PathVariable3.上传⽂件RequestPart3. 获取Cookie/Session3.1 获取和设置Cookie3.1.1传统获取Cookie3.1.2简洁获取Cookie 3. 2 获取和存储Session3.2.1获取Session&…

(SAST检测规则-1)Android - 权限管理漏洞

所属分类&#xff1a;Android - 权限管理漏洞缺陷详解&#xff1a;应用未正确实施最小权限原则或滥用已声明的权限可能导致敏感信息泄露。例如&#xff0c;恶意代码利用已授予的权限绕过用户授权&#xff0c;访问通讯录、位置、短信等敏感资源。部分开发者还可能滥用权限以执行…

EC2还原快照

EC2还原快照 AWS EC2 磁盘快照 是您 Amazon Elastic Block Store (EBS) 卷在特定时间点的增量备份。您可以使用快照创建 EBS 卷的副本&#xff0c;以便在出现故障时恢复数据或将数据迁移到其他区域。 创建磁盘快照 找到ec2实例挂载的磁盘&#xff0c;直接选择创建快照 等待创建…

oracle RAC各版本集群总结和常用命令汇总

oracle RAC学习 RAC介绍 RAC&#xff1a;高可用集群&#xff0c;负载均衡集群&#xff0c;高性能计算集群 RAC是⼀种⾼可⽤&#xff0c;⾼性能&#xff0c;负载均衡的share-everything的集群 8i:内存融合雏形 内存融合雏形&#xff08;Oracle Parallel Server&#xff09;…

【聚类】K-Means 聚类(无监督)及K-Means ++

1. 原理 2. 算法步骤 3. 目标函数 4. 优缺点 import torch import numpy as np import matplotlib.pyplot as plt from sklearn.cluster import KMeans from sklearn.decomposition import PCA import torch.nn as nn# 数据准备 # 生成数据&#xff1a;100 个张量&#xff0c…

智慧银行反欺诈大数据管控平台方案(一)

智慧银行反欺诈大数据管控平台建设方案的核心在于通过整合先进的大数据技术和深度学习算法&#xff0c;打造一个全面、智能且实时的反欺诈系统&#xff0c;以有效识别、预防和应对各类金融欺诈行为。该方案涵盖数据采集、存储、处理和分析的全流程&#xff0c;利用多元化的数据…

搭建业务的性能优化指南

这是一篇搭建业务优化的心路历程&#xff0c;也是写给搭建业务的性能优化指南。 前言 直到今天&#xff0c;淘内的页面大多都迁移到了 SSR&#xff0c;从我们终端平台 - 搭建研发团队的视角看&#xff0c;业务大致可以分为两类 —— 搭建派 和 源码派。 这两者互不冲突&#xf…

【Db First】.NET开源 ORM 框架 SqlSugar 系列

.NET开源 ORM 框架 SqlSugar 系列 【开篇】.NET开源 ORM 框架 SqlSugar 系列【入门必看】.NET开源 ORM 框架 SqlSugar 系列【实体配置】.NET开源 ORM 框架 SqlSugar 系列【Db First】.NET开源 ORM 框架 SqlSugar 系列【Code First】.NET开源 ORM 框架 SqlSugar 系列【数据事务…

搭建私有云存储

1、安装LNMP环境 yum install nginx -y yum install -y nginx mariadb-server php php-fpm php-mysqlnd systemctl restart nginx.service --- 启动Nginx systemctl start mariadb.service ---启动数据库 mysql -e create database lxdb character set utf8 ---创建数据库 my…

YOLO 标注工具 AutoLabel 支持 win mac linux

常见的标注工具&#xff0c;功能基础操作繁琐&#xff0c;无复制粘贴&#xff0c;标签无法排序修改&#xff0c;UI不美观&#xff0c;bug修正不及时&#xff0c;没有集成识别、训练、模型导出… 怎么办呢&#xff1f;AutoLabel它来了 Quick Start 一图胜千言 图像标注 支持YOL…

qt QGraphicsPolygonItem详解

1、概述 QGraphicsPolygonItem是Qt框架中QGraphicsItem的一个子类&#xff0c;它提供了一个可以添加到QGraphicsScene中的多边形项。通过QGraphicsPolygonItem&#xff0c;你可以定义和显示一个多边形&#xff0c;包括其填充颜色、边框样式等属性。QGraphicsPolygonItem支持各…

Linux网络_网络协议_网络传输_网络字节序

一.协议 1.概念 协议&#xff08;Protocol&#xff09; 是一组规则和约定&#xff0c;用于定义计算机网络中不同设备之间如何进行通信和数据交换。协议规定了数据的格式、传输方式、传输顺序等详细规则&#xff0c;确保不同设备和系统能够有效地互联互通。 在网络通信中&#…

MySQL查看日志

目录 1. 日志 1.1 错误日志 1.2 二进制日志 1.2.1 介绍 1.2.2 格式 1.2.3 查看 1.2.4 删除 1.3 查询日志 1.4 慢查询日志 1. 日志 1.1 错误日志 错误日志是 MySQL 中最重要的日志之一&#xff0c;它记录了当 mysqld 启动和停止时&#xff0c;以及服务器在运行过…

【深度学习】四大图像分类网络之AlexNet

AlexNet是由Alex Krizhevsky、Ilya Sutskever&#xff08;均为Hinton的学生&#xff09;和Geoffrey Hinton&#xff08;被誉为”人工智能教父“&#xff0c;首先将反向传播用于多层神经网络&#xff09;在2012年ImageNet图像分类竞赛中提出的一种经典的卷积神经网络。AlexNet在…

MySQL数据库做题笔记

题目链接https://leetcode.cn/problems/invalid-tweets-ii/description/https://leetcode.cn/problems/invalid-tweets-ii/description/ # Write your MySQL query statement below SELECT tweet_id FROM Tweets where LENGTH(content)>140 OR (length(content)-length(rep…

ansible使用说明

将安装包拷贝到主控端主机 在主控端主机安装ansible&#xff0c;sh setup.sh 确认安装成功后&#xff0c;编辑hosts文件&#xff08;按步骤逐个添加主机组&#xff0c;不要一开始全部配置好&#xff09; [site-init]下的主机列表为被控制的主机&#xff08;按照当前ai建模方案…

EDA软件研发的DevOps平台

1&#xff1a;什么是DevOps DevOps是十几年前&#xff0c;在互联网比较火的词&#xff0c;实际上就是ci/cd平台的另外一种说法&#xff0c;核心是说打破研发&#xff0c;测试&#xff0c;运维的边界&#xff0c;能够将整个产品开发的流程快速循环起来&#xff0c;随时可发版&a…

Linux命令进阶·如何切换root以及回退、sudo命令、用户/用户组管理,以及解决创建用户不显示问题和Ubuntu不显示用户名只显示“$“符号问题

目录 1. root用户&#xff08;超级管理员&#xff09; 1.1 用于账户切换的系统命令——su 1.2 退回上一个用户命令——exit 1.3 普通命令临时授权root身份执行——sudo 1.3.1 为普通用户配置sudo认证 2. 用户/用户组管理 2.1 用户组管理 2.2 用户管理 2.2.1 …