SparkSQL-初识

news2025/1/16 18:39:35

一、概览

Spark SQL and DataFrames - Spark 3.5.2 Documentation

我们先看下官网的描述:

SparkSQL是用于结构化数据处理的Spark模块,与基本的Spark RDD API不同。Spark SQL提供的接口为Spark提供了更多关于正在执行的数据和计算结构的信息。在内部,Spark SQL使用这些额外信息来执行额外的优化。

Spark SQL的一个用途是执行SQL查询。Spark SQL还可以用于从现有的Hive库表中读取数据。在另一种编程语言中运行SQL时,结果将作为Dataset/DataFrame返回。也可以使用命令行或通过JDBC/ODBC与执行SQL。

二、什么是Dataset

Dataset是Spark 1.6中添加的一个新接口,是数据的分布式集合。它兼容RDD和SparkSQL的优点:

        1、RDD:强类型、使用强大lambda函数的能力,可以使用map()、flatMap()、filter()等转换算子

        2、Spark SQL:优化执行引擎,可以使用select()、where()、groupBy()等DSL语法

Dataset是惰性的,只有在调用action算子时才会触发计算。在内部,Dataset表示一个逻辑计划,描述了生成数据所需的计算。当调用一个操作时,Spark的查询优化器会优化逻辑计划,并生成一个物理计划,以并行和分布式的方式高效执行。

从源码中我们可以看到,需要给定一个特定于域的类型“T”映射到Spark的内部类型系统。

class Dataset[T] private[sql](
    @DeveloperApi @Unstable @transient val queryExecution: QueryExecution,
    @DeveloperApi @Unstable @transient val encoder: Encoder[T])
  extends Serializable {

  //...........

}

如果时基本类型可以通过导入spark.implicits来支持,如果是对象类型,需要自己定义,比如

case class Person(name: String, age: Long)

encoder会用于告诉Spark在运行时生成代码,将“Person”对象序列化为二进制结构。这种二进制结构通常具有更低的内存占用,并且针对数据处理的效率进行了优化(例如,以列格式)。

三、什么是DataFrame

通常调用spark.sql("select * from xxxxx") 或者 spark.read.json("xxx/xxx.json")时会返回DataFrame

下面我们看下DataFrame的类型是什么

package org.apache.spark
package object sql {

  type DataFrame = Dataset[Row]

}

从源码中我们可以看到 DataFrame只是Dataset[Row]的一个类型别名

DataFrame是一个组织成命名列的数据集。它在概念上相当于关系数据库中的表或R/Python中的DataFrame,但底层有更丰富的优化。DataFrames可以从各种来源构建,例如:结构化数据文件、Hive中的表、外部数据库或现有的RDD。DataFrame API在Scala、Java、Python和R中可用。

四、SparkSession

它是使用Dataset和DataFrame API对Spark编程的入口点

创建一个新SparkSession

SparkSession.builder
   .master("local")
   .appName("Word Count")
   .config("spark.some.config.option", "some-value")
   .getOrCreate()

我们来看下它的部分源码:

class SparkSession private(
    @transient val sparkContext: SparkContext,
    @transient private val existingSharedState: Option[SharedState],
    @transient private val parentSessionState: Option[SessionState],
    @transient private[sql] val extensions: SparkSessionExtensions,
    @transient private[sql] val initialSessionOptions: Map[String, String])
  extends Serializable with Closeable with Logging { self =>


  //此会话的封装版本为[[SQLContext]]形式,以实现向后兼容性
  val sqlContext: SQLContext = new SQLContext(this)

  //向 QueryExecutionListener 注册来监听查询度量
  def listenerManager: ExecutionListenerManager = sessionState.listenerManager

  //用于注册用户定义函数(UDF)
  //以下示例将Scala闭包注册为UDF:
  //sparkSession.udf.register("myUDF", (arg1: Int, arg2: String) => arg2 + arg1)
  def udf: UDFRegistration = sessionState.udfRegistration


  def streams: StreamingQueryManager = sessionState.streamingQueryManager

  //以各种方式创建DataFrame
  def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = withActive {
    //........
  }
  def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame = withActive {
    //........
  }
  def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = withActive {
    //........
  }
  def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame = withActive {
    //........
  }
  def createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame = withActive {
    //........
  }

  //以各种方式创建Dataset
  def createDataset[T : Encoder](data: Seq[T]): Dataset[T] = {
    //........
  }

  //用户可以通过该界面创建、删除、更改或查询底层数据库、表、函数等
  @transient lazy val catalog: Catalog = new CatalogImpl(self)

  def table(tableName: String): DataFrame = {
    read.table(tableName)
  }

  //使用Spark执行SQL查询,将结果作为“DataFrame”返回。这个API急切地运行DDL/DML命令,但不用于SELECT查询。
  def sql(sqlText: String): DataFrame = withActive {
    //........
  }

  //在外部执行引擎而不是Spark中执行任意字符串命令。
  //当用户想在Spark外执行某些命令时,这可能很有用。
  //例如,为JDBC执行自定义DDL/DML命令,为ElasticSearch创建索引,为Solr创建内核等等。
  //调用此方法后,命令将被热切执行,返回的DataFrame将包含命令的输出(如果有的话)。
  def executeCommand(runner: String, command: String, options: Map[String, String]): DataFrame = {
     //........
  }

  //返回一个DataFrameReader,可用于将非流数据作为“DataFrame”读取。
  //示例:
  //    sparkSession.read.parquet("/path/to/file.parquet")
  //    sparkSession.read.schema(schema).json("/path/to/file.json")
  def read: DataFrameReader = new DataFrameReader(self)

  //禁用样式检查器,以便“隐含”对象可以以小写i开头
  //(特定于Scala)Scala中提供的隐式方法,用于将常见的Scala对象转换为`DataFrame`。
  object implicits extends SQLImplicits with Serializable {
    protected override def _sqlContext: SQLContext = SparkSession.this.sqlContext
  }


}

