【Spark精讲】一文讲透SparkSQL聚合过程以及UDAF开发

news2025/1/23 4:02:08

SparkSQL聚合过程

这里的 Partial 方式表示聚合函数的模式,能够支持预先局部聚合,这方面的内容会在下一节详细介绍。 对应实例中的聚合语句,因为 count 函数支持 Partial 方式,因此调用的是 planAggregateWithoutDistinct 方法,生成了图 7.4 中的两个 HashAggregate (聚合执行方式中的一种,后续详细介绍)物理算子树节点,分别进行局部聚合与最终的聚合。 最后,在生成的 SparkPlan 中添加 Exchange 节点,统一排序与分区信息,生成物理执行计划(ExecutedPlan)。

聚合查询在计算聚合值的过程中,通常都需要保存相关的中间计算结果,例如 max 函数需要保存当前最大值, count 函数需要保存当前的数据总数,求平均值的 avg 函数需要同时保存 count 和 sum 的值,更复杂的函数(如 pencent让等)甚至需要临时存储全部的数据 。 聚合查询 计算过程中产生的这些中间结果会临时保存在聚合函数缓冲区。

在 SparkSQL 中,聚合过程有 4种模式,分别是 Partial模式、 ParitialMerge模式、 Final模式 和 Complete模式。

Final模式一般和 Partial模式组合在一起使用。 Partial模式可以看作是局部数据的聚合,在 具体实现中, Partial 模式的聚合函数在执行时会根据读入的原始数据更新对应的聚合缓冲区, 当处理完所有的输入数据后,返回的是聚合缓冲区中的中间数据 。 而 Final模式所起到的作用 是将聚合缓冲区的数据进行合并,然后返回最终的结果。 如下图所示,在最终分组计算总和 之前,可以先进行局部聚合处理,这样能够避免数据传输并减少计算量 。 因此,上述聚合过程 中在 map 阶段的 sum 函数处于 Partial模式,在 reduce 阶段的 sum 函数处于 Final模式 。

Complete模式和上述的 Partial/Final组合方式不一样,不进行局部聚合计算。 下图展示了同样的聚合函数采用 Complete模式的情形。 可以看到,最终阶段直接针对原始输入,中间没有局部聚合过程。 一般来讲, Complete模式应用在不支持Partial模式的聚合函数中。

相比 Partial、 Final和 Complete模式, PartialMerge模式的聚合函数主要是对聚合缓冲区进行合并,但此时仍然不是最终的结果。 ParitialMerge主要应用在 distinct语句中,如下图所示。 聚合语句针对同一张表进行 sum 和 count (distinct)查询,最终的执行过程包含了 4 步聚合操作。 第1步按照(A,C)分组,对 sum函数进行 Partial模式聚合计算;第2步是 PartialMerge模式,对上一步计算之后的聚合缓冲区进行合井,但此时仍然不是最终的结果;第3步分组的列 发生变化,再一次进行 Partial模式的 count计算;第4步完成 Final模式的最终计算。

Hive on Spark与SparkSQL的区别

Hive on Spark是由Cloudera发起,由Intel、MapR等公司共同参与的开源项目,其目的是把Spark作为Hive的一个计算引擎,将Hive的查询作为Spark的任务提交到Spark集群上进行计算。通过该项目,可以提高Hive查询的性能,同时为已经部署了Hive或者Spark的用户提供了更加灵活的选择,从而进一步提高Hive和Spark的普及率。

Hive on Spark是从Hive on MapReduce演进而来,Hive的整体解决方案很不错,但是从查询提交到结果返回需要相当长的时间,查询耗时太长,这个主要原因就是由于Hive原生是基于MapReduce的,那么如果我们不生成MapReduce Job,而是生成Spark Job,就可以充分利用Spark的快速执行能力来缩短HiveQL的响应时间。

Hive on Spark现在是Hive组件(从Hive1.1 release之后)的一部分。

SparkSQL和Hive On Spark都是在Spark上实现SQL的解决方案。Spark早先有Shark项目用来实现SQL层,不过后来推翻重做了,就变成了SparkSQL。这是Spark官方Databricks的项目,Spark项目本身主推的SQL实现。Hive On Spark比SparkSQL稍晚。Hive原本是没有很好支持MapReduce之外的引擎的,而Hive On Tez项目让Hive得以支持和Spark近似的Planning结构(非MapReduce的DAG)。所以在此基础上,Cloudera主导启动了Hive On Spark。这个项目得到了IBM,Intel和MapR的支持(但是没有Databricks)。

