Flink的KeyedProcessFunction基于Event Time和Process Time的定时器用法实例分析

news2025/1/22 18:56:01

FLink处理函数简介

在Flink底层,我们可以不定义任何具体的算子(比如 map,filter,或者 window),而只是提炼出一个统一的【处理】(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑,所以这一层接口就被叫作【处理函数】(process function)。在处理函数中,我们直面的就是数据流中最基本的元素:数据事件(event)、状态(state)以及时间(time)。这就相当于对流有了完全的控制权。处理函数比较抽象,没有具体的操作,所以对于一些常见的简单应用(比如求和、开窗口)会显得有些麻烦;不过正是因为它不限定具体做什么,所以理论上我们可以做任何事情,实现所有需求。

Flink几种处理函数简介

  1. ProcessFunction是用于处理数据流的通用函数。它是一个抽象类,定义了处理数据流的常用方法,如processElement,onTimer等。您可以扩展ProcessFunction类并重写这些方法,以便在Flink程序中执行复杂的数据流处理逻辑。
  2. KeyedProcessFunction是ProcessFunction的特殊类型,用于处理带有键的数据流。它定义了额外的方法,如getKey,context.timerService()等,用于访问数据流中每个元素的键以及在处理函数中安排定时器。
  3. ProcessWindowFunction和ProcessAllWindowFunction是用于处理时间窗口的特殊函数。它们提供了一个process方法,用于在每个窗口中对数据进行处理。ProcessWindowFunction接受带有键的数据流,并且每个窗口都对应于一个键,而ProcessAllWindowFunction接受不带键的数据流,并且每个窗口都包含整个数据流。

这里重点介绍KeyedProcessFunction,KeyedProcessFunction是用来处理KeyedStream的。每有一个数据进入算子,则会触发一次processElement()的处理。它还提供了定时器的功能,在在预警、监控等场景特定场景下,非常适合。
KeyedProcessFunction定时器包分为两种:基于事件时间、基于处理时间。下面以统计计数的方式展示这两种定时器的用法,并附上详细的分析思路。以下用例基于Flink1.14

实例分析

KeyedProcessFunction基于事件时间的定时器

代码:


import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;

/**
 * @description:
 *
 * @author pony
 * @date 2024/1/17 20:55
 * @version 1.0
 * nc -l 9999
 */
public class KeyedProcessFunctionOnTimerEventTime {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
                .<String>forBoundedOutOfOrderness(Duration.ofSeconds(60))
                .withTimestampAssigner(new SerializableTimestampAssigner<String>() {
                    @Override
                    public long extractTimestamp(String element, long recordTimestamp) {
                        return Long.valueOf(element.split(",")[1]);
                    }
                })
                .withIdleness(Duration.ofSeconds(1));

        DataStream<Tuple2<String, Long>> stream0 = env.socketTextStream("x.x.x.x", 9999)
                .assignTimestampsAndWatermarks(watermarkStrategy) //必须在数据源上指定watermark
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(String value) throws Exception {
                        return new Tuple2<String, Long>(value.split(",")[0], Long.valueOf(value.split(",")[1]));
                    }
                });

        // apply the process function onto a keyed stream
        DataStream<Tuple2<String, Long>> result = stream0
                .keyBy(value -> value.f0)
                .process(new CountEventTimeWithTimeoutFunction());

        result.print();

        env.execute("KeyedProcessFunction wordCount");
    }

    /**
     * The implementation of the ProcessFunction that maintains the count and timeouts
     */
    static class CountEventTimeWithTimeoutFunction
            extends KeyedProcessFunction<String, Tuple2<String, Long>, Tuple2<String, Long>> {

        private ValueState<Long> state;
        private static final Integer DELAY = 1000; //1s

        @Override
        public void open(Configuration parameters) throws Exception {
            state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", Long.class));
        }

        @Override
        public void processElement(
                Tuple2<String, Long> value,
                Context ctx,
                Collector<Tuple2<String, Long>> out) throws Exception {

            Long current = state.value();
            if (current == null) {
                current = 0L;
            }
            current++;
            state.update(current);
            //获取当前数据流的水位线
            long currentWatermark = ctx.timerService().currentWatermark();

//            long timer = ctx.timestamp() + DELAY;//设置定时器的时间为当前event time+DELAY
            long timer = currentWatermark + DELAY;//设置定时器的时间为当前水位线+DELAY
            //注册事件时间定时器,与watermark绑定,必须满足条件: watermark >= timer 来触发特定event的定时器
            ctx.timerService().registerEventTimeTimer(timer);

            //删除事件时间定时器
            if (currentWatermark < 0) {
                ctx.timerService().deleteEventTimeTimer(timer);
            }

            System.out.println("last Watermark: " + currentWatermark + ", format: " + time(currentWatermark));

            // 打印信息,用于核对数据
            System.out.println(String.format("processElement: %s, %d, ctx.timestamp() : %d (%s), timer : %d (%s)\n",
                    ctx.getCurrentKey(),
                    current,
                    ctx.timestamp(),
                    time(ctx.timestamp()),
                    timer,
                    time(timer)));

        }

        @Override
        public void onTimer(
                long timestamp, //定时器触发时间,等于以上的timer
                OnTimerContext ctx,
                Collector<Tuple2<String, Long>> out) throws Exception {
            // 取得当前单词
            String currentKey = ctx.getCurrentKey();
            // get the state for the key that scheduled the timer
            Long result = state.value();

            // 打印数据,用于核对是否符合预期
            System.out.println(String.format("onTimer: %s, %d, ctx.timestamp() : %d (%s), timestamp : %d (%s)\n",
                    currentKey,
                    result,
                    ctx.timestamp(),
                    time(ctx.timestamp()),
                    timestamp,
                    time(timestamp)));
            System.out.println("current Watermark: " + ctx.timerService().currentWatermark() + ", format: " + time(ctx.timerService().currentWatermark()));
            
            out.collect(new Tuple2<String, Long>(currentKey, result));

        }

        @Override
        public void close() throws Exception {
            super.close();
            state.clear();
        }
    }

    public static String time(long timeStamp) {
        return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(timeStamp));
    }
}

