【大数据学习 | Spark-SQL】定义UDF和DUAF,UDTF函数

news2025/1/9 19:07:01

1. UDF函数(用户自定义函数)

一般指的是用户自己定义的单行函数。一进一出,函数接受的是一行中的一个或者多个字段值,返回一个值。比如MySQL中的,日期相关的dateDiff函数,字符串相关的substring函数。

先准备数据:

1.1 导入必要的包

首先,确保导入必要的Spark包:

import org.apache.spark.sql.SparkSession

1.2 创建SparkSession

创建一个SparkSession对象,这是与Spark交互的入口。

1.3 定义UDF并注册到SparkSQL

定义一个Scala函数,并将其注册为UDF。示例

1.4 使用UDF在SQL查询中:

调用udf的register方法,第一个参数是udf函数的函数名,第二个参数是要注册为UDF的函数。

session.udf.register("all_income",(sal:Int,bonus:Int)=>{
      sal*12 + bonus
    })

1.5 代码:

尽量使用SparkSQL的sql形式的写法,api写法太麻烦了。

object TestUDF{
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder().master("local[*]").appName("testUDF").getOrCreate()
    import session.implicits._
    val df = session.sparkContext.textFile("D:\\software\\Spark\\SparkProgram1\\atguigu-classes\\data\\a.txt")
      .map(t => {
        val strs = t.split(" ")
        (strs(0), strs(1), strs(2).toInt, strs(3).toInt)
      }).toDF("id", "name", "salary", "bonus")

    session.udf.register("all_income",(sal:Int,bonus:Int)=>{
      sal*12 + bonus
    })

    import org.apache.spark.sql.functions
//    df.withColumn("all",functions.callUDF("all_income",$"salary",$"bonus"))
//      .select("id","name","all")
//      .show()
        df.createTempView("salary")
        session.sql(
          """
            |select id,name,all_income(salary,bonus) all from salary
            |""".stripMargin)
          .show()
  }
}

输出:

2. UDAF(用户自定义的聚合函数)

指的是用户自定义的聚合函数,多进一出,比如MySQL中的,count函数,avg函数。

以学生信息为主进行统计,所有人员的年龄的总和

或者每个性别的年龄的平均值

计算所有人的年龄之和:

package com.atguigu.bigdata.test

import org.apache.spark.sql.{Encoder, Encoders, SparkSession, functions}
import org.apache.spark.sql.expressions.Aggregator

/**
 * ClassName : TestUDAF
 * Package : com.atguigu.bigdata.test
 * Description
 *
 * @Author HeXua
 * @Create 2024/11/29 19:09
 *         Version 1.0
 */
object TestUDAF {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder().appName("test udaf").master("local[*]").getOrCreate()
    import session.implicits._
    val df = session.sparkContext.textFile("D:\\software\\Spark\\SparkProgram1\\atguigu-classes\\data\\a.txt")
      .map(t => {
        val strs = t.split(" ")
        (strs(0), strs(1), strs(2).toInt, strs(3))
      }).toDF("id", "name", "age", "gender")
    import org.apache.spark.sql.functions._
    // 注册udaf函数
    session.udf.register("mysum",udaf(new MySum))

    df.createTempView("student")
    session.sql(
      """
        |select mysum(age) from student
        |""".stripMargin)
      .show()
  }
}
// udaf的类继承Aggregator抽象类
class MySum extends Aggregator[Int,Int,Int]{
  //初始化
  def zero: Int = 0
  //聚合逻辑
  def reduce(b: Int, a: Int): Int = a+b
  //整体聚合
  def merge(b1: Int, b2: Int): Int = b1+b2
  //最终返回值
  def finish(reduction: Int): Int = reduction
  //累加值的类型
  def bufferEncoder: Encoder[Int] = Encoders.scalaInt
  //输出结果的类型
  def outputEncoder: Encoder[Int] = Encoders.scalaInt
}

定义用户自定义聚合函数时,继承Aggregator类需要指定三个泛型参数。这三个泛型参数分别代表不同的概念。

泛型参数解释:

1. 输入类型(IN)

这是聚合函数的输入类型,即每次调用reduce方法时传入的单个元素的类型。例如你要计算一组整数的平均值,输入类型就是int。

2. 缓冲区类型(BUFFER)

这是聚合函数的中间状态类型,也称为缓冲区类型。

例如你要计算一组整数的平均值,缓冲区可能包含两个字段:总和和计数,因为iBUF可能是一个元组。

3. 输出类型(OUT)

这是聚合函数的最终输出类型,即finish方法返回的类型。例如你要计算平均值,最终输出类型是Double。

方法解释:

zero:初始化缓冲区的值,对于平均值计算,初始化和计数都是0。

reduce:更新缓冲区,每次传入一个新的输入值时,更新总和和计数。

finish:计算最终结果,根据缓冲区中的总和和计数,计算平均值。

