【开发篇】一、处理函数:定时器与定时服务

news2025/1/12 18:10:15

文章目录

  • 1、基本处理函数
  • 2、定时器和定时服务
  • 3、KeyedProcessFunction下演示定时器
  • 4、process重获取当前watermark

前面API篇完结,对数据的转换、聚合、窗口等,都是基于DataStream的,称DataStreamAPI,如图:
在这里插入图片描述

在Flink底层,可以不定义具体是什么算子,而只是一个统一的处理(process)操作,里面可以自定义逻辑。即图中底层的处理函数层。从下到上,封装越来越重,使用越来越简单。前面用的map等都是Flink封装好的,底层则是process。当现有的算子无法实现需求时,直接用process就行,最底层,最灵活,逻辑你自己开发就行,自定义处理逻辑!!!

1、基本处理函数

处理函数的使用和前面的转换算子一样,基于DataStream对象调用即可:

stream.process(new MyProcessFunction())
  • ProcessFunction不是接口,而是一个抽象类,继承了AbstractRichFunction

  • ProcessFunction的两个泛型:I表示Input,是输入的数据类型;O即Output,是处理完成之后输出的数据类型

  • ProcessFunction抽象类有抽象方法processElement须重写,以及非抽象方法onTimer

public abstract class ProcessFunction<I, O> extends AbstractRichFunction {

    ...
    public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
    ...

}

抽象方法 processElement:

  • 定义处理元素的逻辑
  • 流中的每个元素都会调用一次这个房啊
  • 三个形参分别为:流中数据value自身、上下文对象ctx获取相关信息、收集器out往下游发处理完的数据

非抽象方法onTimer:

  • 定时器触发时调用这个方法
  • 注册定时器即设一个闹钟,onTimer则是闹钟响了以后要做的事
  • onTimer是基于时间线的一个回调方法
  • onTimer的三个形参分别为:时间戳(timestamp),上下文(ctx),以及收集器(out)

最初的DataStream流在经过不同的操作后会得到不同类型的流,比如keyBy后的KeyedStream,window后的WindowedStream。对于这些不同类型的流,其实都可以直接调用.process()方法进行自定义处理,不过process重载,传参是不同类型的ProcessFunction

关于处理函数的分类:

  • 在什么情况下调用process方法,就传入一个什么类型的ProcessFunction
  • 具体类型,在process下Ctrl+P查看传参提示就行,比如DataStream下传ProcessFunction,按键分区后得到KeyedStream传KeyedProcessFunction

2、定时器和定时服务

ProcessFunction的上下文对象Context有timerService()方法,可返回一个TimerService对象。TimerService是Flink实现定时功能的关键。其常用方法:

  • 获取当前的处理时间
long currentProcessingTime();
  • 获取当前的水位线(事件时间)
long currentWatermark();
  • 注册处理时间定时器,当处理时间超过time时触发
void registerProcessingTimeTimer(long time);
  • 注册事件时间定时器,当水位线超过time时触发
void registerEventTimeTimer(long time);
  • 删除触发时间为time的处理时间定时器
void deleteProcessingTimeTimer(long time);
  • 删除触发时间为time的处理时间定时器
void deleteEventTimeTimer(long time);

注意:

  • 只有在KeyedStream中才支持使用TimerService设置定时器的操作
  • TimerService会以键(key)和时间戳为标准,对定时器进行去重,即同样的key和时间戳下,定时器只会留一个,触发时onTimer只被调用一次

3、KeyedProcessFunction下演示定时器

事件时间下的定时器演示:定义一个5s的定时器,在水位线时间到达5s时触发

public class KeyedProcessTimerDemo {

    public static void main(String[] args) throws Exception {
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("node01", 9527)
                .map(new WaterSensorMapFunction())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))  //乱序默认的水位生成器
                                .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)  //时间戳提取
                );


        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());

        // TODO Process:keyed
        SingleOutputStreamOperator<String> process = sensorKS.process(
                new KeyedProcessFunction<String, WaterSensor, String>() {
                    /**
                     * 来一条数据调用一次
                     * @param value  每条数据
                     * @param ctx 上下文对象
                     * @param out 采集器
                     * @throws Exception
                     */
                    @Override
                    public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                        //获取当前数据的key
                        String currentKey = ctx.getCurrentKey();
                        // 获取定时器服务对象
                        TimerService timerService = ctx.timerService();
                        // 数据中提取出来的事件时间
                        Long currentEventTime = ctx.timestamp(); 
                        //注册定时任务,水位线被推到5s时触发
                        timerService.registerEventTimeTimer(5000L);
                        System.out.println("当前key=" + currentKey + ",当前时间=" + currentEventTime + ",注册了一个5s的定时器");

                        
                    /**
                     * 时间进展到定时器注册的时间,调用该方法
                     * @param timestamp 当前时间进展,就是定时器被触发时的时间
                     * @param ctx       上下文
                     * @param out       采集器
                     * @throws Exception
                     */
                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        super.onTimer(timestamp, ctx, out);
                        String currentKey = ctx.getCurrentKey();
                        System.out.println("key=" + currentKey + "现在时间是" + timestamp + "定时器触发");
                    }
                }
        );

        process.print();

        env.execute();
    }
}

