Flink处理函数(2)—— 按键分区处理函数

news2025/1/16 1:07:15

 按键分区处理函数(KeyedProcessFunction):先进行分区,然后定义处理操作

1.定时器(Timer)和定时服务(TimerService)

  • 定时器(timers)是处理函数中进行时间相关操作的主要机制
  • 定时服务(TimerService)提供了注册定时器的功能

TimerService 是 Flink 关于时间和定时器的基础服务接口:

// 获取当前的处理时间
long currentProcessingTime();
// 获取当前的水位线(事件时间)
long currentWatermark();
// 注册处理时间定时器,当处理时间超过 time 时触发
void registerProcessingTimeTimer(long time);
// 注册事件时间定时器,当水位线超过 time 时触发
void registerEventTimeTimer(long time);
// 删除触发时间为 time 的处理时间定时器
void deleteProcessingTimeTimer(long time);
// 删除触发时间为 time 的事件时间定时器
void deleteEventTimeTimer(long time);

六个方法可以分成两大类:基于处理时间和基于事件时间。而对应的操作主要有三个:获取当前时间,注册定时器,以及删除定时器

尽管处理函数中都可以直接访问TimerService,不过只有基于 KeyedStream 的处理函数,才能去调用注册和删除定时器的方法;未作按键分区的 DataStream 不支持定时器操作,只能获取当前时间

对于处理时间和事件时间这两种类型的定时器,TimerService 内部会用一个优先队列将它们的时间戳保存起来,排队等待执行;可以认为,定时器其实是 KeyedStream上处理算子的一个状态,它以时间戳作为区分。所以 TimerService 会以键(key)和时间戳为标准,对定时器进行去重;也就是说对于每个 key 和时间戳,最多只有一个定时器,如果注册了多次,onTimer()方法也将只被调用一次

基于 KeyedStream 注册定时器时,会传入一个定时器触发的时间戳,这个时间戳的定时器对于每个 key 都是有效的;利用这个特性,有时我们可以故意降低时间戳的精度,来减少定时器的数量,从而提高处理性能。比如我们可以在设置定时器时只保留整秒数,那么定时器的触发频率就是最多 1 秒一次:

long coalescedTime = time / 1000 * 1000; //时间戳(定时器默认的区分精度是毫秒)
ctx.timerService().registerProcessingTimeTimer(coalescedTime); //注册定时器

2.KeyedProcessFunction 的使用

基础用法:

stream.keyBy( t -> t.f0 ).process(new MyKeyedProcessFunction())

这里的MyKeyedProcessFunction即是KeyedProcessFunction的一个实现类;

源码解析


KeyedProcessFunction源码如下:

public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {

    private static final long serialVersionUID = 1L;

    /**
     * Process one element from the input stream.
     *
     * <p>This function can output zero or more elements using the {@link Collector} parameter and
     * also update internal state or set timers using the {@link Context} parameter.
     *
     * @param value The input value.
     * @param ctx A {@link Context} that allows querying the timestamp of the element and getting a
     *     {@link TimerService} for registering timers and querying the time. The context is only
     *     valid during the invocation of this method, do not store it.
     * @param out The collector for returning result values.
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the
     *     operation to fail and may trigger recovery.
     */
    public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;

    /**
     * Called when a timer set using {@link TimerService} fires.
     *
     * @param timestamp The timestamp of the firing timer.
     * @param ctx An {@link OnTimerContext} that allows querying the timestamp, the {@link
     *     TimeDomain}, and the key of the firing timer and getting a {@link TimerService} for
     *     registering timers and querying the time. The context is only valid during the invocation
     *     of this method, do not store it.
     * @param out The collector for returning result values.
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the
     *     operation to fail and may trigger recovery.
     */
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}

    /**
     * Information available in an invocation of {@link #processElement(Object, Context, Collector)}
     * or {@link #onTimer(long, OnTimerContext, Collector)}.
     */
    public abstract class Context {

        /**
         * Timestamp of the element currently being processed or timestamp of a firing timer.
         *
         * <p>This might be {@code null}, for example if the time characteristic of your program is
         * set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
         */
        public abstract Long timestamp();

        /** A {@link TimerService} for querying time and registering timers. */
        public abstract TimerService timerService();

        /**
         * Emits a record to the side output identified by the {@link OutputTag}.
         *
         * @param outputTag the {@code OutputTag} that identifies the side output to emit to.
         * @param value The record to emit.
         */
        public abstract <X> void output(OutputTag<X> outputTag, X value);

        /** Get key of the element being processed. */
        public abstract K getCurrentKey();
    }

    /**
     * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
     */
    public abstract class OnTimerContext extends Context {
        /** The {@link TimeDomain} of the firing timer. */
        public abstract TimeDomain timeDomain();

        /** Get key of the firing timer. */
        @Override
        public abstract K getCurrentKey();
    }
}

