Day10—Spark SQL基础

news2024/11/22 20:20:56

在这里插入图片描述

Spark SQL介绍

​ Spark SQL是一个用于结构化数据处理的Spark组件。所谓结构化数据,是指具有Schema信息的数据,例如JSON、Parquet、Avro、CSV格式的数据。与基础的Spark RDD API不同,Spark SQL提供了对结构化数据的查询和计算接口。

Spark SQL的主要特点:

  • 将SQL查询与Spark应用程序无缝组合

​ Spark SQL允许使用SQL或熟悉的API在Spark程序中查询结构化数据。与Hive不同的是,Hive是将SQL翻译成MapReduce作业,底层是基于MapReduce的;而Spark SQL底层使用的是Spark RDD。

  • 可以连接到多种数据源

​ Spark SQL提供了访问各种数据源的通用方法,数据源包括Hive、Avro、Parquet、ORC、JSON、JDBC等。

  • 在现有的数据仓库上运行SQL或HiveQL查询

​ Spark SQL支持HiveQL语法以及Hive SerDes和UDF (用户自定义函数) ,允许访问现有的Hive仓库。

DataFrame和DataSet

  • DataFrame的结构

​ DataFrame是Spark SQL提供的一个编程抽象,与RDD类似,也是一个分布式的数据集合。但与RDD不同的是,DataFrame的数据都被组织到有名字的列中,就像关系型数据库中的表一样。

​ DataFrame在RDD的基础上添加了数据描述信息(Schema,即元信息) ,因此看起来更像是一张数据库表。例如,在一个RDD中有3行数据,将该RDD转成DataFrame后,其中的数据可能如图所示:
在这里插入图片描述

  • DataSet的结构
    Dataset是一个分布式数据集,是Spark 1.6中添加的一个新的API。相比于RDD, Dataset提供了强类型支持,在RDD的每行数据加了类型约束。
    在这里插入图片描述
    在Spark中,一个DataFrame代表的是一个元素类型为Row的Dataset,即DataFrame只是Dataset[Row]的一个类型别名。

Spark SQL的基本使用

​ Spark Shell启动时除了默认创建一个名为sc的SparkContext的实例外,还创建了一个名为spark的SparkSession实例,该spark变量可以在Spark Shell中直接使用。

​ SparkSession只是在SparkContext基础上的封装,应用程序的入口仍然是SparkContext。SparkSession允许用户通过它调用DataFrame和Dataset相关API来编写Spark程序,支持从不同的数据源加载数据,并把数据转换成DataFrame,然后使用SQL语句来操作DataFrame数据。

Spark SQL函数

内置函数

​ Spark SQL内置了大量的函数,位于API org.apache.spark.sql.functions

中。其中大部分函数与Hive中的相同。

​ 使用内置函数有两种方式:一种是通过编程的方式使用;另一种是在SQL

语句中使用。

  • 以编程的方式使用lower()函数将用户姓名转为小写/大写,代码如下:
df.select(lower(col("name")).as("greet")).show()
df.select(upper(col("name")).as("greet")).show()

​ 上述代码中,df指的是DataFrame对象,使用select()方法传入需要查询的列,使用as()方法指定列的别名。代码col(“name”)指定要查询的列,也可以使用$"name"代替,代码如下:

df.select(lower($"name").as("greet")).show()
  • 以SQL语句的方式使用lower()函数,代码如下:
df.createTempView("temp")
spark.sql("select upper(name) as greet from temp").show()

​ 除了可以使用select()方法查询指定的列外,还可以直接使用filter()、groupBy()等方法对DataFrame数据进行过滤和分组,例如以下代码:

df.printSchema()  # 打印Schema信息
df.select("name").show()  # 查询name列
# 查询name列和age列,其中将age列的值增加1
df.select($"name",$"age"+1).show()
df.filter($"age">25).show() # 查询age>25的所有数据
# 根据age进行分组,并求每一组的数量
df.groupBy("age").count().show() 
自定义函数

​ 当Spark SQL提供的内置函数不能满足查询需求时,用户可以根据需求编写自定义函数(User Defined Functions, UDF),然后在Spark SQL中调用。

​ 例如有这样一个需求:为了保护用户的隐私,当查询数据的时候,需要将用户手机号的中间4位数字用星号()代替,比如手机号180***2688。这时就可以编写一个自定义函数来实现这个需求,实现代码如下:

package spark.demo.sql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

/**
 * 用户自定义函数,隐藏手机号中间4位
 */
