Spark SQL 内置函数

news2024/12/22 2:52:13

文章目录

  • 一、Spark SQL内置函数
    • (一)内置函数概述
      • 1、10类内置函数
      • 2、两种使用方式
    • (二)内置函数演示
      • 1、通过编程方式使用内置函数upper()
      • 2、通过SQL语句的方式使用内置函数upper()
      • 3、演示其它内置函数的使用
  • 二、自定义函数
    • (一)自定义函数概述
    • (二)演示自定义函数
      • 1、提出任务:手机号保密
      • 2、编写程序,完成任务
  • 三、自定义聚合函数
    • (一)自定义聚合函数概述
    • (二)演示自定义聚合函数
      • 1、提出任务:实现求员工平均工资功能的UDAF
      • 2、编写程序,完成任务
  • 四、开窗函数
    • (一)开窗函数概述
    • (二)开窗函数使用格式
    • (三)开窗函数案例演示
      • 1、提出任务:统计前3名
      • 2、编写程序,实现功能,完成任务


一、Spark SQL内置函数

(一)内置函数概述

1、10类内置函数

Spark SQL内置了大量的函数,位于API org.apache.spark.sql.functions中。这些函数主要分为10类:UDF函数、聚合函数、日期函数、排序函数、非聚合函数、数学函数、混杂函数、窗口函数、字符串函数、集合函数,大部分函数与Hive中相同。

2、两种使用方式

使用内置函数有两种方式:一种是通过编程的方式使用;另一种是在SQL语句中使用。

(二)内置函数演示

