Flink 处理函数(1)—— 基本处理函数

news2025/1/8 12:48:27

在 Flink 的多层 API中,处理函数是最底层的API,是所有转换算子的一个概括性的表达,可以自定义处理逻辑

在处理函数中,我们直面的就是数据流中最基本的元素:数据事件(event)、状态(state)以及时间(time)。这就相当于对流有了完全的控制权

基本处理函数主要是定义数据流的转换操作,其所对应的函数类为ProcessFunction


处理函数的功能和使用

对于常用的转换算子来说:

  • MapFunction只能获取到当前的数据;
  • AggregateFunction 中除数据外,还可以获取到当前的状态(以累加器 Accumulator 形式出现);
  • RichMapFunction提供了获取运行时上下文的方法 getRuntimeContext();

但是无论那种算子,如果我们想要访问事件的时间戳,或者当前的水位线信息,都是完全做不到的

与时间相关的操作只能用时间窗口去处理,但如果要求对时间有更精细的控制,需要能够获取水位线,甚至要“把控时间”、定义什么时候做什么事,这就不是基本的时间窗口能够实现的了

因此需要使用处理函数

  • 处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册“定时事件”
  • 处理函数继承了 AbstractRichFunction 抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息
  • 处理函数还可以直接将数据输出到侧输出流(side output)中

处理函数的简单使用:基于 DataStream 调用.process()方法就;方法需要传入一个 ProcessFunction 作为参数,用来定义处理逻辑:

stream.process(new MyProcessFunction())

简单示例:

public class ProcessFunctionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env
                .addSource(new ClickSource())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Event>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                                    @Override
                                    public long extractTimestamp(Event event, long l) {
                                        return event.timestamp;
                                    }
                                })
                )
                .process(new ProcessFunction<Event, String>() {
                    @Override
                    public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
                        if (value.user.equals("Mary")) {
                            out.collect(value.user);
                        } else if (value.user.equals("Bob")) {
                            out.collect(value.user);
                            out.collect(value.user);
                        }
                        System.out.println(ctx.timerService().currentWatermark());
                    }
                })
                .print();

        env.execute();
    }
}

ProcessFunction 中重写了.processElement()方法(参数:输入,上下文对象,输出),自定义处理逻辑

ProcessFunction 解析

源码解析

源码如下:

public abstract class ProcessFunction<I, O> extends AbstractRichFunction {

    private static final long serialVersionUID = 1L;

    /**
     * Process one element from the input stream.
     *
     * <p>This function can output zero or more elements using the {@link Collector} parameter and
     * also update internal state or set timers using the {@link Context} parameter.
     *
     * @param value The input value.
     * @param ctx A {@link Context} that allows querying the timestamp of the element and getting a
     *     {@link TimerService} for registering timers and querying the time. The context is only
     *     valid during the invocation of this method, do not store it.
     * @param out The collector for returning result values.
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the
     *     operation to fail and may trigger recovery.
     */
    public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;

    /**
     * Called when a timer set using {@link TimerService} fires.
     *
     * @param timestamp The timestamp of the firing timer.
     * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,
     *     querying the {@link TimeDomain} of the firing timer and getting a {@link TimerService}
     *     for registering timers and querying the time. The context is only valid during the
     *     invocation of this method, do not store it.
     * @param out The collector for returning result values.
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the
     *     operation to fail and may trigger recovery.
     */
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}

    /**
     * Information available in an invocation of {@link #processElement(Object, Context, Collector)}
     * or {@link #onTimer(long, OnTimerContext, Collector)}.
     */
    public abstract class Context {

        /**
         * Timestamp of the element currently being processed or timestamp of a firing timer.
         *
         * <p>This might be {@code null}, for example if the time characteristic of your program is
         * set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
         */
        public abstract Long timestamp();

        /** A {@link TimerService} for querying time and registering timers. */
        public abstract TimerService timerService();

        /**
         * Emits a record to the side output identified by the {@link OutputTag}.
         *
         * @param outputTag the {@code OutputTag} that identifies the side output to emit to.
         * @param value The record to emit.
         */
        public abstract <X> void output(OutputTag<X> outputTag, X value);
    }

    /**
     * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
     */
    public abstract class OnTimerContext extends Context {
        /** The {@link TimeDomain} of the firing timer. */
        public abstract TimeDomain timeDomain();
    }
}