可以看到和ProcessFunction类似,都有一个processElement()onTimer()方法,并且定义了一个Context抽象类;不同点在于类型参数多了一个K,也就是key的类型;

代码示例

①处理时间语义

public class ProcessingTimeTimerTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 处理时间语义,不需要分配时间戳和watermark
        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource());

        // 要用定时器,必须基于KeyedStream
        stream.keyBy(data -> true)
                .process(new KeyedProcessFunction<Boolean, Event, String>() {
                    @Override
                    public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
                        Long currTs = ctx.timerService().currentProcessingTime();
                        out.collect("数据到达,到达时间:" + new Timestamp(currTs));
                        // 注册一个10秒后的定时器
                        ctx.timerService().registerProcessingTimeTimer(currTs + 10 * 1000L);
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        out.collect("定时器触发,触发时间:" + new Timestamp(timestamp));
                    }
                })
                .print();

        env.execute();
    }
}

通过ctx.timerService().currentProcessingTime()获取当前处理时间;

通过ctx.timerService().registerProcessingTimeTimer来设置一个定时器;

运行结果如下:

由于定时器是处理时间的定时器,不用考虑水位线延时问题,因此10s后能够准时触发定时操作;


②事件时间语义:

public class EventTimeTimerTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream = env.addSource(new CustomSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        }));

        // 基于KeyedStream定义事件时间定时器
        stream.keyBy(data -> true)
                .process(new KeyedProcessFunction<Boolean, Event, String>() {
                    @Override
                    public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
                        out.collect("数据到达,时间戳为:" + ctx.timestamp());
                        out.collect("数据到达,水位线为:" + ctx.timerService().currentWatermark() + "\n -------分割线-------");
                        // 注册一个10秒后的定时器
                        ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 10 * 1000L);
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        out.collect("定时器触发,触发时间:" + timestamp);
                    }
                })
                .print();

        env.execute();
    }

    // 自定义测试数据源
    public static class CustomSource implements SourceFunction<Event> {
        @Override
        public void run(SourceContext<Event> ctx) throws Exception {
            // 直接发出测试数据
            ctx.collect(new Event("Mary", "./home", 1000L));
            // 为了更加明显,中间停顿5秒钟
            Thread.sleep(5000L);

            // 发出10秒后的数据
            ctx.collect(new Event("Mary", "./home", 11000L));
            Thread.sleep(5000L);

            // 发出10秒+1ms后的数据
            ctx.collect(new Event("Alice", "./cart", 11001L));
            Thread.sleep(5000L);
        }

        @Override
        public void cancel() { }
    }
}

运行结果如下:

运行结果解释:

①第一条数据到来时,时间戳为1000,但由于水位线的生成是周期性的(默认200ms),因此水位线不会立即发送改变,仍然是Long.MIN_VALUE,之后只要到了水位线生成的时间周期,就会依据当前最大的时间戳来生成水位线(默认减1)

②第二条数据到来时,显然水位线已经推进到了999,但仍然不会立即改变;

③在事件时间语义下,定时器触发的条件就是水位线推进到设定的时间;第一条数据到来之后,设定的定时器时间为11000,而当时间戳为11000的数据到来时,水位线还停留在999的位置,因此不会立即触发定时器;之后水位线会推进到10999(11000-1),同样无法触发定时器;

④第三条数据到来时,时间戳为11001,此时水位线推进到了10999,等到水位线周期性更新后,推进到11000(11001-1),这样第一个定时器就会触发