bufferEncoder:定义缓冲区类型的编码器,用于序列化和反序列化缓冲区。

outputEncoder:定义最终输出类型的编码器,用于序列化和反序列化输出结果。

计算每个性别的年龄的平均值:

case class AggragateVo(var cnt:Int,var sum:Int)
object MyAvg extends Aggregator[Int,AggragateVo,Double]{
  override def zero: AggragateVo = AggragateVo(0,0)

  override def reduce(b: AggragateVo, a: Int): AggragateVo = {
    b.cnt += 1
    b.sum += a
    b
  }

  override def merge(b1: AggragateVo, b2: AggragateVo): AggragateVo = {
    b1.cnt += b2.cnt
    b1.sum += b2.sum
    b1
  }

  override def finish(reduction: AggragateVo): Double = {
    reduction.sum.toDouble /reduction.cnt
  }

  override def bufferEncoder: Encoder[AggragateVo] = Encoders.product

  override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

3. UDTF(用户自定义炸裂函数)

拆分函数,进入的是一行内容出现的结果是多行内容。

spark中并不直接支持UDTF函数。但可以使用hive中的炸裂函数达到效果。

import org.apache.spark.sql.SparkSession

object TestUDTF {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder().appName("test udtf").master("local[*]").getOrCreate()
    import session.implicits._
    val df = session.sparkContext.textFile("file:///headless/workspace/spark/data/m.txt")
      .map(t => {
        val strs = t.split(",")
        (strs(0), strs(1), strs(2))
      }).toDF("id", "name", "actors")
    //explode map array
    df.createTempView("movies")
    session.sql(
      """
        |select id,name,actor  from movies lateral view explode(split(actors,'\\|')) t as actor
        |""".stripMargin)
      .createTempView("movies1")

    session.sql(
      """
        |select count(1),actor from movies1 group by actor
        |""".stripMargin)
      .show()

  }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2250459.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vue3 点击按钮,增加和减少input框

需求&#xff1a;手机号的input框默认一个&#xff0c;点击加号&#xff0c;可以增加和减少。 <template><el-form :model"editUserForm" label-width"80px" hide-required-asterisk ref"editUserFormRef"><!-- 手机 --><…

TongRDS分布式内存数据缓存中间件

命令 优势 支持高达10亿级的数据缓冲&#xff0c;内存优化管理&#xff0c;避免GC性能劣化。 高并发系统设计&#xff0c;可充分利用多CPU资源实现并行处理。 数据采用key-value多索引方式存储&#xff0c;字段类型和长度可配置。 支持多台服务并行运行&#xff0c;服务之间可互…

CSP/信奥赛C++语法基础刷题训练(33):洛谷P1055:[NOIP2008 普及组] ISBN 号码

CSP/信奥赛C语法基础刷题训练&#xff08;33&#xff09;&#xff1a;洛谷P1055&#xff1a;[NOIP2008 普及组] ISBN 号码 题目描述 每一本正式出版的图书都有一个 ISBN 号码与之对应&#xff0c;ISBN 码包括 9 9 9 位数字、 1 1 1 位识别码和 3 3 3 位分隔符&#xff0c;其…

算法盒子模型转换步骤+操作命令记录

0、模型转换步骤情 第一步、pt模型转onnx 1) 将要转换的pt模型文件上传到192.168.33.79的 /home/sophonsdk_edge_v1.7_official_release/下 2) 进入docker环境&#xff1a; docker exec -it myname /bin/bash 3) 在workspace目录运行: python3 /usr/local/lib/python3…

重复请求取消(不发请求)

重复请求取消&#xff08;不发请求&#xff09; axios 有自带的取消请求方式&#xff0c; 但是 在请求已发出时&#xff0c;取消只是状态取消&#xff0c; 其实请求已经发出&#xff0c;消耗了服务器资源&#xff08;方式1&#xff09;。本文探究的是 在调用请求A时&#xff0c…

谷歌浏览器Chrome打开百度很慢,其他网页正常的解决办法,试了很多,找到了适合的

最近不知怎么的&#xff0c;Chrome突然间打开百度很慢&#xff0c;甚至打不开。不光我一个人遇到这问题&#xff0c;我同事也遇到这个问题。开发中难免遇到问题&#xff0c;需要百度&#xff0c;现在是百度不了。 作为一名开发人员&#xff0c;习惯了使用Chrome进行开发&#…

【C++】深入探讨基础输入输出及类型转换问题

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 &#x1f4af;前言&#x1f4af;题目一&#xff1a;整数输入输出代码实现代码分析 &#x1f4af;题目二&#xff1a;ASCII 码转换为字符代码实现代码分析 &#x1f4af;cin 与 char 搭配行为的深入剖析问题…

GB28181系列三:SIP消息格式