读取HDFS上的people.json,得到数据帧,执行命令:val peopleDF = spark.read.json(“hdfs://master:9000/datasource/input/people.json”)
在这里插入图片描述
显示数据帧内容,执行命令:peopleDF.show()
在这里插入图片描述
导入Spark SQL内置函数,执行命令:import org.apache.spark.sql.functions._
在这里插入图片描述

1、通过编程方式使用内置函数upper()

利用upper()函数将姓名转成大写,执行命令:peopleDF.select(upper(col(“name”)).as(“name”)).show()
在这里插入图片描述
上述代码中,使用select()方法传入需要查询的列,使用as()方法指定列的别名。代码col(“name”)指定要查询的列,也可以使用 $"name" 代替,但是需要导入import spark.implicits._,执行命令:peopleDF.select(upper($"name").as("name")).show()
在这里插入图片描述
对某列使用了内置函数,如果还要显示其它列,就会报错
在这里插入图片描述

2、通过SQL语句的方式使用内置函数upper()

定义临时视图,执行命令:peopleDF.createTempView("t_people")
在这里插入图片描述
执行命令:spark.sql(“select upper(name) as name from t_people”).show()
在这里插入图片描述
执行命令:spark.sql(“select upper(name) as name, age from t_people”).show()
在这里插入图片描述

3、演示其它内置函数的使用

打印Schema信息,执行命令:peopleDF.printSchema()
在这里插入图片描述
查询name列,执行命令:peopleDF.select(“name”).show()
在这里插入图片描述
可用SQL语句方式来完成同样的任务
在这里插入图片描述
查询name列和age列,其中将age列的值增加1,执行命令:peopleDF.select($"name", $"age" + 1).show()
在这里插入图片描述
可用SQL语句方式来完成同样的任务
在这里插入图片描述
查询年龄大于21的记录,执行命令:peopleDF.filter($"age" > 21).show()
在这里插入图片描述
可用SQL语句方式来完成同样的任务
在这里插入图片描述
根据age进行分组,并求每一组的数量,执行命令:peopleDF.groupBy("age").count().show()
在这里插入图片描述
可用SQL语句方式来完成同样的任务
在这里插入图片描述

二、自定义函数

(一)自定义函数概述

当Spark SQL提供的内置函数不能满足查询需求时,用户可以根据自己的业务编写自定义函数(User Defined Functions,UDF),然后在Spark SQL中调用。

(二)演示自定义函数

1、提出任务:手机号保密

有这样一个需求:为了保护用户的隐私,当查询数据的时候,需要将用户手机号的中间4位用星号()代替,比如手机号147***9205。这时就可以写一个自定义函数来实现这个需求。

2、编写程序,完成任务

创建SparkSQLUDF单例对象
在这里插入图片描述

package net.army.sql.day01

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

/**
 * 功能:演示自定义函数
 * 日期:2023年06月14日
 * 作者:梁辰兴
 */
object SparkSQLUDF {
  def main(args: Array[String]): Unit = {
    // 创建或得到SparkSession
    val spark = SparkSession.builder()
      .appName("SparkSQLUDF")
      .master("local[*]")
      .getOrCreate()

    // 第一步:创建测试数据(亦可读取文件)
    // 创建电话模拟数据
    val arr = Array("15892925678", "13567892345", "18034561290", "13967678901")
    // 将数组转换成RDD
    val rdd: RDD[String] = spark.sparkContext.makeRDD(arr)
    // 将RDD[String]转为RDD[Row]
    val rowRDD: RDD[Row] = rdd.map(line => Row(line))
    // 定义数据的schema
    val schema = StructType(
      List {
        StructField("phone", StringType, true)
      }
    )
    // 将RDD[Row]转为DataFrame
    val df = spark.createDataFrame(rowRDD, schema)

    // 第二步:创建自定义函数(phoneHide)
    val phoneUDF = (phone: String) => {
      var result = "手机号码错误!"
      if (phone != null && phone.length == 11) {
        val buffer = new StringBuffer()
        buffer.append(phone.substring(0, 3))
        buffer.append("****")
        buffer.append(phone.substring(7))
        result = buffer.toString
      }
      result
    }
    // 注册函数(第一个参数为函数名称,第二个参数为自定义的函数)
    spark.udf.register("phoneHide", phoneUDF)

    // 第三步:调用自定义函数
    // 创建临时视图
    df.createTempView("t_phone")
    // 查询表,调用自定义函数处理phone字段
    spark.sql("select phoneHide(phone) as phone from t_phone").show()
  }
}

上述代码通过spark.udf.register()方法注册一个自定义函数phoneHide,然后使用spark.sql()方法传入SQL语句,在SQL语句中调用自定义函数phoneHide并传入指定的列,该列的每一个值将依次被自定义函数phoneHide处理。

运行程序,查看结果
在这里插入图片描述

三、自定义聚合函数

(一)自定义聚合函数概述

Spark SQL提供了一些常用的聚合函数,如count()、countDistinct()、avg()、max()、min()等。此外,用户也可以根据自己的业务编写自定义聚合函数(User Defined AggregateFunctions,UDAF)。

UDF主要是针对单个输入返回单个输出,而UDAF则可以针对多个输入进行聚合计算返回单个输出,功能更加强大。

(二)演示自定义聚合函数

1、提出任务:实现求员工平均工资功能的UDAF

员工工资数据存储于HDFS上/input目录里的employees.json文件中
在这里插入图片描述

{"name", "Army", "salary": 3500}
{"name", "Brown", "salary": 4500}
{"name", "Alice", "salary": 3200}
{"name", "Jenny", "salary": 5500}
{"name", "Mike", "salary": 6500}

2、编写程序,完成任务

创建MyAverage类,继承UserDefinedAggregateFunction类

在这里插入图片描述

package net.army.sql.day01

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, LongType, StructField, StructType}

/**
 * 功能:自定义聚合函数类,求平均值
 * 日期:2023年06月14日
 * 作者:梁辰兴
 */
class MyAverage extends UserDefinedAggregateFunction {
  // 聚合函数输入参数的类型,运行时会将需要聚合的每一个值输入聚合函数中
  // inputColumn为输入的列名,不做特殊要求,相当于一个列占位符
  override def inputSchema: StructType = StructType (
    List(StructField("inputColumn", LongType))
  )

  // 定义存储聚合运算产生的中间数据的Schema
  // sum和count不作特殊要求,为自定义名称
  override def bufferSchema: StructType = StructType(
    List(
      StructField("sum", LongType), // 参与聚合的数据总和
      StructField("count", LongType) // 参与聚合的数据数量
    )
  )

  // 定义数据类型
  override def dataType: DataType = DoubleType

  // 针对给定的同一组输入,聚合函数是否返回相同的结果,通常为true
  override def deterministic: Boolean = true

