flink学习(8)——窗口函数

news2024/12/1 11:42:40

增量聚合函数

——指窗口每进入一条数据就计算一次

例如:要计算数字之和,进去一个12 计算结果为20, 再进入一个7 ——结果为27

 reduce

aggregate(aggregateFunction)

package com.bigdata.day04;

public class _04_agg函数 {
    public static final Tuple3[] ENGLISH = new Tuple3[] {
            Tuple3.of("class1", "张三", 100L),
            Tuple3.of("class1", "李四", 40L),
            Tuple3.of("class1", "王五", 60L),
            Tuple3.of("class2", "赵六", 20L),
            Tuple3.of("class2", "小七", 30L),
            Tuple3.of("class2", "小八", 50L),
    };

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //2. source-加载数据
        DataStreamSource<Tuple3<String,String,Long>> dataStreamSource = env.fromElements(ENGLISH);
        // 此时我要获取每个班级的平均成绩
        // 输入数据的类型(IN)、累加器的类型(ACC)和输出数据的类型(OUT)
        // IN——Tuple3<String, String, Long>
        // ACC——Tuple3<String, Integer,Long> 第一个是班级(key)第二个是数量,第三个是总的成绩
        // OUT —— Tuple2<String,Double> 第一个是班级 第二个是平均成绩
        dataStreamSource.countWindowAll(3).aggregate(new AggregateFunction<Tuple3<String, String, Long>, Tuple3<String, Integer,Long>, Tuple2<String,Double>>() {
        
            // 初始化一个 累加器
            @Override
            public Tuple3<String, Integer, Long> createAccumulator() {
                return Tuple3.of(null,0,0L);
            }


            // 累加器和输入的值进行累加
            // Tuple3<String, String, Long> value 第一个是传入的值
            // Tuple3<String, Integer, Long> accumulator 第二个是累加器的值
            @Override
            public Tuple3<String, Integer, Long> add(Tuple3<String, String, Long> value, Tuple3<String, Integer, Long> accumulator) {

                return Tuple3.of(value.f0,accumulator.f1+1,accumulator.f2+value.f2);
            }

            // 获取结果——在不同节点的结果进行汇总后实现
            @Override
            public Tuple2<String, Double> getResult(Tuple3<String, Integer, Long> accumulator) {

                return Tuple2.of(accumulator.f0, (double) accumulator.f2 / accumulator.f1);
            }


            // 由于flink是分布式,所以在别的节点也会进行累加 ,该方法是不同节点的结果进行汇总
            // 即累加器之间的累加
            @Override
            public Tuple3<String, Integer, Long> merge(Tuple3<String, Integer, Long> a, Tuple3<String, Integer, Long> b) {
                return Tuple3.of(a.f0,a.f1+b.f1,a.f2+b.f2);
            }
        }).print();
        //4. sink-数据输出


        //5. execute-执行
        env.execute();
    }
}

sum()

min()

max()

 全量聚合函数

指在窗口触发的时候才会对窗口内的所有数据进行一次计算(等窗口的数据到齐,才开始进行聚合计算,可实现对窗口内的数据进行排序等需求)

全量聚合函数比较简单,但是会将所有的数据存放在内存中,因此会占用大量的内存空间

apply 

package com.bigdata.day04;

public class _05_app函数 {
    public static final Tuple3[] ENGLISH = new Tuple3[] {
            Tuple3.of("class1", "张三", 100L),
            Tuple3.of("class1", "李四", 40L),
            Tuple3.of("class1", "王五", 60L),
            Tuple3.of("class2", "赵六", 20L),
            Tuple3.of("class2", "小七", 30L),
            Tuple3.of("class2", "小八", 50L),
    };

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        DataStreamSource<Tuple3<String,String,Long>> dataStreamSource = env.fromElements(ENGLISH);

        //2. source-加载数据
        dataStreamSource.countWindowAll(3).apply(new AllWindowFunction<Tuple3<String,String,Long>, Tuple2<String,Double>, GlobalWindow>() {

            @Override
            public void apply(GlobalWindow window, Iterable<Tuple3<String, String, Long>> values, Collector<Tuple2<String,Double>> out) throws Exception {
                Long sum = 0L;
                int length = 0;
                String key = null;
                for (Tuple3<String, String, Long> value : values) {
                    sum += value.f2;
                    length++;
                    key = value.f0;

                }
                out.collect(Tuple2.of(key,(double) sum/length));
            }
        }).print();
      
        env.execute();
    }
}

// 总结

// 接口
new AllWindowFunction<Tuple3<String,String,Long>, Tuple2<String,Double>, GlobalWindow>()
GlobalWindow 窗口对象  Tuple3<String,String,Long> 传入的值  Tuple2<String,Double> 结果

// 重写的方法
public void apply(GlobalWindow window, Iterable<Tuple3<String, String, Long>> values, Collector<Tuple2<String,Double>> out) 

