详解 Flink 的 ProcessFunction API

news2024/11/24 10:58:53

一、Flink 不同级别的 API

在这里插入图片描述

  • Flink 拥有易于使用的不同级别分层 API 使得它是一个非常易于开发的框架
  • 最底层的 API 仅仅提供了有状态流处理,它将处理函数(Process Function )嵌入到了 DataStream API 中。底层处理函数(Process Function)与 DataStream API 相集成,可以对某些操作进行抽象,允许用户可以使用自定义状态处理来自一个或多个数据流的事件,且状态具有一致性和容错保证。除此之外,用户可以注册事件时间并处理时间回调,从而使程序可以处理复杂的计算。
  • 核心 API(Core APIs),比如 DataStream API (用于处理有界或无界流数据)以及 DataSet API (用于处理有界数据集)在实际生产中一般使用较多。这些 API 为数据处理提供了通用的构建模块,比如由用户定义的多种形式的转换(transformations)、连接(joins)、聚合(aggregations)、窗口(windows)操作等。
  • Table API 是以表为中心的声明式编程,其中表在表达流数据时会动态变化。 Table API 遵循关系模型:表有二维数据结构(schema)(类似于关系数据库中的表),同时 API 提供可比较的操作,例如 select、join、group-by、aggregate 等。
  • Flink 提供的最高层级的抽象是 SQL。这一层抽象在语法与表达能力上与 Table API 类似,但是是以 SQL 查询表达式的形式表现程序。SQL 抽象与 Table API 交互密切,同时 SQL 查询可以直接在 Table API 定义的表上执行。

二、ProcessFunction 介绍

  • 相较于 map、filter 和 window 等特定的具体的操作而言,Flink 在底层 API 中提炼出一个统一通用的 process 操作,它是所有转换算子的一个概括性的表达,可以在对应的接口中自定义处理逻辑,而这一层接口就被叫作“处理函数”(ProcessFunction)
  • 处理函数 (ProcessFunction) 提供了一个“定时服务”(TimerService),可以通过它访问流中的事件(event )、时间戳(timestamp )、水位线(watermark),甚至可以注册“定时事件”
  • 处理函数 (ProcessFunction) 继承了 AbstractRichFunction 抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息
  • 处理函数 (ProcessFunction) 可以直接将数据输出到侧输出流(side output)中
  • 所以,处理函数 (ProcessFunction) 是最为灵活的处理方法,可以实现各种自定义的业务逻辑;同时也是整个 DataStream API 的底层基础

三、常见的 ProcessFunction 类

  • ProcessFunction:最基本的处理函数,基于 DataStream 直接调用 process() 时作为参数传入
  • KeyedProcessFunction:对流按键分区后的处理函数,基于 KeyedStream 调用 process() 时作为参数传入
  • CoProcessFunction:合并(connect)两条流之后的处理函数,基于 ConnectedStreams 调用 process() 时作为参数传入
  • ProcessJoinFunction:间隔连接(interval join)两条流之后的处理函数,基于 IntervalJoined 调用 process() 时作为参数传入
  • BroadcastProcessFunction:广播连接流处理函数,基于 BroadcastConnectedStream 调用 process() 时作为参数传入。“广播连接流” BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream 与一个广播流(BroadcastStream)做连接(conncet)之后的产物
  • KeyedBroadcastProcessFunction:按键分区的广播连接流处理函数,基于 BroadcastConnectedStream 调用 process() 时作为参数传入。与 BroadcastProcessFunction 不同的是,这时的广播连接流, 是一个 KeyedStream 与广播流(BroadcastStream)做连接之后的产物
  • ProcessWindowFunction:KeyedStream 开窗之后的处理函数,也是全窗口函数的代表。基于 WindowedStream 调用 process() 时作为参数传入
  • ProcessAllWindowFunction:DataStream 开窗之后的处理函数,基于 AllWindowedStream 调用 process() 时作为参数传入

四、ProcessFunction API 实战

1. KeyedProcessFunction