可以看到抽象类 ProcessFunction 继承了 AbstractRichFunction,有两个泛型类型参数:

I 表示 Input,也就是输入的数据类型;O 表示 Output,也就是处理完成之后输出的数据类型

其内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()

  • .processElement():用于“处理元素”,定义了处理的核心逻辑。这个方法对于流中的每个元素都会调用一次,参数包括三个:输入数据值 value,上下文 ctx,以及“收集器”(Collector)out。方法没有返回值,处理之后的输出数据是通过收集器 out 来定义
    • value:当前流中的输入元素,也就是正在处理的数据,类型与流中数据类型一致
    • ctx:类型是 ProcessFunction 中定义的内部抽象类 Context,表示当前运行的上下文,可以获取到当前的时间戳,并提供了用于查询时间和注册定时器的“定时服务”(TimerService),以及可以将数据发送到“侧输出流”(side output)的方法.output()
    • out:“收集器”(类型为 Collector),用于返回输出数据。使用方式与 flatMap算子中的收集器完全一样,直接调用 out.collect()方法就可以向下游发出一个数据。这个方法可以多次调用,也可以不调用
  • .onTimer():用于定义定时触发的操作;这个方法只有在注册好的定时器触发的时候才会调用(在 Flink 中,只有“按键分区流”KeyedStream才支持设置定时器的操作),而定时器是通过“定时服务”TimerService 来注册的
    • 参数:时间戳(timestamp),上下文(ctx),收集器(out)【这里的时间戳是指设置好的触发时间,在事件时间语义下就是水位线

利用onTimer,可以自定义数据按照时间分组、定时触发计算输出结果,这样就实现了窗口的功能

处理函数分类

  1. ProcessFunction:最基本的处理函数,基于 DataStream 直接调用.process()时作为参数传入
  2. KeyedProcessFunction:对流按键分区后的处理函数,基于 KeyedStream 调用.process()时作为参数传入(要想使用定时器必须基于 KeyedStream )
  3. ProcessWindowFunction:开窗之后的处理函数,也是全窗口函数的代表。基于 WindowedStream 调用.process()时作为参数传入
  4. ProcessAllWindowFunction:开窗之后的处理函数,基于 AllWindowedStream 调用.process()时作为参数传入
  5. CoProcessFunction:合并(connect)两条流之后的处理函数,基于 ConnectedStreams 调用.process()时作为参数传入
  6. ProcessJoinFunction:间隔连接(interval join)两条流之后的处理函数,基于 IntervalJoined 调用.process()时作为参数传入
  7. BroadcastProcessFunction:广播连接流处理函数,基于 BroadcastConnectedStream 调用.process()时作为参数传入(这里的“广播连接流”BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream 与一个广播流(BroadcastStream)做连接(conncet)之后的产物)
  8. KeyedBroadcastProcessFunction:按键分区的广播连接流处理函数,同样是基于BroadcastConnectedStream 调用.process()时作为参数传入(这时的广播连接流,是一个 KeyedStream与广播流(BroadcastStream)做连接之后的产物)

学习课程链接:【尚硅谷】Flink1.13实战教程(涵盖所有flink-Java知识点)_哔哩哔哩_bilibili

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1386635.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Github仓库使用方式

主要参考&#xff1a; 「详细教程」使用git将本地项目上传至Github仓库&#xff08;MacOS为例&#xff09;_github上传代码到仓库-CSDN博客 新建文件夹参考&#xff1a; GitHub使用指南——建立仓库、建立文件夹、上传图片详细教程-CSDN博客 一、新建一个 github 仓库&#…

