Flink SQL 表值聚合函数(Table Aggregate Function)详解

news2025/1/10 3:19:56

使用场景: 表值聚合函数即 UDTAF,这个函数⽬前只能在 Table API 中使⽤,不能在 SQL API 中使⽤。

函数功能:

在 SQL 表达式中,如果想对数据先分组再进⾏聚合取值:

select max(xxx) from source_table group by key1, key2

上⾯ SQL 的 max 语义产出只有⼀条最终结果,如果想取聚合结果最⼤的 n 条数据,并且 n 条数据,每⼀条都要输出⼀次结果数据,上⾯的 SQL 就没有办法实现了。

所以 UDTAF 为了处理这种场景,可以⾃定义 怎么取 , 取多少条 最终的聚合结果,UDTAF 和 UDAF 是类似的。

在这里插入图片描述

案例场景: 有⼀个饮料表有 3 列,分别是 id、name 和 price,⼀共有 5 ⾏,需要找到价格最⾼的两个饮料,类似于 top2,表值聚合函数,需要遍历所有 5 ⾏数据,输出结果为 2 ⾏数据的⼀个表。

开发流程:

实现 TableAggregateFunction 接⼝,其中所有的⽅法必须是 public 的、⾮ static 的

必须实现以下⽅法:

Acc聚合中间结果 createAccumulator() : 为当前 Key 初始化⼀个空的 accumulator,存储了聚合的中间结果,⽐如在执⾏ max() 时会存储每⼀条中间结果的 max 值;

accumulate(Acc accumulator, Input输⼊参数) : 每⼀⾏数据,都会调⽤ accumulate() ⽅法更新 accumulator,⽅法对每⼀条输⼊数据执⾏,⽐如执⾏ max() 时,遍历每⼀条数据执⾏;这个⽅法必须声明为 public 和⾮ static 的,accumulate ⽅法可以重载,每个⽅法的参数类型可以不同,⽀持变⻓参数。

emitValue(Acc accumulator, Collector collector) 或者 emitUpdateWithRetract(Acc accumulator, RetractableCollector collector) :

当所有的数据处理完之后,调⽤ emit ⽅法来计算和输出最终结果,可以⾃定义输出多少条以及怎样输出结果。

对于 emitValue 以及 emitUpdateWithRetract 区别,以 TopN 举例,emitValue 每次都会发送所有的最⼤的 n 个值,⽽这在流式任务中会有性能问题,为提升性能,可以实现 emitUpdateWithRetract ⽅法,这个⽅法在 retract 模式下会增量输出结果,⽐如只在有数据更新时,做到撤回⽼数据,再发送新数据,⽽不需要每次都发出全量的最新数据。

如果同时定义了 emitUpdateWithRetract、emitValue ⽅法,那 emitUpdateWithRetract 会优先于 emitValue ⽅法被使⽤,因为引擎会认为 emitUpdateWithRetract 会更加⾼效,它的输出是增量的。

某些场景下必须实现:

  • retract(Acc accumulator, Input输⼊参数) : 回撤流的场景必须实现,在计算回撤数据时调⽤,如果没有实现则会直接报错。
  • merge(Acc accumulator, Iterable it) : 在批式聚合以及流式聚合中的 Session、Hop 窗⼝聚合场景必须实现,这个⽅法对优化也有帮助,例如,打开了两阶段聚合优化,需要 AggregateFunction 实现 merge ⽅法,从⽽在第⼀阶段先进⾏数据聚合。
  • resetAccumulator() : 在批式聚合中是必须实现的。

关于⼊参、出参数据类型:

