Flink之窗口聚合算子

news2024/12/31 6:28:20

1.窗口聚合算子

在Flink中窗口聚合算子主要分类两类

  • 滚动聚合算子(增量聚合)
  • 全窗口聚合算子(全量聚合)
1.1 滚动聚合算子

滚动聚合算子一次只处理一条数据,通过算子中的累加器对聚合结果进行更新,当窗口触发时再从累加器中取结果数据,一般使用算子如下:

  • aggregate
  • max
  • maxBy
  • min
  • minBy
  • reduce
  • sum

这里以aggregate算子作为示例

// ... 
// 每10s统计一次每个用户最近30s的行为条数
SingleOutputStreamOperator<Tuple2<String, Integer>> result = watermarked.keyBy(userEvent -> userEvent.getUId())
        .window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10))) // 参数1:窗口长度 参数2:滑动步长即计算频率
        .aggregate(new AggregateFunction<UserEvent2, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
            // 这里给一个初始值
            @Override
            public Tuple2<String, Integer> createAccumulator() {
                return Tuple2.of("", 0);
            }

            // 在累加器中统计每个用户行为条数(来一条更新一次)
            @Override
            public Tuple2<String, Integer> add(UserEvent2 value, Tuple2<String, Integer> accumulator) {
                Tuple2<String, Integer> result = Tuple2.of(value.getUId() + "-" + value.getName(), accumulator.f1 + 1);
                return result;
            }

            // 将累加器中的更新结果给到getResult方法,输出
            @Override
            public Tuple2<String, Integer> getResult(Tuple2<String, Integer> accumulator) {
                return accumulator;
            }

            // 这个方法在流式计算中可以不用实现,在上下游数据进行合并时需要用到,以spark为例,上有map和下游reduce的计算结果需要合并时需要实现这个方法
            @Override
            public Tuple2<String, Integer> merge(Tuple2<String, Integer> a, Tuple2<String, Integer> b) {
                Tuple2<String, Integer> merged = Tuple2.of(a.f0, a.f1 + b.f1);
                return merged;
            }
        });
// ...

只展示部分代码,冗余代码已省略.
图解如下:
image-20231012101658054

1.2 全窗口聚合算子

全窗口聚合算子会将数据记录在状态容器中,当窗口触发时会将整个窗口中的数据交给聚合函数,根据具体逻辑将这些数据进行计算,常用算子如下:

  • apply
  • process

这里以apply算子为例

// ... 
// 每10s统计一次最近30s每个用户行为发生事件最大两条数据
SingleOutputStreamOperator<UserEvent2> userEventTimeTop2 = keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10)))
               // 泛型1: 数据数据类型 泛型2: 输出数据类型 泛型3: key类型 泛型4: 窗口类型
               .apply(new WindowFunction<UserEvent2, UserEvent2, String, TimeWindow>() {
                   /**
                    *@Param s 本次传入的key
                    *@Param window 本次传入窗口的各种元信息
                    *@Param input 本次输入的所有数据
                    *@Param out 输出数据
                    **/
                   @Override
                   public void apply(String s, TimeWindow window, Iterable<UserEvent2> input, Collector<UserEvent2> out) throws Exception {
                       // 创建集合接收迭代器中的数据
                       ArrayList<UserEvent2> userEvent2List = new ArrayList<>();
                       // 遍历迭代器,也就是输入数据
                       for (UserEvent2 userEvent2 : input) {
                           // 将数据添加到集合中
                           userEvent2List.add(userEvent2);
                       }
                       // 将集合中的数据根据用户行为发生事件进行排序
                       Collections.sort(userEvent2List, new Comparator<UserEvent2>() {
                           @Override
                           public int compare(UserEvent2 o1, UserEvent2 o2) {
                               // 倒序排序
                               return Integer.parseInt(o2.getTime()) - Integer.parseInt(o1.getTime());
                           }
                       });

                       // 将每个用户行为发生时间最大的两条数据输出
                       for (int i = 0; i < Math.min(userEvent2List.size(), 2); i++) {
                           out.collect(userEvent2List.get(i));
                       }
                   }
               });
