【Flink】Flink 处理函数之基本处理函数(一)

news2024/11/26 15:27:35

1. 处理函数介绍

流处理API,无论是基本的转换聚合、还是复杂的窗口操作,都是基于DataStream进行转换的,所以统称为DataStreamAPI,这是Flink编程的核心。

但其实Flink为了更强大的表现力和易用性,Flink本身提供了多层API,DataStreamAPI只是中间一环,如下图所示:在这里插入图片描述
在更底层,Flink可以不定义任何具体的算子(比如 mapfilter,或者 window),而只是提炼出一个统一的“处理”(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑,所以这一层接口就被叫作“处理函数”(process function)

在处理函数中,操作的就是数据流中最基本的元素:数据事件(event)状态(state)以及时间(time)。这就相当于对流有了完全的控制权。处理函数比较抽象,没有具体的操作,所以对于一些常见的简单应用(比如求和、开窗口)会显得有些麻烦;不过正是因为它不限定具体做什么,所以理论上我们可以做任何事情,实现所有需求。

2. 处理函数的分类

DataStream 在调用一些转换方法之后,有可能生成新的流类型;例如调用.keyBy()之后得到 KeyedStream,进而再调用.window()之后得到 WindowedStream。对于不同类型的流,其实都可以直接调用.process()方法进行自定义处理,这时传入的参数就都叫作处理函数。当然,它们尽管本质相同,都是可以访问状态和时间信息的底层 API,可彼此之间也会有所差异。

Flink 提供了 8 个不同的处理函数:

  • ProcessFunction
    最基本的处理函数,基于DataStream直接调用.process()时作为参数传入。
  • KeyedProcessFunction
    对流按键分区后的处理函数,基于KeyedStream调用.process()时作为参数传入。要想使用定时器,比如基于 KeyedStream
  • ProcessWindowFunction
    开窗之后的处理函数,也是全窗口函数的代表。基于 WindowedStream 调用.process()时作为参数传入。
  • ProcessAllWindowFunction
    同样是开窗之后的处理函数,基于 AllWindowedStream 调用.process()时作为参数传入。
  • CoProcessFunction
    合并(connect)两条流之后的处理函数,基于 ConnectedStreams 调用.process()时作为参数传入。
  • ProcessJoinFunction
    间隔连接(interval join)两条流之后的处理函数,基于 IntervalJoined 调用.process()时作为参数传入。
  • BroadcastProcessFunction
    广播连接流处理函数,基于 BroadcastConnectedStream 调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream与一个广播流(BroadcastStream)连接(conncet)之后的产物。
  • KeyedBroadcastProcessFunction
    按键分区的广播连接流处理函数,同样是基于BroadcastConnectedStream调用.process()时作为参数传入。与 BroadcastProcessFunction 不同的是,这时的广播连接流,是一个 KeyedStream广播流(BroadcastStream)做连接之后的产物。

2.1 基本处理函数(ProcessFunction)

处理函数主要是定义数据流的转换操作,所以也可以把它归到转换算子中。在Flink 中几乎所有转换算子都提供了对应的函数类接口,处理函数也不例外;它所对应的函数类,就叫作 ProcessFunction

2.1.1 处理函数的功能和使用

转换算子一般只是针对某种具体操作来定义的,能够拿到的信息比较有限。比如Map算子只能获取当前的数据;而想窗口聚合复杂的操作AggregateFunction 中除数据外,还可以获取到当前的状态(以累加器 Accumulator 形式出现)。另外还有富函数类,比如 RichMapFunction,它提供了获取运行时上下文的方法 getRuntimeContext(),可以拿到状态,还有并行度任务名称之类的运行时信息。

但无论那种算子,如果想要访问事件的时间戳,或者当前的水位线信息,都是获取不到的。但是处理函数可以获取,处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)时间戳(timestamp)水位线(watermark),甚至可以注册“定时事件”。而且处理函数继承了 AbstractRichFunction 抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函数还可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方法,可以实现各种自定义的业务逻辑;同时也是整个 DataStream API 的底层基础。

处理函数的使用与基本的转换操作类似,只需要直接基于 DataStream 调用.process()方法就可以了。方法需要传入一个 ProcessFunction 作为参数,用来定义处理逻辑。

stream.process(new MyProcessFunction())