1.1 解析
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
    //1.两个核心方法:
    //1.1 流中的每一个元素都会调用这个方法,调用结果将会放在 Collector 数据类型中输出。Context 可以访问元素的时间戳,元素的 key,以及 TimerService 时间服务。Context 还可以将结果输出到别的流(side outputs) 
    public abstract void processElement(I value, Context ctx, Collector<O> out);
    
    //1.2 一个回调函数。当processElement中注册的定时器触发时调用。参数 timestamp 为定时器所设定的触发的时间戳。Collector 为输出结果的集合。OnTimerContext 和 processElement 的 Context 参数一样,提供了上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)
    public abstract void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out);
    
    //2.富函数的以下方法:open()/close()/getRuntimeContext()
}
1.2 ProcessFunction 的 Context
//Context的常用方法
context.timestamp(); //获取当前数据的时间戳
context.getCurrentKey(); //获取当前数据的 key
context.output(OutputTag<X> outputTag, X value); //输出侧输出流
context.timerService(); //获取 TimerService 对象
1.3 Timer 和 TimerService

ProcessFunction 的 Context 对象调用 timerService() 方法可以直接返回一个 TimerService 对象;定时器 Timer 只能在 KeyedStream 上面使用

//TimerService 是 Flink 关于时间和定时器的基础服务接口,包含以下六个方法:
//获取当前的处理时间
long currentProcessingTime();

//获取当前的水位线(事件时间)
long currentWatermark();

//注册处理时间定时器,当处理时间超过 time 时触发
void registerProcessingTimeTimer(long time);

//注册事件时间定时器,当水位线超过 time 时触发
void registerEventTimeTimer(long time);

//删除触发时间为 time 的处理时间定时器
void deleteProcessingTimeTimer(long time);

//删除触发时间为 time 的处理时间定时器
void deleteEventTimeTimer(long time);
1.4 案例

需求:监控温度传感器的温度值,如果温度值在 10 秒钟之内 (processing time) 连续上升,则报警

public class ProcessFunctionCase {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        
        dataStream.keyBy("id").process(new TempContIncreWarning(10)).print();
        
        env.execute();
    }
    
    //自定义处理函数,用于监测一段时间内某个传感器温度值是否连续上升,输出报警信息
    public static class TempContIncreWarning extends KeyedProcessFunction<Tuple, SensorReading, String> {
        //定义私有属性:监测的时间间隔
        private Integer interval;
        
        public TempContIncreWarning(Integer interval) {
            this.interval = interval;
        }
        
        //定义两个值状态属性,分别保存上一次的温度值和定时器的时间戳
        private ValueState<Double> lastTempState;
        private ValueState<Long> timerTsState;
        
        @Override
        public void open(Configuration parameters) throws Exception {
            lastTempState = getRuntimeContext().getState(new ValueStateDescriptor<Double>("last-temp", Double.class));
            timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer-ts", Long.class));
        }
        
        @Override
        public void processElement(SensorReading value, Context ctx, Collector<String> out) throws Exception {
            //获取状态值
            Double lastTemp = lastTempState.value();
            Long timerTs = timerTsState.value();
            
            //如果上一次的温度值为null或者上一次的温度值小于当前温度值并且定时器为null则注册定时器
            if(lastTemp == null || (lastTemp != null && value.getTemperature() > lastTemp && timerTs == null)) {
                Long ts = ctx.timerService().currentProcessingTime() + interval * 1000L;
                ctx.timerService().registerProcessingTimeTimer(ts);
                timerTsState.update(ts);
            } else if(value.getTemperature() < lastTemp && timerTs != null) {//如果上一次的温度值大于当前温度值且定时器不为null则删除定时器,清空定时器值状态
                ctx.timerService().deleteProcessingTimeTimer(timerTs);
                timerTsState.clear();
            }
            
            //更新温度值状态
            lastTempState.update(value.getTemperature());
        }
        
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            //定时器触发则输出报警信息
            out.collect("传感器" + ctx.getCurrentKey().getField(0) + "的温度在" + interval + "s内连续上升");
            timerTsState.clear();
        }
        
        @Override
        public void close() throws Exception {
            lastTempState.clear();
        }
    }
}

2. 侧输出流

监控传感器温度值,将温度值低于 30 度的数据输出到 side output

/**
	核心方法:ProcessFunction中的 Context 对象的 output(OutputTag<X> outputTag, X value)
*/
public class SideOutputCase {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        
        //定义OutputTag,用来标记侧输出流的低温流
        OutputTag<SensorReading> lowTempTag = new OutputTag<SensorReading>("lowTemp"){};
        
        //DataStream不做keyBy,使用ProcessFunction的侧输出流进行高低温分流
       SingleOutputStreamOperator<SensorReading> highTempStream = dataStream.process(new ProcessFunction<SensorReading, SensorReading>(){
            @Override
           public void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {
               if(value.getTemperature() > 30) {//高温流,输出到主流
                   out.collect(value);
               } else {//低温流,输出到侧输出流
                   ctx.output(lowTempTag, value);
               }
           }
        });
        