我的音视频/流媒体开源项目(github) GB28181系列目录 目录 一、SIP消息Header字段 二、SIP URI(URL) 三、SIP路由机制 1、路由机制介绍 2、严格路由&#xff08;Strict Routing&#xff09;与松散路由&#xff08;Louse Routing&#xff09; 3、总结 四、SIP消…

mvn-mac操作小记

1.安装brew 如果报错&#xff0c;Warning: /opt/homebrew/bin is not in your PATH. vim ~/.zshrc&#xff0c;最后一行追加 export PATH“/opt/homebrew/bin:$PATH” source ~/.zshrc 2.安装brew install maven mvn -version查看路径 Maven home: /opt/homebrew/Cellar/mav…

算法与数据结构练习——异或

知识点讲解&#xff1a; 一、异或操作定义&#xff1a; 异或是指相同为0&#xff0c;不同为1&#xff0c;也可理解为无进位相加&#xff01;&#xff01; 很重要&#xff01;&#xff01; 二、关于异或运算的几个性质&#xff1a; 1.0^NN &#xff08;0和任何数异或都…

STM32G4系列MCU的Direct memory access controller (DMA)功能介绍之二

目录 概述 1 DMA通道 1.1 可编程数据大小 1.2 指针增量 2 通道配置 2.1 配置步骤 2.2 通道状态和禁用通道 3 模式应用 3.1 循环模式&#xff08;内存到外设/外设到内存的传输&#xff09; 3.2 内存到内存模式 3.3 Peripheral-to-peripheral模式 3.4 编程转移方向&a…

【一文读懂】大语言模型

学习参考 项目教程&#xff1a;中文教程 代码仓库&#xff1a;代码地址 仓库代码目录说明&#xff1a; requirements.txt&#xff1a;官方环境下的安装依赖 notebook&#xff1a;Notebook 源代码文件 docs&#xff1a;Markdown 文档文件 figures&#xff1a;图片 data_base&…

注册表修改键盘位置

1.winr 输入 regedit 2.HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Control\Keyboard Layout 3.右键Keyboard Layout->新建->二进制值->取名Scancode Map 4.右键Scancode Map&#xff0c;修改如下 //第一列 自动生成序号&#xff0c;不用管 第一行 输入8个00 第二…

服务器密码错误被锁定怎么解决?

当服务器密码错误多次导致账号被锁定时&#xff0c;解决方法需要根据服务器的操作系统&#xff08;如 Linux 或 Windows &#xff09;和具体服务器环境来处理。以下是常见的解决办法&#xff1a; 一、Linux 服务器被锁定的解决方法 1. 使用其他用户账号登录 如果有其他未被…

shell脚本编写练习2

1.准备阶段 在根目录下创建一个目录来存储shell脚本 mkdir /s3 2.题目 1. 使用case实现成绩优良差的判断 #!/bin/bash# 假设成绩存储在变量GRADE中 read -p "请输入成绩&#xff08;0-100&#xff09;&#xff1a;" GRADE# 使用case语句进行判断 case $GRADE in[…

清远榉之乡托养机构探讨:自闭症的本质辨析

当人们谈及自闭症时&#xff0c;常常会产生一个疑问&#xff1a;自闭症是精神类疾病吗&#xff1f;今天&#xff0c;清远榉之乡托养机构就来为大家解开这个疑惑。 榉之乡大龄自闭症托养机构在江苏、广东、江西等地都有分校&#xff0c;一直致力于为大龄自闭症患者提供专业的支持…

基于PoE交换机的智慧停车场监控组网应用

伴随城市发展快速&#xff0c;汽车保有量也不断增长&#xff0c;导致停车管理问题也愈发凸显。针对包括路侧停车位、地面停车场、地下停车场等场景的停车管理需求&#xff0c;通常会部署监控设备进行车位监测、现场安全监测等&#xff0c;助力构建智能化停车管理。因此如何为分…

.net XSSFWorkbook 读取/写入 指定单元格的内容

方法如下&#xff1a; using NPOI.SS.Formula.Functions;using NPOI.SS.UserModel;using OfficeOpenXml.FormulaParsing.Excel.Functions.DateTime;using OfficeOpenXml.FormulaParsing.Excel.Functions.Numeric;/// <summary>/// 读取Excel指定单元格内容/// </summa…

10个Word自动化办公脚本

在日常工作和学习中&#xff0c;我们常常需要处理Word文档&#xff08;.docx&#xff09;。 Python提供了强大的库&#xff0c;如python-docx&#xff0c;使我们能够轻松地进行文档创建、编辑和格式化等操作。本文将分享10个使用Python编写的Word自动化脚本&#xff0c;帮助新…

红日靶场-5

环境搭建 这个靶场相对于前几个靶场来说较为简单&#xff0c;只有两台靶机&#xff0c;其中一台主机是win7&#xff0c;作为我们的DMZ区域的入口机&#xff0c;另外一台是windows2008&#xff0c;作为我们的域控主机&#xff0c;所以我们只需要给我们的win7配置两张网卡&#…