一文弄明白KeyedProcessFunction函数

news2025/1/7 6:43:51

引言

KeyedProcessFunction是Flink用于处理KeyedStream的数据集合,它比ProcessFunction拥有更多特性,例如状态处理和定时器功能等。接下来就一起来了解下这个函数吧

正文

了解一个函数怎么用最权威的地方就是 官方文档 以及注解,KeyedProcessFunction的注解如下

/**
 * A keyed function that processes elements of a stream.
 *
 * <p>For every element in the input stream {@link #processElement(Object, Context, Collector)} is
 * invoked. This can produce zero or more elements as output. Implementations can also query the
 * time and set timers through the provided {@link Context}. For firing timers {@link #onTimer(long,
 * OnTimerContext, Collector)} will be invoked. This can again produce zero or more elements as
 * output and register further timers.
 *
 * <p><b>NOTE:</b> Access to keyed state and timers (which are also scoped to a key) is only
 * available if the {@code KeyedProcessFunction} is applied on a {@code KeyedStream}.
 *
 * <p><b>NOTE:</b> A {@code KeyedProcessFunction} is always a {@link
 * org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the {@link
 * org.apache.flink.api.common.functions.RuntimeContext} is always available and setup and teardown
 * methods can be implemented. See {@link
 * org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
 * and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
 */

上面简单来说就是以下四点

  1. Flink中输入流中的每一条数据都会触发KeyedProcessFunction类的processElement方法调用
  2. 通过这个方法的Context参数可以设置定时器,在开启定时器后会程序会定时调用onTimer方法
  3. 由于KeyedProcessFunction实现了RichFunction接口,因此是可以通过RuntimeContext上下文对象管理状态state的开启和释放
  4. 需要注意的是,只有在KeyedStream里才能够访问state和定时器,通俗点来说就是这个函数要用在keyBy这个函数的后面

processElement方法解析

  1. Flink会调用processElement方法处理输入流中的每一条数据
  2. KeyedProcessFunction.Context参数可以用来读取以及更新内部状态state
  3. 这个KeyedProcessFunction跟其他function一样通过参数中的Collector对象以回写的方式返回数据

onTimer方法解析:在启用TimerService服务时会定时触发此方法,一般会在processElement方法中开启TimerService服务

以上就是这个函数的基本知识,接下来就通过实战来熟悉下它的使用

实战简介

本次实战的目标是学习KeyedProcessFunction,内容如下:

  1. 监听本机7777端口读取字符串
  2. 将每个字符串用空格分隔,转成Tuple2实例,f0是分隔后的单词,f1等于1
  3. 将Tuple2实例集合通过f0字段分区,得到KeyedStream
  4. KeyedSteam通过自定义KeyedProcessFunction处理
  5. 自定义KeyedProcessFunction的作用,是记录每个单词最新一次出现的时间,然后建一个十秒的定时器进行触发

使用代码例子

首先定义pojo类

public class CountWithTimestampNew {

    private String key;

    private long count;

    private long lastQuestTimestamp;

    public long getAndIncrementCount() {
        return ++count;
    }

    public String getKey() {
        return key;
    }

    public void setKey(String key) {
        this.key = key;
    }

    public long getCount() {
        return count;
    }

    public void setCount(long count) {
        this.count = count;
    }

    public long getLastQuestTimestamp() {
        return lastQuestTimestamp;
    }

    public void setLastQuestTimestamp(long lastQuestTimestamp) {
        this.lastQuestTimestamp = lastQuestTimestamp;
    }
}

接着实现KeyedProcessFunction类

public class CountWithTimeoutKeyProcessFunctionNew extends KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Long>> {

    private ValueState<CountWithTimestampNew> state;

    @Override
    public void open(Configuration parameters) throws Exception {
        state = getRuntimeContext().getState(new ValueStateDescriptor<CountWithTimestampNew>("sherlock-state", CountWithTimestampNew.class));
    }

    // 实现数据处理逻辑的地方
    @Override
    public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
        Tuple currentKey = ctx.getCurrentKey();

        CountWithTimestampNew countWithTimestampNew = state.value();
        if (countWithTimestampNew == null) {
            countWithTimestampNew = new CountWithTimestampNew();
            countWithTimestampNew.setKey(value.f0);
        }

        countWithTimestampNew.getAndIncrementCount();