区块链是怎么存储数据的?

每个块都是有大小限制的新的数据存储单元&#xff0c;当前数据不到上限&#xff0c;那么都可以添加进块。当前数据达到了上限&#xff0c;那么就得分表/分块&#xff0c;超限的那部分数据就需要等待下个区块存储 存储的数据&#xff1a;和mysql一样&#xff0c;文本数据直接存储…

vue3 锚点定位 点击滚动高亮

功能描述 点击导航跳到对应模块的起始位置&#xff0c;并且高亮点击的导航&#xff1b; 滚动到相应的模块时&#xff0c;对应的导航也自动高亮&#xff1b; 效果展示 注意事项 一定要明确哪个是要滚动的盒子&#xff1b;滚动的高度要减去导航栏的高度&#xff1b;当前在导航1…

RFID技术在汽车装备中的应用:提升安全性与效率

RFID技术在汽车装备中的应用&#xff1a;提升安全性与效率 射频识别&#xff08;RFID&#xff09;技术是一种非接触式的自动识别技术&#xff0c;它利用射频信号及其空间耦合和传输特性&#xff0c;实现对目标对象的信息读写。随着汽车工业的不断发展&#xff0c;汽车装备的技…

YOLOv8目标检测中数据集各部分的作用

自学答疑使用&#xff0c;持续更新… 在目标检测任务中&#xff0c;通常将整个数据集划分为训练集&#xff08;training set&#xff09;、验证集&#xff08;validation set&#xff09;和测试集&#xff08;test set&#xff09;。这三个数据集在训练和评估过程中具有不同的…

IT行业——香港优才计划低分段申请之王!

IT行业——香港优才计划低分段申请之王&#xff01; 香港优才计划获批率确实没那么看重分数&#xff0c;看了下2023年的获批案例&#xff0c;一些申请者的基础分数刚过80分&#xff0c;照样顺利获批拿到了香港身份&#xff01; 今天就拿真实数据来分析&#xff0c;盘点那些低分…

JS常用插件 Swiper插件 实现轮播图

Swiper介绍 Swiper 是一款免费以及轻量级的移动设备触控滑块的js框架 中文官网地址: https://www.swiper.com.cn/ 点击查看Swiper演示&#xff0c;里面的功能和样式十分丰富&#xff0c;根据自己的需求选择 中文教程中详细介绍了如何使用Swiper API文档中介绍了各个模块以及参…

gitee完整使用教程,创建项目并上传

目录 一 什么是gitee 二 安装Git 三 登录gitee&#xff0c;生成密钥 四 配置SSH密钥 五 创建项目 六 克隆仓库到本地 七 关联本地工程到远程仓库 八 添加文件 九 异常处理 十 删除仓储 十一 git常用命令 一 什么是gitee gitee是开源中国推出的基于git的代码托管服务…

我成为开源贡献者的原因竟然是做MySql-CDC数据同步

今年下半年机缘巧合下公司决定搭建自己的数据中台&#xff0c;中台的建设势必少不了数据集成。首先面临的就是数据集成技术选型的问题&#xff0c;按照社区活跃度、数据源适配性、同步效率等要求对市面上几个成熟度较高的开源引擎进行了深度调研。 最终经过内部讨论决定用Apac…

智能安全帽识别系统简析

在工业安全领域&#xff0c;安全帽识别系统的重要性不言而喻。这种系统利用先进的图像识别技术&#xff0c;确保工地上每位工人都佩戴安全帽&#xff0c;从而大幅提升工作场所的安全性。本文旨在探讨这一系统的工作原理、应用范围以及面临的挑战。 安全帽识别系统的工作原理 智…

查找算法(部分)

顺序查找 顺序查找是最简单的了&#xff0c;属于无序查找算法&#xff0c;它的原理就是从前往后一个一个的找&#xff0c;如果找到了就返回它的位置&#xff0c;否则就返回-1。 如果有多个相同元素的话&#xff0c;返回第一个该元素的位置。 代码&#xff1a; #include<…

