SparkSql---用户自定义函数UDFUDAF

news2024/12/25 0:23:58

文章目录

  • 1.UDF
  • 2.UDAF
    • 2.1 UDF函数实现原理
    • 2.2需求:计算用户平均年龄
      • 2.2.1 使用RDD实现
      • 2.2.2 使用UDAF弱类型实现
      • 2.2.3 使用UDAF强类型实现

1.UDF

用户可以通过 spark.udf 功能添加自定义函数,实现自定义功能。

如:实现需求在用户name前加上"Name:"字符串,并打印在控制台

  def main(args: Array[String]): Unit = {
    //创建上下文环境配置对象
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo03")

    //创建 SparkSession 对象
    val sc: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    import sc.implicits._

    //创建DataFrame
    val dataRDD: RDD[(String,Int)] = sc.sparkContext.makeRDD(List(("zhangsan",21),("lisi",24)))
    val dataframe = dataRDD.toDF("name","age")
    //注册udf函数
    sc.udf.register("addName",(x:String)=>
      "Name:"+x
    )
    //创建临时视图
    dataframe.createOrReplaceTempView("people")
    //对临时视图使用udf函数
    sc.sql("select addName(name) from people").show()


    sc.stop()
  }

在这里插入图片描述

2.UDAF

强类型的 Dataset 和弱类型的 DataFrame 都提供了相关的聚合函数, 如 count(),countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。**通过继承 UserDefinedAggregateFunction 来实现用户自定义弱类型聚合函数。**从 Spark3.0 版本后,UserDefinedAggregateFunction 已经不推荐使用了。可以统一采用强类型聚合函数Aggregator。

2.1 UDF函数实现原理

在这里插入图片描述
在Spark中,UDF(用户自定义函数)在对表中的数据进行处理时,通常会将数据放入缓冲区中以便进行计算。这种缓冲策略可以提高数据处理的效率,特别是对于大数据集。

2.2需求:计算用户平均年龄

2.2.1 使用RDD实现

    val dataRDD: RDD[(String,Int)] = sc.sparkContext.makeRDD(List(("zhangsan",21),("lisi",24),("wangwu",26)))
    val reduceResult: (Int, Int) = dataRDD.map({
      case (name, age) => {
        (age, 1)
      }
    }).reduce((t1, t2) => {
      (t1._1 + t2._1, t1._2 + t2._2)
    })
    println(reduceResult._1/reduceResult._2)

在这里插入图片描述

2.2.2 使用UDAF弱类型实现

需要用户自定义类实现UserDefinedAggregateFunction,并重写其中的方法,当前已不推荐使用。

package bigdata.wordcount.udf

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StructField, StructType}
import org.apache.spark.util.AccumulatorV2

/**
 * 用户自定义函数
 */
object UDF_Demo02 {
  def main(args: Array[String]): Unit = {
    //创建上下文环境配置对象
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo03")

    //创建 SparkSession 对象
    val sc: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    import sc.implicits._

    val dataRDD: RDD[(String, Int)] = sc.sparkContext.makeRDD(List(("zhangsan", 19), ("lisi", 21), ("wangwu", 22)))
    val dataFrame: DataFrame = dataRDD.toDF("name","age")
    dataFrame.createOrReplaceTempView("user")

    //创建聚合函数
    var myAvg=new MyAverageUDAF()
    //在Spark中注册自定义的聚合函数
    sc.udf.register("avgMy",myAvg)

    sc.sql("select avgMy(age) from user").show()
    sc.stop()
  }

  case class User(var name:String,var age:Int)

}

class MyAverageUDAF extends UserDefinedAggregateFunction{
  //输入的要进行聚合的参数的类型
  override def inputSchema: StructType = StructType(Array(StructField("age",IntegerType)))

  //聚合函数缓冲区中的值的数据类型
  override def bufferSchema: StructType = StructType(Array(StructField("sum",LongType),StructField("count",LongType)))

  //函数返回的值的数据类型
  override def dataType: DataType = DoubleType

  //判断函数的稳定性
  //对于相同类型的输入是否有相同类型的输出
  override def deterministic: Boolean = true

  //聚合函数缓冲区中值的初始化
  //因为数据是弱类型的,函数缓冲区中是根据索引来找到对应的变量
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    //年龄的总和
    buffer(0)=0L
    //年龄的个数
    buffer(1)=0L
  }

  //更新缓冲区中的数据(执行操作步骤)
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit ={
    //第0个索引值是否为空
    if(!input.isNullAt(0)) {
      //更新年龄sum的值
      buffer(0)=buffer.getLong(0)+input.getInt(0)
      //更新年龄个数
      buffer(1)=buffer.getLong(1)+1;
    }

  }

  //合并缓冲区
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0)=buffer1.getLong(0)+buffer2.getLong(0)
    buffer1(1)=buffer1.getLong(1)+buffer2.getLong(1)
  }

  //计算最终结果
  override def evaluate(buffer: Row): Double = {
    buffer.getLong(0).toDouble / buffer.getLong(1)
  }
}