这里 ProcessFunction 不是接口,而是一个抽象类,继承了 AbstractRichFunctionMyProcessFunction 是它的一个具体实现。所以所有的处理函数,都是富函数(RichFunction),富函数可以调用的东西这里同样都可以调用。

代码实例:

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env  = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<Event> stream = env.addSource(new CustomSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.getTimestamp();
                            }
                        })
                );
        stream.process(new ProcessFunction<Event, String>() {
            @Override
            public void processElement(Event value, ProcessFunction<Event, String>.Context ctx, Collector<String> out) throws Exception {
                out.collect(value.toString());
                if (value.getUser().equals("Mary")) {
                    out.collect(value.user + "click " + value.getUrl());
                } else if (value.getUser().equals("Alice")) {
                    out.collect(value.user);
                    out.collect(value.user);
                }
                System.out.println("timestamp:" + ctx.timestamp());
                System.out.println("watermark:" + ctx.timerService().currentWatermark());

                System.out.println(getRuntimeContext().getIndexOfThisSubtask());

            }

            @Override
            public void onTimer(long timestamp, ProcessFunction<Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
                super.onTimer(timestamp, ctx, out);
            }

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
            }

            @Override
            public void close() throws Exception {
                super.close();
            }
        }).print();

        env.execute();
    }

运行结果:
在这里插入图片描述

这里第一次的水位线的值其实是个默认值,Long.MIN_VALUE + outOfOrdernessMillis + 1;
在这里插入图片描述

然后每次下一次的水位线都是上一次的timestamp - 1

2.1.2 ProcessFunction 解析

抽象类 ProcessFunction 继承了 AbstractRichFunction,有两个泛型类型参数:I 表示 Input,也就是输入的数据类型;O 表示 Output,也就是处理完成之后输出的数据类型。

内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()。

public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
 ...
public abstract void processElement(I value, Context ctx, Collector<O> out) 
throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) 
throws Exception {}
...
}
2.1.2.1 抽象方法.processElement()

用于处理元素,定义了处理的核心逻辑。这个方法对流中的每个元素都会调用一次,参数包括三个: 输入数据值 value上下文 ctx,以及“收集器”(Collector)out。方法没有返回值,处理之后的输出数据是通过收集器 out 来定义的。

  • value: 当前流中的输入元素,也就是正在处理的数据,类型与流中数据类型一致。
  • cts:类型是ProcessFunction中定义的内部抽象类Context,表示当前运行的上下文,可以获取到当前的时间戳,并提供了用于查询时间和注册定时器的“定时服务”(TimerService),以及可以将数据发送到“侧输出流”(side output)的方法.output()

Context 抽象类定义如下:

public abstract class Context {
 public abstract Long timestamp();
 public abstract TimerService timerService();
 public abstract <X> void output(OutputTag<X> outputTag, X value);
}

  • out: “收集器”(类型为 Collector),用于返回输出数据。使用方式与 flatMap算子中的收集器完全一样,直接调用 out.collect()方法就可以向下游发出一个数据。这个方法可以多次调用,也可以不调用。

ProcessFunction 可以轻松实现flatMap这样的基本转换功能(当然 mapfilter 更不在话下);而通过富函数提供的获取上下文方法.getRuntimeContext(),也可以自定义状态(state)进行处理,这也就能实现聚合操作的功能了。

2.1.2.2 非抽象方法.onTimer()
@Override
public void onTimer(long timestamp, ProcessFunction<Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
    super.onTimer(timestamp, ctx, out);
}

用于定义定时触发的操作,这是一个非常强大、也非常有趣的功能。这个方法只有在注册好的定时器触发的时候才会调用,而定时器是通过“定时服务”TimerService 来注册的。打个比方,注册定时器(timer)就是设了一个闹钟,到了设定时间就会响;而.onTimer()中定义的,就是闹钟响的时候要做的事。所以它本质上是一个基于时间的“回调”(callback)方法,通过时间的进展来触发;在事件时间语义下就是由水位线(watermark)来触发了。

.processElement()类似,定时方法.onTimer()也有三个参数:时间戳(timestamp)上下文(ctx),以及收集器(out)。这里的timestamp是指设定好的触发时间,事件时间语义下当然就是水位线了。另外这里同样有上下文和收集器,所以也可以调用定时服务(TimerService),以及任意输出处理之后的数据。

