Spark SQL 概述

news2024/9/25 21:27:51

Spark SQL 概述

Spark SQL 是 Apache Spark 的一个模块,专门用于处理结构化数据。它集成了 SQL 查询和 Spark 编程的强大功能,使得处理大数据变得更加高效和简便。通过 Spark SQL,用户可以直接在 Spark 中使用 SQL 查询,或者使用 DataFrame 和 DataSet API 进行数据操作。

  • 一、Spark SQL 架构
  • 二、Spark SQL 特点
  • 三、Spark SQL 运行原理
  • 四、Spark SQL API 相关概述
  • 五、Spark SQL 依赖
  • 六、Spark SQL 数据集
    • 1、DataFrame
    • 2、Dataset
    • 3、DataFrame 和 Dataset 的关系
  • 七、Spark Sql 基本用法
    • 1、Scala 创建 SparkSession 对象
    • 2、DataFrame 和 Dataset 的创建方式
    • 3、DataFrame API

一、Spark SQL 架构

Spark SQL 的架构主要由以下几个组件组成:

  1. SparkSession:Spark 应用的统一入口点,用于创建 DataFrame、DataSet 和执行 SQL 查询。
  2. Catalyst 优化器:Spark SQL 的查询优化引擎,负责解析、分析、优化和生成物理执行计划。
  3. DataFrame 和 DataSet API:提供面向对象的编程接口,支持丰富的数据操作方法。
  4. 数据源接口:支持多种数据源,如 HDFS、S3、HBase、Cassandra、Hive 等。
  5. 执行引擎:将优化后的查询计划转换为执行任务,并在分布式集群上并行执行这些任务。

二、Spark SQL 特点

  • 统一数据访问接口:支持多种数据源(如 CSV、JSON、Parquet、Hive、JDBC、HBase 等)并提供一致的查询接口。
  • DataFrame 和 Dataset API:提供面向对象的编程接口,支持类型安全的操作,便于数据处理。
  • Catalyst 优化器:自动将用户的查询转换为高效的执行计划,提升查询性能。
  • 与 Hive 的集成:无缝集成 Hive,能够直接访问现存的 Hive 数据,并使用 Hive 的 UDF 和 UDAF。
  • 高性能:通过 Catalyst 优化器和 Tungsten 执行引擎,实现高效的查询性能和内存管理。
  • 多种操作方式:支持 SQL 和 API 编程两种操作方式,灵活性高。
  • 外部工具接口:提供 JDBC/ODBC 接口供第三方工具借助 Spark 进行数据处理。
  • 高级接口:提供了更高层级的接口,方便地处理数据。

三、Spark SQL 运行原理

在这里插入图片描述

查询解析(Query Parsing):将 SQL 查询解析成抽象语法树(AST)。

逻辑计划生成(Logical Plan Generation):将 AST 转换为未优化的逻辑计划。

逻辑计划优化(Logical Plan Optimization):使用 Catalyst 优化器对逻辑计划进行一系列规则优化。

物理计划生成(Physical Plan Generation):将优化后的逻辑计划转换为一个或多个物理计划,并选择最优的物理计划。

执行(Execution):将物理计划转换为 RDD,并在集群上并行执行。

四、Spark SQL API 相关概述

SparkContext:SparkContext 是 Spark 应用程序的主入口点,负责连接到 Spark 集群,管理资源和任务调度。在 Spark 2.0 之后,推荐使用 SparkSession 取代 SparkContext。

SQLContext:SQLContext 是 Spark SQL 的编程入口点,允许用户通过 SQL 查询或 DataFrame API 进行数据处理。它提供了基本的 Spark SQL 功能。

HiveContext:HiveContext 是 SQLContext 的子集,增加了对 Hive 的集成支持,可以直接访问 Hive 中的数据和元数据,使用 Hive 的 UDF 和 UDAF。