结构上Hive On Spark和SparkSQL都是一个翻译层,把一个SQL翻译成分布式可执行的Spark程序。比如一个SQL:

SELECT item_type, sum(price)
FROM item
GROUP item_type;

上面这个SQL脚本交给Hive或者类似的SQL引擎,它会“告诉”计算引擎做如下两个步骤:读取item表,抽出item_type,price这两个字段;对price计算初始的SUM(其实就是每个单独的price作为自己的SUM)因为GROUP BY说需要根据item_type分组,所以设定shuffle的key为item_type从第一组节点分组后分发给聚合节点,让相同的item_type汇总到同一个聚合节点,然后这些节点把每个组的Partial Sum再加在一起,就得到了最后结果。不管是Hive还是SparkSQL大致上都是做了上面这样的工作。

需要理解的是,Hive和SparkSQL都不负责计算,它们只是告诉Spark,你需要这样算那样算,但是本身并不直接参与计算。

Spark UDAF开发

分两种

  1. 无泛型约束的UDAF  extends UserDefinedAggregateFunction  extends Aggregator  dataframe设计的
  2. 有泛型约束的UDAF  extends Aggregator 该UDAF时允许添加泛型,保障函数更加安全。但是这种UDAF不可直接在SQL中被调用运算适用于强类型Datasets。

在Spark中使用
    1.编写UDAF<两种类型的UDAF都可以>
    2. 在spark中注册UDAF,为其绑定一个名字,使用

在Spark SQL 中使用
   1.编写UDAF<使用继承 UserDefinedAggregateFunction 类型编写>
   2. 打Jar包,并上传
   3. 注册临时聚合函数,并使用

ADD  jar TestSpark.jar;
CREATE  TEMPORARY FUNCTION  mean_my AS  'com.test.structure.udaf.MeanMy';
select t1.data,mean_my(t1.age)
from (
select 33 as age,  '1' as data 
union all 
select 55  as age, '1' as data 
union all 
select 66 as age, '2' as data
)t1
group by   t1.data;

自定义UDAF类

import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.ArrayList;
import java.util.List;

 public class MeanFloatUDAF extends UserDefinedAggregateFunction {
  /**
  * 聚合函数的输入数据结构
  *   函数的参数列表,不过需要写成StructType的格式
  */
 @Override
 public StructType inputSchema() {
    List<StructField> structFields = new ArrayList<>();
    structFields.add(DataTypes.createStructField( "field_nm", DataTypes.DoubleType, true ));
    return DataTypes.createStructType( structFields );
}

/**
 * 聚缓存区数据结构 - 产生中间结果的数据类型
 * 如果是求平均数,存储总和以及计数,总和及计数就是中间结果
 * count    buffer.getInt(0)
 * sum_field   buffer.getDouble(1)
 */
@Override
public StructType bufferSchema() {
    List<StructField> structFields = new ArrayList<>();
    structFields.add(DataTypes.createStructField( "count", DataTypes.IntegerType, true ));
    structFields.add(DataTypes.createStructField( "sum_field", DataTypes.DoubleType, true ));
    return DataTypes.createStructType( structFields );
}

/**
 * 聚合函数返回值数据结构
 */
@Override
public DataType dataType() {
    return DataTypes.DoubleType;
}

/**
 * 聚合函数是否是幂等的,即相同输入是否总是能得到相同输出
 */
@Override
public boolean deterministic() {
    return true;
}

/**
 * 初始化缓冲区
 * buffer是中间结果,是Row类的子类
 */
@Override
public void initialize(MutableAggregationBuffer buffer) {
    //相加的初始值,这里的要和上边的中间结果的类型和位置相对应 - buffer.getInt(0)
    buffer.update(0,0);
    //参与运算数字个数的初始值
    buffer.update(1,Double.valueOf(0.0) );
}

/**
 *  给聚合函数传入一条新数据进行处理
 *  //每有一条数据参与运算就更新一下中间结果(update相当于在每一个分区中的计算)
 *  buffer里面存放着累计的执行结果,input是当前的执行结果
 */
@Override
public void update(MutableAggregationBuffer buffer, Row input) {
    //个数加1
    buffer.update(0,buffer.getInt(0)+1);
    //每有一个数字参与运算就进行相加(包含中间结果)
    buffer.update(1,buffer.getDouble(1)+Double.valueOf(input.getDouble(0)));
}

/**
 *  合并聚合函数缓冲区   //全局聚合
 */
@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
    buffer1.update(0,buffer1.getInt(0)+buffer2.getInt(0));
    buffer1.update(1,buffer1.getDouble(1)+buffer2.getDouble(1));
}