Iterable<Tuple3<String, String, Long>> values 传入值的迭代器 进行遍历
Collector<Tuple2<String,Double>> out 收集器 调用collect方法收集即可
window 窗口对象

//使用窗口对象我们可以拿到窗口的起始时间
 long start = window.getStart();
 long end = window.getEnd();

process

使用方式一:在connect合流之后对两个类型不同的流进行处理

使用方式二:在分流的时候使用,可以通过context.output方法对每个数据添加一个标签

 使用方式一
new CoProcessFunction<Long, String, String>()  
// 第一个泛型是第一个流的类型 第二个泛型是第二个流的类型  第三个泛型是合并后流的类型

 @Override
        public void processElement1(Long l, CoProcessFunction<Long, String, String>.Context context, Collector<String> collector) throws Exception {
            
            // Long 是数据类型 结果使用collector中的collect 收集
            collector.collect(String.valueOf(l));
        }

@Override
        public void processElement2(String s, CoProcessFunction<Long, String, String>.Context context, Collector<String> collector) throws Exception {
        
            //  String 是数据类型 结果使用collector中的collect 收集
            collector.collect(s);
        }
使用方式二
此时使用的是context中的context.output(odd, element); 方法
odd 是标签
element 是元素

OutputTag<Long> odd = new OutputTag<>("奇数",TypeInformation.of(Long.class));
OutputTag<Long> even = new OutputTag<>("偶数", TypeInformation.of(Long.class));

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2251084.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

智能桥梁安全运行监测系统守护桥梁安全卫士

一、方案背景 桥梁作为交通基础设施中不可或缺的重要组成部分&#xff0c;其安全稳定的运行直接关联到广大人民群众的生命财产安全以及整个社会的稳定与和谐。桥梁不仅是连接两地的通道&#xff0c;更是经济发展和社会进步的重要纽带。为了确保桥梁的安全运行&#xff0c;桥梁安…

网络原理-初识

1.网络的发展历程 独立模式 独立模式&#xff1a;计算机之间相互独立。 每个终端A、B、C各自持有客户端数据 网络互连 随着时代的发展&#xff0c;越来越需要计算机之间互相通信&#xff0c;共享软件和数据&#xff0c;即可以多个计算机协调工作来完成业务&#xff0c;就有…

房屋结构安全监测系统守护房屋安全卫士

一、系统背景 随着时间的流逝&#xff0c;建筑物的主体结构、设备设施等会因为自然老化、材料疲劳、使用环境的变化以及维护不当等各种因素的影响&#xff0c;逐渐出现性能下降甚至安全隐患。因此&#xff0c;进行房屋安全监测显得尤为重要。房屋结构安全是指建筑物的结构体系在…

排序算法之选择排序堆排序

算法时间复杂度辅助空间复杂度稳定性选择排序O(N^2)O(1)不稳定堆排序O(NlogN)O(1)不稳定 1.选择排序 这应该算是最简单的排序算法了&#xff0c;每次在右边无序区里选最小值&#xff0c;没有无序区时&#xff0c;就宣告排序完毕 比如有一个数组&#xff1a;[2,3,2,6,5,1,4]排…

从0开始linux(39)——线程(2)线程控制

欢迎来到博主的专栏&#xff1a;从0开始linux 博主ID&#xff1a;代码小豪 文章目录 线程创建线程标识符线程参数多线程竞争资源 回收线程detach 线程退出pthread_cancel 线程创建 线程创建的函数为pthread_create。该函数是包含在posix线程库当中&#xff0c;posix线程是C语言…

28.100ASK_T113-PRO Linux+QT 显示一张照片

1.添加资源文件 2. 主要代码 #include "mainwindow.h" #include "ui_mainwindow.h" #include <QImage> #include <QPixmap>MainWindow::MainWindow(QWidget *parent) :QMainWindow(parent),ui(new Ui::MainWindow) {ui->setupUi(this);QIm…

不用下载安装的线上3D编辑器,支持哪些功能?

线上3D编辑器&#xff0c;不用下载软件&#xff0c;直接通过浏览器打开51建模网&#xff0c;上传模型即可进入编辑器&#xff0c;不仅支持对3D模型进行细致的效果配置&#xff0c;如光源设置、背景定制、材质调节等&#xff0c;还具备爆炸效果、热点动画、部件替换等高级交互功…

C语言——指针初阶(三)

目录 一.指针-指针 代码1&#xff1a; 运行结果&#xff1a; 代码2&#xff1a; 运行结果&#xff1a; 代码3&#xff1a; 运行结果&#xff1a; 二.指针数组 例&#xff1a; 往期回顾 一.指针-指针 指针减去指针的前提&#xff1a;两个指针指向同一块空间。 指针减去指针…

vue3项目创建方式记录