SparkSession:SparkSession 是 Spark 2.0 引入的新概念,合并了 SQLContext 和 HiveContext 的功能,提供了统一的编程接口。SparkSession 是 Spark SQL 的建议入口点,支持使用 DataFrame 和 Dataset API 进行数据处理。

创建 SparkContext 和 SparkSession 的注意事项:如果同时需要创建 SparkContext 和 SparkSession,必须先创建 SparkContext,再创建 SparkSession。如果先创建 SparkSession,再创建 SparkContext,会导致异常,因为在同一个 JVM 中只能运行一个 SparkContext。

五、Spark SQL 依赖

<properties>
    <spark.version>3.1.2</spark.version>
    <spark.scala.version>2.12</spark.scala.version>
</properties>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${spark.scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>

六、Spark SQL 数据集

在 Spark SQL 中,数据集主要分为以下几种类型:DataFrame 和 Dataset。它们是处理和操作结构化和半结构化数据的核心抽象。

1、DataFrame

Dataset 是在 Spark 2.0 中引入的新的抽象数据结构,它是强类型的,可以存储 JVM 对象。Dataset API 结合了 DataFrame 的操作简便性和类型安全性,适用于需要更高级别数据类型控制和面向对象编程风格的场景。具体特点如下:

  • 类似于二维表格:DataFrame 类似于传统的关系数据库中的二维表格。
  • Schema(数据结构信息):在 RDD 的基础上加入了 Schema,描述数据结构的信息。
  • 支持嵌套数据类型:DataFrame 的 Schema 支持嵌套的数据类型,如 structmaparray
  • 丰富的 SQL 操作 API:提供更多类似 SQL 操作的 API,便于进行数据查询和操作。

2、Dataset

Dataset 是在 Spark 2.0 中引入的新的抽象数据结构,它是强类型的,可以存储 JVM 对象。Dataset API 结合了 DataFrame 的操作简便性和类型安全性,适用于需要更高级别数据类型控制和面向对象编程风格的场景。具体特点如下:

  • 强类型:Spark 1.6中引入的一个更通用的数据集合,Dataset 是强类型的,提供类型安全的操作。
  • RDD + Schema:可以认为 Dataset 是 RDD 和 Schema 的结合,既有 RDD 的分布式计算能力,又有 Schema 描述数据结构的信息。
  • 适用于特定领域对象:可以存储和操作特定领域对象的强类型集合。
  • 并行操作:可以使用函数或者相关操作并行地进行转换和操作。

3、DataFrame 和 Dataset 的关系

  • DataFrame 是特殊的 Dataset:DataFrame 是 Dataset 的一个特例,即 DataFrame = Dataset[Row]
  • 数据抽象和操作方式的统一:DataFrame 和 Dataset 统一了 Spark SQL 的数据抽象和操作方式,提供了灵活且强大的数据处理能力。

七、Spark Sql 基本用法

1、Scala 创建 SparkSession 对象

import org.apache.spark.sql.SparkSession
object SparkSqlContext {

  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象,设置应用程序的配置
    val conf: SparkConf = new SparkConf()
      .setMaster("local[4]")   // 设置本地运行模式,使用 4 个线程
      .setAppName("spark sql") // 设置应用程序名称为 "spark sql"

    // 创建 SparkSession 对象,用于 Spark SQL 的编程入口
    val spark: SparkSession = SparkSession.builder()
      .config(conf) // 将 SparkConf 配置应用于 SparkSession
      .getOrCreate() // 获取现有的 SparkSession,或者新建一个
	
    // 获取 SparkContext 对象,可以直接从 SparkSession 中获取
    val sc: SparkContext = spark.sparkContext

    // 导入 SparkSession 的隐式转换,可以使用 DataFrame API 的方法
    import spark.implicits._

    // 在这里可以编写数据处理代码,例如创建 DataFrame 和 Dataset,进行数据操作等...

    // 停止 SparkSession,释放资源
    spark.stop()
  }
}