⑤然后等待5s后,没有新的数据到来,整个程序结束,将要退出,此时会将水位线推进到Long.MAX_VALUE,所以所有没有触发的定时器统一触发;

 学习课程链接:【尚硅谷】Flink1.13实战教程(涵盖所有flink-Java知识点)_哔哩哔哩_bilibili

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1399271.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

python开发之远程开发工具对比

前言 除了本地开发外&#xff0c;还有一种常见的开发方式就是远程开发&#xff0c;一般情况是一台Windows或mac笔记本作为日常使用的电脑&#xff0c;另有一台linux服务器作为开发服务器。开发服务器的性能往往较强&#xff0c;这样远程开发的方式一方面可以让我们在习惯的系统…

PWM实现呼吸灯

PWM也属于51中的重要章节&#xff0c;本节主要介绍呼吸灯&#xff0c;目的是理解PWM的工作原理&#xff0c;PWM的实验案例重点还得看后续的舵机&#xff08;下一节会讲到&#xff09; 那么何为呼吸灯。呼吸灯的定义是&#xff1a;灯光实现由亮到暗的变化或由暗到亮的逐渐变化。…

一篇文章搞懂什么是测试,测试是干什么的?

&#x1f525; 交流讨论&#xff1a;欢迎加入我们一起学习&#xff01; &#x1f525; 资源分享&#xff1a;耗时200小时精选的「软件测试」资料包 &#x1f525; 教程推荐&#xff1a;火遍全网的《软件测试》教程 &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1…

信号处理专题设计-基于边缘检测的数字图像分类识别

目录 一、实验目的 二、实验要求 三、实验原理 1.卷积神经网络&#xff08;CNN&#xff09;模型 2.边缘检测 3.形态学操作 4.鲁棒性 四、实验过程 1.数据预处理 2. 网络的构建 3.模型的训练 4.边缘检测和形态学操作相关代码 5.模型训练结果 6.关键信息的保存 五、实验测试与评估…

网络安全全栈培训笔记(54-服务攻防-数据库安全RedisHadoopMysqla未授权访问RCE)

第54天 服务攻防-数据库安全&Redis&Hadoop&Mysqla&未授权访问&RCE 知识点&#xff1a; 1、服务攻防数据库类型安全 2、Redis&Hadoop&Mysql安全 3、Mysql-CVE-2012-2122漏洞 4、Hadoop-配置不当未授权三重奏&RCE漏洞 3、Redis-配置不当未授权…

金蝶云星空表单插件获取单据体数据