既然有.onTimer()方法做定时触发,我们用ProcessFunction也可以自定义数据按照时间分组、定时触发计算输出结果;这其实就实现了窗口(window)的功能。所以说 ProcessFunction是真正意义上的终极奥义,用它可以实现一切功能。

处理函数都是基于事件触发的。水位线就如同插入流中的一条数据一样;只不过处理真正的数据事件调用的是.processElement()方法,而处理水位线事件调用的是.onTimer()

.onTimer()方法只是定时器触发时的操作,而定时器(timer)真正的设置需要用到上下文 ctx 中的定时服务。在Flink中,只有“按键分区流”KeyedStream才支持设置定时器的操作,所以之前的代码中并没有使用定时器。所以基于不同类型的流,可以使用不同的处理函数。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1554253.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

工厂数字化看板是什么?部署工厂数字化看板有什么作用?

随着工业4.0时代的来临&#xff0c;数字化转型已成为制造业发展的必然趋势。在这个背景下&#xff0c;工厂数字化看板作为一种高效的信息展示与管理工具&#xff0c;正逐渐受到越来越多企业的青睐。那么&#xff0c;什么是工厂数字化看板&#xff1f;部署工厂数字化看板又有哪些…

大数据做「AI大模型」数据清洗调优基础篇

关于本文 近期一直在协助做AI大模型数据清洗调优的工作&#xff0c;主要就是使用大数据计算引擎Spark做一些原始数据的清洗工作&#xff0c;整体数据量大约6PB-8PB之间&#xff0c;那么对于整个大数据量的处理性能将是一个重大的挑战&#xff0c;关于具体的调优参数配置项暂时不…

『VUE』04. 模板语法-属性绑定(详细图文注释)

目录 v-bind 属性绑定省略写法如果绑定的值是null或underfined 属性省略逻辑值绑定动态绑定总结 欢迎关注 『VUE』 专栏&#xff0c;持续更新中 欢迎关注 『VUE』 专栏&#xff0c;持续更新中 v-bind 属性绑定 Mustache 语法不能在 HTML 属性中使用&#xff0c;然而&#xff0…

上位机图像处理和嵌入式模块部署(qmacvisual非opencv算法编写)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 我们都知道&#xff0c;qmacvisual本身依赖于qtopencv的实现。大部分的界面都是依赖于qt的实现。图像算法部分&#xff0c;也是大部分都依赖于open…

python的神奇bug2

今天测试出一个很诡异的bug&#xff0c; 这个错误还真的很难发现 测试1 a [1,10,100] for i in a:print(i)if(i10):a[20,30,-1]一般来说我们在进行迭代时&#xff0c;a这个值时不能改动的&#xff0c;但是现在的问题时如果我不小心给改动了呢&#xff0c;结果如下 也就是说…

windows下的vscode + opencv4.8.0(C++) 配置

1.添加环境变量 D:\mingw64\bin 2.安装vscode 3.下载opencv 4.8.0 4.程序引用第三方库(opencv为例) 打开CMakeLists.txt&#xff0c;引入头文件&#xff0c;使用include_directories 加入头文件所在目录。静态链接库link_directories # 头文件 include_directories(D:/ope…

python函数参数中独立星号*的作用

python函数中间有一个&#xff08;&#xff09;分隔&#xff0c;星号后面为*命名关键字参数&#xff0c;星号本身不是参数**。命名关键字参数&#xff0c;在函数调用时必须带参数名字进行调用。如下例子&#xff1a;

热门超声波清洗机哪个好?2024顶配超声波清洗机真实评测攻略分享

就是因为现在关于超声波清洗机的款式太多了&#xff0c;很多朋友无从下手&#xff0c;不知道到底哪款更值得入手&#xff01;所以本篇文章作者集结了市面上最多人问最热门的三款超声波清洗机做了一次实测。对于热门超声波清洗机哪款更值得入手体验&#xff0c;一篇揭晓&#xf…

2024Postman中变量的使用!

Postman中可设置的变量类型有全局变量&#xff0c;环境变量&#xff0c;集合变量&#xff0c;数据变量及局部变量。区别则是各变量作用域不同&#xff0c;全局变量适用于所有集合&#xff0c;环境变量适用于当前所选环境&#xff08;所有集合中均可使用不同环境变量&#xff09…

Redis(一) redis配置 | 如何连接redis服务器 | 基本数据类型 | 基本全局命令