/**
 * 计算最终结果
 */
@Override
public Object evaluate(Row buffer) {
    return buffer.getDouble(1)/buffer.getInt(0);
}

使用自定义UDAF

//在Spark中使用  extends UserDefinedAggregateFunction类型的UDAF的使用

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.Arrays;
import java.util.List;

public class MeanUDAFMain {
public static void main(String[] args){
    try {
        SparkSession spark = SparkSession
                .builder()
                .appName("Java Spark SQL data sources example")
                .config("spark.some.config.option", "some-value")
                .master("local[2]")
                .getOrCreate();
        List<Row> dataExample = Arrays.asList(
                RowFactory.create( "2019-0801", 4,9.2),
                RowFactory.create( "2020-0802", 3,8.6),
                RowFactory.create( "2021-0803",2,5.5),
                RowFactory.create( "2021-0803",2,5.5),
                RowFactory.create( "2021-0803",7,4.5)
        );
        StructType schema = new StructType(new StructField[]{
                 new StructField("date", DataTypes.StringType, false, Metadata.empty()),
                new StructField("dist_mem", DataTypes.IntegerType, false, Metadata.empty()),
                new StructField("dm_mem", DataTypes.DoubleType, false, Metadata.empty())
        });
        Dataset<Row> itemsDF = spark.createDataFrame(dataExample, schema);
        itemsDF.printSchema();
        itemsDF.createOrReplaceTempView("test_mean_table");
        // 注册自定义聚合函数 -2. 在spark中注册UDAF,为其绑定一个名字
        spark.udf().register("mymean",new MeanFloatUDAF ());
        spark.sql("select dist_mem  from test_mean_table").show();
        spark.sql("select date,mymean(dm_mem) memdoubleMean from test_mean_table group by date").show();
    } catch (Exception e) {
        e.printStackTrace();
    }
}
}

Hive UDAF开发

UDF、UDAF、UDTF需要实现的方法

类型方法
UDF

类:

GenericUDF


包路径:
org.apache.hadoop.hive.ql.udf.generic

initialize:类型检查,返回结果类型
入参:ObjectInspector[]
出参:ObjectInspector
 

evaluate:功能逻辑实现

入参:DeferredObject[]

出参:Object

getDisplayString:函数名称
入参:String[]

出参:String
 

close:关闭函数,释放资源等
入参:无

出参:void

UDTF

类:
GenericUDTF

包路径:
org.apache.hadoop.hive.ql.udf.generic

initialize:类型检查,返回结果类型
入参:StructObjectInspector
出参:StructObjectInspector

process:功能逻辑实现
**调用forward输出一行数据,可多次调用

入参:Object[]

出参:void

close:关闭函数,释放资源等
入参:无

出参:void

UDAF

类:
AbstractGenericUDAFResolver

包路径:
org.apache.hadoop.hive.ql.udf.generic

类:
GenericUDAFEvaluator

包路径:
org.apache.hadoop.hive.ql.udf.generic
 


类:

AbstractAggregationBuffer

包路径:
org.apache.hadoop.hive.ql.udf.generic

-----AbstractGenericUDAFResolver-----

getEvaluator:获取计算器
入参:TypeInfo[]
出参:GenericUDAFEvaluator

---------GenericUDAFEvaluator----------

init:
入参:Mode,ObjectInspector[]
出参:ObjectInspector

getNewAggregationBuffer:

入参:无

出参:AggregationBuffer

reset:

入参:AggregationBuffer

出参:void

iterate:

入参:AggregationBuffer,Object[]

出参:void

merge:

入参:AggregationBuffer,Object

出参:void


terminate:

入参:AggregationBuffer

出参:Object

terminatePartial:

入参:AggregationBuffer

出参:Object

--------AbstractAggregationBuffer-------
estimate:评估内存占用大小

入参:无

出参:int

UDAF说明

一个Buffer作为中间处理数据的缓冲:获取getNewAggregationBuffer、重置reset
四个模式(Mode):

  1. PARTIAL1:
    from original data to partial aggregation data:
    iterate() and terminatePartial() will be called.
  2. PARTIAL2:
    from partial aggregation data to partial aggregation data:
    merge() and terminatePartial() will be called.
  3. FINAL:
    from partial aggregation to full aggregation:
    merge() and terminate() will be called.
  4. COMPLETE:
    from original data directly to full aggregation:
    iterate() and terminate() will be called.