默认情况下,⽤户的 Input输⼊参数( accumulate(Acc accumulator, Input输⼊参数) 的⼊参 Input输⼊参数 )、accumulator( Acc聚 合中间结果 createAccumulator() 的返回结果)、 Output输出参数 数据类型( emitValue(Acc acc,Collector<Output输出参数> out) 的 Output输出参数 )会被 Flink 反射获取,但对于accumulator 和 Output输出参数类型来说,Flink SQL 的类型推导在遇到复杂类型的时候可能会推导出错误的结果(注意: Input输⼊参数 因为是上游算⼦传⼊的,所以类型信息是确认的,不会出现推导错误的情况),⽐如那些⾮基本类型 POJO 的复杂类型,所以跟 ScalarFunction 和 TableFunction ⼀样, AggregateFunction 提供了TableAggregateFunction#getResultType() 和 TableAggregateFunction#getAccumulatorType() 来分别指定最终返回值类型和accumulator 的类型,两个函数的返回值类型都是 TypeInformation。

  • getResultType() : 即 emitValue(Acc acc, Collector<Output输出参数> out) 的输出结果数据类型;
  • getAccumulatorType() : 即 Acc聚合中间结果 createAccumulator() 的返回结果数据类型;

案例场景: Top2

定义⼀个 TableAggregateFunction 来计算给定列的最⼤的 2 个值

在 TableEnvironment 中注册函数

在 Table API 查询中使⽤函数(当前只在 Table API 中⽀持 TableAggregateFunction)

实现思路:

计算最⼤的 2 个值,accumulator 需要保存当前的最⼤的 2 个值,定义了类 Top2Accum 作为 accumulator,Flink 的 checkpoint 机制会⾃动保存 accumulator,在失败时进⾏恢复,来保证精确⼀次的语义。

Top2 表值聚合函数(TableAggregateFunction)的 accumulate() ⽅法有两个输⼊,第⼀个是 Top2Accum accumulator,另⼀个是⽤户定义的输⼊:输⼊的值 v,尽管 merge() ⽅法在⼤多数聚合类型中不是必须的,但在样例中提供了它的实现。并且定义了 getResultType() 和 getAccumulatorType() ⽅法。

代码案例:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.util.Collector;

/**
 * 输入数据:
 * a,1
 * a,2
 * a,3
 * 
 * 输出结果:
 * res=>:1> +I[a, 1, 1]
 * res=>:1> -D[a, 1, 1]
 * res=>:1> +I[a, 2, 1]
 * res=>:1> +I[a, 1, 2]
 * res=>:1> -D[a, 2, 1]
 * res=>:1> -D[a, 1, 2]
 * res=>:1> +I[a, 3, 1]
 * res=>:1> +I[a, 2, 2]
 */
public class TableAggregateFunctionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        DataStreamSource<String> source = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<Tuple2<String,Integer>> tpStream = source.map(new MapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String,Integer> map(String input) throws Exception {
                return new Tuple2<>(input.split(",")[0],Integer.parseInt(input.split(",")[1]));
            }
        });

        tEnv.registerFunction("top2", new Top2());

        Table table = tEnv.fromDataStream(tpStream, "key,value");

        tEnv.createTemporaryView("SourceTable", table);

        // 使⽤函数
        Table res = tEnv.from("SourceTable")
                .groupBy("key")
                .flatAggregate("top2(value) as (v, rank)")
                .select("key, v, rank");

        tEnv.toChangelogStream(res).print("res=>");
        env.execute();
    }

    /**
     * Accumulator for Top2.
     */
    public static class Top2Accum {
        public Integer first;
        public Integer second;
    }

    public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> {
        @Override
        public Top2Accum createAccumulator() {
            Top2Accum acc = new Top2Accum();
            acc.first = Integer.MIN_VALUE;
            acc.second = Integer.MIN_VALUE;
            return acc;
        }

        public void accumulate(Top2Accum acc, Integer v) {
            if (v > acc.first) {
                acc.second = acc.first;
                acc.first = v;
            } else if (v > acc.second) {
                acc.second = v;
            }
        }

        public void merge(Top2Accum acc, java.lang.Iterable<Top2Accum> iterable) {
            for (Top2Accum otherAcc : iterable) {
                accumulate(acc, otherAcc.first);
                accumulate(acc, otherAcc.second);
            }
        }

        public void emitValue(Top2Accum acc, Collector<Tuple2<Integer, Integer>> out) {
            // emit the value and rank
            if (acc.first != Integer.MIN_VALUE) {
                out.collect(Tuple2.of(acc.first, 1));
            }
            if (acc.second != Integer.MIN_VALUE) {
                out.collect(Tuple2.of(acc.second, 2));
            }
        }
    }
}

测试结果:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1199174.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

