Spark大数据处理讲课笔记4.7 Spark SQL内置函数

news2025/1/20 18:35:46

文章目录

  • 零、本讲学习目标
  • 一、Spark SQL内置函数
    • (一)内置函数概述
      • 1、10类内置函数
      • 2、两种使用方式
    • (二)内置函数演示
      • 1、通过编程方式使用内置函数upper()
      • 2、通过SQL语句的方式使用内置函数upper()
      • 3、演示其它内置函数的使用
  • 二、自定义函数
    • (一)自定义函数概述
    • (二)演示自定义函数
      • 1、提出任务:手机号保密
      • 2、编写程序,完成任务
  • 三、自定义聚合函数
    • (一)自定义聚合函数概述
    • (二)演示自定义聚合函数
      • 1、提出任务:实现求员工平均工资功能的UDAF
      • 2、编写程序,完成任务
  • 四、开窗函数
    • (一)开窗函数概述
    • (二)开窗函数使用格式
    • (三)开窗函数案例演示
      • 1、提出任务:统计前3名
      • 2、编写程序,实现功能,完成任务

零、本讲学习目标

  1. 了解Spark SQL内置函数
  2. 学会使用自定义函数
  3. 学会自定义聚合函数
  4. 学会使用开窗函数

一、Spark SQL内置函数

(一)内置函数概述

1、10类内置函数

  • Spark SQL内置了大量的函数,位于API org.apache.spark.sql.functions中。这些函数主要分为10类:UDF函数、聚合函数、日期函数、排序函数、非聚合函数、数学函数、混杂函数、窗口函数、字符串函数、集合函数,大部分函数与Hive中相同。

2、两种使用方式

  • 使用内置函数有两种方式:一种是通过编程的方式使用;另一种是在SQL语句中使用。

(二)内置函数演示

  • 读取HDFS上的people.json,得到数据帧,执行命令:val peopleDF = spark.read.json("hdfs://master:9000/input/people.json")
    在这里插入图片描述
  • 显示数据帧内容
    在这里插入图片描述
  • 导入Spark SQL内置函数,执行命令:import org.apache.spark.sql.functions._
    在这里插入图片描述

1、通过编程方式使用内置函数upper()

  • 利用upper()函数将姓名转成大写,执行命令:peopleDF.select(upper(col("name")).as("name")).show()
    在这里插入图片描述
  • 上述代码中,使用select()方法传入需要查询的列,使用as()方法指定列的别名。代码col("name")指定要查询的列,也可以使用$"name"代替,但是需要导入import spark.implicits._,执行命令:peopleDF.select(upper($"name").as("name")).show()
    在这里插入图片描述
  • 对某列使用了内置函数,如果还要显示其它列,就会报错
    在这里插入图片描述

2、通过SQL语句的方式使用内置函数upper()

  • 定义临时视图,执行命令:peopleDF.createTempView("t_people")
    在这里插入图片描述
  • 执行命令:spark.sql("select upper(name) as name from t_people").show()
    在这里插入图片描述
  • 执行命令:spark.sql("select upper(name) as name, age from t_people").show()
    在这里插入图片描述

3、演示其它内置函数的使用

  • 打印Schema信息,执行命令:peopleDF.printSchema()
    在这里插入图片描述
  • 查询name列,执行命令:peopleDF.select("name").show()
    在这里插入图片描述
  • 可用SQL语句方式来完成同样的任务
    在这里插入图片描述
  • 查询name列和age列,其中将age列的值增加1,执行命令:peopleDF.select($"name", $"age" + 1).show()
    在这里插入图片描述
  • 可用SQL语句方式来完成同样的任务
    在这里插入图片描述
  • 查询年龄大于21的记录,执行命令:peopleDF.filter($"age" > 21).show()
    在这里插入图片描述
  • 可用SQL语句方式来完成同样的任务
    在这里插入图片描述
  • 根据age进行分组,并求每一组的数量,执行命令:peopleDF.groupBy("age").count().show()
    在这里插入图片描述
  • 可用SQL语句方式来完成同样的任务
    在这里插入图片描述

二、自定义函数

(一)自定义函数概述

  • 当Spark SQL提供的内置函数不能满足查询需求时,用户可以根据自己的业务编写自定义函数(User Defined Functions,UDF),然后在Spark SQL中调用。

(二)演示自定义函数