五个方法

  1. 初始化init
  2. 遍历iterate:PARTIAL1和COMPLETE阶段
  3. 合并merge:PARTIAL2和FINAL阶段
  4. 终止terminatePartial:PARTIAL1和PARTIAL2阶段
  5. terminate:COMPLETE和FINAL阶段

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1344162.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Nacos注册

一、简介 Nacos是阿里云开源的一个服务发现、配置管理和服务鉴权平台&#xff0c;它提供了一种更简单、更便捷、更开放的方式来管理服务&#xff0c;帮助开发者快速实现服务的发现、配置的管理、服务的鉴权等功能。Nacos可以帮助开发者轻松管理微服务应用中的服务提供者、服务…

Ubuntu22.04系统安装软件、显卡驱动、cuda、cudnn、pytorch

Ubuntu22.04系统安装软件、显卡驱动、cuda、cudnn、pytorch 安装 Nvidia 显卡驱动安装 CUDA安装 cuDNN安装 VSCode安装 Anaconda 并更换源在虚拟环境中安装 GPU 版本的 PyTorchReference 这篇博文主要介绍的是 Ubuntu22.04 系统中软件、显卡驱动、cuda、cudnn、pytorch 等软件和…

系统启动流程 - 理解modules加载流程

​编辑 Hacker_Albert    202 linux 启动流程module加载 1.启动过程分为三个部分 BIOS 上电自检&#xff08;POST&#xff09;引导装载程序 (GRUB2)内核初始化启动 systemd&#xff0c;其是所有进程之父。 1.1.BIOS 上电自检&#xff08;POST&#xff09; BIOS stands for…

杰发科技AC7840——EEPROM初探

0.序 7840和7801的模拟EEPROM使用不太一样 1.现象 按照官方Demo&#xff0c;在这样的配置下&#xff0c;我们看到存储是这样的&#xff08;连续三个数字1 2 3&#xff09;。 使用串口工具的多帧发送功能 看不出多少规律 修改代码后 发现如下规律&#xff1a; 前四个字节是…

VSCode中的注释标签

2023年12月30日&#xff0c;周六上午 在软件开发中&#xff0c;开发者会使用这些标签来提供关于代码功能、版本信息、作者、API使用说明等方面的额外信息。 这些标签的含义通常是&#xff1a; apiNote: 提供有关API使用的注释或说明。author: 标识代码作者的信息。category: …

【 ATU NXP-SBC 系列 】FS26XX GUI_OTP烧录与模拟操作

1. 概述 FS26XX 为了其安全性需求&#xff0c;针对重要暂存器的配置&#xff0c;使用 one time program 的功能&#xff0c;避免不小心修改重要暂存器&#xff0c;导致发生重大意外&#xff0c;使系统丧失功能安全性。FS26XX 也可以让使用者先测试 OTP 后的结果功能&#xff0…

Python:将print内容写入文件

简介&#xff1a;print函数是Python中使用频率非常非常高的函数&#xff0c;其包含四个参数&#xff1a;sep、end、file、flush。 历史攻略&#xff1a; Python基础&#xff1a;输入、输出 Python&#xff1a;将控制台输出保存成文件 参数解析&#xff1a; print()函数可以…

简单了解SQL堆叠注入与二次注入(基于sqllabs演示)

1、堆叠注入 使用分号 ; 成堆的执行sql语句 以sqllabs-less-38为例 ?id1 简单测试发现闭合点为单引号 ?id1 order by 3 ?id1 order by 4使用order by探测发现只有三列&#xff08;字段数&#xff09; 尝试简单的联合注入查询 ?id-1 union select 1,database(),user()-…

爬虫工作量由小到大的思维转变---<第三十五章 Scrapy 的scrapyd+Gerapy 部署爬虫项目>