文章目录 前言Redis 配置文件连接 redis 服务器Redis 常见数据类型Redis 基本全局命令set 和 get 命令KEYS 命令EXISTS 命令DEL 命令EXPIRE 和 TTL 命令Redis 过期策略定时器和时间轮的方式实现过期key的及时删除 TYPE 命令 前言 本篇文章将介绍我们在 Linux 环境下安装了 Red…

Centos服务器Open Gauss 部署

近期很多的项目由于信创要求使用一些国产的数据库&#xff0c;比如OpenGauss。OpenGuass是华为高斯DB的开源版&#xff0c;内核还是PostgreSQL&#xff0c;商业版是收费的。这里记录一下是如何安装部署 的。 官方中文文档 官方下载地址 部署要求 操作系统要求 ARM&#xff…

第十四届蓝桥杯JavaA组省赛真题 - 特殊日期

解题思路&#xff1a; 暴力秒了 public class Main {public static void main(String[] args) {int cnt 0;for (int i 1900; i < 9999; i) {for (int j 1; j < 12; j) {for (int k 1; k < days(i, j); k) {if (sum(i) sum(j) sum(k)) cnt;}}}System.out.print…

Lightguide

Resolve Assembly Challenges with Projected AR Workflows The future of assembly is here. It’s not just about automation, it’s about empowering your workforce with the best possible digital toolset. LightGuide AR is a powerful addition to your Industry 4.…

如何注册谷歌邮箱gmail

不知道大家在工作生活中有没有需要用到谷歌邮箱的地方&#xff0c;但是最近我就用到了它。因为注册ChatGPT的事&#xff0c;用了outlook&#xff0c;hotmail邮箱注册的gpt账号都被封了&#xff0c;然后通过各方面的了解&#xff0c;发现谷歌的邮箱是没有问题的&#xff0c;不会…

服务器中有g++,但是查询不到,Command ‘g++‘ not found

有gcc但是查询不到g&#xff0c;gcc版本为9.5.0 (base) zyICML:~$ g -V Command g not found, but can be installed with: apt install g Please ask your administrator. 突然就出现这个问题&#xff0c;导致detectron装不上&#xff0c;现在有时间了专门研究下怎么解决 这…

文章分享:协和文章《病原宏基因组高通量测序性能确认方案》

摘要&#xff1a;宏基因组学利用新一代高通量测序技术&#xff0c;以特定环境下病原体基因组为研究对象&#xff0c;在分析病原体多样性、种群结构、进化关系的基础上&#xff0c;进一步探究病原体的群体功能活性、相互作用及其与环境之间的关系&#xff0c;发掘潜在的生物学意…

马上蓝桥杯了,干货总结动态规划专题,祝你考场爆杀(拔高篇)最佳课题选择 书本整理 打鼹鼠 吃吃吃 非零字段划分

目录 最佳课题选择 思路&#xff1a; 书本整理 思路&#xff1a; 打鼹鼠 思路&#xff1a; 吃吃吃 思路&#xff1a; 非零字段划分 最佳课题选择 思路&#xff1a; 根本还是论文的分配&#xff0c;每个课题分配多少个论文是不确定的&#xff0c;这个也是很影响转…

六千字详解!一篇看懂 ArrayList 的扩容机制(完整源码解析)

☀️今天花了很久写了这篇关于 ArrayList 扩容机制源码解析的博客&#xff0c;在阅读源码的过程中发现了很多之前有误解的地方&#xff0c;也加深了对代码的理解&#xff0c;所以写下了这篇博客。 &#x1f3b6;本文附带了流程中所有的代码和附加解析&#xff0c;我有信心一定能…

网络安全笔记-day8,DHCP部署

DHCP部署与安全 全称&#xff08;Dynamic Host Configura Protocol&#xff09;动态主机配置协议 DHCP原理 DHCP协议_科来测试dhcp网络包-CSDN博客&#x1f50d; 注意的是利用广播地址发送包 ACK&#xff08;确认&#xff09; 如果DHCP服务器损坏&#xff0c;则在87.5%时…

Python基础:标准库 -- pprint (数据美化输出)

1. pprint 库 官方文档 pprint --- 数据美化输出 — Python 3.12.2 文档 pprint — Data pretty printer — Python 3.12.2 documentation 2. 背景 处理JSON文件或复杂的嵌套数据时&#xff0c;使用普通的 print() 函数可能不足以有效地探索数据或调试应用程序。下面通过一…