FlinkSQL聚合函数(Aggregate Function)详解

news2025/1/10 23:33:39

使用场景: 聚合函数即 UDAF,常⽤于进多条数据,出⼀条数据的场景。

在这里插入图片描述

上图展示了⼀个 聚合函数的例⼦ 以及 聚合函数包含的重要⽅法

案例场景:

关于饮料的表,有三个字段,分别是 id、name、price,表⾥有 5 ⾏数据,找到所有饮料⾥最贵的饮料的价格,即执⾏⼀个 max() 聚合拿到结果,遍历所有 5 ⾏数据,最终结果就只有⼀个数值。

开发流程:

实现 AggregateFunction 接⼝,其中所有的⽅法必须是 public 的、⾮ static 的;

必须实现以下⽅法:

  • Acc聚合中间结果 createAccumulator() : 为当前 Key 初始化⼀个空的 accumulator,其存储了聚合的中间结果,⽐如在执⾏ max() 时会存储当前的 max 值;
  • accumulate(Acc accumulator, Input输⼊参数) : 每⼀⾏数据,调⽤ accumulate() ⽅法更新 accumulator,处理每⼀条输⼊数据,方法必须声明为 public 和⾮ static 的,accumulate ⽅法可以重载,⽅法的参数类型可以不同,并且⽀持变⻓参数。
  • Output输出参数 getValue(Acc accumulator) : 通过调⽤ getValue ⽅法来计算和返回最终的结果。

某些场景下必须实现:

  • retract(Acc accumulator, Input输⼊参数) : 在回撤流的场景下必须实现,在计算回撤数据时调⽤,如果没有实现会直接报错。
  • merge(Acc accumulator, Iterable it) : 在批式聚合以及流式聚合中的 Session、Hop 窗⼝聚合场景下必须要实现,此外,这个⽅法对于优化也有帮助,例如,打开了两阶段聚合优化,需要 AggregateFunction 实现 merge ⽅法,在数据 shuffle 前先进⾏⼀次聚合计算。
  • resetAccumulator() : 在批式聚合中是必须实现的。

关于⼊参、出参数据类型信息的⽅法:

默认情况下,⽤户的 Input 输⼊参数( accumulate(Acc accumulator, Input输⼊参数) 的⼊参 Input输⼊参数 )、accumulator( Acc聚合中间结果 createAccumulator() 的返回结果)、 Output输出参数数据类型( Output输出参数 getValue(Acc accumulator) 的 Output输出参数 )会被 Flink 使⽤反射获取到。

对于 accumulator 和 Output 输出参数类型,Flink SQL 的类型推导在遇到复杂类型时会推导出错误的结果(注意:Input输⼊参数 因为是上游算⼦传⼊的,类型信息是确认的,不会出现推导错误),⽐如⾮基本类型 POJO 的复杂类型。

同 ScalarFunction 和 TableFunction, AggregateFunction 提供了 AggregateFunction#getResultType() 和AggregateFunction#getAccumulatorType() 指定最终返回值类型和 accumulator 的类型,两个函数的返回值类型是TypeInformation。

  • getResultType() : 即 Output 输出参数 getValue(Acc accumulator) 的输出结果数据类型;
  • getAccumulatorType() : 即 Acc聚合中间结果 createAccumulator() 的返回结果数据类型。

案例: 加权平均值

  • 定义⼀个聚合函数来计算某⼀列的加权平均
  • 在 TableEnvironment 中注册函数
  • 在查询中使⽤函数

实现思路:

为了计算加权平均值,accumulator 需要存储加权总和以及数据的条数,定义了类 WeightedAvgAccumulator 作为 accumulator,Flink 的 checkpoint 机制会⾃动保存 accumulator,在失败时进⾏恢复,保证精确⼀次的语义。