非遗文化展示预约小程序的效果如何

漫漫历史长河&#xff0c;我国积累的各种非遗文化广而多&#xff0c;也有相应的机构整理展示和收录&#xff0c;区域限制下&#xff0c;传统非遗文化内容传播度并不高&#xff0c;实际线下查看了解的人也并不是很多&#xff0c;在实际展示方面也面临着一些难题&#xff1a; 线…

CS224W6.1——介绍图神经网络GNN

之前我们讨论了一些节点嵌入技术&#xff0c;它们可以通过随机游走的过程学习与任务无关的特征。从这篇开始&#xff0c;我们介绍了令人兴奋的图神经网络技术&#xff0c;该技术基于图结构用多层非线性变换对节点特征进行编码。图神经网络在各种任务中表现出非凡的性能&#xf…

【数据库开发】DataX开发环境的安装部署

文章目录 1、简介1.1 DataX简介1.2 DataX功能1.3 支持的数据通道 2、DataX安装配置2.1 DataX2.2 Java2.3 Python2.4 测试 3、DataX Web安装配置3.1 mysql3.2 DataX Web3.2.1 简介3.2.2 架构图3.2.3 依赖环境3.2.4 安装 结语 1、简介 DataX是阿里云DataWorks数据集成的开源版本。…

安装纯净版Linux后的必备设置

目录 一&#xff1a;网络设置 1&#xff0c;设置yum源 2&#xff0c;配置网络 二&#xff1a;samba服务设置 1&#xff0c;安装samba 2&#xff0c;设置samba 3&#xff0c;windows上挂载 三&#xff1a;安装必备的开发软件 1&#xff0c;GCC安装 2&#xff0c;Pyth…

阿里云OSS和腾讯云COS对象存储介绍和简单使用

对象存储指的是一种云存储服务&#xff0c;其主要是将数据以对象的形式存储在云端&#xff0c;并且提供了完全的API调用&#xff0c;这些API包括上传&#xff0c;下载&#xff0c;删除&#xff0c;复制&#xff0c;预览&#xff0c;权限设置等等。OSS对象存储和COS对象存储都是…

分享10个地推拉新和网推拉新app推广接单平台,一手接任务平台

文章首推平台&#xff1a;”聚量推客“ 官方邀请码000000 从事地推、拉新、推广这一类型的工作&#xff0c;是一定要有稳定的一手接单平台的&#xff0c;因为在瞬息万变的拉新推广市场中&#xff0c;很多APP应用的推广拉新存在周期性&#xff0c;有可能这个月还在的拉新项目&a…

【学习辅助】Axure手机时间管理APP原型,告别手机控高保真模板

作品概况 页面数量&#xff1a;共 30 页 兼容软件&#xff1a;Axure RP 9/10&#xff0c;不支持低版本 应用领域&#xff1a;时间管理、系统工具 作品申明&#xff1a;页面内容仅用于功能演示&#xff0c;无实际功能 作品特色 本品为「手机时间管理」APP原型&#xff0c;…

动态规划题解