在这里插入图片描述

2.2.3 使用UDAF强类型实现

Spark3.0 版本可以采用强类型的 Aggregator 方式代替 UserDefinedAggregateFunction

package bigdata.wordcount.udf

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Encoders, Row, SparkSession, TypedColumn}
import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StructField, StructType}
import org.apache.spark.util.AccumulatorV2

/**
 * 用户自定义函数
 */
object UDF_Demo03 {
  def main(args: Array[String]): Unit = {
    //创建上下文环境配置对象
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo03")

    //创建 SparkSession 对象
    val sc: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    import sc.implicits._

    val dataRDD: RDD[(String, Int)] = sc.sparkContext.makeRDD(List(("zhangsan", 19), ("lisi", 21), ("wangwu", 22)))
    val dataFrame: DataFrame = dataRDD.toDF("name","age")
    val dataset: Dataset[User01] = dataFrame.as[User01]
    //创建聚合函数
    var myAvg=new MyAverageUDAF01()
    //将聚合函数转换为查询的列
    val col: TypedColumn[User01, Double] = myAvg.toColumn
    //执行查询操作
    dataset.select(col).show()

    sc.stop()
  }
  case class User(var name:String,var age:Int)

}

//输入数据类型
case class User01(var name:String,var age:Int)
//缓存中的数据类型
case class AgeBuffer(var sum:Long,var count:Long)

class MyAverageUDAF01 extends Aggregator[User01,AgeBuffer,Double]{
  //设置初始值
  override def zero: AgeBuffer = {
    AgeBuffer(0L,0L)
  }

  //缓冲区实现聚合
  override def reduce(b: AgeBuffer, a: User01): AgeBuffer = {
    b.sum = b.sum + a.age
    b.count = b.count + 1
    b
  }

  //合并缓冲区
  override def merge(b1: AgeBuffer, b2: AgeBuffer): AgeBuffer = {
    b1.sum+=b2.sum
    b1.count+=b2.count
    b1
  }

  //计算最终结果
  override def finish(buff: AgeBuffer): Double = {
    buff.sum.toDouble/buff.count
  }

  //设置编码器和解码器
  //自定义类型就是 product 自带类型根据类型选择
  override def bufferEncoder: Encoder[AgeBuffer] = {
    Encoders.product
  }