前言: 项目框架没有问题大家布好了的话,接着我们就开始部署scrapy项目(没搭好架子的话,看我上文爬虫工作量由小到大的思维转变---&#xff1c;第三十四章 Scrapy 的部署scrapydGerapy&#xff1e;-CSDN博客) 正文: 1.创建主机: 首先gerapy的架子,就相当于部署服务器上的;所以…

[mysql 基于C++实现数据库连接池 连接池的使用] 持续更新中

目背景 常见的MySQL、Oracle、SQLServer等数据库都是基于C/S架构设计的&#xff0c;即&#xff08;客户端/服务器&#xff09;架构&#xff0c;也就是说我们对数据库的操作相当于一个客户端&#xff0c;这个客户端使用既定的API把SQL语句通过网络发送给服务器端&#xff0c;MyS…

【Bootstrap学习 day4】

Bootstrap5 列表组 使用Bootstrap创建列表 可以创建三种不类型的HTML列表&#xff1a; 无序列表—顺序无关紧要的项目列表。无序列表中的列表标有项目符号&#xff0c;例如。、等ul>li有序列表—顺序确实很重要的项目列表。有序列表中的列表项用数字标记&#xff0c;例如1、…

欧洲十大跨境电商平台,自养号测评下单的重要性及优势

在欧洲站&#xff0c;用户体量非常庞大&#xff0c;这与近几年人们的消费习惯密不可分&#xff0c;越来越多的人开始网购&#xff0c;据欧盟委员的最新调研显示&#xff0c;在欧盟&#xff0c;近一半(42%)的中小企业通过在线市场销售产品和服务。 所以&#xff0c;逸居海外给大…

Grafana无法发送告警消息的飞书webhook(机器人)

1.问题描述 Grafana无法向飞书机器人发送报警消息&#xff0c;实测使用Grafana自带的webhook也不好使&#xff0c;对于用飞书办公的程序猿非常不便&#xff0c;后来发现一个报警神器&#xff0c;开源免费&#xff0c;关键是好用 PrometheusAlert 2.PrometheusAlert安装 Prom…

ansible_角色的使用

本章主要介绍ansible中角色的使用 了解什么是角色独立地写一个角色使用角色系统自带角色地使用 1.了解角色 正常情况下&#xff0c;配置一个服务如 apache时&#xff0c;要做一系列的操作:安装、拷贝、启动服务等。如果要在不同的机器上重复配置此服务&#xff0c;需要重新执…

企业私有云容器化架构

什么是虚拟化: 虚拟化&#xff08;Virtualization&#xff09;技术最早出现在 20 世纪 60 年代的 IBM 大型机系统&#xff0c;在70年代的 System 370 系列中逐渐流行起来&#xff0c;这些机器通过一种叫虚拟机监控器&#xff08;Virtual Machine Monitor&#xff0c;VMM&#x…

IC入门必备!数字IC中后端设计实现全流程解析(1.3万字长文)

吾爱IC社区自2018年2月份开始在公众号上开始分享数字IC后端设计实现相关基础理论和实战项目经验&#xff0c;累计输出文字超1000万字。全部是小编一个个字敲出来的&#xff0c;绝对没有复制粘贴的情况&#xff0c;此处小编自己得给自己鼓鼓掌鼓励下自己。人生不要给自己设限&am…

【华为数据之道学习笔记】7-5通过感知能力推进企业业务数字化

感知数据在华为信息架构中的位置 感知可以应用于广泛的物理世界和数字世界&#xff0c;感知范围可以从人、物、作业、地点扩展到复杂环境。成熟的用例倾向于以物和人为中心。而在企业中&#xff0c;只有将感知数据纳入整体的数据体系中&#xff0c;才能发挥感知数据的价值。 华…

Java核心技术卷接口的实现与继承多态知识梳理总结

Java核心技术卷接口的实现与继承多态知识梳理总结 接口的概念 在Java程序设计语言中&#xff0c;接口不是类&#xff0c;而是对希望符合这个接口的类的一组需求。 form&#xff1a; Java核心技术卷 I&#xff08;原书第11版&#xff09; 基础知识 by 凯 S.霍斯特曼 在Java中&a…

园林机械部件自动化三维测量检测形位公差-CASAIM自动化三维检测工作站

随着园林机械的广泛应用&#xff0c;对其机械部件的精确测量需求也日益增加。传统的测量方法不仅效率低下&#xff0c;而且精度难以保证&#xff0c;因此&#xff0c;自动化三维测量技术成为了解决这一问题的有效途径。本文将重点介绍CASAIM自动化三维检测工作站在园林机械部件…

线性代数笔记1 12.30

学习视频&#xff1a; 1.4 行列式的计算&#xff08;一&#xff09;_哔哩哔哩_bilibili 以下内容&#xff0c;包含&#xff1a; 二阶三阶行列式 n阶行列式 行列式的性质 行列式按行展开