2、DataFrame 和 Dataset 的创建方式

1、从集合创建

case class Person(name: String, age: Int)				// 下同

val data1 = Seq(Person("Alice", 25), Person("Bob", 30))	
val ds: Dataset[Person] = spark.createDataset(data)		// 这里的spark是SparkSession对象(如上代码),下同

val data2 = Seq(("Alice", 25), ("Bob", 30))
val df: DataFrame = data.toDF("name", "age")

1、从文件系统读取

val schema = StructType(Seq(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false)
))

val dsJson: Dataset[Person] = spark.read.json("/path/to/json/file").as[Person]

val dfCsv: DataFrame = spark.read
	// 使用.schema方法指定CSV文件的模式(schema)其定义了DataFrame的列名和类型。
	// 这是一个可选步骤,但如果CSV文件没有头部行,或者你想覆盖文件中的头部行,则必须指定。  
  .schema(schema)		  
   // 这里设置"header"为"true",表示CSV文件的第一行是列名,不需要Spark从文件中自动推断。 
  .option("header", "true")
  .csv("/path/to/csv/file")

3、从关系型数据库读取

val url = "jdbc:mysql://localhost:3306/database"
val properties = new java.util.Properties()
properties.setProperty("user", "username")
properties.setProperty("password", "password")

val dsDb: Dataset[Person] = spark.read.jdbc(url, "table", properties).as[Person]

val dfDb: DataFrame = spark.read.jdbc(url, "table", properties)

4、从非结构化数据源读取

val dsParquet: Dataset[Person] = spark.read.parquet("/path/to/parquet/file").as[Person]

val dfParquet: DataFrame = spark.read.parquet("/path/to/parquet/file")

5、手动创建 Dataset

import org.apache.spark.sql.types._

val schema = StructType(Seq(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false)
))
val data = Seq(Row("Alice", 25), Row("Bob", 30))

val dsManual: Dataset[Person] = spark.createDataFrame(spark.sparkContext.parallelize(data), schema).as[Person]

val dfManual: DataFrame = spark.createDataFrame(
  spark.sparkContext.parallelize(data), schema
)

3、DataFrame API

语法示例一

模拟数据(1000条):

id,name,gender,age,city
1,邵睿,男,12,上海市
2,林子异,男,48,广州市
3,孟秀英,女,46,上海市
4,金嘉伦,男,8,北京市
...

需求:哪些城市和性别组合在人口较多(ID数量>50)的情况下具有最高的平均年龄,以及这些组合在各自性别中的排名。

// 导入SparkSession的隐式转换,这样可以使用DataFrame的便捷方法(例如下面的'$'符号)
import spark.implicits._

// 定义了一个DataFrame的schema,但在这个例子中,使用了CSV的header来自动推断schema
val schema = StructType(Seq(
  StructField("id", LongType),
  StructField("name", StringType),
  StructField("gender", StringType),
  StructField("age", IntegerType),
  StructField("city", StringType),
))

// 定义WindowSpec,用于后续的窗口函数操作,按gender分区,按avg_age降序排序,(复用使用此)
val WindowSpec: WindowSpec = Window
  .partitionBy($"gender")
  .orderBy($"avg_age".desc)

// 从CSV文件中读取数据,使用header作为列名,然后选择特定的列,进行分组和聚合操作
// 哪些城市和性别组合在人口较多(ID数量>50)的情况下具有最高的平均年龄,以及这些组合在各自性别中的排名。
spark.read
   // .schema(schema)	// 应用我们定义的schema 
  .option("header", "true") 							// 使用CSV的header作为列名
  .csv("D:\\projects\\sparkSql\\people.csv")			// DataFrame
  .select($"id", $"name", $"age", $"city", $"gender") 	// 选择需要的列(不写默认就是全选)
  .groupBy($"city", $"gender") 							// 按城市和性别分组
  .agg(			// 多重聚合
    count($"id").as("count"),   		// 计算每个组的ID数量
    round(avg($"age"), 2).as("avg_age") // 计算每个组的平均年龄,并保留两位小数
  )
  .where($"count".gt(50))  		// 过滤出ID数量大于(可以使用>)50的组
  .orderBy($"avg_age".desc)     // 按平均年龄降序排序

  .select($"city", $"gender", $"avg_age",
    dense_rank().over(Window.partitionBy($"gender").orderBy($"avg_age".desc)).as("gender_avg_age_rank"))
  .show() // 显示结果