object SparkSession extends Logging {

  

  class Builder extends Logging {
    //为应用程序设置一个名称,该名称将显示在Spark web UI中。
    //如果没有设置应用程序名称,将使用随机生成的名称。
    def appName(name: String): Builder = config("spark.app.name", name)

    //设置配置选项。使用此方法设置的选项会自动传播到“SparkConf”和SparkSession自己的配置中。
    def config(key: String, value: String): Builder = synchronized {
      options += key -> value
      this
    }
    def config(conf: SparkConf): Builder = synchronized {
      conf.getAll.foreach { case (k, v) => options += k -> v }
      this
    }

    //设置要连接的Spark主URL,
    //例如“local”在本地运行,“local[4]”在4核本地运行,
    //或“spark://master:7077“在Spark独立集群上运行。
    def master(master: String): Builder = config("spark.master", master)

    //启用Hive支持,包括连接到持久Hive元存储、支持Hive服务器和Hive用户定义函数。
    def enableHiveSupport(): Builder = synchronized {
      if (hiveClassesArePresent) {
        config(CATALOG_IMPLEMENTATION.key, "hive")
      } else {
        抛异常 : 无法使用Hive支持实例化SparkSession,因为找不到Hive相关类
      }
    }

    //将扩展注入[[SparkSession]]。这允许用户添加分析器规则、优化器规则、
    //计划策略或自定义解析器。
    def withExtensions(f: SparkSessionExtensions => Unit): Builder = synchronized {
      f(extensions)
      this
    }

    //获取一个现有的 SparkSession,如果没有现有的,则创建一个新的。
    def getOrCreate(): SparkSession = synchronized {
      //......省略.......
    }


  }

  //创建一个SparkSession.Builder来构造一个SparkSession
  def builder(): Builder = new Builder

}

五、使用示例

我们以Spark源码中的examples为例来看下SparkSQL是如何使用的

examples/src/main/resources/people.json

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

1、创建DataFrame

val df = spark.read.json("examples/src/main/resources/people.json")

//将DataFrame的内容显示到stdout
df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

//使用$符号需要此导入
import spark.implicits._
//以树格式打印schema
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
//只选择name列进行打印
df.select("name").show()
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// 选择所有的列,但是对age列分别进行加1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// 选择年龄大于21的人
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// 按年龄统计人数
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+


//将DataFrame注册为SQL临时视图
df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

2、创建Dataset

import spark.implicits._
// 创建编码器
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// 大多数常见类型的编码器是通过导入spark.implicits自动提供的_
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// 通过提供类,DataFrames可以转换为Dataset。映射将按名称完成
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

3、用户自定义函数

// 定义并注册零参数非确定性UDF
// 默认情况下,UDF是确定性的,即对相同的输入产生相同的结果。
val random = udf(() => Math.random())
spark.udf.register("random", random.asNondeterministic())
spark.sql("SELECT random()").show()
// +-------+
// |UDF()  |
// +-------+
// |xxxxxxx|
// +-------+

// 定义并注册一个单参数UDF
val plusOne = udf((x: Int) => x + 1)
spark.udf.register("plusOne", plusOne)
spark.sql("SELECT plusOne(5)").show()
// +------+
// |UDF(5)|
// +------+
// |     6|
// +------+

