Flink定时器

news2025/1/1 21:27:29

flink的定时器都是基于事件时间(event time)或事件处理时间(processing time)的变化来触发响应的。对一部分新手玩家来说,可能不清楚事件时间和事件处理时间的区别。我这里先说一下我的理解,防止下面懵逼。简单来说事件时间就相当于人出生的时间,一般数据生成的时候也会有创建时间。而事件处理时间则相当于人具体做某件事的时间,一条数据可能是2023年生成的,但是到2024年才被处理,这个2024年便被称为这个事件的处理时间。

一、事件时间定时器(event time),这是基于事件时间来触发的,这里面有一个小坑。当第一个事件到的时候创建一个定时器10秒后触发。对我们大部分人来说我既然已经创建了这个定时器,那么10秒后,他就会自动触发。但事实上他10秒后如果没有事件到来他并不会触发。大概意思就是前一个事件创建的定时器需要后一个事件的时间来触发。下面是事件时间定时器的一种实现方式。

import com.xcj.flink.bean.Temperature;
import com.xcj.util.DateFormat;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.Date;
import java.util.Iterator;
import java.util.Scanner;

public class EventTime {
    public static void main(String[] args) throws Exception {
        SourceTemperature mySourceTemperature = new SourceTemperature();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Temperature> tDSSource = env.addSource(mySourceTemperature);
        WatermarkStrategy<Temperature> twsDS
                = WatermarkStrategy.<Temperature>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                .withTimestampAssigner((temperature, recordTimestamp) -> temperature.getTimestamp());

        SingleOutputStreamOperator<Temperature> tSSODS = tDSSource.assignTimestampsAndWatermarks(twsDS);

        KeyedStream<Temperature, String> keyByDS = tSSODS.keyBy(temperature -> temperature.getDay());

        SingleOutputStreamOperator<Temperature> process = keyByDS.process(new ProcessFunction<Temperature, Temperature>() {

            ListState<Temperature> temperatureListState;
            ValueState<Temperature> temperatureState;
            ValueState<Integer> size;
            ValueState<Long> temperature;
            @Override
            public void open(OpenContext openContext) throws Exception {
                ListStateDescriptor<Temperature> listStateDescriptor = new ListStateDescriptor<>("listState", Temperature.class);
                temperatureListState = getRuntimeContext().getListState(listStateDescriptor);
                temperatureState = getRuntimeContext().getState(new ValueStateDescriptor<>("temperatureState", Temperature.class));
                size = getRuntimeContext().getState(new ValueStateDescriptor<>("sizeState", Integer.class));
                temperature = getRuntimeContext().getState(new ValueStateDescriptor<>("temperature", Long.class));
            }

            @Override
            public void processElement(Temperature value, ProcessFunction<Temperature, Temperature>.Context ctx, Collector<Temperature> out) throws Exception {

                Temperature value1 = temperatureState.value();
                //System.out.println(ctx.timestamp());
                if(value1 == null){
                    temperatureState.update(value);
                    temperatureListState.add(value);
                    size.update(1);
                    //System.out.printf("当前事件处理:"+DateFormat.getDateTime(ctx.timestamp()));
                    //System.out.println("当前水位线:"+DateFormat.getDateTime(ctx.timerService().currentWatermark()));
                    temperature.update(value.getTimestamp());
                    ctx.timerService().registerEventTimeTimer(temperature.value()+1000*10);
                }else{
                    if(value1.getTemperature() < value.getTemperature()){
                        temperatureState.update(value);
                        temperatureListState.add(value);
                        size.update(size.value()+1);
                        //System.out.println(size.value());
                        if(size.value()>= 3){
                            System.out.printf("警告警告:");
                            Iterator<Temperature> iterator = temperatureListState.get().iterator();
                            while(iterator.hasNext()){
                                out.collect(iterator.next());
                            }
                            temperatureListState.clear();
                            temperatureState.clear();
                            size.clear();
                            ctx.timerService().deleteEventTimeTimer(temperature.value()+1000*10);
                        }
                    }else{
                        System.out.println("温度降低了");
                        temperatureState.update(value);
                        temperatureListState.clear();
                        temperatureListState.add(value);
                        size.update(1);
                        ctx.timerService().deleteEventTimeTimer(temperature.value()+1000*10);
                        temperature.update(value.getTimestamp());
                        ctx.timerService().registerEventTimeTimer(temperature.value()+1000*10);
                    }
                }
            }

            @Override
            public void onTimer(long timestamp, ProcessFunction<Temperature, Temperature>.OnTimerContext ctx, Collector<Temperature> out) throws Exception {
                System.out.printf("时间到了:清空温度"+DateFormat.getDateTime(ctx.timestamp()));
                temperatureListState.clear();
                temperatureState.clear();
                size.clear();
                if(temperature.value() != null)
                    ctx.timerService().deleteEventTimeTimer(temperature.value() + 10*1000);
            }
        });
        process.print("当前警告温度为:");
        env.execute();
    }
}

