Flink笔记整理(五)

news2025/1/11 9:01:53

Flink笔记整理(五)

文章目录

  • Flink笔记整理(五)
  • 七、处理函数(最底层最常用最灵活)
    • 7.1基本处理函数(ProcessFunction)
      • 处理函数的功能和使用
      • ProcessFunction解析
    • 7.2按键分区处理函数(KeyedProcessFunction)
      • 定时器(Timer)和定时服务(TimeService)
    • 7.3 窗口处理函数
      • 窗口处理函数的使用
      • ProcessWindowFunction解析
    • 7.4 应用案例——Top N
  • 总结


七、处理函数(最底层最常用最灵活)

之前所介绍的流处理API,无论是基本的转换、聚合,还是更为复杂的窗口操作,其实都是基于DataStream进行转换的,所以可以统称为DataStream API。

在Flink更底层,我们可以不定义任何具体的算子(比如map,filter,或者window),而只是提炼出一个统一的“处理”(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑,所以这一层接口就被叫作“处理函数”(process function)。

在这里插入图片描述

7.1基本处理函数(ProcessFunction)

处理函数的功能和使用

之前学习的转换算子,一般只是针对某种具体操作来定义的,能够拿到的信息比较有限。如果我们想要访问事件的时间戳,或者当前的水位线信息,都是完全做不到的。跟时间相关的操作,目前我们只会用窗口来处理。而在很多应用需求中,要求我们对时间有更精细的控制,需要能够获取水位线,甚至要“把控时间”、定义什么时候做什么事,这就不是基本的时间窗口能够实现的了。

这时就需要使用底层的处理函数。处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册“定时事件”。而且处理函数继承了AbstractRichFunction抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函数还可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方法,可以实现各种自定义的业务逻辑。

处理函数的使用与基本的转换操作类似,只需要直接基于DataStream调用.process()方法就可以了。方法需要传入一个ProcessFunction作为参数,用来定义处理逻辑。

stream.process(new MyProcessFunction())

这里ProcessFunction不是接口,而是一个抽象类,继承了AbstractRichFunction;MyProcessFunction是它的一个具体实现。所以所有的处理函数,都是富函数(RichFunction),富函数可以调用的东西这里同样都可以调用。

ProcessFunction解析

在源码中我们可以看到,抽象类ProcessFunction继承了AbstractRichFunction,有两个泛型类型参数:I表示Input,也就是输入的数据类型;O表示Output,也就是处理完成之后输出的数据类型。内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()。


public abstract class ProcessFunction<I, O> extends AbstractRichFunction {

    ...
    public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
    ...
}

ProcessFunction解析

7.2按键分区处理函数(KeyedProcessFunction)

在上节中提到,只有在KeyedStream中才支持使用TimerService设置定时器的操作。所以一般情况下,我们都是先做了keyBy分区之后,再去定义处理操作;代码中更加常见的处理函数是KeyedProcessFunction。

ProcessFunction解析

定时器(Timer)和定时服务(TimeService)

定时器(Timer)和定时服务(TimeService)及例子

7.3 窗口处理函数

除了KeyedProcessFunction,另外一大类常用的处理函数,就是基于窗口的ProcessWindowFunction和ProcessAllWindowFunction了。在第六章窗口函数的介绍中,我们之前已经简单地使用过窗口处理函数了。

窗口处理函数的使用

进行窗口计算,我们可以直接调用现成的简单聚合方法(sum/max/min),也可以通过调用.reduce()或.aggregate()来自定义一般的增量聚合函数(ReduceFunction/AggregateFucntion);而对于更加复杂、需要窗口信息和额外状态的一些场景,我们还可以直接使用全窗口函数、把数据全部收集保存在窗口内,等到触发窗口计算时再统一处理。窗口处理函数就是一种典型的全窗口函数。

窗口处理函数ProcessWindowFunction的使用与其他窗口函数类似,也是基于WindowedStream直接调用方法就可以,只不过这时调用的是.process()。

stream.keyBy( t -> t.f0 )
        .window( TumblingEventTimeWindows.of(Time.seconds(10)) )
        .process(new MyProcessWindowFunction())

ProcessWindowFunction解析

ProcessWindowFunction既是处理函数又是全窗口函数。从名字上也可以推测出,它的本质似乎更倾向于“窗口函数”一些。事实上它的用法也确实跟其他处理函数有很大不同。我们可以从源码中的定义看到这一点:
ProcessWindowFunction解析

7.4 应用案例——Top N

案例需求:实时统计一段时间内的出现次数最多的水位。例如,统计最近10秒钟内出现次数最多的两个水位,并且每5秒钟更新一次。我们知道,这可以用一个滑动窗口来实现。于是就需要开滑动窗口收集传感器的数据,按照不同的水位进行统计,而后汇总排序并最终输出前两名。这其实就是著名的“Top N”问题。
案例实现代码


总结

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1952729.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【初阶数据结构】9.二叉树(4)

文章目录 5.二叉树算法题5.1 单值二叉树5.2 相同的树5.3 另一棵树的子树5.4 二叉树遍历5.5 二叉树的构建及遍历 6.二叉树选择题 5.二叉树算法题 5.1 单值二叉树 点击链接做题 代码&#xff1a; /*** Definition for a binary tree node.* struct TreeNode {* int val;* …

鱼哥好书分享活动第27期:看完这篇《云原生安全》了解云原生环境安全攻防实战技巧!

鱼哥好书分享活动第27期&#xff1a;看完这篇《云原生安全》了解云原生安全攻防实战技巧&#xff01; 主要内容&#xff1a;读者对象&#xff1a;本书目录&#xff1a;了解更多&#xff1a;赠书抽奖规则: 当前全球数字化的发展逐步进入深水区&#xff0c;云计算模式已经广泛应用…

用 apifox cli 命令行运行本地接口出现TypeError:Invalid IP address: undefined

用 apifox cli 命令行运行本地接口出现TypeError:Invalid IP address: undefined&#xff0c;客户端运行是通过的但命令行运行会报错 修改端口也是一样报错&#xff0c;地址修改为127.0.0.1会报错connect ECONNREFUSED 127.0.0.1:8080 解决方法&#xff1a;不用localhost&…

视觉SLAM第一讲

第一讲-预备知识 SLAM是什么&#xff1f; SLAM&#xff08;Simultaneous Localization and Mapping&#xff09;是同时定位与地图构建。 它是指搭载特定传感器的主体&#xff0c;在没有环境先验信息的情况下&#xff0c;于运动过程中建立环境的模型&#xff0c;同时估计自己…

《Milvus Cloud向量数据库指南》——Milvus Cloud不同场景下的部署形态选型

不同场景下的部署形态选型 一般说选型肯定离不开阶段。用到向量数据库的应用基本有这么几个阶段: AI 应用的快速原型构建。比如你在做一个 AI 个人助手、一个小的搜索引擎原型、一个端到端的 RAG 原型,这类项目的迭代速度是很关键的,而且原型构建期不需要关心性能或者稳定性…

JVM 内存分析工具 Memory Analyzer Tool(MAT)入门(一)

一、打开 jvisualvm &#xff08;VisualVM 是一款集成了 JDK 命令行工具和轻量级剖析功能的可视化工具。 设计用于开发和生产。&#xff09; 打开 jvisualvm.exe 工具会出现如下一些监控指标 二、VisualVM可以根据需要安装不同的插件&#xff0c;每个插件的关注点都不同&#x…

街道宣传信息工作通讯稿怎样向新闻媒体投稿?

在街道单位从事信息宣传工作的初期,我深刻体会到了这份工作的艰辛与挑战。面对繁重的投稿任务和严苛的审核机制,传统的邮箱投稿方式如同一座难以逾越的大山,横亘在我与成功之间。每一篇精心撰写的通讯稿,都承载着对单位工作的热情与期待,却在漫长的等待与频繁的退稿中逐渐消磨了…

Java实现七大排序(二)

一.交换排序 1.冒泡排序 这个太经典了&#xff0c;每个学编程都绕不开的。原理跟选择排序差不多&#xff0c;不过冒泡排序是直接交换。 public static void bubbleSort(int[] array){for (int i 0; i < array.length - 1; i) {for (int j 0; j < array.length-1-i; j…

助力运动员突破数据障碍 英特尔助力巴黎奥运会构建RAG聊天机器人

随着现代科技的飞速发展&#xff0c;奥运会这样的大型体育赛事也迎来了前所未有的变革。从运动员训练到比赛直播&#xff0c;从裁判判罚到观众体验&#xff0c;科技的应用正深刻地影响着体育赛事的每一个环节。近日&#xff0c;英特尔就分享了与国际奥林匹克委员会&#xff08;…

Docker快速搭建WordPress博客系统网站

WordPress 是一款广泛使用的开源内容管理系统(CMS),用于创建和管理网站和博客。 主要功能: 易于使用的界面:WordPress 提供了一个直观的后台管理界面,使用户能够轻松创建、编辑和管理网站内容。 主题和模板:WordPress 提供了各种主题和模板,可根据网站需求进行选择和自…

OceanBase v4.2 特性解析:如何实现表级恢复

背景 在某些情况下&#xff0c;你可能会因为误操作而遇到表数据损坏或误删表的情况。为了能在事后将表数据恢复到某个特定时间点&#xff0c;在OceanBase尚未有表级恢复功能之前&#xff0c;你需要进行以下步骤&#xff1a; 利用OceanBase提供的物理恢复工具&#xff0c;您可…

进程概念(三)----- fork 初识

目录 前言1. pid && ppid2. forka. 为什么 fork 要给子进程返回 0&#xff0c; 给父进程返回子进程的 pid &#xff1f;b. 一个函数是如何做到两次的&#xff1f;c. fork 函数在干什么&#xff1f;d. 一个变量怎么做到拥有不同的内容的&#xff1f;e. 拓展&#xff1a;…

简单快捷!Yarn的安装与使用指南

Yarn 是由 Facebook (现 Meta) 开发的包管理工具。 今天&#xff0c;我将介绍如何使用 Yarn。 目录 Yarn 的官方网站 关于安装 版本确认 开始一个新项目&#xff08;创建 package.json 文件&#xff09; 安装软件包 升级包 运行脚本 执行包的命令 卸载包 总结 Yarn 的…

光伏+农业,会激发出怎样的火花?

在这个科技与自然和谐共生的时代&#xff0c;光伏技术与现代农业的深度融合&#xff0c;正悄然掀起一场绿色革命。当“光伏”这一代表未来能源方向的技术与承载着人类生存之本的“农业”相遇&#xff0c;两者之间的化学反应&#xff0c;不仅照亮了清洁能源的道路&#xff0c;更…

MP的使用

1、MP简介 MyBatis-Plus&#xff08;简称MP&#xff09;是一个MyBatis的增强工具&#xff0c;在MyBatis的基础上只做增强不做改变&#xff0c;为简化开发、提高效率而生 官网&#xff1a;MyBatis-Plus &#x1f680; 为简化开发而生 参考教程&#xff1a;https://baomidou.c…

【LeetCode 随笔】C++入门级,详细解答加注释,持续更新中。。。

文章目录 58.【简单】最后一个单词的长度&#x1f31f; &#x1f308;你好呀&#xff01;我是 山顶风景独好 &#x1f388;欢迎踏入我的博客世界&#xff0c;能与您在此邂逅&#xff0c;真是缘分使然&#xff01;&#x1f60a; &#x1f338;愿您在此停留的每一刻&#xff0c;都…

全网最详细!! Linux 安装、配置教程

一、下载安装包 首先去官网下载VMware最新版本&#xff0c;以及发行版CentOS -7&#xff0c;懒得下载的可以私信我&#xff0c;我给你发包 其中&#xff0c;CentOS&#xff08;Community Enterprise Operating System&#xff09;是一个基于Linux的开源操作系统&#xff0c;它是…

VBA技术资料MF181:图片导入Word后添加说明文字

我给VBA的定义&#xff1a;VBA是个人小型自动化处理的有效工具。利用好了&#xff0c;可以大大提高自己的工作效率&#xff0c;而且可以提高数据的准确度。“VBA语言専攻”提供的教程一共九套&#xff0c;分为初级、中级、高级三大部分&#xff0c;教程是对VBA的系统讲解&#…

Java学习Day15:基础篇5

1.参数问题 2.变量 3.可变参数 package 方法demo1__code;public class two {public static void main(String[] args) {text.add(3,4,5,6);} } class text{static void add(int ... a){} } 可变参数其实是一个数组&#xff0c;可以用数组的方式使用&#xff1b; ATT&#xf…