object SparkSQLUDF {
  def main(args: Array[String]): Unit = {
    //创建或得到SparkSession
    val spark = SparkSession.builder()
      .appName("SparkSQLUDF")
      .master("local[*]")
      .getOrCreate()

    //第一步:创建测试数据(或直接从文件中读取)
    //模拟数据
    val arr=Array("18001292080","13578698076","13890890876")
    //将数组数据转为RDD
    val rdd: RDD[String] = spark.sparkContext.parallelize(arr)
    //将RDD[String]转为RDD[Row]
    val rowRDD: RDD[Row] = rdd.map(line=>Row(line))
    //定义数据的schema
    val schema=StructType(
      List{
        StructField("phone",StringType,true)
      }
    )
    //将RDD[Row]转为DataFrame
    val df = spark.createDataFrame(rowRDD, schema)

    //第二步:创建自定义函数(phoneHide)
    val phoneUDF=(phone:String)=>{
      var result = "手机号码错误!"
      if (phone != null && (phone.length==11)) {
        val sb = new StringBuffer
        sb.append(phone.substring(0, 3))
        sb.append("****")
        sb.append(phone.substring(7))
        result = sb.toString
      }
      result
    }
    //注册函数(第一个参数为函数名称,第二个参数为自定义的函数)
    spark.udf.register("phoneHide",phoneUDF)

    //第三步:调用自定义函数
    df.createTempView("t_phone")		//创建临时视图
    spark.sql("select phoneHide(phone) as phone from t_phone").show()
    // +-----------+
    // |      phone|
    // +-----------+
    // |180****2080|
    // |135****8076|
    // |138****0876|
    // +-----------+
  }
}
窗口(开窗)函数

​ 开窗函数是为了既显示聚合前的数据,又显示聚合后的数据,即在每一行的最后一列添加聚合函数的结果。开窗口函数有以下功能:

  • 同时具有分组和排序的功能
  • 不减少原表的行数
  • 开窗函数语法:

聚合类型开窗函数

sum()/count()/avg()/max()/min() OVER([PARTITION BY XXX] [ORDER BY XXX [DESC]]) 

排序类型开窗函数

ROW_NUMBER() OVER([PARTITION BY XXX] [ORDER BY XXX [DESC]])
  • 以row_number()开窗函数为例:

​ 开窗函数row_number()是Spark SQL中常用的一个窗口函数,使用该函数可以在查询结果中对每个分组的数据,按照其排列的顺序添加一列行号(从1开始),根据行号可以方便地对每一组数据取前N行(分组取TopN)。row_number()函数的使用格式如下:

row_number() over (partition by 列名 order by 列名 desc) 行号列别名

上述格式说明如下:

partition by:按照某一列进行分组;

order by:分组后按照某一列进行组内排序;

desc:降序,默认升序。

例如,统计每一个产品类别的销售额前3名,代码如下:

package spark.demo.sql

import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SparkSession}

/**
 * 统计每一个产品类别的销售额前3名(相当于分组求TOPN)
 */
object SparkSQLWindowFunctionDemo {
  def main(args: Array[String]): Unit = {
    //创建或得到SparkSession
    val spark = SparkSession.builder()
      .appName("SparkSQLWindowFunctionDemo")
      .master("local[*]")
      .getOrCreate()

    //第一步:创建测试数据(字段:日期、产品类别、销售额)
    val arr=Array(
      "2019-06-01,A,500",
      "2019-06-01,B,600",
      "2019-06-01,C,550",
      "2019-06-02,A,700",
      "2019-06-02,B,800",
      "2019-06-02,C,880",
      "2019-06-03,A,790",
      "2019-06-03,B,700",
      "2019-06-03,C,980",
      "2019-06-04,A,920",
      "2019-06-04,B,990",
      "2019-06-04,C,680"
    )
    //转为RDD[Row]
    val rowRDD=spark.sparkContext
      .makeRDD(arr)
      .map(line=>Row(
        line.split(",")(0),
        line.split(",")(1),
        line.split(",")(2).toInt
      ))
    //构建DataFrame元数据
    val structType=StructType(Array(
      StructField("date",StringType,true),
      StructField("type",StringType,true),
      StructField("money",IntegerType,true)
    ))
    //将RDD[Row]转为DataFrame
    val df=spark.createDataFrame(rowRDD,structType)

    //第二步:使用开窗函数取每一个类别的金额前3名
    df.createTempView("t_sales")		//创建临时视图
    //执行SQL查询
    spark.sql(
      "select date,type,money,rank from " +
        "(select date,type,money," +
        "row_number() over (partition by type order by money desc) rank "+
        "from t_sales) t " +
        "where t.rank<=3"
    ).show()
  }
}

在这里插入图片描述

结果展示

在这里插入图片描述

小结