WeightedAvg(聚合函数)的 accumulate ⽅法有三个输⼊参数,第⼀个是 WeightedAvgAccum accumulator,另外两个是⽤户⾃定义的输⼊:输⼊的值 ivalue 和 输⼊的权重 iweight,尽管 retract()、merge()、resetAccumulator() ⽅法在⼤多数聚合类型中都不是必须实现的,但在样例中提供了他们的实现,并且定义了 getResultType() 和 getAccumulatorType()。

代码案例:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;

import java.io.Serializable;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

/**
 * 输入数据:
 * a,1,1
 * a,10,2
 *
 * 输出结果:
 * res1=>:1> +I[a, 1.0]
 * res2=>:1> +I[a, 1.0]
 * res3=>:1> +I[a, 1.0]
 *
 * res1=>:1> -U[a, 1.0]
 * res1=>:1> +U[a, 7.0]
 * res3=>:1> -U[a, 1.0]
 * res3=>:1> +U[a, 7.0]
 * res2=>:1> -U[a, 1.0]
 * res2=>:1> +U[a, 7.0]
 */
public class AggregateFunctionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        DataStreamSource<String> source = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<Tuple3<String, Double, Double>> tpStream = source.map(new MapFunction<String, Tuple3<String, Double, Double>>() {
            @Override
            public Tuple3<String, Double, Double> map(String input) throws Exception {
                return new Tuple3<>(input.split(",")[0],
                        Double.parseDouble(input.split(",")[1]),
                        Double.parseDouble(input.split(",")[2]));
            }
        });

        Table table = tEnv.fromDataStream(tpStream, "field,iValue,iWeight");

        tEnv.createTemporaryView("SourceTable", table);

        Table res1 = tEnv.from("SourceTable")
                .groupBy($("field"))
                .select($("field"), call(WeightedAvg.class, $("iValue"), $("iWeight")));

        // 注册函数
        tEnv.createTemporarySystemFunction("WeightedAvg", WeightedAvg.class);

        // Table API 调⽤函数
        Table res2 = tEnv.from("SourceTable")
                .groupBy($("field"))
                .select($("field"), call("WeightedAvg", $("iValue"), $("iWeight")));

        // SQL API 调⽤函数
        Table res3 = tEnv.sqlQuery("SELECT field, WeightedAvg(`iValue`, iWeight) FROM SourceTable GROUP BY field");

        tEnv.toChangelogStream(res1).print("res1=>");
        tEnv.toChangelogStream(res2).print("res2=>");
        tEnv.toChangelogStream(res3).print("res3=>");

        env.execute();

    }

    // ⾃定义⼀个计算权重 avg 的 accmulator
    public static class WeightedAvgAccumulator implements Serializable {
        public Double sum = 0.0;
        public Double count = 0.0;
    }

    // 输⼊:Long iValue, Integer iWeight
    public static class WeightedAvg extends AggregateFunction<Double, WeightedAvgAccumulator> {
        // 创建⼀个 accumulator
        @Override
        public WeightedAvgAccumulator createAccumulator() {
            return new WeightedAvgAccumulator();
        }

        public void accumulate(WeightedAvgAccumulator acc, Double iValue, Double iWeight) {
            acc.sum += iValue * iWeight;
            acc.count += iWeight;
        }

        public void retract(WeightedAvgAccumulator acc, Double iValue, Double iWeight) {
            acc.sum -= iValue * iWeight;
            acc.count -= iWeight;
        }

        // 获取返回结果
        @Override
        public Double getValue(WeightedAvgAccumulator acc) {
            if (acc.count == 0) {
                return null;
            } else {
                return acc.sum / acc.count;
            }
        }

        // Session window 使⽤这个⽅法将⼏个单独窗⼝的结果合并
        public void merge(WeightedAvgAccumulator acc, Iterable<WeightedAvgAccumulator> it) {
            for (WeightedAvgAccumulator a : it) {
                acc.count += a.count;
                acc.sum += a.sum;
            }
        }

        public void resetAccumulator(WeightedAvgAccumulator acc) {
            acc.count = 0.0;
            acc.sum = 0.0;
        }
    }
}