        //更新这个单词最后一次出现的时间
        countWithTimestampNew.setLastQuestTimestamp(ctx.timestamp());

        //单词之间不会互相覆盖吗?推测state对象是跟key绑定,针对每一个不同的key KeyedProcessFunction会创建其对应的state对象
        state.update(countWithTimestampNew);

        //给当前单词创建定时器,十秒后触发
        long timer = countWithTimestampNew.getLastQuestTimestamp()+10000;

        //尝试注释掉看看是否还会触发onTimer方法
        ctx.timerService().registerProcessingTimeTimer(timer);

        //打印所有信息,用于确保数据准确性
        System.out.println(String.format(" 触发processElement方法,当前的key是 %s, 这个单词累加次数是 %d, 上次请求的时间是:%s, timer的时间是: %s",
                    currentKey.getField(0),
                    countWithTimestampNew.getCount(),
                    time(countWithTimestampNew.getLastQuestTimestamp()),
                    time(timer)
                ));
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {

        Tuple currentKey = ctx.getCurrentKey();

        CountWithTimestampNew countWithTimestampNew = state.value();

        //标记当前元素是否已经连续10s未出现
        boolean isTimeout = false;

        if (timestamp >= countWithTimestampNew.getLastQuestTimestamp()+10000 ) {
            //out.collect(new Tuple2<>(countWithTimestampNew.getKey(), countWithTimestampNew.getCount()));
            isTimeout = true;
        }

        //打印所有信息,用于确保数据准确性
        System.out.println(String.format(" 触发onTimer方法,当前的key是 %s, 这个单词累加次数是 %d, 上次请求的时间是:%s, timer的时间是: %s, 当前单词是否已超过10秒没有再请求: %s",
                currentKey.getField(0),
                countWithTimestampNew.getCount(),
                time(countWithTimestampNew.getLastQuestTimestamp()),
                time(timestamp),
                String.valueOf(isTimeout)
        ));
    }

    public static String time(long timeStamp) {
        return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));
    }
}

最后是启动类

public class KeyedProcessFunctionDemo2 {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 并行度1
        env.setParallelism(1);

        // 处理时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        // 监听本地9999端口,读取字符串
        DataStream<String> socketDataStream = env.socketTextStream("localhost", 7777);

        // 所有输入的单词,如果超过10秒没有再次出现,都可以通过CountWithTimeoutFunction得到
        DataStream<Tuple2<String, Long>> timeOutWord = socketDataStream
                // 对收到的字符串用空格做分割,得到多个单词
                .flatMap(new SplitterFlatMapFunction())
                // 设置时间戳分配器,用当前时间作为时间戳
                .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() {

                    @Override
                    public long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) {
                        // 使用当前系统时间作为时间戳
                        return System.currentTimeMillis();
                    }

                    @Override
                    public Watermark getCurrentWatermark() {
                        // 本例不需要watermark,返回null
                        return null;
                    }
                })
                // 将单词作为key分区
                .keyBy(0)
                // 按单词分区后的数据,交给自定义KeyedProcessFunction处理
                .process(new CountWithTimeoutKeyProcessFunctionNew());

        // 所有输入的单词,如果超过10秒没有再次出现,就在此打印出来
        timeOutWord.print();

        env.execute("ProcessFunction demo : KeyedProcessFunction");
    }
}

演示

在启动服务前,先通过linux指令监听端口 nc -lk 7777

  1. 启动Flink服务后,往7777端口里面发送数据
    在这里插入图片描述

  2. 通过IDEA的终端可以看到有日志输出,可以看到在发送消息的时候第一条日志立马打印出来并在10秒后输出第二条日志
    在这里插入图片描述

  3. 那么咱们尝试连续发送两条Hello呢,可以看到累加器会持续累加,并且会触发两次onTimer方法,也就是每一条消息都会触发一次。由于连续发送两条,因此可以看得到第三行日志的末尾是false,说明收到第一条后的10秒内又有相同的消息进来。第二条是ture说明在收到第二条消息后的10秒内没有消息进来
    在这里插入图片描述

  4. 再输入点其他的试试
    在这里插入图片描述

  5. 通过输出可以看到这些单词的计数器又从0开始,说明每一个Key都对应一个状态
    在这里插入图片描述