结果:

+------+------+-------+-------------------+
|  city|gender|avg_age|gender_avg_age_rank|
+------+------+-------+-------------------+
|北京市|    男|  41.05|                  1|
|  东莞|    男|  42.81|                  2|
|上海市|    男|  43.92|                  3|
|成都市|    男|  45.89|                  4|
|  中山|    男|  47.08|                  5|
|广州市|    男|  47.47|                  6|
|  深圳|    男|  48.36|                  7|
|上海市|    女|  46.02|                  1|
|  中山|    女|  49.55|                  2|
+------+------+-------+-------------------+

语法示例二:视图,sql

// 读取CSV文件到DataFrame,使用header作为列名
val dfPeople: DataFrame = spark.read
    .option("header", "true") // 使用CSV的header作为列名
    .csv("D:\\projects\\sparkSql\\people.csv")

// 将DataFrame注册为临时视图
dfPeople.createOrReplaceTempView("people_view")
// 可以使用Spark SQL来查询这个视图了
// 例如,查询所有人的姓名和年龄
spark.sql("SELECT name, age FROM people_view").show()
// 二
spark.sql(
      """
        |select * from people_view
        |where gender = '男'
        |""".stripMargin
    ).show()

语法示例三:join

case class Student(name: String, classId: Int)
case class Class(classId: Int, className: String)

val frmStu = spark.createDataFrame(
  Seq(
    Student("张三", 1),
    Student("李四", 1),
    Student("王五", 2),
    Student("赵六", 2),
    Student("李明", 2),
    Student("王刚", 4),
    Student("王朋", 5),
  )
)

val frmClass = spark.createDataFrame(
  Seq(
    Class(1, "name1"),
    Class(2, "name2"),
    Class(3, "name3"),
    Class(4, "name4")
  )
)

left 左连接,rignt 右连接, full 全外连接,anti左差集,semi左交集

// 别名 + inner 内连接
frmStu.as("S")
	.join(frmClass.as("C"), $"S.classId" === $"C.classId")	// joinType 默认 inner内连接
	.show()

// 使用左外连接将df和frmClass根据classId合并
frmStu
  .join(frmClass, Seq("classId"), "left")	
  .show()

// 左差集
frmStu
  .join(frmClass, Seq("classId"), "anti")	
  .show()

// 左交集
frmStu
  .join(frmClass, Seq("classId"), "semi")	
  .show()

结果

别名 + inner 内连接
+----+-------+-------+---------+
|name|classId|classId|className|
+----+-------+-------+---------+
|张三|      1|      1|    name1|
|李四|      1|      1|    name1|
|王五|      2|      2|    name2|
|赵六|      2|      2|    name2|
|李明|      2|      2|    name2|
|王刚|      4|      4|    name4|
+----+-------+-------+---------+

使用左外连接将df和frmClass根据classId合并
+-------+----+---------+
|classId|name|className|
+-------+----+---------+
|      1|张三|    name1|
|      1|李四|    name1|
|      2|王五|    name2|
|      2|赵六|    name2|
|      2|李明|    name2|
|      4|王刚|    name4|
|      5|王朋|     null|
+-------+----+---------+

左差集
+-------+----+
|classId|name|
+-------+----+
|      5|王朋|
+-------+----+