测试数据:

nc -l 9999
a1,1704038400000
a1,1704038401000
a1,1704038403000

运行结果:
在这里插入图片描述

KeyedProcessFunction基于处理时间的定时器

代码:

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;

/**
 * @description:
 *
 * @author pony
 * @date 2024/1/17 20:55
 * @version 1.0
 * nc -l 9999
 */
public class KeyedProcessFunctionOnTimerProcessTime {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
                .<String>forBoundedOutOfOrderness(Duration.ofSeconds(60))
                .withTimestampAssigner(new SerializableTimestampAssigner<String>() {
                    @Override
                    public long extractTimestamp(String element, long recordTimestamp) {
//                        return System.currentTimeMillis();
                        return Long.valueOf(element.split(",")[1]);
                    }
                })
                .withIdleness(Duration.ofSeconds(1));

        DataStream<Tuple2<String, Long>> stream0 = env.socketTextStream("x.x.x.x", 9999)
                .assignTimestampsAndWatermarks(watermarkStrategy) //必须在数据源上指定watermark
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(String value) throws Exception {
                        return new Tuple2<String, Long>(value.split(",")[0], Long.valueOf(value.split(",")[1]));
                    }
                });

        // apply the process function onto a keyed stream
        DataStream<Tuple2<String, Long>> result = stream0
                .keyBy(value -> value.f0)
                .process(new CountProcessTimeWithTimeoutFunction());

        result.print();

        env.execute("KeyedProcessFunction wordCount");
    }

    static class CountProcessTimeWithTimeoutFunction
            extends KeyedProcessFunction<String, Tuple2<String, Long>, Tuple2<String, Long>> {

        private ValueState<Long> state;
        private static final Integer DELAY = 60 * 1000; //1s

        @Override
        public void open(Configuration parameters) throws Exception {
            state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", Long.class));
        }

        @Override
        public void processElement(
                Tuple2<String, Long> value,
                Context ctx,
                Collector<Tuple2<String, Long>> out) throws Exception {

            Long current = state.value();
            if (current == null) {
                current = 0L;
            }
            current++;
            state.update(current);

            long timer = ctx.timestamp() + DELAY;//设置定时器的时间为当前event time+DELAY
            //注册处理时间定时器, 与watermark无关,定时器触发条件:当前系统时间>timer
            ctx.timerService().registerProcessingTimeTimer(timer);
            //删除处理时间定时器
//            ctx.timerService().deleteProcessingTimeTimer(timer);

            System.out.println("processElement currentProcessingTime: " + ctx.timerService().currentProcessingTime() + ", format: " + time(ctx.timerService().currentProcessingTime()));
            // 打印所有信息,用于核对数据
            System.out.println(String.format("processElement: %s, %d, ctx.timestamp() : %d (%s), timer : %d (%s)\n",
                    ctx.getCurrentKey(),
                    current,
                    ctx.timestamp(),
                    time(ctx.timestamp()),
                    timer,
                    time(timer)));
        }

        @Override
        public void onTimer(
                long timestamp,
                OnTimerContext ctx,
                Collector<Tuple2<String, Long>> out) throws Exception {
            // 取得当前单词
            String currentKey = ctx.getCurrentKey();
            // get the state for the key that scheduled the timer
            Long result = state.value();

            System.out.println("onTimer currentProcessingTime: " + ctx.timerService().currentProcessingTime() + ", format: " + time(ctx.timerService().currentProcessingTime()));
            // 打印数据,用于核对是否符合预期
            System.out.println(String.format("onTimer: %s, %d, ctx.timestamp() : %d (%s), timestamp : %d (%s)\n",
                    currentKey,
                    result,
                    ctx.timestamp(),
                    time(ctx.timestamp()),
                    timestamp,
                    time(timestamp)));

            //另外还支持侧流
            OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("single"){};
            if (result < 2) {
                ctx.output(outputTag, new Tuple2<>(currentKey, result));
            } else {
                out.collect(new Tuple2<String, Long>(currentKey, result));
            }

        }

        @Override
        public void close() throws Exception {
            super.close();
            state.clear();
        }
    }

    public static String time(long timeStamp) {
        return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(timeStamp));
    }
}