思考题

  1. open方法会在哪里进行调用,KeyedProcessFunction整个类的完整调用逻辑是怎么样的
  2. registerProcessingTimeTimer和registerEventTimeTimer的差异是什么

参考资料

  1. https://blog.csdn.net/boling_cavalry/article/details/106299167
  2. https://blog.csdn.net/lujisen/article/details/105510532
  3. https://blog.csdn.net/qq_31866793/article/details/102831731

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1463258.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

stm32和嵌入式linux可以同步学习吗?

在开始前我有一些资料&#xff0c;是我根据网友给的问题精心整理了一份「stm3的资料从专业入门到高级教程」&#xff0c; 点个关注在评论区回复“888”之后私信回复“888”&#xff0c;全部无偿共享给大家&#xff01;&#xff01;&#xff01;如果需要使用STM32&#xff0c;建…

LeetCode 0106.从中序与后序遍历序列构造二叉树:分治(递归)——五彩斑斓的题解(若不是彩色的可以点击原文链接查看)

【LetMeFly】106.从中序与后序遍历序列构造二叉树&#xff1a;分治&#xff08;递归&#xff09;——五彩斑斓的题解&#xff08;若不是彩色的可以点击原文链接查看&#xff09; 力扣题目链接&#xff1a;https://leetcode.cn/problems/construct-binary-tree-from-inorder-an…

28.云原生之服务网格ServiceMesh和istio

云原生专栏大纲 文章目录 Service Mesh介绍为什么要使用ServiceMesh&#xff1f;Istio介绍istio架构EnvoyIstiod Istio 核心流量管理安全可观测性 Istio 原理istio资源和k8s资源扭转关系istio-ingressgatewayIstio-GatewayVirtualServiceDestinationRule Service Mesh介绍 Se…

【小智好书分享• 第一期】深度学习计算机视觉

目录 一、内容简介二、内页插图三、书籍目录四、粉丝福利中奖名单 &#x1f389;博客主页&#xff1a;小智_x0___0x_ &#x1f389;欢迎关注&#xff1a;&#x1f44d;点赞&#x1f64c;收藏✍️留言 &#x1f389;系列专栏&#xff1a;好书分享 &#x1f389;代码仓库&#xf…

springboot210基于Springboot开发的精简博客系统的设计与实现

基于Springboot开发的精简博客系统的设计与实现 摘要 当下&#xff0c;正处于信息化的时代&#xff0c;许多行业顺应时代的变化&#xff0c;结合使用计算机技术向数字化、信息化建设迈进。以前企业对于博客信息的管理和控制&#xff0c;采用人工登记的方式保存相关数据&#…

Video generation models as world simulators-视频生成模型作为世界模拟器

原文地址&#xff1a;Video generation models as world simulators 我们探索在视频数据上进行大规模生成模型的训练。具体来说&#xff0c;我们联合训练文本条件扩散模型&#xff0c;同时处理不同持续时间、分辨率和长宽比的视频和图像。我们利用一个在视频和图像潜在编码的时…

Salesforce顾问如何拿到更高的薪水?

顾问的角色已经在Salesforce生态系统存在了一段时间&#xff0c;随着Salesforce针对职业发展的Trailhead培训模块的发布&#xff0c;该角色的热度又达到了新的浪潮。越来越多人走上了Salesforce顾问这条职业道路。 当然其薪资水平也非常可观&#xff0c;据调查&#xff0c;美国…

【Linux系统化学习】深入理解匿名管道(pipe)和命名管道(fifo)

目录 进程间通信 进程间通信目的 进程间通信的方式 管道 System V IPC&#xff08;本地通信&#xff09; POSIX IPC&#xff08;网络通信&#xff09; 管道 什么是管道 匿名管道 匿名管道的创建 匿名管道的使用 匿名管道的四种情况 匿名管道的五种特性 命名管道 …

2024第16届全国大学生广告艺术大赛介绍

全国大学生广告艺术大赛介绍 全国大学生广告艺术大赛&#xff08;简称大广赛&#xff09;自2005年第1届至今&#xff0c;遵循“促进教改、启迪智慧、强化能力、提高素质、立德树人”的竞赛宗旨&#xff0c;成功举办了14届共15次赛事&#xff0c;全国共有1857所高校参与其中&am…

#LLM入门|Prompt#1.7_文本拓展_Expanding