  // 初始化聚合运算的中间结果,中间结果存储于buffer中,buffer是一个Row类型
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0L // 与bufferSchema中的第一个字段(sum)对应,即sum的初始值
    buffer(1) = 0L // 与bufferSchema中的第二个字段(count)对应,即count的初始值
  }

  // 由于参与聚合的数据会依次输入聚合函数,因此每当向聚合函数输入新的数据时,都会调用该函数更新聚合中间结果
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (!input.isNullAt(0)) {
      buffer(0) = buffer.getLong(0) + input.getLong(0) // 更新参与聚合的数据总和
      buffer(1) = buffer.getLong(1) + 1 // 更新参与聚合的数据数量
    }
  }

  // 合并多个分区的buffer中间结果(分布式计算,参与聚合的数据存储于多个分区,每个分区都会产生buffer中间结果
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0) // 合并参与聚合的数据总和
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1) // 合并参与聚合的数据数量
  }

  // 计算最终结果,数据总和 / 数据数量 = 平均值
  override def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}

/**
 * 测试自定义聚合函数
 */
object MyAverage {
  def main(args: Array[String]): Unit = {
    // 创建或得到SparkSession
    val spark = SparkSession.builder()
      .appName("SparkSQLUDF")
      .master("local[*]")
      .getOrCreate()

    // 注册自定义聚合函数
    spark.udf.register("myAverage", new MyAverage)
    // 读取员工JSON数据
    val df = spark.read.json("hdfs://master:9000/input/employees.json")
    // 显示数据帧内容
    df.show()
    // 创建临时视图
    df.createOrReplaceTempView("employees")
    // 调用聚合函数进行查询
    val result = spark.sql("select myAverage(salary) as average_salary from employees")
    // 显示查询结果
    result.show()

    // 停止会话
    spark.toString
  }
}

四、开窗函数

(一)开窗函数概述

row_number()开窗函数是Spark SQL中常用的一个窗口函数,使用该函数可以在查询结果中对每个分组的数据,按照其排序的顺序添加一列行号(从1开始),根据行号可以方便地对每一组数据取前N行(分组取TOPN)。

(二)开窗函数使用格式

row_number() over (partition by 列名 order by 列名 desc) 行号列别名
partition by:按照某一列进行分组

order by:分组后按照某一列进行组内排序

desc:降序,默认升序

(三)开窗函数案例演示

1、提出任务:统计前3名

统计每一个产品类别的销售额前3名(相当于分组求TOPN)

2、编写程序,实现功能,完成任务

创建SparkSQLWindowFunctionDemo单例对象
在这里插入图片描述

package net.army.sql.day01

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

/**
 * 功能:统计每一个产品类别的销售额前3名
 * 日期:2023年06月14日
 * 作者:梁辰兴
 */
object SparkSQLWindowFunctionDemo {
  def main(args: Array[String]): Unit = {
    // 创建或得到SparkSession
    val spark = SparkSession.builder()
      .appName("SparkSQLUDF")
      .master("local[*]")
      .getOrCreate()

    // 第一步:创建测试数据(字段:日期、产品类别、销售额)
    val arr = Array(
      "2022-05-10,A,710",
      "2022-05-10,B,530",
      "2022-05-10,C,670",
      "2022-05-11,A,520",
      "2022-05-11,B,730",
      "2022-05-11,C,610",
      "2022-05-12,A,500",
      "2022-05-12,B,700",
      "2022-05-12,C,650",
      "2022-05-13,A,620",
      "2022-05-13,B,690",
      "2022-05-13,C,700",
      "2022-05-14,A,720",
      "2022-05-14,B,680",
      "2022-05-14,C,590"
    )
    // 转为RDD[Row]
    val rowRDD = spark.sparkContext
      .makeRDD(arr)
      .map(line => Row(
        line.split(",")(0),
        line.split(",")(1),
        line.split(",")(2).toInt
      ))
    // 构建数据帧元数据
    val structType = StructType(
      List(
        StructField("date", StringType, true),
        StructField("type", StringType, true),
        StructField("money", IntegerType, true)
      ))
    // 将RDD[Row]转成数据帧
    val df = spark.createDataFrame(rowRDD, structType)

    // 第二步:使用开窗函数取每个类别的金额前3名
    // 创建临时视图
    df.createTempView("t_sales")
    // 执行SQL查询,显示每个类别排名
    spark.sql(
      """
        |select date, type, money,
        |        row_number() over (partition by type order by money desc) rank
        |        from t_sales
        |""".stripMargin
    ).show()

    // 执行SQL查询,取每个类别前3名
    spark.sql(
      """
        |select date, type, money, rank from
        |  (
        |     select date, type, money,
        |        row_number() over (partition by type order by money desc) rank
        |        from t_sales
        |  ) sale
        |where sale.rank <= 3
        |""".stripMargin
    ).show()
  }
}