//自己定义数据源
 class SourceTemperature extends RichSourceFunction<Temperature> {

    @Override
    public void run(SourceContext<Temperature> ctx) throws Exception {
        Scanner scanner = new Scanner(System.in);
        while (true) {
            Temperature temperature = new Temperature();
            System.out.print("请输入温度: ");
            //double temp = Math.random()*40;
            double temp = scanner.nextDouble();
            //System.out.println(temp);
            temperature.setTemperature(temp);
            temperature.setTimestamp(new Date().getTime());
            ctx.collect(temperature);
            //Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {

    }
}

//自定义实体类
class Temperature1 {
    public Temperature1(double temperature, long timestamp) {
        this.temperature = temperature;
        this.timestamp = timestamp;
    }

    public Temperature1(){};

    //温度
    private double temperature;
    //时间
    private long timestamp;
    //id
    private String day = "2024-12-24";

    public double getTemperature() {
        return temperature;
    }

    public void setTemperature(double temperature) {
        this.temperature = temperature;
    }

    public long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(long timestamp) {
        this.timestamp = timestamp;
    }

    public String getDay() {
        return day;
    }

    public void setDay(String day) {
        this.day = day;
    }

    @Override
    public String toString() {
        return "Temperature1{" +
                "temperature=" + temperature +
                ", timestamp=" + timestamp +
                ", day='" + day + '\'' +
                '}';
    }
}

下面我们做一个测试,来验证一下这个解释:前一个事件创建的定时器需要后一个事件的时间来触发。他们的时间间隔超过了10秒钟,但是时间并没有触发,而是下一个事件到的时候才触发的。

二、事件处理时间,事件处理时间触发有系统时间有关

package com.xcj;
import com.xcj.flink.bean.Temperature;
import com.xcj.util.DateFormat;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.Date;
import java.util.Iterator;
import java.util.Scanner;

public class ProcessTime {
    public static void main(String[] args) throws Exception {
        SourceTemperature mySourceTemperature = new SourceTemperature();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Temperature> tDSSource = env.addSource(mySourceTemperature);
//        WatermarkStrategy<Temperature> twsDS
//                = WatermarkStrategy.<Temperature>forBoundedOutOfOrderness(Duration.ofSeconds(0))
//                .withTimestampAssigner((temperature, recordTimestamp) -> temperature.getTimestamp());
//
//        SingleOutputStreamOperator<Temperature> tSSODS = tDSSource.assignTimestampsAndWatermarks(twsDS);

        KeyedStream<Temperature, String> keyByDS = tDSSource.keyBy(temperature -> temperature.getDay());

        SingleOutputStreamOperator<Temperature> process = keyByDS.process(new ProcessFunction<Temperature, Temperature>() {

            ListState<Temperature> temperatureListState;
            ValueState<Temperature> temperatureState;
            ValueState<Integer> size;
            ValueState<Long> temperature;
            @Override
            public void open(OpenContext openContext) throws Exception {
                ListStateDescriptor<Temperature> listStateDescriptor = new ListStateDescriptor<>("listState", Temperature.class);
                temperatureListState = getRuntimeContext().getListState(listStateDescriptor);
                temperatureState = getRuntimeContext().getState(new ValueStateDescriptor<>("temperatureState", Temperature.class));
                size = getRuntimeContext().getState(new ValueStateDescriptor<>("sizeState", Integer.class));
                temperature = getRuntimeContext().getState(new ValueStateDescriptor<>("temperature", Long.class));
            }

            @Override
            public void processElement(Temperature value, ProcessFunction<Temperature, Temperature>.Context ctx, Collector<Temperature> out) throws Exception {

                Temperature value1 = temperatureState.value();
                //System.out.println(ctx.timestamp());
                System.out.printf("当前事件时间:"+DateFormat.getDateTime(value.getTimestamp()));
                System.out.println("当前水位线:"+DateFormat.getDateTime(ctx.timerService().currentWatermark()));
                if(value1 == null){
                    temperatureState.update(value);
                    temperatureListState.add(value);
                    size.update(1);
                    temperature.update(ctx.timerService().currentProcessingTime());
                    ctx.timerService().registerProcessingTimeTimer(temperature.value()+1000*10);
                }else{
                    if(value1.getTemperature() < value.getTemperature()){
                        temperatureState.update(value);
                        temperatureListState.add(value);
                        size.update(size.value()+1);
                        //System.out.println(size.value());
                        if(size.value()>= 3){
                            System.out.printf("警告警告:");
                            Iterator<Temperature> iterator = temperatureListState.get().iterator();
                            while(iterator.hasNext()){
                                out.collect(iterator.next());
                            }
                            temperatureListState.clear();
                            temperatureState.clear();
                            size.clear();
                            ctx.timerService().deleteProcessingTimeTimer(temperature.value()+1000*10);
                        }
                    }else{
                        System.out.println("温度降低了");
                        temperatureState.update(value);
                        temperatureListState.clear();
                        temperatureListState.add(value);
                        size.update(1);
                        ctx.timerService().deleteProcessingTimeTimer(temperature.value()+1000*10);
                        temperature.update(value.getTimestamp());
                        ctx.timerService().registerProcessingTimeTimer(temperature.value()+1000*10);
                    }
                }
            }

            @Override
            public void onTimer(long timestamp, ProcessFunction<Temperature, Temperature>.OnTimerContext ctx, Collector<Temperature> out) throws Exception {
                System.out.printf("时间到了:清空温度"+DateFormat.getDateTime(ctx.timestamp()));
                temperatureListState.clear();
                temperatureState.clear();
                size.clear();
                if(temperature.value() != null)
                    ctx.timerService().deleteProcessingTimeTimer(temperature.value() + 10*1000);
            }
        });
        process.print("当前警告温度为:");
        env.execute();
    }
}

//自己定义数据源
class SourceTemperature extends RichSourceFunction<Temperature> {

    @Override
    public void run(SourceContext<Temperature> ctx) throws Exception {
        Scanner scanner = new Scanner(System.in);
        while (true) {
            Temperature temperature = new Temperature();
            System.out.print("请输入温度: ");
            //double temp = Math.random()*40;
            double temp = scanner.nextDouble();
            //System.out.println(temp);
            temperature.setTemperature(temp);
            temperature.setTimestamp(new Date().getTime());
            ctx.collect(temperature);
            //Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {

    }
}

//自定义实体类
class Temperature1 {
    public Temperature1(double temperature, long timestamp) {
        this.temperature = temperature;
        this.timestamp = timestamp;
    }

    public Temperature1(){};

    //温度
    private double temperature;
    //时间
    private long timestamp;
    //id
    private String day = "2024-12-24";

    public double getTemperature() {
        return temperature;
    }

    public void setTemperature(double temperature) {
        this.temperature = temperature;
    }

    public long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(long timestamp) {
        this.timestamp = timestamp;
    }

    public String getDay() {
        return day;
    }

    public void setDay(String day) {
        this.day = day;
    }

    @Override
    public String toString() {
        return "Temperature1{" +
                "temperature=" + temperature +
                ", timestamp=" + timestamp +
                ", day='" + day + '\'' +
                '}';
    }
}

事件处理时间是不需要下一个事件触发的

三、总结

事件时间(event time) 与事件处理时间(process time)定时器整体代码其实差不多,主要是在注册定时器的时候选择的方法

//事件时间
ctx.timerService().registerEventTimeTimer(value.getTimestamp());
//事件处理事件            
ctx.timerService().registerProcessingTimeTimer(temperature.value()+1000*10);

和不同定时器的逻辑。注意:事件时间定时器是需要下一个事件来触发上一个事件的定时任务,但是事件处理时间定时器是不需要下一个事件来触发的,他是根据注册时间和系统时间的差值来触发的。

上面我把注册时间改为了过去很久的时间,来一个就触发一次定时任务,因为注册时间与当前系统时间相差>10秒,所以会直接触发。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2267695.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用 OpenCV 绘制线条和矩形

OpenCV 是一个功能强大的计算机视觉库&#xff0c;它不仅提供了丰富的图像处理功能&#xff0c;还支持图像的绘制。绘制简单的几何图形&#xff08;如线条和矩形&#xff09;是 OpenCV 中常见的操作。在本篇文章中&#xff0c;我们将介绍如何使用 OpenCV 在图像上绘制线条和矩形…

【Artificial Intelligence篇】AI 前沿探秘:开启智能学习的超维征程

目录 一、人工智能的蓬勃发展与智能学习的重要性: 二、数据的表示与处理 —— 智能学习的基石: 三、构建一个简单的感知机模型 —— 智能学习的初步探索: 四、神经网络 —— 开启超维征程的关键一步: 五、超维挑战与优化 —— 探索智能学习的深度: 六、可视化与交互 —— …

大数据的尽头是数据中台吗?

大数据的尽头是数据中台吗&#xff1f; 2018年末开始&#xff0c;原市场上各种关于大数据平台的招标突然不见&#xff0c;取而代之的是数据中台项目&#xff0c;建设数据中台俨然成为传统企业数字化转型首选&#xff0c;甚至不少大数据领域的专家都认为&#xff0c;数据中台是…

珞珈一号夜光遥感数据地理配准,栅格数据地理配准

目录 一、夜光数据下载&#xff1a; 二、夜光遥感数据地理配准 三、计算夜光数据值 四、辐射定标 五、以表格显示分区统计 五、结果验证 夜光数据位置和路网位置不匹配&#xff0c;虽然都是WGS84坐标系&#xff0c;不匹配&#xff01;&#xff01;&#xff01;不要看到就直接…

3.若依前端项目拉取、部署、访问

因为默认RuoYi-Vue是使用的Vue2,所以需要另外去下载vue3来部署。 拉取代码 git clone https://gitee.com/ys-gitee/RuoYi-Vue3.git 安装node才能执行npm相关的命令 执行命令npm install 如果npm install比较慢的话&#xff0c;需要添加上国内镜像 npm install --registrhttp…

【Java】线程相关面试题 (基础)

文章目录 线程与进程区别并行与并发区别解析概念含义资源利用执行方式应用场景 创建线程线程状态如何保证新建的三个线程按顺序执行wait方法和sleep方法的不同所属类和使用场景方法签名和参数说明调用wait方法的前提条件被唤醒的方式与notify/notifyAll方法的协作使用示例注意事…

手机租赁平台开发全攻略打造高效便捷的租赁服务系统

内容概要 手机租赁平台开发&#xff0c;简单说就是让用户能轻松租赁各类手机的高效系统。这一平台不仅帮助那些想要临时使用高端手机的人们节省了不少资金&#xff0c;还为商家开辟了新的收入渠道。随着智能手机的普及&#xff0c;很多人并不需要长期拥有一部手机&#xff0c;…

【视觉惯性SLAM:十一、ORB-SLAM2:跟踪线程】

跟踪线程是ORB-SLAM2的核心之一&#xff0c;其主要任务是实时跟踪相机的位姿变化和场景的变化&#xff0c;以维持地图的更新和相机轨迹的估计。ORB-SLAM2的跟踪线程通过多种方式&#xff08;参考关键帧跟踪、恒速模型跟踪、重定位跟踪、局部地图跟踪&#xff09;处理跟踪丢失、…

浙江肿瘤医院病理库存储及NAS共享存储(磁盘阵列)方案-Infortrend普安科技

Infortrend金牌代理-燊通智联信息科技发展&#xff08;上海&#xff09;有限公司与院方多轮沟通&#xff0c;详细讨论性能与容量要求&#xff0c;最终决定采用GSe统一存储设备&#xff0c;与现有病理系统服务器无缝对接&#xff0c;每台设备配1.92T SSD作缓存加速原数据读写&am…

解决GPT公式复制到Word之后乱码问题

chat辅助确实很有用。不论是出文稿还是代码。如何把chatgpt中的公式直接复制到word中且保持原样格式呢&#xff1f;下面的方法经过我的验证确实好用&#xff0c;成功解决了最近的论文报告写公式的问题。 一、首先复制chatgpt里面的公式 二、粘贴在下面网站 网站&#xff1a;Mat…

Spring Boot教程之四十:使用 Jasypt 加密 Spring Boot 项目中的密码

如何使用 Jasypt 加密 Spring Boot 项目中的密码 在本文中&#xff0c;我们将学习如何加密 Spring Boot 应用程序配置文件&#xff08;如 application.properties 或 application.yml&#xff09;中的数据。在这些文件中&#xff0c;我们可以加密用户名、密码等。 您经常会遇到…

Quartz任务调度框架实现任务动态执行

说明&#xff1a;之前使用Quartz&#xff0c;都是写好Job&#xff0c;指定一个时间点&#xff0c;到点执行。最近有个需求&#xff0c;需要根据前端用户设置的时间点去执行&#xff0c;也就是说任务执行的时间点是动态变化的。本文介绍如何用Quartz任务调度框架实现任务动态执行…

Scala_【1】概述

第一章 语言特点环境搭建(Windows)idea编写scalaHelloWorld注意事项 Scala是一门以Java虚拟机&#xff08;JVM&#xff09;为运行环境并将面向对象和函数式编程的最佳特性结合在一起的静态类型编程语言 语言特点 Scala是一门多范式的编程语言&#xff0c;Scala支持面向对象和函…

StableAnimator模型的部署:复旦微软提出可实现高质量和高保真的ID一致性人类视频生成

文章目录 一、项目介绍二、项目部署模型的权重下载提取目标图像的关节点图像&#xff08;这个可以先不看先用官方提供的数据集进行生成&#xff09;提取人脸&#xff08;这个也可以先不看&#xff09;进行图片的生成 三、模型部署报错 一、项目介绍 由复旦、微软、虎牙、CMU的…

最新高性能多目标优化算法:多目标麋鹿优化算法(MOEHO)求解LRMOP1-LRMOP6及工程应用---盘式制动器设计,提供完整MATLAB代码

一、麋鹿优化算法 麋鹿优化算法&#xff08;Elephant Herding Optimization&#xff0c;EHO&#xff09;是2024年提出的一种启发式优化算法&#xff0c;该算法的灵感来源于麋鹿群的繁殖过程&#xff0c;包括发情期和产犊期。在发情期&#xff0c;麋鹿群根据公麋鹿之间的争斗分…

螺杆支撑座在运用中会出现哪些问题?

螺杆支撑座是一种用于支撑滚珠螺杆的零件&#xff0c;通常用于机床、数控机床、自动化生产线等高精度机械设备中。在运用中可能会出现多种问题&#xff0c;这些问题源于多个方面&#xff0c;以下是对可能出现的问题简单了解下&#xff1a; 1、安装不当&#xff1a;安装过程中没…

Unity3d UGUI如何优雅的实现Web框架(Vue/Rect)类似数据绑定功能(含源码)

前言 Unity3d的UGUI系统与Web前端开发中常见的数据绑定和属性绑定机制有所不同。UGUI是一个相对简单和基础的UI系统&#xff0c;并不内置像Web前端&#xff08;例如 Vue.js或React中&#xff09;那样的双向数据绑定或自动更新UI的机制。UGUI是一种比较传统的 UI 系统&#xff…

从0入门自主空中机器人-2-2【无人机硬件选型-PX4篇】

1. 常用资料以及官方网站 无人机飞控PX4用户使用手册&#xff08;无人机基本设置、地面站使用教程、软硬件搭建等&#xff09;&#xff1a;https://docs.px4.io/main/en/ PX4固件开源地址&#xff1a;https://github.com/PX4/PX4-Autopilot 飞控硬件、数传模块、GPS、分电板等…

Windows上缺少xaudio2_9.dll是什么原因?

一、文件丢失问题&#xff1a;Windows上缺少xaudio2_9.dll是什么原因&#xff1f; xaudio2_9.dll是DirectX音频处理库的一个组件&#xff0c;它支持游戏中的音频处理功能。当你在Windows系统上运行某些游戏或音频软件时&#xff0c;如果系统提示缺少xaudio2_9.dll文件&#xf…

冥想的实践

这是我某一天的正念和冥想实践&#xff0c;我对正念练习、冥想练习进行了分别的统计。 正念练习&#xff1a;1分钟**5次 冥想&#xff1a;15分钟10分钟 正念练习&#xff0c;基本在工作休息时间练习。当然&#xff0c;工作过程中&#xff0c;也有一部分时间会有正念的状态&am…