// 定义一个双参数UDF,并在一个步骤中向Spark注册
spark.udf.register("strLenScala", (_: String).length + (_: Int))
spark.sql("SELECT strLenScala('test', 1)").show()
// +--------------------+
// |strLenScala(test, 1)|
// +--------------------+
// |                   5|
// +--------------------+

//WHERE子句中的UDF
spark.udf.register("oneArgFilter", (n: Int) => { n > 5 })
spark.range(1, 10).createOrReplaceTempView("test")
spark.sql("SELECT * FROM test WHERE oneArgFilter(id)").show()
// +---+
// | id|
// +---+
// |  6|
// |  7|
// |  8|
// |  9|
// +---+

4、基于hive使用

使用Hive时,必须使用Hive支持实例化“SparkSession”,即enableHiveSupport()。包括连接到持久Hive元存储、支持Hive服务器和Hive用户定义函数。没有现有Hive部署的用户仍然可以启用Hive支持。当未由hive-site.xml配置时,上下文会自动在当前目录中创建“metastore_db”,并创建一个由“spark.sql.house.dir”配置的目录,该目录默认为启动spark应用程序的当前目录中的“spark warehouse”目录。

import spark.implicits._
import spark.sql

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// 普通查询
sql("SELECT * FROM src").show()
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// 聚合查询
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// |    500 |
// +--------+

     

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2165899.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C++中vector类的使用