// ...

只展示部分代码,冗余代码已省略.
图解如下:
image-20231012101658054

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1084860.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Unity中Shader光照模型Phong

文章目录 前言一、Phong光照模型二、图示解释Phone光照模型1、由图可得&#xff0c;R 可以由 -L 加上 P 得出2、P等于2*M3、因为 N 和 L 均为单位向量&#xff0c;所以 M 的模可以由 N 和 L得出4、得到M的模后&#xff0c;乘以 单位向量N&#xff0c;得到M5、最后得出 P 和 R 前…

Prometheus-Prometheus安装及其配置

Prometheus-Prometheus安装及其配置 Prometheus安装下载解压 配置启动prometheus校验配置文件表达式浏览器 Prometheus安装 Prometheus的安装针对Linux的安装&#xff0c;其他的安装方式可以查看Prometheus官网 下载 sudo wget https://github.com/prometheus/prometheus/re…

四款数字办公工具大比拼,在线办公无压力

在线办公软件使企业、员工实现办公场所、距离的自由&#xff0c;尤其是近几年&#xff0c;受“口罩”的影响&#xff0c;远程办公软件的使用者也越来越多&#xff0c;无论是财务、行政、还是设计师&#xff0c;都开始追求好用的在线办公软件&#xff0c;作为办公软件发烧友&…

发送消息时序图

内窥镜消息队列发送消息原理 目的 有一个多线程的Java应用程序&#xff0c;使用消息队列来处理命令 时序图 startumlactor User participant "sendCmdWhiteBalance()" as Controller participant CommandConsumer participant MessageQueueUser -> Controller:…

​左手 Serverless,右手 AI,7 年躬身的古籍修复之路

作者&#xff1a;宋杰 “AI 可以把我们思维体系当中&#xff0c;过度专业化、过度细分的这些所谓的知识都替代掉&#xff0c;让我们集中精力去体验自己的生命。我挺幸运的&#xff0c;代码能够有 AI 辅助&#xff0c;也能够有 Serverless 解决我的运营成本问题。Serverless 它…

mybatis拦截器源码分析