运行:注意时间戳8s时水位线为8s-3s-1ms < 5s,即当前最大事件时间 - 等待延迟时间 - 1ms,因此未触发,且同一个key,同一个定时时间,只有一个定时器生效:

在这里插入图片描述

看下不同key的效果,注意,水位线是多少和key没关系,s1,9,9进去,直接水位线变成9-3-1ms > 5s,三个定时器都触发

在这里插入图片描述

再用处理时间下的定时器:

public class KeyedProcessTimerDemo {

    public static void main(String[] args) throws Exception {
		
		//...重复代码略,同上
        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());

        // TODO Process:keyed
        SingleOutputStreamOperator<String> process = sensorKS.process(
                new KeyedProcessFunction<String, WaterSensor, String>() {
                    
                    @Override
                    public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                        //获取当前数据的key
                        String currentKey = ctx.getCurrentKey();
                        
                        TimerService timerService = ctx.timerService();

						//当前数据的处理时间
                        long currentTs = timerService.currentProcessingTime();
						//定时器不用水位线为标杆,直接处理时间加5s
                        timerService.registerProcessingTimeTimer(currentTs + 5000L);
                        
                        System.out.println("当前key=" + currentKey + ",当前时间=" + currentTs + ",注册了一个5s后的定时器");

                    }


         //...重复代码略,同上
}

运行:

在这里插入图片描述

4、process重获取当前watermark

还是用上面的socket流,但process逻辑不玩定时器,验证下watermark:

//...重复代码略,同上

@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {     
     // 获取 process的 当前watermark
     long currentWatermark = timerService.currentWatermark();
     System.out.println("当前数据=" + value + ",当前watermark=" + currentWatermark);
 }


此时可以看到,s1,1,1进去,水位线本应为1000ms-3000ms-1ms = -2001,但通过timerService获取到的却是起始值,就那个Long.MIN,直到s1,5进去,才获取到-2001,依次往下,都差一个

在这里插入图片描述

在process重获取当前的watermark,显示的是上一次的watermark,因为process还没接收到这条数据对应的生成的新的watermark。关键点:watermark也是一个数据,要跟着流中对应的那个数据往下游流。

在这里插入图片描述
在这里插入图片描述

上图示意了为什么s5,5获取到的水位线为-2001,因为此时process还没接收到这条数据对应的生成的新的watermark(1999还在process框外,框内只有一个-2001)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1137324.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Linux08-进程信号】信号的一生……

今天&#xff0c;带来Linux下进程信号的讲解。文中不足错漏之处望请斧正&#xff01; 是什么 生活中的信号 例子: 红绿灯来电铃声老妈倒数321叫我起床外卖小哥叫我下楼拿外卖 理解: 过程&#xff1a;收到信号 → 分析信号 → 产生信号对应的行为信号不一定会被立即处理&…

Flutter——最详细(Scaffold)使用教程

Scaffold简介 相当于界面的主体&#xff08;类似于安卓最外层PhoneWindow&#xff09;&#xff0c;组件的展示都必须依附于它。 使用场景&#xff1a; 每一个界面都是脚手架&#xff0c;通过它来进行架构实现&#xff0c;优美的布局效果。 属性作用appBar顶部的标题栏body显示整…

Qwt QwtPlotMarker标记类详解

1.概述 QwtPlotMarker类是Qwt绘图库中用于在图表上绘制标记的类。标记可以是垂直或水平线、直线、文本或箭头等。它可用于标记某个特定的位置、绘制参考线或注释信息。 以下是类继承关系图&#xff1a; 2.常用方法 设置标记的坐标。传入x和y坐标值&#xff0c;标记将被放置在…

红黑树--讲解以及详细实现过程

目录 红黑树理解红黑树概念红黑树性质 红黑树实现红黑树图解基础结构实现插入实现 红黑树理解 红黑树概念 红黑树&#xff0c;是一种二叉搜索树&#xff0c;但在每个结点上增加一个存储位表示结点的颜色&#xff0c;可以是Red或Black。 通过对任何一条从根到叶子的路径上各个…

六零导航页SQL注入漏洞复现(CVE-2023-45951)

0x01 产品简介 LyLme Spage&#xff08;六零导航页&#xff09;是中国六零&#xff08;LyLme&#xff09;开源的一个导航页面。致力于简洁高效无广告的上网导航和搜索入口&#xff0c;支持后台添加链接、自定义搜索引擎&#xff0c;沉淀最具价值链接&#xff0c;全站无商业推广…

前端(二十五)——前端实现 OCR 图文识别的详细步骤与示例代码

&#x1f601;博主&#xff1a;小猫娃来啦 &#x1f601;文章核心&#xff1a;前端实现 OCR 图文识别的详细步骤与示例代码 文章目录 简介确定使用的 OCR API创建前端界面添加图像上传功能发送识别请求和处理识别结果完善代码添加注释结论附录 简介 在现代应用程序中&#xff…

如何选择向量数据库|Weaviate Cloud v.s. Zilliz Cloud

随着以 Milvus 为代表的向量数据库在 AI 产业界越来越受欢迎&#xff0c;传统数据库和检索系统也开始在快速集成专门的向量检索插件方面展开角逐。 例如 Weaviate 推出开源向量数据库&#xff0c;凭借其易用、开发者友好、上手快速、API 文档齐全等特点脱颖而出。同样&#xff…

使用AOP切面实现日志记录功能

系列文章 1.SpringBoot整合RabbitMQ并实现消息发送与接收 2. 解析JSON格式参数 & 修改对象的key 3. VUE整合Echarts实现简单的数据可视化 4. Java中运用BigDecimal对字符串的数值进行加减乘除等操作 5. List&#xff1c;HashMap&#xff1c;String,String&#xff1e;&…

【Javascript】函数(变量作用域)

变量&#xff1a;全局变量&#xff0c;局部变量 全局变量 挂载到window对象上的 var a全局变量;console.log(a);var a全局变量;console.log(window.a);var a全局变量;在控制台里输入a也能打印a的值 局部变量 函数体内部声明的变量 var a全局变量;function test(){var b局部…

软考高级之系统架构师系列之UP、RUP、4+1视图、JAD、JRP、RAD

概述 软件工程是一个很庞杂的系统工程&#xff0c;而我们面对的软件需求也很复杂&#xff1a; 面对不同规模&#xff08;复杂度&#xff0c;模块量&#xff0c;用户量&#xff0c;开发周期等等&#xff09;的软件项目&#xff0c;人员储备不尽不同的开发团队也会采用不同的软…

数据可视化在行业解决方案中的实践应用 ——华为云Astro Canvas大屏开发研究及指南

本文主要探讨华为云Astro Canvas在数据可视化大屏开发中的应用及效果。首先阐述Astro Canvas的基本概念、功能和特性说明&#xff0c;接着集中分析展示其在教育、金融、交通行业等不同领域实际应用案例&#xff1b;之后&#xff0c;详细介绍使用该工具进行大屏图表创建的开发指…

22年下半年上午题

计算机指令集 cpu的构成 存储器 决策表 原型模型 白盒测试 活动图 构件图 半圆是需接口&#xff0c;满圆是供接口&#xff0c;上图有小错误。 故障类型 b-树 排序算法复杂度 二分查找平均比较次数 成功查找比较平均次数 失败查找平均比较次数 如有 OSI 模型层次对应典型机器…

Vue+ElementUI项目打包部署到Ubuntu服务器中

1、修改config/index.js中的assetsPublicPath: /,修改为assetsPublicPath: ./ assetsPublicPath: ./2、在build/utils.js中增加publicPath: ../../ publicPath: ../../3、打开终端&#xff0c;在根目录下执行npm run build进行打包&#xff0c;打包成功后会生成dist npm run…

前端使用 printJS 插件打印多页:第一页空白问题解决

printJS({printable: [data:image/jpg;base64,${this.printData.url}],type: image,style: media print { page {size: auto; margin: 0; } body{margin:0 5px}} // 解决出现多页打印时第一页空白问题 })

java基础 集合2

9.List遍历方式&#xff1a; 10.Arraylist底层原理&#xff1a; 11.Linklist底层原理&#xff1a; 1.LinkedList做队列和栈&#xff1a; package day01;import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List;publ…

Vue3 + Tsx 集成 ace-editor编辑器

Ace Editor介绍 Ace Editor&#xff08;全名&#xff1a;Ajax.org Cloud9 Editor&#xff09;是一个开源的代码编辑器&#xff0c;旨在提供强大的代码编辑功能&#xff0c;通常用于构建基于Web的代码编辑应用程序。它最初由Cloud9 IDE开发&#xff0c;现在由开源社区维护。 主…

计算机网络 第四章网络层

文章目录 1 网络层的功能2 数据交换方式&#xff1a;电路交换3 数据交换方式&#xff1a;报文交换4 数据交换方式&#xff1a;分组交换5 数据交换方式&#xff1a;数据报方式6 数据交换方式&#xff1a;虚电路方式及各种方式对比7 路由算法及路由协议8 IP数据报的概念和格式9 I…

数据存储成本降低50%!图匠数据搭载OceanBase全新出发

近日&#xff0c;AI 技术公司 ImageDT 图匠数据&#xff08;以下简称“图匠”&#xff09;上线 OceanBase。目前&#xff0c;公司两大核心业务“数货宝”、“数智柜”已全面接入 OB Cloud 云数据库&#xff0c;保障图匠一站式全渠道销售数字化闭环作战平台的每一笔「数据」都算…

浮动面试题

浮动元素特点&#xff1a;

找不到mfc100u.dll怎么解决,总结了多种修复方法帮你解决

首先&#xff0c;让我们来了解一下mfc100u.dll文件是什么&#xff1f;其实&#xff0c;mfc100u.dll是Microsoft Foundation Class(MFC)库中的一个动态链接库文件&#xff0c;它包含了一些常用的类、函数和变量等资源&#xff0c;用于支持Windows应用程序的开发。 那么&#xf…