  override def outputEncoder: Encoder[Double] = {

    Encoders.scalaDouble
  }
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1415929.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

lv14 内核内存管理、动态分频及IO访问 12

一、内核内存管理框架 内核将物理内存等分成N块4KB,称之为一页,每页都用一个struct page来表示,采用伙伴关系算法维护 补充: Linux内存管理采用了虚拟内存机制,这个机制可以在内存有限的情况下提供更多可用的内存空…

docker compose实现mysql一主多从

参考了很多博客,死磕了几天,最终跑起来了,不容易,晚上喝瓶82年可乐庆祝下。 1、整体文件结构,这里忽略log、conf、data映射目录 2、docker-compose.yml文件内容如下: version: 3.3 services:mysql-master…

Android App开发-简单控件(3)——常用布局

3.3 常用布局 本节介绍常见的几种布局用法,包括在某个方向上顺序排列的线性布局,参照其他视图的位置相对排列的相对布局,像表格那样分行分列显示的网格布局,CommonLayouts以及支持通过滑动操作拉出更多内容的滚动视图。 3.3.1 线…

代码随想录算法训练DAY29|回溯5

算法训练DAY29|回溯5 491.递增子序列 力扣题目链接 给定一个整型数组, 你的任务是找到所有该数组的递增子序列,递增子序列的长度至少是2。 示例: 输入: [4, 6, 7, 7] 输出: [[4, 6], [4, 7], [4, 6, 7], [4, 6, 7, 7], [6, 7], [6, 7, 7], [7,7], [4,7,7]] 说…

【QT】QPainter基本绘图

目录 1 QPainter绘图系统 1.1 QPainter与QPaintDevice 1.2 paintEvent事件和绘图区 1.3 QPainter绘图的主要属性 1.4 创建实例 2 QPen的主要功能 2.1 线条样式 2.2 线条端点样式 2.3 线条连接样式 3 QBrush的主要功能 4 渐变填充 5 QPainter绘制基本图形元件 5.1 基本图形元件 …

burp靶场--身份认证漏洞

burp靶场–身份认证漏洞 https://portswigger.net/web-security/authentication#what-is-authentication 1.身份认证漏洞: ### 身份验证漏洞 从概念上讲,身份验证漏洞很容易理解。然而,由于身份验证和安全性之间的明确关系,它们…

基于Java SSM框架实现学校招生信息网系统项目【项目源码+论文说明】

基于java的SSM框架实现学生招生信息网系统演示 摘要 随着科学技术的飞速发展,社会的方方面面、各行各业都在努力与现代的先进技术接轨,通过科技手段来提高自身的优势,学校招生信息网当然也不能排除在外。学校招生信息网是以实际运用为开发背…

【大数据】Flink 中的状态管理

Flink 中的状态管理 1.算子状态2.键值分区状态3.状态后端4.有状态算子的扩缩容4.1 带有键值分区状态的算子4.2 带有算子列表状态的算子4.3 带有算子联合列表状态的算子4.4 带有算子广播状态的算子 在前面的博客中我们指出,大部分的流式应用都是有状态的。很多算子都…

OpenCV 0 - VS2019配置OpenCV

1 配置好环境变量 根据自己的opencv的安装目录配置 2 新建一个空项目 3 打开 视图->工具栏->属性管理器 4 添加新项目属性表 右键项目名(我这是opencvdemo)添加新项目属性表,如果有配置好了的属性表选添加现有属性表 5 双击选中Debug|x64的刚添加的属性表 6 (重点)添…

[LVGL] 可点击的文字label

LVGL8.x 自带的label 是没有点击响应的功能,即使加了lv_obj_add_event_cb 也不起作用,为了解决这个问题,我们使用了按钮控件去模拟纯label的效果;有了这个demo用户就可以实现类似超链接 点击一个文字就跳转到某个页面的功能。 st…

Kotlin 教程(环境搭建)

Kotlin IntelliJ IDEA环境搭建 IntelliJ IDEA 免费的社区版下载地址:Download IntelliJ IDEA – The Leading Java and Kotlin IDE 下载安装后,我们就可以使用该工具来创建项目,创建过程需要选择 SDK, Kotlin 与 JDK 1.6 一起使…

【大数据】详解 Flink 中的 WaterMark

详解 Flink 中的 WaterMark 1.基础概念1.1 流处理1.2 乱序1.3 窗口及其生命周期1.4 Keyed vs Non-Keyed1.5 Flink 中的时间 2.Watermark2.1 案例一2.2 案例二2.3 如何设置最大乱序时间2.4 延迟数据重定向 3.在 DDL 中的定义3.1 事件时间3.2 处理时间 1.基础概念 1.1 流处理 流…

1.【Vue3】前端开发引入、Vue 简介

1. 前端开发引入 1.1 前端开发前置知识 通过之前的学习,已经通过 SpringBoot 和一些三方技术完成了大事件项目的后端开发。接下来开始学习大事件项目的前端开发,前端部分借助两个框架实现: Vue3(一个 JS 框架)基于 …

Vue-Router: 如何使用路由元信息来管理路由?

Vue-Router是Vue.js官方的路由管理器,它可以帮助我们快速构建单页应用程序(SPA)。除了常见的路由功能外,Vue-Router还支持使用路由元信息来管理和控制路由。路由元信息是可以附加到路由上的自定义属性,它可以帮助我们实…

LandrayOA内存调优 / JAVA内存调优 / Tomcat web.xml 超时时间调优实战

目录 一、背景说明 二、LandrayOA / Tomcat 内存调优 2.1 \win64\tomcat\conf\web.xml 文件调优 2.2 \win64\tomcat\bin\catalina64.bat 文件调优 一、背景说明 随着系统的使用时间越来越长,数据量越多,发现系统的有些功能越来越慢&…

在腾讯云上部署幻兽帕鲁,实现游戏自由!

在帕鲁的世界,你可以选择与神奇的生物「帕鲁」一同享受悠闲的生活,也可以投身于与偷猎者进行生死搏斗的冒险。帕鲁可以进行战斗、繁殖、协助你做农活,也可以为你在工厂工作。你也可以将它们进行售卖,或肢解后食用。引用自&#xf…

第17章_反射机制(理解Class类并获取Class实例,类的加载与ClassLoader的理解,反射的基本应用,读取注解信息,体会反射的动态性)

文章目录 第17章_反射机制本章专题与脉络1. 反射(Reflection)的概念1.1 反射的出现背景1.2 反射概述1.3 Java反射机制研究及应用1.4 反射相关的主要API1.5 反射的优缺点 2. 理解Class类并获取Class实例2.1 理解Class2.1.1 理论上2.1.2 内存结构上 2.2 获取Class类的实例(四种方…

Linux系统优化要义

这里不敢说 linux优化奥义,主要是本文比较浅显,适合普通开发相关人员去读 linux作为服务器系统的王者,以稳定性著称,但对于不同的“应用场景”,相关配置还需调整,才能保证业务稳定性。以下是相关总结 IO优…

函数入门.

函数入门 1. 初识函数2. 函数的参数2.1 参数2.2 默认参数2.3 动态参数 3. 函数返回值总结作业 1. 初识函数 函数到底是个什么东西? 函数,可以当做是一大堆功能代码的集合。 def 函数名():函数内编写代码......函数名()例如: # 定义名字叫in…

Linux 驱动开发基础知识—— 具体单板的 LED 驱动程序(五)

个人名片: 🦁作者简介:一名喜欢分享和记录学习的在校大学生 🐯个人主页:妄北y 🐧个人QQ:2061314755 🐻个人邮箱:2061314755qq.com 🦉个人WeChat:V…