mybatis拦截器源码分析 拦截器简介 mybatis Plugins 拦截器由于Mybatis对数据库访问与操作进行了深度的封装,让我们应用开发效率大大提高,但是灵活度很差拦截器的作用:深度定制Mybatis的开发抛出一个需求 :获取Mybatis在开发过程中执行的SQL语句(执行什么操作获取那条SQL语句…

RK3562开发板:升级摄像头ISP,突破视觉体验边界

RK3562开发板作为深圳触觉智能新推出的爆款产品&#xff0c;采用 Rockchip 新一代 64 位处理器 RK3562&#xff08;Quad-core ARM Cortex-A53&#xff0c;主频最高 2.0GHz&#xff09;&#xff0c;最大支持 8GB 内存&#xff1b;内置独立的 NPU&#xff0c;可用于轻量级人工智能…

谷歌浏览查询http被自动转化成https导致页面读取失败问题处理

原因&#xff1a; 谷歌浏览器版本升级&#xff0c;安全问题考虑自动转化https 解决方案&#xff1a; 一、打开配置页面&#xff1a; chrome://flags/ 二、禁止自动转化

vue3_setup基础_渲染函数(ref,reactive)

一、setup语法糖 是什么&#xff1a;组合式Api &#xff08;vue2为option Api&#xff09; 来解决什么问题&#xff1a;使用&#xff08;data,computed,methonds,watch&#xff09;组件选项来组织逻辑通常都很有效。然而&#xff0c;当我们组件变的更大的时候&#xff0c;逻辑…

ansible的介绍安装与模块

目录 一、ansible简介 二、ansible特点 三、Ansible核心组件与工作原理 1、核心组件 2、工作原理 四、ansible的安装 五、ansible 命令行模块 1&#xff0e;command 模块 2&#xff0e;shell 模块 3&#xff0e;cron 模块 4&#xff0e;user 模块 5&#xff0e;group 模…

01 时钟配置初始化,debug

1. 开启debug series&#xff0c;否则只能下载一次&#xff0c;再次下载要配置boot 2.f0外部时钟配置 h750 配置 实测可用

股票印花税如何征收,万一免五的低费率成本计算以及券商选择

印花税国家收的&#xff0c;不管是深市沪市都收&#xff0c;如下图所示&#xff0c;可以看到&#xff0c;证券交易印花税自2008年9月之后改为单向收取&#xff0c;今年8月份更是降到了0.05%&#xff0c;也就是万分之5&#xff0c;以现在的视角看历史&#xff0c;在最早90年的千…

【数据结构】算法的空间复杂度

&#x1f984;个人主页:修修修也 &#x1f38f;所属专栏:数据结构 ⚙️操作环境:Visual Studio 2022 算法空间复杂度的定义 算法的时间复杂度和空间复杂度是度量算法好坏的两个重要量度,在实际写代码的过程中,我们完全可以用空间来换时间,比如说,我们要判断某某年是不是闰年,大…

基于Vue构建的快速开发框架

一、Vue结合低代码 "低代码"是一种快速开发应用的方法&#xff0c;它使开发者能够通过图形界面和预构建的块进行设计和构建&#xff0c;而不是手动编写大量的代码。这种方法被广泛用于快速应用开发、移动应用开发、业务流程管理和数据库应用开发等领域。 Vue.js 是一…

【Proteus仿真】【51单片机】智能语音家居陪护机器人

文章目录 一、功能简介二、软件设计三、实验现象联系作者 一、功能简介 本项目使用Proteus8仿真51单片机控制器&#xff0c;使用OLED液晶、按键、蜂鸣器、DS18B20温度传感器、人体红外传感器、语音识别模块、继电器、风扇、LED等。 主要功能&#xff1a; 系统运行后&#xff0…

Kafka生产者使用案例

1.生产者发送消息的过程 首先介绍一下 Kafka 生产者发送消息的过程&#xff1a; 1)Kafka 会将发送消息包装为 ProducerRecord 对象&#xff0c; ProducerRecord 对象包含了目标主题和要发送的内容&#xff0c;同时还可以指定键和分区。在发送 ProducerRecord 对象前&#xff0c…

FPGA面试题(7)

一.解释一下SPI的四种模式 01时钟极性CPOL空闲状态为低电平空闲状态为高电平时钟相位CPHA在第一个跳变沿采样在第二个跳变沿采样 模式CPOLCPHA描述模式000sclk上升沿采样&#xff0c;sclk下降沿发送模式101sclk上升沿发送&#xff0c;sclk下降沿采样模式210sclk上升沿发送&…

解决nav2_bringup tb3_simulation_launch.py 无法启动Gazebo的问题

方法 1 断网再开gazebo. 评价: 方便且有效, 但来回联网很麻烦 参考: https://blog.csdn.net/James___H/article/details/116906217 方法 2 断网能打开是因为gazebo软件开启时会自动从网络下载模型&#xff0c;下载过程必然漫长, 另外你懂的, 网络问题嘛, vpn也解决不了的话…

jmeter压测记录、使用方法

jmeter压测记录、使用方法 1、非gui方式执行压测命令2、压测命令输出解读 1、非gui方式执行压测命令 sh jmeter.sh -n -t test.jmx -l result.jtl2、压测命令输出解读 Active: 10 Started: 10 Finished: 0 Active: 10 表示一共10个活动&#xff08;正在进行的压测线程&#…

基于Cl2/BCl3电感偶联等离子体的氮化镓干蚀特性

引言 氮化镓(GaN)具有六方纤锌矿结构&#xff0c;直接带隙约为3.4eV&#xff0c;目前已成为实现蓝光发光二极管(led)的主导材料。由于GaN的高化学稳定性&#xff0c;在室温下用湿法化学蚀刻来蚀刻或图案化GaN是非常困难的。与湿法蚀刻技术相比&#xff0c;干法蚀刻技术可以提供…