本次学习了Spark SQL基础,学习Spark SQL基础是掌握大数据处理的关键一步。Spark SQL是Apache Spark的一个模块,它提供了对结构化和半结构化数据的高效处理能力。通过学习Spark SQL,你将能够使用SQL查询和DataFrame API来分析数据集。Spark SQL的核心优势在于其能够处理大规模数据集,同时保持高性能。它支持多种数据源,包括HDFS、S3、Parquet等,使得数据的读写变得简单。此外,Spark SQL还提供了丰富的数据类型和复杂的数据操作功能,如过滤、分组、排序和聚合。学习过程中,你将了解如何创建DataFrame,执行转换和操作,以及如何使用SQL语句进行查询。你还将学习到如何优化Spark SQL查询,包括使用分区、索引和缓存技术来提高性能。

掌握Spark SQL基础对于数据工程师和分析师来说非常重要,因为它不仅可以提高数据处理的效率,还可以帮助你更好地理解和分析大规模数据集。随着你的学习深入,你将能够更有效地利用Spark的强大功能来解决实际问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1841273.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

星戈瑞FITC-Cytochrome C:荧光标记细胞色素C的研究与应用

细胞色素C&#xff08;Cytochrome C&#xff09;是一种位于线粒体内膜上的蛋白质。为了深入地研究细胞色素C在细胞生物学和病理学中的功能&#xff0c;科学家们常常采用荧光标记技术对其进行追踪和观察。其中&#xff0c;异硫氰酸荧光素&#xff08;FITC&#xff09;作为一种常…

iOS原生APP开发的技术难点

iOS原生APP开发的技术难点主要体现在以下几个方面&#xff0c;总而言之&#xff0c;iOS原生APP开发是一项技术难度较高的工作&#xff0c;需要开发者具备扎实的编程基础、丰富的开发经验和良好的学习能力。北京木奇移动技术有限公司&#xff0c;专业的软件外包开发公司&#xf…

再谈量化策略失效的问题

数量技术宅团队在CSDN学院推出了量化投资系列课程 欢迎有兴趣系统学习量化投资的同学&#xff0c;点击下方链接报名&#xff1a; 量化投资速成营&#xff08;入门课程&#xff09; Python股票量化投资 Python期货量化投资 Python数字货币量化投资 C语言CTP期货交易系统开…