测试结果:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1199050.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

录制GIF图,动态图

软件下载链接&#xff1a; https://www.cockos.com/licecap/ 参考链接&#xff1a; https://chat.xutongbao.top/

<C++> stack queue模拟实现

目录 前言 一、stack的使用 1. 接口说明 2. 例题 二、模拟实现stack 三、queue的使用 四、模拟实现queue 五、deque 总结 前言 LIFO stack 1. 栈是一种容器适配器&#xff0c;专门设计用于在后进先出上下文&#xff08;后进先出&#xff09;中运行&#xff0c;其中元素仅从容器…

Linux之基本指令操作

1、whoami whoami&#xff1a;查看当前账号是谁 2、who who&#xff1a;查看当前我的系统当中有哪些用户&#xff0c;当前有哪些人登录了我的机器 3、 pwd pwd&#xff1a;查看我当前所处的目录&#xff0c;就好比Windows下的路径 4、ls ls&#xff1a;查看当前目录下的文件信…

搭建Docker

一、概念 云服务器大家肯定不陌生了&#xff0c;相比较传统物理服务器来说他的价格&#xff0c;个性化的配置服务&#xff0c;节省了很多的运维成本&#xff0c;越来越多的企业以及个人开发者更加的青睐于云服务器。有了属于自己的服务器就可以部署搭建自己个人网站了&#xf…

【博士每天一篇文献-模型】A mechanistic model of connector hubs, modularity and cognition

阅读时间&#xff1a;2023-11-10 1 介绍 年份&#xff1a;2018 作者&#xff1a;Maxwell A. Bertolero, B. T. Thomas Yeo 期刊&#xff1a; nature human behaviour 引用量&#xff1a;180 2 创新点 作者提出了一个机制模型&#xff0c;解释了连接中枢的功能以及其对认知表…

深度学习1【吴恩达】

视频链接&#xff1a;1.5 关于这门课_哔哩哔哩_bilibilihttps://www.bilibili.com/video/BV1FT4y1E74V?p5&spm_id_frompageDriver&vd_source3b6cdacf9e8cb3171856fe2c07acf498 视频中吴恩达老师所有的话语收录&#xff1a; 机器学习初学者-AI入门的宝典 (ai-start.c…

知识蒸馏概述及开源项目推荐

文章目录 1.介绍2.知识2.1 基于响应的知识&#xff08;response-based&#xff09;2.2 基于特征的知识(feature-based)2.3 基于关系的知识(relation-based) 3.蒸馏机制3.1 离线蒸馏3.2 在线蒸馏3.3 自蒸馏 4.教师-学生架构5.蒸馏算法5.1 对抗性蒸馏&#xff08;Adversarial Dis…

计算机组成原理第四章(存储系统)(一)

一、存储器概述 1.分类&#xff1a; 存取方式&#xff1a;随机存储器&#xff08;RAM&#xff09;、顺序存储器&#xff08;SAM&#xff09;、直接存储器&#xff08;DAM&#xff09; 存储介质&#xff1a;磁性材料存储器、半导体存储器、光存储器 功能和存取速度&#xff1a; …

docker stop slow 解决

验证 NanoMQ stop slow 的问题 daemon 和非 daemon 两种方式 docker stop 都很慢 疑问是默认情况下&#xff0c;SIGTERM 会被处理。 模拟 docker 内发送 SIGTERM 信号 # The default signal for kill is TERM # pkill will send the specified signal (by defau…

常微分方程

什么是常微分方程&#xff1a; 未知函数为单变量(一元)函数 例1 设有温度为100摄氏度的物体放置在20摄氏度的空气冷却&#xff0c;求物体温度随时间 变化 的规律。 解&#xff1a;设t时刻物体温度为T 对两边求共积分 设比例系数为k>0 令C, 微分方程&#xff1a; 联系着…

算法导论笔记4:散列数 hash