左交集
+-------+----+
|classId|name|
+-------+----+
|      1|张三|
|      1|李四|
|      2|王五|
|      2|赵六|
|      2|李明|
|      4|王刚|
+-------+----+

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1920358.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

i18n、L10n、G11N 和 T9N 的含义

注&#xff1a;机翻&#xff0c;未校对。 Looking into localization for the first time can be terrifying, if only due to all of the abbreviations. But the meaning of i18n, L10n, G11N, and T9N, are all very easy to understand. 第一次研究本地化可能会很可怕&…

Git 删除包含敏感数据的历史记录及敏感文件

环境 Windows 10 Git 2.41.0 首先备份你需要删除的文件&#xff08;如果还需要的话&#xff09;&#xff0c;因为命令会将本地也删除将项目中修改的内容撤回或直接提交到仓库中&#xff08;有修改内容无法提交&#xff09; 会提示Cannot rewrite branches: You have unstaged …

给后台写了一个优雅的自定义风格的数据日志上报页面

highlight: atelier-cave-dark 查看后台数据日志是非常常见的场景,经常看到后台的小伙伴从服务器日志复制一段json数据字符串,然后找一个JSON工具网页打开,在线JSON格式化校验。有的时候,一些业务需要展示mqtt或者socket的实时信息展示,如果不做任何修改直接展示一串字符…

音频语言学习领域数据集现状、分类及评估

Audio Language Learning (Audio-Text Learning) 是一个新兴的研究领域&#xff0c;专注于处理、理解和描述声音。它的发展动力是机器学习技术的进步以及越来越多地将声音与其相应的文本描述相结合的数据集的可用性。 Audio Language Models (ALMs) 是这个领域的关键技术&#…

零基础学python(二)

1. 字典 字典的创建 最常见的创建方式&#xff1a; d {"k1": "v1", "k2": "v2"} 再向字典中添加一对键值&#xff1a; d["k3"] "v3" 字典还可以用dict()创建&#xff0c;这种方式中&#xff0c;键是不加引…

嵌入式工程师从0开始,到底该学什么,怎么学?

作为嵌入式工程师&#xff0c;从零开始学习需要掌握以下几个关键方面。我收集归类了一份嵌入式学习包&#xff0c;对于新手而言简直不要太棒&#xff0c;里面包括了新手各个时期的学习方向编程教学、问题视频讲解、毕设800套和语言类教学&#xff0c;敲个22就可以免费获得。 基…

WPS打开PDF文件的目录

WPS打开PDF文件的目录 其实WPS中PDF文件并没有像Word那样标准的目录&#xff0c;但是倒是有书签&#xff0c;和目录一个效果 点击左上角书签选项&#xff0c;或者使用Alt Shift 1快捷键即可

java解决实例问题--拿硬币堆

题目&#x1f38a; 编程梦想家&#xff08;大学生版&#xff09;-CSDN博客 桌上有 n 堆力扣币&#xff0c;每堆的数量保存在数组 coins 中。我们每次可以选择任意一堆&#xff0c;拿走其中的一枚或者两枚&#xff0c;求拿完所有力扣币的最少次数。 ❤ 这个问题实际上是一个贪…

【简历】西安某211大学研究生:Java简历面试通过率低

注&#xff1a;为保证用户信息安全&#xff0c;姓名和学校等信息已经进行同层次变更&#xff0c;内容部分细节也进行了部分隐藏 简历说明 这个同学是211研究生的一份Java简历,这个简历版面没有问题,但是因为主项目重复度过大,所以导致这个简历的简历通过率会大大降低,面试通过…

《Windows API每日一练》9.2.1 菜单

■和菜单有关的概念 窗口的菜单栏紧挨着标题栏下面显示。这个菜单栏有时叫作程序的“主菜单”或“顶级菜单“&#xff08;top-level menu&#xff09;。顶级菜单中的菜单项通常会激活下拉菜单&#xff08;drop-downmenu&#xff09;&#xff0c;也 叫“弹出菜单”&#xff08;…