输入简短文本&#xff0c;生成更加丰富的长文。 “温度”&#xff08;temperature&#xff09;&#xff1a;控制文本生成的多样性。 一、定制客户邮件 根据客户的评价和其中的情感倾向&#xff0c;使用大语言模型针对性地生成回复邮件。将大大提升客户满意度。 # 我们可以在…

Rust: reqwest库示例

一、单一文件异步 1、cargo.toml [dependencies] tokio { version "1.0.0", features ["full", "tracing"] } tokio-util { version "0.7.0", features ["full"] } tokio-stream { version "0.1" }tr…

《数字化运维路线图》第四部分-数字化运维转型场景 震撼发布!

《数字化运维路线图》系列的压轴之作——《数字化运维转型场景》终于迎来正式发布。这部分内容与《数字化运维组织升级》、《数字化运维转型的标准流程》和《数字化运维转型平台》共同构成了一套完整的数字化运维转型作战蓝图&#xff0c;全方位、多角度地概括了企业如何有效地…

10MARL深度强化学习 Value Decomposition in Common-Reward Games

文章目录 前言1、价值分解的研究现状2、Individual-Global-Max Property3、Linear and Monotonic Value Decomposition3.1线性值分解3.2 单调值分解 前言 中心化价值函数能够缓解一些多智能体强化学习当中的问题&#xff0c;如非平稳性、局部可观测、信用分配与均衡选择等问题…

前端架构: 脚手架之Chalk和Chalk-CLI使用教程

Chalk Chalk 是粉笔的意思, 它想表达的是&#xff0c;给我们的命令行中的文本添加颜色类似彩色粉笔的功能 在官方文档当中&#xff0c;它的 Highlights 核心特性 Expressive API Highly performant No dependencies Ability to nest styles 256/Truecolor color support Auto-…

Android中通过属性动画实现文字轮播效果

前些天发现了一个蛮有意思的人工智能学习网站,8个字形容一下"通俗易懂&#xff0c;风趣幽默"&#xff0c;感觉非常有意思,忍不住分享一下给大家。 &#x1f449;点击跳转到教程 一、创建一个自定义ProvinceView类,具体代码如下 /*** Author: ly* Date: 2024/2/22* D…

【服务器】服务器推荐

一、引言 在数字世界的浪潮中&#xff0c;服务器作为数据存储和处理的基石&#xff0c;其重要性不言而喻。而在这个繁星点点的市场中&#xff0c;雨云以其独特的优势和超高的性价比&#xff0c;逐渐成为众多企业和个人的首选。今天&#xff0c;就让我带你走进雨云的世界&#…

2024 Sora来了!“手机Agent智能体”也来了!

近日&#xff0c;Open AI发布了能够根据文本生成超现实视频的工具Sora&#xff0c;多款震撼视频引爆科技圈刷屏&#xff0c;热度持续发酵占据AI领域话题中心&#xff0c;被认为是AGI实现过程里的重大里程碑事件。新一轮的人工智能浪潮给人类未来的生产和生活方式带来巨大而深远…

盘点被吹爆的桌面便签小工具

桌面便签小工具有很多&#xff0c;任何一款桌面便签小工具都有它的优缺点&#xff0c;而那些被吹爆了好用的桌面便签小工具往往是优点远多于缺点&#xff0c;从而深受用户的喜爱&#xff0c;今天我们来给大家盘点一款被很多行业吹爆了的桌面便签小工具&#xff1a;好用便签。 …

[论文精读]Do Transformers Really Perform Bad for Graph Representation?

论文网址&#xff1a;[2106.05234] Do Transformers Really Perform Bad for Graph Representation? (arxiv.org) 论文代码&#xff1a;https://github.com/Microsoft/Graphormer 英文是纯手打的&#xff01;论文原文的summarizing and paraphrasing。可能会出现难以避免的拼…

EAP-TLS实验之Ubuntu20.04环境搭建配置(FreeRADIUS3.0)(二)

上篇文章简要介绍了freeradius的搭建及配置&#xff0c;在最后数据库连接阶段还没进行测试验证&#xff0c;今天继续。 修改相关文件 1 radiusd.conf 打开762行注释&#xff08;&#xff04;INCLUDE mods-enabled/sql&#xff09;&#xff1b; 2 sites-available/default …