1、提出任务:手机号保密

  • 有这样一个需求:为了保护用户的隐私,当查询数据的时候,需要将用户手机号的中间4位用星号()代替,比如手机号158***1170。这时就可以写一个自定义函数来实现这个需求。

2、编写程序,完成任务

  • 创建SparkSQLUDF单例对象
    在这里插入图片描述
package net.hw.sparksql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

/**
 * 功能:演示自定义函数
 * 作者:华卫
 * 日期:2022年05月13日
 */
object SparkSQLUDF {
  def main(args: Array[String]): Unit = {
    // 创建或得到SparkSession
    val spark = SparkSession.builder()
      .appName("SparkSQLUDF")
      .master("local[*]")
      .getOrCreate()

    // 第一步:创建测试数据(亦可读取文件)
    // 创建电话模拟数据
    val arr = Array("15892925678", "13567892345", "18034561290", "13967678901")
    // 将数组转换成RDD
    val rdd: RDD[String] = spark.sparkContext.makeRDD(arr)
    // 将RDD[String]转为RDD[Row]
    val rowRDD: RDD[Row] = rdd.map(line => Row(line))
    // 定义数据的schema
    val schema = StructType(
      List {
        StructField("phone", StringType, true)
      }
    )
    // 将RDD[Row]转为DataFrame
    val df = spark.createDataFrame(rowRDD, schema)

    // 第二步:创建自定义函数(phoneHide)
    val phoneUDF = (phone: String) => {
      var result = "手机号码错误!"
      if (phone != null && phone.length == 11) {
        val buffer = new StringBuffer()
        buffer.append(phone.substring(0, 3))
        buffer.append("****")
        buffer.append(phone.substring(7))
        result = buffer.toString
      }
      result
    }
    // 注册函数(第一个参数为函数名称,第二个参数为自定义的函数)
    spark.udf.register("phoneHide", phoneUDF)

    // 第三步:调用自定义函数
    // 创建临时视图
    df.createTempView("t_phone")
    // 查询表,调用自定义函数处理phone字段
    spark.sql("select phoneHide(phone) as phone from t_phone").show()
  }
}
  • 上述代码通过spark.udf.register()方法注册一个自定义函数phoneHide,然后使用spark.sql()方法传入SQL语句,在SQL语句中调用自定义函数phoneHide并传入指定的列,该列的每一个值将依次被自定义函数phoneHide处理。
  • 运行程序,查看结果
    在这里插入图片描述

三、自定义聚合函数

(一)自定义聚合函数概述

  • Spark SQL提供了一些常用的聚合函数,如count()、countDistinct()、avg()、max()、min()等。此外,用户也可以根据自己的业务编写自定义聚合函数(User Defined AggregateFunctions,UDAF)。
  • UDF主要是针对单个输入返回单个输出,而UDAF则可以针对多个输入进行聚合计算返回单个输出,功能更加强大。

(二)演示自定义聚合函数

1、提出任务:实现求员工平均工资功能的UDAF

  • 员工工资数据存储于HDFS上/input目录里的employees.json文件中
    在这里插入图片描述

2、编写程序,完成任务

  • 创建MyAverage类,继承UserDefinedAggregateFunction
    在这里插入图片描述
package net.hw.sparksql

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, LongType, StructField, StructType}

/**
 * 功能:自定义聚合函数类,求平均值
 * 作者:华卫
 * 日期:2022年05月13日
 */
class MyAverage extends UserDefinedAggregateFunction {
  // 聚合函数输入参数的类型,运行时会将需要聚合的每一个值输入聚合函数中
  // inputColumn为输入的列名,不做特殊要求,相当于一个列占位符
  override def inputSchema: StructType = StructType (
    List(StructField("inputColumn", LongType))
  )

  // 定义存储聚合运算产生的中间数据的Schema
  // sum和count不作特殊要求,为自定义名称
  override def bufferSchema: StructType = StructType(
    List(
      StructField("sum", LongType), // 参与聚合的数据总和
      StructField("count", LongType) // 参与聚合的数据数量
    )
  )

  // 定义数据类型
  override def dataType: DataType = DoubleType

  // 针对给定的同一组输入,聚合函数是否返回相同的结果,通常为true
  override def deterministic: Boolean = true