头歌资源库(25)地图着色

一、 问题描述 任何平面区域图都可以用四种颜色着色&#xff0c;使相邻区域颜色互异。这就是四色定理。要求给定区域图&#xff0c;排出全部可能的着色方案。例如&#xff0c;区域图如下图所示&#xff1a; 要求用四种颜色着色。 则输入&#xff1a; 10 4 &#xff08;分别表示…

什么是敏捷本地化

快速、敏捷的多语言产品和服务交付正逐渐成为众多行业的常态。在这种情况下&#xff0c;重点从传统的期望&#xff08;即在合理的时间框架内翻译大量内容&#xff09;转变为翻译工作量非常大的小片段&#xff0c;通常在2-3到12-24小时之间&#xff0c;通常在周末或假期。 Logr…

如何做好漏洞扫描工作提高网络安全

在数字化浪潮席卷全球的今天&#xff0c;企业数字化转型已成为提升竞争力、实现可持续发展的关键路径。然而&#xff0c;这一转型过程并非坦途&#xff0c;其中网络安全问题如同暗礁般潜伏&#xff0c;稍有不慎便可能引发数据泄露、服务中断乃至品牌信誉受损等严重后果。因此&a…

usbserver工程师手记(三)手工开通 OTP功能

1、设定密钥&#xff0c;用户自行选择一个密钥&#xff0c;以下以密钥为 EAZAYOKNGETBOPC5 为例说明 2、usb server 配置otp 密钥&#xff0c;目前还没有UI 界面开通&#xff0c;后续版本会支持从管理界面开通 curl -X POST -H Content-Type: application/json -H Accept: app…

mysql高可用解决方案:MHA原理及实现

MHA&#xff1a;Master High Availability。对主节点进行监控&#xff0c;可实现自动故障转移至其它从节点&#xff1b;通过提升某一从节点为新的主节点&#xff0c;基于主从复制实现&#xff0c;还需要客户端配合实现&#xff0c;目前MHA主要支持一主多从的架构&#xff0c;要…

应力平衡方程的推导

应力平衡方程的推导 对于一点&#xff0c;已知其应力状态有&#xff1a; σ x , τ x y , τ x z \sigma_x,\tau_{xy},\tau_{xz} σx​,τxy​,τxz​ 则其附近点的应力状态为&#xff1a; σ x ∂ σ x ∂ x d x , τ x y ∂ τ x y ∂ x d x , τ x z ∂ τ x z ∂ x d …

【JavaScript 报错】未捕获的范围错误:Uncaught RangeError

&#x1f525; 个人主页&#xff1a;空白诗 文章目录 一、错误原因分析1. 递归调用次数过多2. 数组长度超出限制3. 数值超出允许范围 二、解决方案1. 限制递归深度2. 控制数组长度3. 检查数值范围 三、实例讲解四、总结 Uncaught RangeError 是JavaScript中常见的一种错误&…

2024年06月CCF-GESP编程能力等级认证C++编程三级真题解析

本文收录于专栏《C等级认证CCF-GESP真题解析》&#xff0c;专栏总目录&#xff1a;点这里。订阅后可阅读专栏内所有文章。 一、单选题&#xff08;每题 2 分&#xff0c;共 30 分&#xff09; 第 1 题 小杨父母带他到某培训机构给他报名参加CCF组织的GESP认证考试的第1级&…

IO模型理论学习

1、什么是IO 计算机视角下的io AIO

Redis命令详解以及存储原理

Redis是什么 远程字典服务 分布式场景重的一个单独的节点。请求回应的模式&#xff1a;发起请求&#xff0c;处理之后得到回应的结果。字典的形式存储&索引数据。 内存数据库 数据在内存中&#xff0c;不可以出现需要的内存不在内存中而在磁盘中速度快&#xff0c;内存100…