目录 1.vector类常用接口说明 1.1默认成员函数 1.1.1构造函数(constructor) 1.1.2 赋值运算符重载(operator()) 2. vector对象的访问及遍历操作(Iterators and Element access) 3.vector类对象的容量操作(Capacity) 4. vector类对象的修改及相关操作(Modifiers and Stri…

【Java数据结构】 ---对象的比较

乐观学习&#xff0c;乐观生活&#xff0c;才能不断前进啊&#xff01;&#xff01;&#xff01; 我的主页&#xff1a;optimistic_chen 我的专栏&#xff1a;c语言 &#xff0c;Java 欢迎大家访问~ 创作不易&#xff0c;大佬们点赞鼓励下吧~ 前言 上图中&#xff0c;线性表、堆…

[Redis][主从复制][上]详细讲解

目录 0.前言1.配置1.建立复制2.断开复制3.安全性4.只读5.传输延迟 2.拓扑1.一主一从结构2.一主多从结构2.树形主从结构 0.前言 说明&#xff1a;该章节相关操作不需要记忆&#xff0c;理解流程和原理即可&#xff0c;用的时候能自主查到即可主从复制&#xff1f; 分布式系统中…

PyTorch自定义学习率调度器实现指南

在深度学习训练过程中&#xff0c;学习率调度器扮演着至关重要的角色。这主要是因为在训练的不同阶段&#xff0c;模型的学习动态会发生显著变化。 在训练初期&#xff0c;损失函数通常呈现剧烈波动&#xff0c;梯度值较大且不稳定。此阶段的主要目标是在优化空间中快速接近某…

ResNet残差网络:深度学习的里程碑

引言 在深度学习领域&#xff0c;卷积神经网络&#xff08;CNN&#xff09;的发展一直推动着图像识别、目标检测等任务的进步。然而&#xff0c;随着网络层数的增加&#xff0c;传统的CNN面临着梯度消失和梯度爆炸等难题&#xff0c;限制了深层网络的训练效果。为了克服这些挑…

oracle direct path read处理过程

文章目录 缘起处理过程1.AWR Report 分析2.调查direct path read发生的table3.获取sql text4.解释sql并输出执行计划&#xff1a; 结论&#xff1a;补充direct path read等待事件说明 缘起 记录direct path read处理过程 处理过程 1.AWR Report 分析 问题发生时间段awr如下…

FortiGate OSPF动态路由协议配置

1.目的 本文档针对 FortiGate 的 OSPF 动态路由协议说明。OSPF 路由协议是一种 典型的链路状态(Link-state)的路由协议,一般用于同一个路由域内。在这里,路由 域是指一个自治系统,即 AS,它是指一组通过统一的路由政策或路由协议互相交 换路由信息的网络。在这个 AS 中,所有的 …

基于JSP+Servlet+Layui实现的博客系统

> 这是一个使用 Java 和 JSP 开发的博客系统&#xff0c;并使用 Layui 作为前端框架。 > 它包含多种功能&#xff0c;比如文章发布、评论管理、用户管理等。 > 它非常适合作为 Java 初学者的练习项目。 一、项目演示 - 博客首页 - 加载动画 - 右侧搜索框可以输入…

开源服务器管理软件Nexterm

什么是 Nexterm &#xff1f; Nexterm 是一款用于 SSH、VNC 和 RDP 的开源服务器管理软件。 安装 在群晖上以 Docker 方式安装。 在注册表中搜索 nexterm &#xff0c;选择第一个 germannewsmaker/nexterm&#xff0c;版本选择 latest。 本文写作时&#xff0c; latest 版本对…

【STM32】RTT-Studio中HAL库开发教程七:IIC通信--EEPROM存储器FM24C04

文章目录 一、简介二、模拟IIC时序三、读写流程四、完整代码五、测试验证 一、简介 FM24C04D&#xff0c;4K串行EEPROM&#xff1a;内部32页&#xff0c;每个16字节&#xff0c;4K需要一个11位的数据字地址进行随机字寻址。FM24C04D提供4096位串行电可擦除和可编程只读存储器&a…

Excel 设置自动换行

背景 版本&#xff1a;office 专业版 11.0 表格内输入长信息&#xff0c;发现默认状态时未自动换行的&#xff0c;找了很久设置按钮&#xff0c;遂总结成经验帖。 操作 1&#xff09;选中需设置的单元格/区域/行/列。 2&#xff09;点击【开始】下【对齐方式】中的【自动换…

HAproxy,nginx实现七层负载均衡

环境准备&#xff1a; 192.168.88.25 &#xff08;client&#xff09; 192.168.88.26 &#xff08;HAproxy&#xff09; 192.168.88.27 &#xff08;web1&#xff09; 192.168.88.28 (web2) 192.168.88.29 &#xff08;php1&#xff09; 192.168.88.30…

基于微信小程序的教学质量评价系统ssm(lw+演示+源码+运行)

摘要 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理信息系统的实施在技术上已逐步成熟。本文介绍了基于微信小程序的教学质量评价系统的开发全过程。通过分析基于微信小程序的教学质量评价系统管理的不足&#xff0c;创建了一个计算机管理基于微信小程序的教学…

【Anti-UAV410】论文阅读

摘要 无人机在红外视频中的感知&#xff0c;对于有效反无人机是很重要的。现有的跟踪数据集存在目标大小和环境问题&#xff0c;不能完全表示复杂的逼真场景。因此作者就提出了Anti-UAV410数据集&#xff0c;该数据集总共410个视频和超过438K个标注框。为了应对复杂环境无人机跟…

丹摩智算(damodel)部署stable diffusion实验

名词解释&#xff1a; 丹摩智算&#xff08;damodel&#xff09;&#xff1a;是一款带有RTX4090&#xff0c;Tesla-P40等显卡的公有云服务器。 stable diffusion&#xff1a;是一个大模型&#xff0c;可支持文生图&#xff0c;图生图&#xff0c;文生视频等功能 一.实验目标 …

Linux-TCP重传

问题描述&#xff1a; 应用系统进行切换&#xff0c;包含业务流量切换&#xff08;即TongWeb主备切换&#xff09;和MYSQL数据库主备切换。首先进行流量切换&#xff0c;然后进行数据库主备切换。切换后发现备机TongWeb上有两批次慢请求&#xff0c;第一批慢请求响应时间在133…

【HarmonyOS】应用引用media中的字符串资源如何拼接字符串

【HarmonyOS】应用引用media中的字符串资源如何拼接字符串 一、问题背景&#xff1a; 鸿蒙应用中使用字符串资源加载&#xff0c;一般文本放置在resoutces-base-element-string.json字符串配置文件中。便于国际化的处理。当然小项目一般直接引用字符串&#xff0c;不需要加载s…

计算机毕业设计 基于Python国潮男装微博评论数据分析系统的设计与实现 Django+Vue 前后端分离 附源码 讲解 文档

&#x1f34a;作者&#xff1a;计算机编程-吉哥 &#x1f34a;简介&#xff1a;专业从事JavaWeb程序开发&#xff0c;微信小程序开发&#xff0c;定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事&#xff0c;生活就是快乐的。 &#x1f34a;心愿&#xff1a;点…

LeetCode 149. 直线上最多的点数

LeetCode 149. 直线上最多的点数 给你一个数组 points &#xff0c;其中 points[i] [xi, yi] 表示 X-Y 平面上的一个点。求最多有多少个点在同一条直线上。 示例 1&#xff1a; 输入&#xff1a;points [[1,1],[2,2],[3,3]] 输出&#xff1a;3 示例 2&#xff1a; 输入&…

【数据结构之线性表】有序表的合并(链表篇)

链表有序表的合并 思路图 将链表L1和L2按照顺序合并到L3中&#xff08;注&#xff1a;三个链表都是带头结点的&#xff09; A、要实现有序合并&#xff0c;必须先比较L1,L2两表中结点的大小&#xff0c;这里我们暂时先不讨论&#xff0c;直接根据图中来进行思路整理&#xff…