一 了解一些散列的基本概念&#xff0c;仅从文字角度&#xff0c;整理了最基础的定义。 发现一本书&#xff0c;《算法图解》&#xff0c;微信读书APP可读&#xff0c;有图&#xff0c;并且是科普性质的读物&#xff0c;用的比喻很生活化&#xff0c;可以与《算法导论》合并起…

论文笔记--Baichuan 2: Open Large-scale Language Models

论文笔记--Baichuan 2: Open Large-scale Language Models 1. 文章简介2. 文章概括3 文章重点技术3.1 预训练3.1.1 预训练数据3.1.2 模型架构 3.2 对齐3.2.1 SFT3.2.2 Reward Model(RM)3.2.3 PPO 3.3 安全性 4. 文章亮点5. 原文传送门 1. 文章简介 标题&#xff1a;Baichuan 2…

专用博客模板

【点我-这里送书】 本人详解 作者&#xff1a;王文峰&#xff0c;参加过 CSDN 2020年度博客之星&#xff0c;《Java王大师王天师》 公众号&#xff1a;JAVA开发王大师&#xff0c;专注于天道酬勤的 Java 开发问题中国国学、传统文化和代码爱好者的程序人生&#xff0c;期待你的…

用excel计算一个矩阵的逆矩阵

假设我们的原矩阵是一个3*3的矩阵&#xff1a; 125346789 我们现在要求该矩阵的逆矩阵&#xff1a; 鼠标点到其它空白的地方&#xff0c;用来存放计算结果&#xff1a; 插入-》函数&#xff1a; 选择MINVERSE函数&#xff0c;这个就是求逆矩阵的函数&#xff1a; 点击“继续…

[C国演义] 第十八章

第十八章 最长斐波那契子序列的长度最长等差数列等差序列划分II - 子序列 最长斐波那契子序列的长度 力扣链接 子序列 ⇒ dp[i] — — 以 arr[i] 结尾的所有子序列中, 斐波那契子序列的最长长度子序列 ⇒ 状态转移方程 — — 根据最后一个位置的组成来划分 初始化 — — 根…

开发知识点-Vue-Electron

Electron ElectronVue打包.exe桌面程序 ElectronVue打包.exe桌面程序 为了不报错 卸载以前的脚手架 npm uninstall -g vue-cli安装最新版脚手架 cnpm install -g vue/cli创建一个 vue 随便起个名 vue create electron-vue-example (随便起个名字electron-vue-example)进入 创建…

PDF文件编辑器有哪些?10 个适用的PDF 编辑器推荐!

PDF 编辑器始终配备简单的界面以及高效管理工作所需的所有功能。 那么如何选择合适的版本呢&#xff1f;在线工具还是离线工具更好&#xff1f; 为了帮助您回答这些问题&#xff0c;我将通过多次深入的测试来详细回顾十大免费编辑器。现在让我们来探索一下吧&#xff01; 10 …

扭矩传感器信号模拟地、数据地与电源地

在电子电路中&#xff0c;电源地、信号地、数字地和模拟地都是不同的地&#xff08;ground&#xff09;节点&#xff0c;它们在电路中有不同的作用。 电源地&#xff08;Power Ground&#xff09;是指用于连接电源电源回路的地节点。在大多数电子设备中&#xff0c;电源地通常是…

算法导论笔记5:贪心算法

P216 第15章动态规划 最优子结构 具有它可能意味着适合应用贪心策略 动态规划(Dynamic Programming)算法的核心思想是:将大问题划分为小问题进行解决,从而一步步获取最优解的处理算法。 剪切-粘贴技术证明 每个子问题的解就是它本身的最优解&#xff08;利用反证法&#xff0…

模板初阶 C++

目录 泛型编程 函数模板 概念 格式 原理 函数模板的实例化 类模板 格式 类模板的实例化 泛型编程 当我们要实现一个交换函数&#xff0c;我们可以利用函数重载实现&#xff0c;但是有几个不好的地方 1.函数重载仅仅是类型不同&#xff0c;代码复用率较低&#xff0c;只…