  // 初始化聚合运算的中间结果,中间结果存储于buffer中,buffer是一个Row类型
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0L // 与bufferSchema中的第一个字段(sum)对应,即sum的初始值
    buffer(1) = 0L // 与bufferSchema中的第二个字段(count)对应,即count的初始值
  }

  // 由于参与聚合的数据会依次输入聚合函数,因此每当向聚合函数输入新的数据时,都会调用该函数更新聚合中间结果
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (!input.isNullAt(0)) {
      buffer(0) = buffer.getLong(0) + input.getLong(0) // 更新参与聚合的数据总和
      buffer(1) = buffer.getLong(1) + 1 // 更新参与聚合的数据数量
    }
  }

  // 合并多个分区的buffer中间结果(分布式计算,参与聚合的数据存储于多个分区,每个分区都会产生buffer中间结果
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0) // 合并参与聚合的数据总和
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1) // 合并参与聚合的数据数量
  }

  // 计算最终结果,数据总和 / 数据数量 = 平均值
  override def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}

/**
 * 测试自定义聚合函数
 */
object MyAverage {
  def main(args: Array[String]): Unit = {
    // 创建或得到SparkSession
    val spark = SparkSession.builder()
      .appName("SparkSQLUDF")
      .master("local[*]")
      .getOrCreate()

    // 注册自定义聚合函数
    spark.udf.register("myAverage", new MyAverage)
    // 读取员工JSON数据
    val df = spark.read.json("hdfs://master:9000/input/employees.json")
    // 显示数据帧内容
    df.show()
    // 创建临时视图
    df.createOrReplaceTempView("employees")
    // 调用聚合函数进行查询
    val result = spark.sql("select myAverage(salary) as average_salary from employees")
    // 显示查询结果
    result.show()

    // 停止会话
    spark.toString
  }
}
  • 运行程序,查看结果
    在这里插入图片描述

四、开窗函数

(一)开窗函数概述

  • row_number()开窗函数是Spark SQL中常用的一个窗口函数,使用该函数可以在查询结果中对每个分组的数据,按照其排序的顺序添加一列行号(从1开始),根据行号可以方便地对每一组数据取前N行(分组取TOPN)。

(二)开窗函数使用格式

row_number() over (partition by 列名 order by 列名 desc) 行号列别名
  • partition by:按照某一列进行分组
  • order by:分组后按照某一列进行组内排序
  • desc:降序,默认升序

(三)开窗函数案例演示

1、提出任务:统计前3名

  • 统计每一个产品类别的销售额前3名(相当于分组求TOPN)

2、编写程序,实现功能,完成任务

  • 创建SparkSQLWindowFunctionDemo单例对象
    在这里插入图片描述
package net.hw.sparksql

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

/**
 * 功能:
 * 作者:华卫
 * 日期:2022年05月14日
 */
object SparkSQLWindowFunctionDemo {
  def main(args: Array[String]): Unit = {
    // 创建或得到SparkSession
    val spark = SparkSession.builder()
      .appName("SparkSQLUDF")
      .master("local[*]")
      .getOrCreate()

    // 第一步:创建测试数据(字段:日期、产品类别、销售额)
    val arr = Array(
      "2022-05-10,A,710",
      "2022-05-10,B,530",
      "2022-05-10,C,670",
      "2022-05-11,A,520",
      "2022-05-11,B,730",
      "2022-05-11,C,610",
      "2022-05-12,A,500",
      "2022-05-12,B,700",
      "2022-05-12,C,650",
      "2022-05-13,A,620",
      "2022-05-13,B,690",
      "2022-05-13,C,700",
      "2022-05-14,A,720",
      "2022-05-14,B,680",
      "2022-05-14,C,590"
    )
    // 转为RDD[Row]
    val rowRDD = spark.sparkContext
      .makeRDD(arr)
      .map(line => Row(
        line.split(",")(0),
        line.split(",")(1),
        line.split(",")(2).toInt
      ))
    // 构建数据帧元数据
    val structType = StructType(
      List(
        StructField("date", StringType, true),
        StructField("type", StringType, true),
        StructField("money", IntegerType, true)
      ))
    // 将RDD[Row]转成数据帧
    val df = spark.createDataFrame(rowRDD, structType)

    // 第二步:使用开窗函数取每个类别的金额前3名
    // 创建临时视图
    df.createTempView("t_sales")
    // 执行SQL查询,显示每个类别排名
    spark.sql(
      """
        |select date, type, money,
        |        row_number() over (partition by type order by money desc) rank
        |        from t_sales
        |""".stripMargin
    ).show()

    // 执行SQL查询,取每个类别前3名
    spark.sql(
      """
        |select date, type, money, rank from
        |  (
        |     select date, type, money,
        |        row_number() over (partition by type order by money desc) rank
        |        from t_sales
        |  ) sale
        |where sale.rank <= 3
        |""".stripMargin
    ).show()
  }
}
  • 运行程序,查看结果
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/527598.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL的安装和卸载-Linux版