测试数据:

nc -l 9999
a,1705568024000    
a,1705568024000

运行结果:
在这里插入图片描述

总结

在真实业务场景中【 KeyedProcessFunction基于处理时间的定时器】用的比较多,比较符合业务场景,即根据事件的时间来指定处理时间去定时触发定时器。因此在此场景中,可以不指定watermarkStrategy,可以获取传输参数的时间时间来定时触发定时器。

参考:
Process Function
Generating Watermarks

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1394409.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

动手学深度学习5 矩阵计算

矩阵计算--矩阵怎么求导数 1. 导数和微分2. 偏导数3. 梯度1. 向量-标量求导2. 向量-向量求导3. 拓展到矩阵 4. 链式法则5. 小结QA练习 课程安排&#xff1a; 视频&#xff1a;https://www.bilibili.com/video/BV1eZ4y1w7PY/?spm_id_fromautoNext&vd_sourceeb04c9a33e87ce…

【复现】SpringBlade SQL 注入漏洞_22

目录 一.概述 二 .漏洞影响 三.漏洞复现 1. 漏洞一&#xff1a; 四.修复建议&#xff1a; 五. 搜索语法&#xff1a; 六.免责声明 一.概述 SpringBlade 是由一个商业级项目升级优化而来的SpringCloud微服务架构&#xff0c;采用Java8 API重构了业务代码&#xff0c;完全…

具有中国特色的普及工厂数字化转型的新路子

工业互联网浪潮来袭&#xff0c;你准备好了吗&#xff1f; 国家智能制造专委会委员、浙江省智能制造专家委员会毛光烈主任在“第七届中国工业大数据大会”上的演讲&#xff0c;《具有中国特色的普及工厂数字化转型的新路子》&#xff0c;阐述了关于工厂订单全流程业务数据体系运…

C++核心编程之通过类和对象的思想对文件进行操作

目录 ​​​​​​​一、文件操作 1. 文件类型分类&#xff1a; 2. 操作文件的三大类 二、文本文件 1.写文件 2.读文件 三、二进制文件 1.写二进制文件 2.读二进制文件 一、文件操作 程序运行时产生的数据都属于临时数据,程序一旦运行结束都会被释放 通过文件可以将…

GPT APP的开发步骤

开发一个GPT&#xff08;Generative Pre-trained Transformer&#xff09; Store&#xff08;存储&#xff09;涉及到使用预训练的语言模型&#xff08;例如GPT-3&#xff09;来生成和管理内容。以下是一般的步骤&#xff0c;希望对大家有所帮助。北京木奇移动技术有限公司&…

2024年美国大学生数学建模思路 - 案例:异常检测

文章目录 赛题思路一、简介 -- 关于异常检测异常检测监督学习 二、异常检测算法2. 箱线图分析3. 基于距离/密度4. 基于划分思想 建模资料 赛题思路 &#xff08;赛题出来以后第一时间在CSDN分享&#xff09; https://blog.csdn.net/dc_sinor?typeblog 一、简介 – 关于异常…

适合进阶学习的 机器学习 开源项目(可快速下载)