运行程序,查看结果
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/653265.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

离散数学题目收集整理练习(期末过关进度30%)

✨博主&#xff1a;命运之光 &#x1f984;专栏&#xff1a;离散数学考前复习&#xff08;知识点题&#xff09; &#x1f353;专栏&#xff1a;概率论期末速成&#xff08;一套卷&#xff09; &#x1f433;专栏&#xff1a;数字电路考前复习 ✨博主的其他文章&#xff1a;点击…

软件 安全,处理威胁STRIDE模型

微软威胁分析工具&#xff1a; https://www.microsoft.com/en-us/securityengineering/sdl/threatmodeling?azure-portaltrue STRIDE 是微软定义的6中威胁 身份假冒&#xff08;Spoofing&#xff09; 身份假冒&#xff0c;即伪装成某对象或某人。例如&#xff0c;我们通过伪…

校园旧物商城系统

一、项目说明 校园旧物回收商城&#xff0c;使用SpringbootVue2.x开发,使用了JWT、MybatisPlus、JWT、ElementUI 项目已经开源在https://github.com/astudent2020/Campus_waste_recycling 文章目录 一、项目说明一、说明书1、用户主页&#xff1a;2、登录注册页面3、后台页面…

在Maya、ZBrush和UE中制作龙香炉

大家好&#xff0c;今天云渲染小编给大家带来的分享是来自印尼的CG艺术家Stephen Herman“龙香炉”道具分解幕后花絮。 介绍 大家好&#xff01;我叫 Stephen Herman&#xff0c;是来自印度尼西亚雅加达的 3D 艺术家。 目前&#xff0c;我在 Bandai Namco Studios Malaysia …

数字电路基础---时序逻辑

时序逻辑 通过前面的组合逻辑的学习&#xff0c;我们知道了组合逻辑电路是没有记忆功能的&#xff0c;也就是说在任何时刻产生的输出信号都仅仅取决于该时刻电路的输入信号&#xff0c;而与它以前的输入信号是无关的。下来我们来学习下什么是时序逻辑。 1、简介 时序电路是有…

C语言实现顺序表

绪论 从本章开始就是开始数据结构的开端&#xff0c;本章将会写出数据结构中的顺序表的代码实现&#xff0c;多会以注释的方法来描述一些细节&#xff08;注释是我们程序员必须常用的工具&#xff09;。 话不多说安全带系好&#xff0c;发车啦&#xff08;建议电脑观看&#xf…

分析:如何多线程运行测试用例

这是时常被问到的问题&#xff0c;尤其是UI自动化的运行&#xff0c;过程非常耗时&#xff0c;所以&#xff0c;所以多线程不失为一种首先想到的解决方案。 多线程是针对的测试用例&#xff0c;所以和selenium没有直接关系&#xff0c;我们要关心的是单元测试框架。 unittest …

生态伙伴 | 华秋硬创联合湾加速,共同加速企业发展

01 大赛介绍 中国硬件创新创客大赛始于2015年&#xff0c;由深圳华秋电子有限公司主办&#xff0c;至今已经成功举办八届&#xff0c;赛事范围覆盖华南、华东、华北三大地区&#xff0c;超10个省市区域。 大赛影响了超过45万工程师群体&#xff0c;吸引了35000多名硬创先锋报…

云可观测性技术的应用领域及价值有哪些?

随着云计算的迅速发展&#xff0c;云可观测性技术成为了越来越重要的一项技术。它可以帮助企业实时监测、分析和优化其在云环境中运行的应用程序和系统&#xff0c;那云可观测性技术的应用领域及价值有哪些&#xff1f; 一、应用性能监测与优化 云可观测性技术使得企业能够实时…

(字符串) 844. 比较含退格的字符串——【Leetcode每日一题】

❓844. 比较含退格的字符串 难度&#xff1a;简单 给定 s 和 t 两个字符串&#xff0c;当它们分别被输入到空白的文本编辑器后&#xff0c;如果两者相等&#xff0c;返回 true 。# 代表退格字符。 注意&#xff1a;如果对空文本输入退格字符&#xff0c;文本继续为空。 示例…