MySQL8.0.26-Linux版安装 1. 准备一台Linux服务器 云服务器或者虚拟机都可以,Linux的版本为 CentOS7; 2. 下载Linux版MySQL安装包 https://download.csdn.net/download/weixin_44373940/87784825 3. 创建目录,并解压到对应目录中 mkdir mysql 解压到mysql目录中 tar -xvf…

【Arduino疑难杂症】:报错:上传失败:上传错误:exit status Oxffffffff

项目场景&#xff1a; 制作arduino宠物监控系统项目的过程中&#xff0c;摄像头方面使用到了ESP32Cam&#xff0c;制作过程中遇到了如下问题。 问题描述 [ERRORJ: (annot configure port&#xff0c; something wert wrong. 0riginal message: WindowsError(31,"xclxacxb…

Spring笔记-教程-快速回忆

title: Spring笔记 date: 2023-05-12 00:12:55 categories: 后端Java tags:JavaSpring Spring官网https://spring.io 框架图&#xff1a; 为什么要使用Spring 原先代码中存在的问题如下&#xff1a; 业务层&#xff1a; public class BookServiceImpl implements BookServi…

了解Swarm 集群管理

Swarm 集群管理 简介 Docker Swarm 是 Docker 的集群管理工具。它将 Docker 主机池转变为单个虚拟 Docker 主机。 Docker Swarm 提供了标准的 Docker API&#xff0c;所有任何已经与 Docker 守护程序通信的工具都可以使用 Swarm 轻松地扩展到多个主机。 支持的工具包括但不限…

SpringCache缓存常见问题

SpringCache 解决缓存常见问题 1 缓存穿透2 缓存雪崩3 缓存击穿 1 缓存穿透 缓存穿透是指缓存和数据库中都没有数据&#xff0c;而用户不断发起请求则这些请求会穿过缓存直接访问数据库&#xff0c;如发起为id为“-1”的数据或id为特别大不存在的数据。假如有恶意攻击&#xf…

TCP 和 UDP 协议详解

文章目录 1 概述2 TCP 协议2.1 报文格式2.2 三次握手&#xff0c;建立连接2.3 四次挥手&#xff0c;断开连接2.4 窗口机制 3 UDP 协议3.1 传输头格式 4 扩展4.1 常用端口号4.2 TCP 与 UDP 区别 1 概述 #mermaid-svg-aC8G8xwQRSdze7eM {font-family:"trebuchet ms",ve…

Mysql MHA高可用集群及故障切换

文章目录 一、MHA概述1.MHA的特点2. MHA的工作原理3.故障切换时MHA会做什么 二、实验搭建MySQLMHA1.配置主从分离2.安装MHA软件 总结 一、MHA概述 MHA&#xff08;MasterHigh Availability&#xff09;是一套优秀的mysql高可用环境下故障切换和主从复制的软件。 MHA解决了mysq…

铁路铁鞋UWB定位系统

在铁路运输过程中&#xff0c;当列车到达车站时&#xff0c;需要用专用铁鞋将列车固定在前轮和后轮上&#xff0c;以防止列车打滑和前进。所以&#xff0c;实时掌握铁鞋的位置信息十分重要&#xff0c;如果工人忘记撤回铁鞋子&#xff0c;则可能导致车辆停车和跳轨等事故频发。…

淘宝关键词搜索分析商品价格走势(商品列表接口,销量接口,价格接口,分类ID精准商品数据接口)接口代码对接

淘宝 OpenAPI&#xff08;Open application programming interface&#xff09;是一套 REST 方式的开放应用程序编程接口。淘宝网根据自己提供的电子商务基础服务&#xff0c;抽象并做成一系列的 API 接口。通过这些接口&#xff0c;可以让外部用户能够通过程序的方式访问淘宝网…

Vue 实现轮播图功能

