周期性触发的自定义触发器

news2024/11/19 5:43:14

背景

本文我们实现一个周期性触发的自定义触发器,顺便看下实现自定义触发器的一些要点

周期性触发器实现

实现一个每分钟触发一次的自定义事件时间触发器,实现代码和注意事项如下所示

package wikiedits.trigger;

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;

public class OneMinuteIntervalTrigger<W extends Window> extends Trigger<Object, W> {
    private static final long serialVersionUID = 1L;

    private final long interval;

    // 触发时间的状态对象
    private final ValueStateDescriptor<Long> stateDesc =
            new ValueStateDescriptor<>("fire-time", TypeInformation.of(Long.class));

    private OneMinuteIntervalTrigger(long interval) {
        this.interval = interval;
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {

        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {// 这里其实不是必要的,取决于窗口结束时间到之后是否要触发一次计算
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {// 多次注册也没事,反正是同一个计时器,这表明窗口结束时想要触发一次计算,此外注意getEnd和maxTimestamp方法的区别
            ctx.registerEventTimeTimer(window.maxTimestamp());
        }

        // 仅仅在第一次未注册时注册一次,后续由ontimer触发
        ValueState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
        if (fireTimestamp.value() == null) {
            long start = timestamp - (timestamp % interval);
            long nextFireTimestamp = start + interval;
            ctx.registerEventTimeTimer(nextFireTimestamp);
            fireTimestamp.update(nextFireTimestamp);
        }

        return TriggerResult.CONTINUE;
    }

    // 计时器触发的函数
    @Override
    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {

        // 这里窗口结束时触发不是必要的,取决于是否想要在窗口结束是触发一次计算,并且这里如果不处理延迟的消息,可以返回FIRE_AND_PURGE清理窗口状态(但是注意即使返回PURGE,也不会清理触发器的状态)
        if (time == window.maxTimestamp()) {
            return TriggerResult.FIRE;
        }

        ValueState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);

        Long fireTimestamp = fireTimestampState.value();

        // 继续注册计时器
        if (fireTimestamp != null && fireTimestamp == time) {
            fireTimestampState.update(time + interval);
            ctx.registerEventTimeTimer(time + interval);
            return TriggerResult.FIRE;
        }

        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(W window, TriggerContext ctx) throws Exception {
        // 清理触发器状态
        ValueState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
        Long timestamp = fireTimestamp.value();
        if (timestamp != null) {
            ctx.deleteEventTimeTimer(timestamp);
            fireTimestamp.clear();
        }
    }

}

代码里面注解已经比较详细的说明了注意事项,此外对于状态的清理,我们需要看的是WindowOperator,如下

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1059578.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

线程通信初始

简单认识一下线程通信 目录 简单认识一下线程通信线程通信定义线程通信模型之一释疑示例案例案例要求案例简单实现 拓展等待和唤醒API 参考视频 注&#xff1a;线程通信 前提是 线程安全 线程通信定义 当多个线程共同操作共享的资源时&#xff0c;线程间通过某种方式互相告知自…

【算法训练-二分查找 三】【特殊二分】寻找峰值

废话不多说&#xff0c;喊一句号子鼓励自己&#xff1a;程序员永不失业&#xff0c;程序员走向架构&#xff01;本篇Blog的主题是【数组的二分查找】&#xff0c;使用【数组】这个基本的数据结构来实现&#xff0c;这个高频题的站点是&#xff1a;CodeTop&#xff0c;筛选条件为…

Vue3+TS+ECharts5实现中国地图数据信息显示

1.引言 最近在做一个管理系统&#xff0c;主要技术栈使用的是Vue3TSViteElementPlus&#xff0c;主要参考项目是yudao-ui-admin-vue3&#xff0c;其中用到ECharts5做数字大屏&#xff0c;展示中国地图相关信息&#xff0c;以此基础做一个分享&#xff0c;写下这篇文章。 &quo…

解决ASP.NET Core的中间件无法读取Response.Body的问题

概要 本文主要介绍如何在ASP.NET Core的中间件中&#xff0c;读取Response.Body的方法&#xff0c;以便于我们实现更多的定制化开发。本文介绍的方法适用于.Net 3.1 和 .Net 6。 代码和实现 现象解释 首先我们尝试在自定义中间件中直接读取Response.Body&#xff0c;代码如…

Appleid苹果账号自动解锁改密(自动解锁二验改密码)

目前该项目能实现以下功能&#xff1a; 多用户使用&#xff0c;权限控制多账号管理账号分享页&#xff0c;支持设置密码、有效期、自定义HTML内容自动解锁与关闭二步验证自动/定时修改密码自动删除Apple ID中的设备代理池与Selenium集群&#xff0c;提高解锁成功率允许手动触发…

先输入列,再输入行

想要的表格行数和列数是 5 行 5 列&#xff0c;以下是相应的代码实现&#xff1a; # 定义表格行数和列数 rows 5 cols 5# 创建一个二维列表作为表格 table [[ for j in range(cols)] for i in range(rows)]print("请输入表格数据&#xff1a;")while True:# 获取…

Python无废话-办公自动化Excel读取操作

openpyxl模块介绍 openpyxl是一个用于处理Excel文件的Python库,用于读取/写入Excel2010 xlsx/xlsm/xltx/xltm文件(不支持xls格式)。通过使用openpyxl库&#xff0c;可 以轻松地在Python程序中实现对Excel文件的操作。 openpyxl 安装 方式1&#xff1a;使用pip 命令安装&…

操作系统学习笔记1

文章目录 1、OS的一个宏观比喻2、OS的目的和功能3、OS的发展4、OS的运行机制5、OS的特征6、OS的体系结构 参考视频&#xff1a;操作系统 1、OS的一个宏观比喻 2、OS的目的和功能 3、OS的发展 4、OS的运行机制 中断、系统调用、异常。 5、OS的特征 6、OS的体系结构

【逐步剖C++】-第二章-C++类和对象(下)

前言&#xff1a;本文是对类和对象知识点的最后一篇总结&#xff0c;前两篇的链接如下&#xff1a; 【逐步剖C】-第二章-C类和对象&#xff08;上&#xff09; 【逐步剖C】-第二章-C类和对象&#xff08;中&#xff09; 这三篇加起来就是笔者学习在类和对象中的所有总结了&…

计算机网络网络层、应用层、数据链路层协议详解

目录 一、计算机网络 二、网络层 三、应用层 四、数据链路层 一、计算机网络 计算机网络是将多台计算机和其他网络设备通过通信链路连接起来&#xff0c;以实现数据交换和资源共享的系统。它是现代信息社会的基础设施之一&#xff0c;为人们提供了快速、可靠、安全的数据传…

学校项目培训之Carla仿真平台之Carla学习内容

一、Blender Blender入门&#xff1a;https://www.bilibili.com/video/BV1fb4y1e7PD/ Blender导入骨骼&#xff1a;https://www.bilibili.com/video/BV1hc41157nL 做一个车&#xff1a;https://www.bilibili.com/video/BV1hY411q7w2 二、Roadrunner RoadRunner Scenario…

Flink状态管理与检查点机制

1.状态分类 相对于其他流计算框架,Flink 一个比较重要的特性就是其支持有状态计算。即你可以将中间的计算结果进行保存,并提供给后续的计算使用: 具体而言,Flink 又将状态 (State) 分为 Keyed State 与 Operator State: 1.1 算子状态 算子状态 (Operator State):顾名思义…

计组—— I/O系统

&#x1f4d5;&#xff1a;参考王道课件 目录 一、I/O系统的基本概念 1.什么是“I/O”&#xff1f; ​编辑2.主机如何和I/O设备进行交互&#xff1f; 3.I/O控制方式 &#xff08;1&#xff09;程序查询方式 &#xff08;2&#xff09;程序中断方式 &#xff08;3&#x…

【MATLAB源码-第41期】基于压缩感知算法的OFDM系统信道估计和LS算法对比仿真。

操作环境&#xff1a; MATLAB 2013b 1、算法描述 压缩感知&#xff08;Compressed Sensing, CS&#xff09;是一种从稀疏或可压缩信号中重构完整信号的数学理论和技术。下面详细介绍压缩感知和它在OFDM信道估计中的应用。 1. 压缩感知基本概念 在传统采样理论中&#xff0…

数字电路逻辑与设计 之循环码和 移存码

有发现错误的能力&#xff0c;不能纠正 只能检查单次的错误&#xff0c;不能完全抗干扰 可以按照上面的方法来循环构造 移存码可以通过前推后推来实现

pytorch_神经网络构建1

文章目录 pytorch简介神经网络基础分类问题分析:逻辑回归模型逻辑回归实现多层神经网络多层网络搭建保存模型 pytorch简介 为什么神经网络要自定义数据类型torch.tensor? tensor可以放在gpu上训练,支持自动求导,方便快速训练,同时支持numpy的运算,是加强版,numpy不支持这些 为…

C++项目:【高并发内存池】

文章目录 一、项目介绍 二、什么是内存池 1.池化技术 2.内存池 3.内存池主要解决的问题 4.malloc 三、定长的内存池 四、高并发内存池整体框架设计 1.高并发内存池--thread cache 1.1申请内存&#xff1a; 1.2释放内存&#xff1a; 1.3用TLS实现thread cache无锁访…

GD32F10 串口通信

1. 什么是通信 通信&#xff0c;指人与人或人与自然之间通过某种行为或媒介进行的信息交流与传递&#xff0c;从广义上指需要信息的双方或多方在不违背各自意愿的情况下采用任意方法&#xff0c;任意媒质&#xff0c;将信息从某方准确安全地传送到另方。通信双方如果想正确传输…

SystemUI导航栏

SystemUI导航栏 1、系统中参数项1.1 相关开关属性2.2 属性设置代码 2、设置中设置“三按钮”导航更新流程2.1 属性资源覆盖叠加2.2 SystemUI导航栏接收改变广播2.3 SystemUI导航栏布局更新2.4 时序图 android13-release 1、系统中参数项 1.1 相关开关属性 设置->系统->…

C++算法 —— 动态规划(9)完全背包问题

文章目录 1、动规思路简介2、完全背包【模板】3、零钱兑换4、零钱兑换Ⅱ5、完全平方数 背包问题需要读者先明白动态规划是什么&#xff0c;理解动规的思路&#xff0c;并不能给刚接触动规的人学习。所以最好是看了之前的动规博客&#xff0c;以及01背包博客&#xff0c;才能看完…