高通开发系列 - RT补丁死机问题scheduling while atomic

By: fulinux E-mail: fulinux@sina.com Blog: https://blog.csdn.net/fulinus 喜欢的盆友欢迎点赞和订阅! 你的喜欢就是我写作的动力! 返回:专栏总目录 目录 背景概述问题现象了解RTOS如何使用高分辨率计时器?RT-mutex 实现设计线程化的中断处理程序睡眠spinlock

图解python | 字符串及操作

1.Python元组 Python的元组与列表类似&#xff0c;不同之处在于元组的元素不能修改。 元组使用小括号&#xff0c;列表使用方括号。 元组创建很简单&#xff0c;只需要在括号中添加元素&#xff0c;并使用逗号隔开即可。 tup1 (ByteDance, ShowMeAI, 1997, 2022) tup2 (1…

通过FTP和HTTPD,搭建内网yum仓库

一、yum仓库的简介 1.yum介绍 yum是一个基于RPM包&#xff08;是Red-Hat Package Manager红帽软件包管理器的缩写&#xff09;构建的软件更新机制&#xff0c;能够自动解决软件包之间的依赖关系。解决了日常工作中的大量查找安装依赖包的时间 为什么会有依赖关系的发生 因为li…

GPT图解大模型是怎样构建的

❤️作者主页&#xff1a;小虚竹 ❤️作者简介&#xff1a;大家好,我是小虚竹。2022年度博客之星评选TOP 10&#x1f3c6;&#xff0c;Java领域优质创作者&#x1f3c6;&#xff0c;CSDN博客专家&#x1f3c6;&#xff0c;华为云享专家&#x1f3c6;&#xff0c;掘金年度人气作…

网络文件共享服务

目录 一.存储类型&#xff1a; 1.存储类型分为三种&#xff1a; 2.NAS的概述&#xff1a; 3.SAN的概述&#xff1a; 4.DAS的概述&#xff1a; 二.FTP 文件传输协议&#xff1a; 1.FTP工作原理&#xff1a; 2.FTP的两种模式&#xff1a; 3.FTP的用户认证&#xff1a; 三…

Ncast盈可视高清智能录播系统busiFacade RCE漏洞(CVE-2024-0305)

产品介绍 Ncast盈可视高清智能录播系统是一套新进的音视频录制和播放系统&#xff0c;旨在提供高质量&#xff0c;高清定制的录播功能。 漏洞描述 广州盈可视电子科技有限公司的高清智能录播系统存在信息泄露漏洞(CVE-2024-0305)&#xff0c;攻击者可通过该漏洞&#xff0c;…

【HarmonyOS4.0】第十篇-ArkUI布局容器组件(二)

三、层叠布局容器&#xff08;Stack&#xff09; 堆叠容器组件 Stack的布局方式是把子组件按照设置的对齐方式顺序依次堆叠&#xff0c;后一个子组件覆盖在前一个子组件上边。 注意&#xff1a;Stack 组件层叠式布局&#xff0c;尺寸较小的布局会有被遮挡的风险&#xff0c; …

【PyQt小知识 - 7】:QLineEdit设置输入的文本以圆点或星号等方式显示

文章目录 setEchoMode setEchoMode 在PyQt中&#xff0c;QLineEdit是一种用于接收用户输入的小部件&#xff08;widget&#xff09;。setEchoMode是QLineEdit类中的一个方法&#xff0c;可以用于设置文本输入框中的文本显示模式。它接受一个参数来指定要使用的模式。 setEcho…

springIoc依赖注入循环依赖三级缓存

springIoc的理解&#xff0c;原理和实现 控制反转&#xff1a; 理论思想&#xff0c;原来的对象是由使用者来进行控制&#xff0c;有了spring之后&#xff0c;可以把整个对象交给spring来帮我们进行管理 依赖注入DI&#xff1a; 依赖注入&#xff0c;把对应的属性的值注入到…