        //高温流
        highTempStream.print("high-temp");
        
        //低温流
        highTempStream.getSideOutput(lowTempTag).print("low-temp");
        
        env.execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1806332.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Flutter项目开发模版,开箱即用

前言 当前案例 Flutter SDK版本&#xff1a;3.22.2 每当我们开始一个新项目&#xff0c;都会 引入常用库、封装工具类&#xff0c;配置环境等等&#xff0c;我参考了一些文档&#xff0c;将这些内容整合、简单修改、二次封装&#xff0c;得到了一个开箱即用的Flutter开发模版…

高速USB转串口芯片CH343

CH343封装 截止目前&#xff0c;主要封装有 SOP16: CH343G QFN16: CH343P ESSOP10: CH343K,截止24年6月未生产 CH343串口速度 最高串口速度&#xff1a; 6Mbps,比CH340的2M&#xff0c;快3倍 1、概述 参考版本&#xff1a;1E CH343 是一个 USB 总线的转接芯片&#xff0c;…

【MySQL】服务器配置和管理

本文使用的MySQL版本是8.0 MySQL服务器介绍 MySQL服务器通常说的是mysqld程序。 mysqld 是 MySQL 数据库服务器的核心程序&#xff0c;负责处理客户端的请求、管理数据库和执行数据库操作。管理员可以通过配置文件和各种工具来管理和监控 mysqld 服务器的运行 官方文档&…

Tensorflow入门实战 P03-天气识别

目录 1、完整代码 2、运行结果 2.1 查看20张图片 2.2 程序运行 2.3 运行结果 3、小结 ① 代码运行过程中有报错&#xff1a; ② 修改代码如下&#xff1a; ③ 分析原因&#xff1a; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&…

屏幕空间环境光遮蔽(SSAO)在AI绘画中的作用

引言&#xff1a; 随着人工智能技术的不断发展&#xff0c;AI绘画已经逐渐走进了人们的视野。作为一种新兴的艺术形式&#xff0c;AI绘画通过算法和模型来生成具有艺术感的图像。在这个过程中&#xff0c;屏幕空间环境光遮蔽&#xff08;SSAO&#xff09;技术发挥着重要作用。本…

英伟达的GPU(4)

更第四篇&#xff0c;上周有点私事&#xff0c;恢复更新 上次的文章 英伟达的GPU(3) (qq.com) 书接前文&#xff0c;我们上章说要更新GPU的内存机制&#xff0c;本次就讲点这个 先做个定义&#xff0c;我们说内存&#xff08;显存&#xff09;&#xff0c;也分物理内存&#…

【目标跟踪网络训练 Market-1501 数据集】DeepSort 训练自己的跟踪网络模型

前言 Deepsort之所以可以大量避免IDSwitch&#xff0c;是因为Deepsort算法中特征提取网络可以将目标检测框中的特征提取出来并保存&#xff0c;在目标被遮挡后又从新出现后&#xff0c;利用前后的特征对比可以将遮挡的后又出现的目标和遮挡之前的追踪的目标重新找到&#xff0…

【BUG】已解决:ModuleNotFoundError: No module named ‘transformers‘

已解决&#xff1a;ModuleNotFoundError: No module named ‘transformers‘ 欢迎来到英杰社区https://bbs.csdn.net/topics/617804998https://bbs.csdn.net/topics/617804998 欢迎来到我的主页&#xff0c;我是博主英杰&#xff0c;211科班出身&#xff0c;就职于医疗科技公司…

360数字安全:2024年4月勒索软件流行态势分析报告

勒索软件传播至今&#xff0c;360 反勒索服务已累计接收到数万勒索软件感染求助。随着新型勒索软件的快速蔓延&#xff0c;企业数据泄露风险不断上升&#xff0c;勒索金额在数百万到近亿美元的勒索案件不断出现。勒索软件给企业和个人带来的影响范围越来越广&#xff0c;危害性…

OrangePi AIpro小试牛刀-目标检测(YoloV5s)

非常高兴参加本次香橙派AI Pro&#xff0c;香橙派联合华为昇腾打造的一款AI推理开发板评测活动&#xff0c;以前使用树莓派Raspberry Pi4B 8G版本&#xff0c;这次有幸使用国产嵌入式开发板。 一窥芳容 这款开发板搭载的芯片是和华为昇腾的Atlas 200I DK A2同款的处理器&#…

插卡式仪器模块:数字万用表模块(插卡式)

• 6 位数字表显示 • 24 位分辨率 • 250 KSPS 采样率 • 电源和数字 I/O 均采用隔离抗噪技术 • 电压、电流、电阻、电感、电容的高精度测量 • 二极管/三极管测试 通道122输入 阻抗 电压10 MΩHigh-Z, 10 MΩ电流10 Ω50 mΩ / 2 Ω / 2 KΩ输入范围电压 5 V0–60 V电流…

钉钉统一授权登录第三方网站

开发流程 配置回调域名。 进入已创建的应用详情页&#xff0c;在基础信息页面可以查看到应用的SuiteKey/SuiteSecret(第三方企业应用)或AppKey/AppSecret(企业内部应用)。 在应用详情页&#xff0c;然后单击钉钉登录与分享&#xff0c;添加应用回调的URL&#xff0c;以http或…

一文学习yolov5 实例分割:从训练到部署

一文学习yolov5 实例分割&#xff1a;从训练到部署 1.模型介绍1.1 YOLOv5结构1.2 YOLOv5 推理时间 2.构建数据集2.1 使用labelme标注数据集2.2 生成coco格式label2.3 coco格式转yolo格式 3.训练3.1 整理数据集3.2 修改配置文件3.3 执行代码进行训练 4.使用OpenCV进行c部署参考文…

安全专业的硬件远控方案 设备无网也能远程运维

在很多行业中&#xff0c;企业的运维工作不仅仅局限在可以联网的IT设备&#xff0c;不能连接外网的特种设备也需要专业的远程运维手段。 这种特种设备在能源、医疗等行业尤其常见&#xff0c;那么我们究竟如何通过远程控制&#xff0c;对这些无网设备实施远程运维&#xff0c;…

单灯双控开关原理

什么是单灯双控&#xff1f;顾名思义&#xff0c;指的是一个灯具可以通过两个不同的开关或控制器进行控制。 例如客厅的主灯可能会设置成单灯双控&#xff0c;一个开关位于门口&#xff0c;另一个位于房间内的另一侧&#xff0c;这样无论你是从门口进入还是从房间内出来&#x…

Java数据结构准备工作---常用类

文章目录 前言1.包装类1.1.包装类基本知识1.2.包装类的用途1.3.装箱和拆箱1.3.1.装箱&#xff1a;1.3.2.拆箱 1.4 包装类的缓存问题 2.时间处理类2.1.Date 时间类(java.util.Date)2.2.DateFormat 类和 SimpleDateFormat 类2.3.Calendar 日历类 3.其他常用类3.1.Math类3.2.Rando…

(论文翻译)Coordinate Attention for Efficient Mobile Network Design(坐标注意力 CVPR2021)

Coordinate Attention for Efficient Mobile Network Design&#xff08;CVPR2021&#xff09; 文章目录 Coordinate Attention for Efficient Mobile Network Design&#xff08;CVPR2021&#xff09;摘要1.引言2.相关工作3.方法&#xff1a;Coordinate Attention3.1.Revisit …

(二)JSX基础

什么是JSX 概念&#xff1a;JSX是JavaScript和XML&#xff08;HTML&#xff09;的缩写&#xff0c;表示在JS代码中编写HTML模版结构&#xff0c;它是React中编写UI模板的方式。 优势&#xff1a;1.HTML的声明式模版方法&#xff1b;2.JS的可编程能力 JSX的本质 JSX并不是标准…

RabbitMQ-topic exchange使用方法

RabbitMQ-默认读、写方式介绍 RabbitMQ-发布/订阅模式 RabbitMQ-直连交换机(direct)使用方法 目录 1、概述 2、topic交换机使用方法 2.1 适用场景 2.2 解决方案 3、代码实现 3.1 源代码实现 3.2 运行记录 4、小结 1、概述 topic 交换机是比直连交换机功能更加强大的…

强!推荐一款开源接口自动化测试平台:AutoMeter-API !

在当今软件开发的快速迭代中&#xff0c;接口自动化测试已成为确保代码质量和服务稳定性的关键步骤。 随着微服务架构和分布式系统的广泛应用&#xff0c;对接口自动化测试平台的需求也日益增长。 今天&#xff0c;我将为大家推荐一款强大的开源接口自动化测试平台: AutoMete…