目录 创建vue3常用的方式有三种&#xff1a;一.使用vue cli创建二.使用vite创建三.使用vue3官方推荐创建方式&#xff08;create-vue&#xff09; 创建vue3常用的方式有三种&#xff1a; 一.使用vue cli创建 vue create 项目名二.使用vite创建 vite是下一代前端开发与构建工…

基于特征子空间的高维异常检测:一种高效且可解释的方法

本文将重点探讨一种替代传统单一检测器的方法&#xff1a;不是采用单一检测器分析数据集的所有特征&#xff0c;而是构建多个专注于特征子集(即子空间)的检测器系统。 在表格数据的异常检测实践中&#xff0c;我们的目标是识别数据中最为异常的记录&#xff0c;这种异常性可以…

MySQL —— MySQL 程序

目录 前言 一、MySQL 程序简介 二、mysqld -- MySQL 服务器 三、mysql -- MySQL 客户端 1. mysql 客户端简介 2. mysql 客户端选项 &#xff08;1&#xff09;指定选项的方式 &#xff08;2&#xff09;mysql 客户端命令常用选项 &#xff08;3&#xff09;在命令行中使…

Flink CDC 使用实践以及遇到的问题

背景 最近公司在做一些业务上的架构调整&#xff0c;有一部分是数据从mysql采集到Starrocks&#xff0c;之前的一套方法是走 debezium 到 puslar 到 starrocks,这一套下来比较需要配置很多东西&#xff0c;而且出现问题以后&#xff0c;需要修改很多配置&#xff0c;而且现阶段…

数据链路层(三)--点对点通信协议PPP

PPP协议叫做点对点协议&#xff0c;是目前使用的最广泛的数据链路层协议。 1 PPP协议的特点 用户通常需要连接到某个ISP才能接入互联网&#xff0c;PPP协议就是用户计算机和ISP进行通信所使用的数据链路层协议。 1.1 PPP协议应满足的需求 &#xff08;1&#xff09;简单&…

C语言 分支语句(if)

分支语句(if) if语句形式一 适用只有一个分支判断 if(表达式1) //如果 { 语句块1 } if语句形式二 适用有两个分支判断 if(表达式1) //如果 { 语句块1 } else //否则 { 语句块2 } 例:求方程的根 if语句形式三 适用多分支判断 if(表达式1) //多分支 { 语句块1 } else if(表达…

如何将WSL的虚拟机安装到任意目录中

目录 引言 下载安装包 解压安装包 手工安装 结语 引言 WSL默认是将虚拟机安装在C盘的用户目录下&#xff0c;如果长时间使用Windows后&#xff0c;可能C盘的空间就会非常吃紧&#xff0c;所以非常希望把虚拟机安装到C盘以外的目录中。本文就介绍一下相关的工作。 这里只讨…

一款.NET开源的Windows资源管理器标签页工具

前言 今天大姚给大家分享一款基于.NET开发的可以让你在Windows资源管理器中使用Tab多标签功能的小工具&#xff1a;QTTabBar。 工具介绍 QTTabBar是一款基于.NET开发的可以让你在Windows资源管理器中使用Tab多标签功能的小工具。从此以后工作时不再遍布文件夹窗口&#xff0c…

传输控制协议(TCP)

传输控制协议是Internet一个重要的传输层协议。TCP提供面向连接、可靠、有序、字节流传输服务。 1、TCP报文段结构 注&#xff1a;TCP默认采用累积确认机制。 2、三次握手、四次挥手 &#xff08;1&#xff09;当客户向服务器发送完最后一个数据段后&#xff0c;发送一个FIN段…

c++哈希表(原理、实现、开放寻址法)适合新手

c系列哈希的原理及实现&#xff08;上&#xff09; 文章目录 c系列哈希的原理及实现&#xff08;上&#xff09;前言一、哈希的概念二、哈希冲突三、哈希冲突解决3.1、开放寻址法3.2、删除操作3.3、负载因子四、代码实现 总结 前言 红黑树平衡树和哈希有不同的用途。 红黑树、…

服务器数据恢复—raid6阵列硬盘被误重组为raid5阵列的数据恢复案例

服务器存储数据恢复环境&#xff1a; 存储中有一组由12块硬盘组建的RAID6阵列&#xff0c;上层linux操作系统EXT3文件系统&#xff0c;该存储划分3个LUN。 服务器存储故障&分析&#xff1a; 存储中RAID6阵列不可用。为了抢救数据&#xff0c;运维人员使用原始RAID中的部分…

Python酷库之旅-第三方库Pandas(250)

目录 一、用法精讲 1181、pandas.tseries.offsets.BusinessMonthEnd.is_on_offset方法 1181-1、语法 1181-2、参数 1181-3、功能 1181-4、返回值 1181-5、说明 1181-6、用法 1181-6-1、数据准备 1181-6-2、代码示例 1181-6-3、结果输出 1182、pandas.tseries.offse…