【SpringBoot】整合Elasticsearch 快速入门操作索引

官网操作文档&#xff1a;Elasticsearch Clients | Elastic 踩坑太多了。。。这里表明一下Spring Boot2.4以上版本可能会出现问题&#xff0c;所以我降到了2.2.1.RELEASE。对于现在2023年6月而言&#xff0c;Es版本已经到了8.8&#xff0c;而SpringBoot版本已经到了3.x版…

【实战】Python爬虫之代理使用详解

在Python爬虫中&#xff0c;代理的使用非常常见。代理的主要作用是隐藏客户端的真实IP地址&#xff0c;从而实现更高的网络访问速度和更好的访问隐私保护。下面我们将通过Python爬虫的实例&#xff0c;带你详细了解Python爬虫中代理的使用方法。 目录 ## 1. 代理原理和作用 …

多维度员工信息整合查询——红海云员工信息数字化管理实用指南(中)

红海云员工全生命周期数字化管理平台从信息源头开始管控员工数据质量&#xff0c;在员工数据的采集、更新、审核环节采用多种方式保障员工信息的准确性、完整性、时效性和一致性&#xff0c;为企业搭建坚实可靠的人力资源管理数字化基座。但在有了准确可靠的员工数据基础后&…

APP测试应该从哪些方面入手?其实就这几点

前言 还在苦恼怎么去测APP吗&#xff1f; 一定要记住这几个方向&#xff0c;然后流程化的去执行&#xff0c;一来严谨规范&#xff0c;二来不会有遗漏。 1、需求检查&#xff1a; 在需求评审的时候展现你的业务能力啦&#xff01;不过还是得口下留情哟。&#xff08;PM心里瑟…

GitOps指南

GitOps基于CICD和IaC&#xff0c;以一致的方式管理代码和部署&#xff0c;是DevOps最佳实践之一。本文完整介绍了GitOps的理念和实践&#xff0c;并介绍了Weave Cloud的GitOps模型和工具&#xff0c;从整体上提供了实践GitOps的路径和方案。原文&#xff1a;Guide To GitOps[1]…

C++中的一些小技巧,numeric_limits、static_cast、reinterpret_cast方法内存验证

1、获取指定类型的最大值和最小值 在准备求一堆double数据中的最大值最小值的时候&#xff0c;常规做法是预估这堆数据的最大最小值&#xff0c;然后进行比较求&#xff0c;在重构别人代码的时候发现&#xff0c;可以准确知道double类型最大值或者最小值&#xff0c;获取方法如…

Apikit 自学日记:分享 API 文档

开启/关闭在线分享 您可以在线分享项目给团队以外的人&#xff0c;其他人可以通过分享链接在线查看API文档并且进行API测试。通过这种方式查看API文档不需要注册账号&#xff0c;用户可方便查看接口文档和测试接口。 在项目内&#xff0c;点击进入项目管理菜单&#xff0c;选择…

银河麒麟部署达梦8数据库开发者版本详细教程

我的系统信息如下&#xff1a; 系统架构&#xff1a;X86架构 系统信息&#xff1a;银河麒麟&#xff08;V10&#xff09; CPU&#xff1a;interl E5 官方安装文档&#xff1a;安装及卸载 | 达梦技术文档 (dameng.com) 数据库下载&#xff1a; 下载地址&#xff1a;产品下载…

【深度学习】2-5 神经网络-批处理

批处理&#xff08;Batch Processing&#xff09;是指在深度学习中每次迭代更新模型参数时同时处理多个样本的方式。 批处理时要注意对应维度的元素个数要一致 关于之前手写数字识别的例子&#xff1a; 用图表示&#xff0c;可以发现&#xff0c;多维数组的对应维度的元素个数…

体验DIY物联网浏览器(谷歌内核兼容性好支持H264视频播放)

一、功能及快捷键说明(说明32位兼容64位,版本往下看) 功能及快捷键图说明,不可多得的浏览器,支持右键自定义菜单... 二、下载安装包 2.1 版本 100.0.230 (支持H264版本)介绍 cefsharp物联网浏览器-支持H264(100.0.230)_cefsharp h264_久爱物联网的博客-CSDN博客 …