Vue 是一款流行的前端框架&#xff0c;它提供了一系列的工具和组件&#xff0c;使得开发者可以更加便捷地创建交互式的 Web 应用程序。轮播图是 Web 应用程序中常见的一种交互式组件&#xff0c;可以用来展示图片、新闻、广告等内容。在 Vue 中&#xff0c;我们可以使用第三方组…

RVMedia VCL 8.0 for Delphi Crack

RVMedia VCL 8.0 for Delphi Crack FGX Native Network Frame是制造跨平台和现代移动设备的强大工具。FG开发团队的主要目标是简化移动应用程序的开发&#xff0c;使大多数人都能以各种技能开发应用程序。此外&#xff0c;这种形式的网络最重要的功能可以在100%的用户界面中设计…

分库分表的 21 条法则,hold 住!

大家好&#xff0c;我是小富&#xff5e; &#xff08;一&#xff09;好好的系统&#xff0c;为什么要分库分表&#xff1f; 本文是《分库分表ShardingSphere5.x原理与实战》系列的第二篇文章&#xff0c;距离上一篇文章已经过去好久了&#xff0c;惭愧惭愧&#xff5e; 还是…

GIS应用技巧之Landsat、Sport等遥感影像去除黑边的多种方法

一、前言 当我们利用GIS对遥感影像处理过程中&#xff0c;有时候需要将几张小的影像图镶嵌为一张大的&#xff0c;但是却被黑边所阻挡&#xff0c;这时候就需要找到一种方法来将黑边去掉了。由于遥感影像一般都太大了&#xff0c;不好获取也不好处理&#xff0c;这里我们选用的…

【C#】继承和序列化

前言 在之前的一篇文章中&#xff1a; 【C#】复杂Json的反序列 任意Json获取_code bean的博客-CSDN博客其中result这个key对应的内容是可能发生变化的&#xff0c;所以这里可以用到泛型。如何将一个复杂类型的JSON进行反序列化。那就是如何把json拆解成一个个子类的过程。htt…

NLP 中语言表示 (向量化) 的基本原理和历史演变综述

目录 1 前言2 语言表示2.1 离散表示2.1.1 独热编码2.1.2 词袋模型2.1.3 TF-IDF 模型2.1.4 N-gram 模型2.1.5 基于聚类的表示 2.2 连续表示2.2.1 分布式表示2.2.2 Word Embedding2.2.2.1 Word2Vec2.2.2.2 GloVe2.2.2.3 FastText 2.2.3 基础神经网络模型2.2.3.1 神经词袋模型2.2.…

Excel 设置只能输入指定的字符

目录 1. 创建你要用的表格 2. 确定你要限定输入的行/ 列 3. 创建另一个sheet&#xff0c;用来保存限制输入的配置信息 4. 选中【是否外包】列&#xff0c;并选择数据验证 5. 设置数据限制 6. 确认结果 7. 不想设置配置sheet怎么办&#xff1f; 在工作中&#xff0c;你们…

基于Java+SpringBoot+vue+node.js的图书购物商城系统详细设计和实现

基于JavaSpringBootvuenode.js的图书购物商城系统详细设计和实现 博主介绍&#xff1a;5年java开发经验&#xff0c;专注Java开发、定制、远程、指导等,csdn特邀作者、专注于Java技术领域 作者主页 超级帅帅吴 Java项目精品实战案例《500套》 欢迎点赞 收藏 ⭐留言 文末获取源码…

70.建立一个轮播图组件第一部分

本次我们的目标是实现如图所示的 初始代码如下&#xff1a; ● 现在我们把图片、文本、按钮等元素添加进去 <div class"carousel"><img src"maria.jpg" alt"Maria de Almeida" /><blockquote class"testimonial">&…

一、Go基础知识入门

1、go语言介绍 2、go开发环境搭建 2.1、go的安装 go下载地址&#xff1a;All releases - The Go Programming Language&#xff0c;windows选择下载go1.20.2.windows-amd64.msi文件。 双击go1.20.2.windows-amd64.msi&#xff0c;点击"Next"&#xff0c;然后勾选同…

开启自主创新基础设施领先之路,超融合缘何能担当大任?

数字基础设施领域最重要的趋势都有哪些&#xff1f; 毫无疑问&#xff0c;超融合便是其中之一。Gartner《2022年中国ICT技术成熟度曲线报告》&#xff08;Hype Cycle for ICT in China, 2022&#xff09;预测&#xff0c;超融合技术未来2年内将到达“生产力成熟期”&#xff0…