文章目录 杨辉三角杨辉三角2爬楼梯最小花费爬楼梯斐波那契数列比特位计数不同路径 杨辉三角 var generate function(numRows) {//先定义一个空数组var ret[];//遍历行数for(let i 0;i<numRows;i){var cownew Array(i1).fill(1)//定义行内数组数&#xff0c;有多少numrows&a…

基于LDA主题分析的《老友记》情景喜剧数据集的建模分析(文末送书)

&#x1f935;‍♂️ 个人主页&#xff1a;艾派森的个人主页 ✍&#x1f3fb;作者简介&#xff1a;Python学习者 &#x1f40b; 希望大家多多支持&#xff0c;我们一起进步&#xff01;&#x1f604; 如果文章对你有帮助的话&#xff0c; 欢迎评论 &#x1f4ac;点赞&#x1f4…

Python字符串字母大小写变换

嗨喽~大家好呀&#xff0c;这里是魔王呐 ❤ ~! python更多源码/资料/解答/教程等 点击此处跳转文末名片免费获取 说明&#xff1a; 字符串就是一系列字符&#xff0c;在Python中用引号括起来的都是字符串&#xff0c; 引号可以是单引号&#xff0c;也可以是双引号&#xff0…

发送失败的RocktMQ消息,你遇到过吗?

背景 需要通过flink同时向测试和线上的RocketMQ中写入数据 现象 在程序中分别创建了两个MqProducer&#xff0c;设置了不同的nameServerAddr&#xff0c;分别调用不同的producer向不同环境发消息&#xff0c;返回发送成功&#xff0c;但是在线上MQ中却查不到数据&#xff0…

RK3568平台 在alsa框架中添加音频功放芯片

一.alsa框架概述 ALSA&#xff0c;全称是Advanced Linux Sound Architecture&#xff0c;是Linux中提供声音设备驱动的内核组件&#xff0c;应用可以通过ALSA接口实现音频播放、录音、设备通路控制、音量控制、通话等功能。 在 Linux 内核设备驱动层&#xff0c;ALSA 提供了 …

ThinkPHP图片处理之压缩图片大小,图片处理之图片水印(添加平铺文字水印,并设置文字之间的间距和文字的角度)

安装扩展 使用Composer安装图像处理类库 composer require topthink/think-image在对应的控制器内引入Image use think\Image;图片处理之压缩图片大小 public function upload(){$file request()->file(image);// 将前端传过来的图片移动到项目目录下$info $file->…

BEVFormer 论文阅读

论文链接 BEVFormer BEVFormer&#xff0c;这是一个将Transformer和时间结构应用于自动驾驶的范式&#xff0c;用于从多相机输入中生成鸟瞰&#xff08;BEV&#xff09;特征利用查询来查找空间/时间&#xff0c;并相应地聚合时空信息&#xff0c;从而为感知任务提供更强的表示…

技能培训知识付费服务预约小程序的效果如何

技能、证书往往是很多人生活的基本&#xff0c;行业岗位竞争激烈&#xff0c;每个人都希望有多种技能或工作所需&#xff0c;而需求持续增加下&#xff0c;相关技能培训机构也很多&#xff0c;比如常见的考证、钢琴培训、针灸培训、花艺培训等。 很多行业都需要学习或考证&…

java笔记(一)

一、Java的三大平台 1.Java SE (必学) java语言的标准版&#xff0c;用于桌面开发&#xff0c;是其他两个版本的基础。 桌面应用适合的语言其实是c和C合适&#xff0c;复杂动画等加载时java很慢。 2.Java ME(现在很少用) java语言的小型版本&#xff0c;适用于嵌入式电子设备或…

Pytroch损失函数、反向传播和优化器、Sequential使用

Pytroch_Sequential使用、损失函数、反向传播和优化器 文章目录 nn.Sequential搭建小实战损失函数与反向传播优化器 nn.Sequential nn.Sequential是一个有序的容器&#xff0c;用于搭建神经网络的模块被按照被传入构造器的顺序添加到nn.Sequential()容器中。 import torch.nn …

C++进阶-STL list容器的简单认识

STL list容器的简单认识 list容器基本概念list容器构造函数list容器赋值和交换list容器大小操作list容器插入和删除list容器数据存取list容器反转和排序list排序案例 list容器基本概念 list容器是将数据进行链式存储的容器&#xff0c;链表&#xff08;list&#xff09;是一种…

Windows10 安装 Ubuntu(WSL2)

Windows10 安装 Ubuntu(WSL2)&#xff1a;因为 Ubuntu(WSL1) 不具备调试功能&#xff0c;所以尽可能安装 Ubuntu(WSL2)。 具体流程如下&#xff1a; 1. 什么是WSL Windows Subsystem for Linux&#xff08;简称WSL&#xff09;&#xff0c;Windows下的 Linux 子系统&#xff…

YOLO目标检测——猫狗目标检测数据集下载分享【含对应voc、coco和yolo三种格式标签】

实际项目应用&#xff1a;宠物识别、猫狗分类数据集说明&#xff1a;猫狗分类检测数据集&#xff0c;真实场景的高质量图片数据&#xff0c;数据场景丰富&#xff0c;含有猫和狗图片标签说明&#xff1a;使用lableimg标注软件标注&#xff0c;标注框质量高&#xff0c;含voc(xm…