目录 开源项目合集[>> 开源的机器学习平台&#xff1a;mlflow/mlflow](https://gitcode.com/mlflow/mlflow)[>> 机器学习路线图&#xff1a;mrdbourke/machine-learning-roadmap](https://gitcode.com/mrdbourke/machine-learning-roadmap)[>> 机器学习理论和…

VsCode 常见的配置

转载&#xff1a;Visual Studio Code 常见的配置、常用好用插件以及【vsCode 开发相应项目推荐安装的插件】 - 知乎 (zhihu.com) 一、VsCode 常见的配置 1、取消更新 把插件的更新也一起取消了 2、设置编码为utf-8&#xff1a;默认就是了&#xff0c;不用设置了 3、设置常用的…

阿里云云原生弹性方案:用弹性解决集群资源利用率难题

作者&#xff1a;赫曦 随着上云的认知更加普遍&#xff0c;我们发现除了以往占大部分的互联网类型的客户&#xff0c;一些传统的企业&#xff0c;一些制造类的和工业型企业客户也都开始使用云原生的方式去做 IT 架构的转型&#xff0c;提高集群资源使用率也成为企业上云的一致…

【51单片机】数码管的静态与动态显示(含消影)

数码管在现实生活里是非常常见的设备&#xff0c;例如 这些数字的显示都是数码管的应用。 目录 静态数码管&#xff1a;器件介绍&#xff1a;数码管的使用&#xff1a;译码器的使用&#xff1a;缓冲器&#xff1a; 实现原理&#xff1a;完整代码&#xff1a; 动态数码管&#…

Linux Shell脚本入门

目录 介绍 编写格式与执行方式 Shell脚本文件编写规范 脚本文件后缀名规范 首行格式规范 注释格式 shell脚本HelloWord入门案例 需求 效果 实现步骤 脚本文件的常用执行三种方式 介绍 3种方式的区别 小结 多命令处理 Shell变量 环境变量 目标 Shell变量的介绍 变量类型 系统环境…

Java 方法中参数类型后写了三个点?什么意思?

1、...代表什么意思&#xff1f; 2、如何使用 3、注意事项 4、两个list&#xff0c;一个新的&#xff0c;一个旧的&#xff0c;旧列表中可能有新列表中存在的数据&#xff0c;也可能存在新列表中不存在的数据&#xff08;注&#xff1a;新旧列表中都不存在重复元素&#xff09;…

【数据结构】堆:堆的构建,堆的向上调整算法,堆的向下调整算法、堆排序

目录 一、堆的定义 1、堆的定义&#xff1a; 2、根节点与其左、右孩子间的联系 二、堆的创建 1、堆的向下调整算法 2、堆的向上调整算法 三、堆排序 一、堆的定义 1、堆的定义&#xff1a; 堆可以被看作是一棵完全二叉树的数组对象。即在存储结构上是数组&#xff0c…

2024 年 10 款最佳 Windows 免费分区管理器软件

买了一台现成的全新电脑&#xff0c;出于多种原因希望对硬盘进行分区&#xff0c;例如&#xff0c;为了更好地组织文件。我们整理了一份最佳分区软件列表&#xff0c;可以帮助您轻松完成这项任务。 适用于 Windows 11/10/8.1/8/7 的最佳 10 个磁盘分区工具 1.奇客分区大师 兼容…

vue3自定义按钮点击变颜色(切换)

实现效果图&#xff1a; 默认选中第一个按钮&#xff0c;未选中按钮为粉色&#xff0c;点击时颜色变为红色 利用动态类名&#xff0c;当定义isChange数值和下标index相同时&#xff0c;赋予act类名&#xff0c;实现变色效果 <template><view class"page"&g…

FPGA 多路分频器实验

1 概述 在 FPGA 中&#xff0c;时钟分频是经常用到的。本节课讲解 2 分频、3 分频、4 分频和 8 分频的 Verilog 实现并且学习 generate 语法功能的应。 2 程序设计思路 1&#xff09;整数倍分频&#xff0c;为 2、4、8&#xff0c;这种 2^n 次方倍数倍数关系的…

Spring Security 中 Authentication和Authorization的区别

Authentication Spring Security提供了全面的认证支持。认证是用来验证试图访问特定资源的用户身份的方式。验证用户的常见方式是要求用户输入用户名和密码。一旦认证完成&#xff0c;我们就知道了用户的身份并且可以进行授权。 Spring Security内置支持对用户进行认证。 简…

el-date-picker如果超过限制跨度则提示

需求&#xff1a;实现日期时间选择组件跨度如果超过限制天数&#xff0c;点击查询则提示超过限制时间 封装一个方法&#xff0c;传入开始和结束时间以及限制天数&#xff0c;如果超过则返回false //计算时间跨度是否超过限制天数isTimeSpanWithinLimit(startTime, endTime, li…

Android Text View 去掉默认的padding的实现方法

先看下最终实现效果&#xff0c;满意您在往下看&#xff1a; TextView 绘制的时候自带一定的Padding值&#xff0c;要想实现去掉默认的padding值&#xff0c;xml文件可以设置一个属性值 &#xff1a; android:includeFontPadding"false" 然后运行起来就会发现&…

【C++干货铺】红黑树 (Red Black Tree)

个人主页点击直达&#xff1a;小白不是程序媛 C系列专栏&#xff1a;C干货铺 代码仓库&#xff1a;Gitee 目录 前言 红黑树的概念 红黑树的性质 红黑树结点的定义 红黑树的插入操作 插入新的结点 检查规则进行改色 情况一 情况二 情况三 插入完整代码 红黑树的验…