文章目录 金蝶云星空表单插件获取单据体数据 金蝶云星空表单插件获取单据体数据 使用标识报错 var thisEntry this.View.Model.DataObject["FEntity"] as DynamicObjectCollection;应该使用实体属性 var thisEntry this.View.Model.DataObject["BillEntry&q…

Python连接数据库的梳理

我们通常用的数据库类型主要有关系型数据库&#xff0c;非关系型数据库等&#xff0c;其中关系型数据库主要有Microsoft SQL Server ,MySQL,Oracle&#xff0c;SQLite等&#xff0c;常用的非关系型数据库包括Redis、DynamoDB&#xff0c;MongoDB等 ​​​​​​​ 一 关系型…

Qt文件和目录相关操作

1.相关说明 QCoreApplication类、QFile类、QDir、QTemporaryDir类、QTemporaryFile类、QFileSystemWatcher类的相关函数 2.相关界面 3.相关代码 #include "dialog.h" #include "ui_dialog.h" #include <QFileDialog> #include <QTemporaryDir>…

MySQL综合练习题

一、创建表的素材 CREATE TABLE dept ( deptno INT(2) NOT NULL COMMENT 部门编号, dname VARCHAR (15) COMMENT 部门名称, loc VARCHAR (20) COMMENT 地理位置 ); -- 添加主键 ALTER TABLE dept ADD PRIMARY KEY (deptno); -- 添加数据 INSERT INTO dept (deptno…

nuclei安装;linux上 以及使用教程

kali安装go环境_go1.17 kali安装-CSDN博客Ubuntu完美解决Github网站打不开问题 - 一抹烟霞 - 博客园 (cnblogs.com) All releases - The Go Programming Language 然但是上面两个我似乎都没用到网上的教程 也不适用 一个网不好 一个apt没找到包 然后我先试试了版本 结果 我的…

组件通信方式

组件通信 父子组件通信 单向数据流 属性传递props&#xff08;还有插槽&#xff0c;$attrs非属性&#xff09;/$emit&#xff0c;发布订阅模式 方法也可以作为属性 父子组件渲染生命周期&#xff1a; 获取组件实例。$children、ref&$refs/$parent 祖先和后代 组件和后代通信…

python之粘包/粘包的解决方案

python之粘包/粘包的解决方案 什么是粘包 粘包就是在数据传输过程中有多个数据包被粘连在一起被发送或接受 服务端&#xff1a; import socket import struct# 创建Socket Socket socket.socket(socket.AF_INET, socket.SOCK_STREAM)# 绑定服务器和端口号 servers_addr (…

java打包及上传到私服务

一、准备Maven私服Nexus 添加saas.maven 仓库地址&#xff1a;http://192.168.31.109:8081/repository/saas.maven 二、新建SpringBoot项目com.saas.pdf 添加类&#xff1a;PdfUtil.java package com.saas.pdf;public class PdfUtil {public static void Save(String fileP…

ubuntu20遇到缺少qt4相关库的问题

最近需要做套接字通讯的工作&#xff0c;最好是有一个网络调试软件能够接受或者发送套接字&#xff0c;测试代码能够正常通讯。windows下有很多&#xff0c;但是linux下比较少&#xff0c;使用广泛的是下面这一款。 1、安装 首先从网盘&#xff08;链接: https://pan.baidu.c…

安装conda搭建python环境(保姆级教程)

目录 一、Anaconda简介二、Anaconda安装 2.1 Anaconda下载2.2 Anaconda安装2.3 配置环境变量 三、通过conda配置python环境 3.1 创建并激活虚拟环境3.2 管理虚拟环境 一、Anaconda简介 Anaconda 是专门为了方便使用 Python 进行数据科学研究而建立的一组软件包&#xff0c;…

【设计模式-08】Flyweight享元模式

简要说明 简要的理解&#xff1a;享元模式就是新建一个池(Pool)&#xff0c;该池子(Pool)中有新建好的一堆对象&#xff0c;当需要使用时&#xff0c;从池子(Pool)中直接获取&#xff0c;不用重新新建一个对象。通俗的讲就是&#xff1a;共享元数据。 比如Java中的String就是使…

Python圣诞主题绘图:用turtle库打造冬日奇妙画面【第31篇—python:圣诞节】

文章目录 Python圣诞主题绘图导言代码结构概览详细解析drawlight函数tree函数xzs函数drawsnow函数五角星的绘制 完整代码代码解析总结 Python圣诞主题绘图 导言 圣诞季节是个充满欢乐和创意的时刻。在这个技术博客中&#xff0c;我们将深入探讨如何使用Python的turtle库创建一…

【华为 ICT HCIA eNSP 习题汇总】——题目集4

1、&#xff08;多选&#xff09;网络中出现故障后&#xff0c;管理员通过排查发现某台路由器的配置被修改了&#xff0c;那么管理员应该采取哪些措施来避免这种状况再次发生&#xff1f; A、管理员应该通过配置 ACL 来扩展只有管理员能够登录设备 B、管理员应该在路由的管理端…

Redis原理篇(QuickList)

一.前言&#xff1a;ZipList出现的问题 QuickList的出现是为了解决ZipList所存在的一些问题 1.寻找大内存块&#xff0c;申请内存效率低 ZipList所申请的是连续的内存空间&#xff0c;如果ZipList里面存放的数据过多&#xff0c;就需要一块很大的连续内存&#xff0c;系统需…

【c++笔记】用c++解决一系列质数问题!

质数是c语言和c中比较常见的数学问题&#xff0c;本篇文章将带你走进有关质数的一系列基础问题&#xff0c;其中包含常见的思路总结&#xff0c;本篇文章过后&#xff0c;将会持续更新c算法系列&#xff0c;感兴趣的话麻烦点个关注吧&#xff01; 希望能给您带来帮助&#xff…