数据结构之B树的理解与示例(C#)

文章目录 B树的基本概念与特点B树在C#数据结构中的应用创建一个B树节点的具体代码示例插入、删除和查找操作的示例B树在文件存储与处理中的具体应用示例总结 B树是一种自平衡的树数据结构&#xff0c;它维持数据的有序性&#xff0c;并且允许搜索、顺序访问、插入和删除的操作都…

前后端完整案例-简单模仿点点开黑抽奖

数据库 后台 源码&#xff1a;https://gitee.com/qfp17393120407/game 前台 源码&#xff1a; https://gitee.com/qfp17393120407/game-weeb vue项目打包 注意&#xff1a;打包时将IP改为自己公网IP npm run build公网页面 地址&#xff1a;点点模拟抽奖 进入页面抽奖…

电路笔记 :LM3481MM/NOPB升压模块,升压电路原理

LM3481MM/NOPB LM3481MM/NOPB 是德州仪器&#xff08;Texas Instruments&#xff09;的一款广泛应用的DC-DC控制器&#xff0c;常用于电源管理应用&#xff0c;特别是在需要升压&#xff08;boost&#xff09;、反激&#xff08;flyback&#xff09;、SEPIC或反向配置的场合。…

企业UDP文件传输工具测速的方式(下)

在前一篇文章中&#xff0c;我们深入讨论了UDP传输的基本概念和镭速UDP文件传输工具如何使用命令行快速进行速度测试。现在&#xff0c;让我们进一步探索更为高级和灵活的方法&#xff0c;即通过整合镭速UDP的动态或静态库来实现网络速度的测量&#xff0c;以及如何利用这一过程…

Java零基础之多线程篇:线程的多种创建方式

哈喽&#xff0c;各位小伙伴们&#xff0c;你们好呀&#xff0c;我是喵手。运营社区&#xff1a;C站/掘金/腾讯云&#xff1b;欢迎大家常来逛逛 今天我要给大家分享一些自己日常学习到的一些知识点&#xff0c;并以文字的形式跟大家一起交流&#xff0c;互相学习&#xff0c;一…

支持向量机 (SVM) 算法详解

支持向量机 (SVM) 算法详解 支持向量机&#xff08;Support Vector Machine, SVM&#xff09;是一种监督学习模型&#xff0c;广泛应用于分类和回归分析。SVM 特别适合高维数据&#xff0c;并且在处理复杂非线性数据时表现出色。本文将详细讲解 SVM 的原理、数学公式、应用场景…

[Qt的学习日常]--窗口

前言 作者&#xff1a;小蜗牛向前冲 名言&#xff1a;我可以接受失败&#xff0c;但我不能接受放弃 如果觉的博主的文章还不错的话&#xff0c;还请点赞&#xff0c;收藏&#xff0c;关注&#x1f440;支持博主。如果发现有问题的地方欢迎❀大家在评论区指正 目录 一、窗口的分…

最新!!单目深度估计方向文献综述--Monocular Depth Estimation: A Thorough Review

论文链接&#xff1a;https://ieeexplore.ieee.org/abstract/document/10313067 Abstract 一个是考虑人类深度感知的机制&#xff0c;另一个是包括各种深度学习方法。 这篇论文是关于单目深度估计&#xff08;Monocular Depth Estimation&#xff09;的全面综述&#xff0c;由…

Flutter第十三弹 路由和导航

目标&#xff1a; 1.Flutter怎么创建路由&#xff1f; 2.怎么实现路由跳转&#xff1f;页面返回&#xff1f; 一、路由 1.1 什么是路由&#xff1f; 路由(Route)在移动开发中通常指页面&#xff08;Page&#xff09;&#xff0c;在Android中通常指一个Activity。所谓路由管…

wsl报错在BIOS中启用虚拟化

解决&#xff1a; Enable-WindowsOptionalFeature -Online -FeatureName Microsoft-Hyper-V -All 以高级管理员身份运行powershell&#xff0c;执行如上命令。

GoldWave软件下载 GoldWave 音频处理软件 v6.80.0 官方版下载附加详细安装步骤

准确来讲GoldWave软件防止GoldWave *原生音频插件被禁用。根据大数据结果显示回声效果&#xff1a;回声&#xff0c;顾名思义是指声音发出后经过一定的时间再返回被我们听到&#xff0c;就象在旷野上面对高山呼喊一样&#xff0c;在很多视频剪辑、配音中广泛采用&#xff0c; G…

最新版本IntelliJ IDEA安装与“坤活”使用

最新版本IntelliJ IDEA安装与“科学”使用 IntelliJ IDEA安装与坤活下载安装坤活idea1.将下面两个压缩文件解压到安装位置&#xff0c;注意路径不要包含中文空格等特殊符号2.双击 install-all-users.vbs &#xff0c;然后点击确定&#xff0c;等到出现 Done的弹窗3. 打开idea复…

【ajax基础04】form-serialize插件

目录 一&#xff1a;form-serialize插件 作用&#xff1a; 语法格式&#xff1a; 一&#xff1a;form-serialize插件 作用&#xff1a; 快速且大量的收集表单元素的值 例如上图对于多表单元素的情形&#xff0c;单靠通过”选择器获取节点.value”值的形式&#xff0c;获取…

Blazor 组件:创建、生命周期、嵌套和 UI 集成

在本文中&#xff0c;您将获得以下问题的答案。 什么是 Blazor 组件&#xff1f;如何使用组件&#xff1f;Blazor 组件的生命周期是什么&#xff1f;我们可以从一个组件调用另一个组件吗&#xff1f;如何创建 Blazor 组件&#xff1f;在组件中哪里写 C# 代码&#xff1f; 什么…

百度文心智能体平台(想象即现实):轻松上手,开启智能新时代!创建属于自己的智能体应用。

目录 1.1、文心智能体平台 1.2、创建智能体 1.3、智能体报名入口 1.4、古诗词小助手 1.5、访问我的智能体 在这个全新的时代里&#xff0c;人工智能技术正以前所未有的速度发展&#xff0c;渗透到我们生活的方方面面。无论是智能家居、自动驾驶&#xff0c;还是医疗诊断、…

漏洞挖掘 | 记一次src挖掘-小程序敏感信息泄露

权当是一次漏洞挖掘的思路分享 闲言 就现在的一个web漏洞挖掘强度还是非常高的&#xff0c;所以我们不妨把我们的眼光投向一个之前可能未曾涉及到的区域———小程序 是的微信小程序&#xff0c;这玩意的防范能力和过滤能力其实对比web方向是要弱小很多的 进入正题 以下就是…

如何设置Excel单元格下拉列表

如何设置Excel单元格下拉列表 在Excel中设置单元格下拉列表可以提高数据输入的准确性和效率。以下是创建下拉列表的步骤&#xff1a; 使用数据验证设置下拉列表&#xff1a; 1. 选择单元格&#xff1a; 选择你想要设置下拉列表的单元格或单元格区域。 2. 打开数据验证&…