Spark编程-SparkSQL

news2025/1/13 10:10:58

SparkSql能做些啥 

        Spark SQL的核心概念是DataFrame,它是一个分布式的数据集合,类似于关系数据库中的表。支持使用SQL语言直接对DataFrame进行查询,提供了丰富的内置函数和表达式,可以用于数据的转换、过滤和聚合等操作,支持多种数据源,包括Hive、Avro、Parquet、ORC、JSON和JDBC等。它可以读取和写入这些数据源,并且还支持将非结构化数据转换为结构化数据.

        Spark SQL在Hive兼容层面仅依赖HiveQL解析和Hive元数据,也就是说,从HQL被解析成抽象语法树(AST)起,就全部由Spark SQL接管了。Spark SQL执行计划生成和优化都由Catalyst(函数式关系查询优化框架)负责。

DataFrame与RDD的区别

        DataFrame的推出,让Spark具备GH了处理大规模结构化数据的能力,不仅比原有的RDD转化方式更加简单易用,而且获得了更高的计算性能。Spark能够轻松实现从MySQL到DataFrame的转化,并且支持SQL查询。

        如上图,DataFrame和RDD的区别,RDD是分布式的 Java对象的集合,比如,RDD[Person]是以Person为类型参数,但是,Person类的内部结构对于RDD而言却是不可知的。DataFrame是一种以RDD为基础的分布式数据集,也就是分布式的Row对象的集合(每个Row对象代表一行记录),提供了详细的结构信息,也就是我们经常说的模式(schema),Spark SQL可以清楚地知道该数据集中包含哪些列、每列的名称和类型。
        和RDD一样,DataFrame的各种变换操作也采用惰性机制,只是记录了各种转换的逻辑转换路线图(是一个DAG图),不会发生真正的计算,这个DAG图相当于一个逻辑查询计划,最终,会被翻译成物理查询计划,生成RDD DAG,按照之前介绍的RDD DAG的执行方式去完成最终的计算得到结果。

SparkSession是什么       

        SparkSession是在Spark 2.0中引入的,作为替代SparkContext的新入口点。SparkSession是一个用于与Spark进行交互的主要入口,它封装了SparkContext、SQLContext和HiveContext的功能,并提供了更简洁、更一致的API。

SparkSession功能

        创建DataFrame和DataSet:SparkSession提供了创建DataFrame和DataSet的方法,这些方法可以从各种数据源(如文件、数据库、Hive表等)中读取数据,并将其转换为分布式数据集合。

        执行SQL查询:SparkSession允许使用Spark SQL模块提供的SQL语法来查询数据。它提供了SQL方法来执行SQL查询,并将结果返回为DataFrame。

        集成Hive:SparkSession内置了对Hive的支持,可以直接执行HiveQL查询和操作Hive表。

        与其他数据源的交互:SparkSession提供了用于读取和写入数据的方法,可以与各种数据源进行交互,如Parquet、Avro、JSON、CSV等。

如何创建DataFrame

例子1-已知Rdd创建DataFrame

代码

package SparkSQL
  //从一个已知rdd创建DataFrame
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
object DataFrame_create {
  def main(args:Array[String]):Unit = {
    //创建sparksession
    val spark = SparkSession.builder.appName("dataframe_create").master("local")
      .getOrCreate()
    //创建RDD
    val rdd = spark.sparkContext.parallelize(Seq(("zhugeliang",48),("guanyu",40),("xiangyu",19)))
    //定义schama
    val schema = StructType((Seq(
      StructField("name",StringType,nullable = true),
      StructField("age",IntegerType,nullable = true))
      ))
    //将RDD转化为Row对象
    val rowRDD = rdd.map(row => Row(row._1,row._2))
    //创建dataFrame
    val df = spark.createDataFrame(rowRDD,schema)
    //展示
    df.show()

  }

}

运行结果

例子2-读取外部数据集创建DataFrame

 代码

package Sparksql
//      读取json文件创建
import org.apache.spark.sql.SparkSession

object DataFrame_readFile {
  def main(args:Array[String]):Unit ={
    val spark = SparkSession.builder.appName("readFile")
      .master("local").getOrCreate()
    //使支持RDDs转换为DataFrames及后续sql操作
    import spark.implicits._
    //读取json文件
    val df = spark.read.json("D:\\workspace\\spark\\src\\main\\Data\\package.json")
    //展示结果
    df.show()
  }

}

 运行结果

例子3-编码创建DataFrame

代码

  package Sparksql

  import org.apache.spark.sql.SparkSession
  import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
  import org.apache.spark.sql.Row
  object DF_create_list {
    def main(args:Array[String]):Unit ={
       这里设置为local表示在本地运行
      val spark = SparkSession.builder.appName("DF_list").master("local").getOrCreate()
      //创建数据列表
      val data = Seq(("libai",43,"changan"),("jushi",48,"newYork"),("xinge",28,"jinan"))
      //定义schema
      val schema = StructType(Seq(
        StructField("name",StringType,nullable = true),
        StructField("age",IntegerType,nullable = true),
        StructField("city",StringType,nullable = true)
      ))
      //将数据转换为row对象
      val row = spark.sparkContext.parallelize(data).map{
        case
          (name,age,city) => Row(name,age,city)}
      val df = spark.createDataFrame(row,schema)
      df.show()

    }

  }

运行结果

 注:DataFrame的模式(Schema)

        代码定义了一个Spark SQL中DataFrame的模式(Schema),用于描述DataFrame中各列的名称和数据类型。
        StructType(Seq(...))表示创建一个结构类型(StructType)对象,用于存储DataFrame的模式信息。

        Seq(...)是一个包含多个元素的序列,每个元素都代表DataFrame的一个列。我们定义了三个列,分别是name、age和city。每个列都由StructField对象来表示,StructField的构造函数接受三个参数:列名、数据类型和是否可为空。
        StructField("name", StringType, nullable = true):表示一个名为name的列,数据类型为StringType,可以为空。

        StructField("age", IntegerType, nullable = true):表示一个名为age的列,数据类型为IntegerType,可以为空。

        StructField("city", StringType, nullable = true):表示一个名为city的列,数据类型为StringType,可以为空。
        即定义了一个包含三个列的模式,可以用于创建DataFrame。使用模式创建DataFrame时,可确保DataFrame的列具有正确的名称和数据类型。

常用DataFrame操作

      我使用了例子3的DataFrame,在原基础上进行操作。

       这里总结了日常使用中使用的DataFrame操作,主要有选择出多列进行打印,条件过滤,分组聚合,单列排序,多列排序,对列名进行重命名。

      //打印df      
      df.show
+-----+---+-------+
| name|age|   city|
+-----+---+-------+
|libai| 43|changan|
|jushi| 48|newYork|
|xinge| 28|  jinan|
+-----+---+-------+
      //打印模式信息
      df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- city: string (nullable = true)


      //选择多列
      df.select(df("name"),df("age")+1).show()

+-----+---------+
| name|(age + 1)|
+-----+---------+
|libai|       44|
|jushi|       49|
|xinge|       29|
+-----+---------+

      //条件过滤
      df.filter(df("age") > 30).show()
+-----+---+-------+
| name|age|   city|
+-----+---+-------+
|libai| 43|changan|
|jushi| 48|newYork|
+-----+---+-------+

      //分组聚合
      df.groupBy("age").count().show()

+---+-----+
|age|count|
+---+-----+
| 28|    1|
| 48|    1|
| 43|    1|
+---+-----+

      //排序
      df.sort(df("age").desc).show()

+-----+---+-------+
| name|age|   city|
+-----+---+-------+
|jushi| 48|newYork|
|libai| 43|changan|
|xinge| 28|  jinan|
+-----+---+-------+

      //多列排序
      df.sort(df("age").asc,df("city").asc).show()

+-----+---+-------+
| name|age|   city|
+-----+---+-------+
|xinge| 28|  jinan|
|libai| 43|changan|
|jushi| 48|newYork|
+-----+---+-------+

      //对某列进行重名名
      df.select(df("city").as("area"),df("age")).show()

+-------+---+
|   area|age|
+-------+---+
|changan| 43|
|newYork| 48|
|  jinan| 28|
+-------+---+

DataSet

概念

Spark SQL and DataFrames - Spark 2.2.3 Documentation,看官网

DataSet、DataFrame、RDD之间的关系

参考文章:

Spark2.1.0入门:DataFrame的创建_厦大数据库实验室博客,林子雨教授

Spark RDD(Resilient Distributed Datasets)论文 - 【布客】Spark 中文翻译

Spark SQL and DataFrames - Spark 2.2.3 Documentation   Spark官网

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/797883.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【SpringCloud Alibaba】(四)使用 Feign 实现服务调用的负载均衡

在上一文中,我们实现了服务的自动注册与发现功能。但是还存在一个很明显的问题:如果用户微服务和商品微服务在服务器上部署多份的话,之前的程序无法实现服务调用的负载均衡功能。 本文就带着大家一起实现服务调用的负载均衡功能 1. 负载均衡…

Vue2基础五、工程化开发

零、文章目录 Vue2基础五、工程化开发 1、工程化开发和脚手架 (1)开发 Vue 的两种方式 核心包传统开发模式:基于 html / css / js 文件,直接引入核心包,开发 Vue。工程化开发模式:基于构建工具&#xf…

让你 React 组件水平暴增的 5 个技巧

目录 透传 className、style 通过 forwardRef 暴露一些方法 useCallback、useMemo 用 Context 来跨组件传递值 React.Children、React.cloneElement 总结 最近看了一些 Ant Design 的组件源码,学到一些很实用的技巧,这篇文章来分享一下。 首先&am…

LeetCode111. 二叉树的最小深度

111. 二叉树的最小深度 文章目录 [111. 二叉树的最小深度](https://leetcode.cn/problems/minimum-depth-of-binary-tree/)一、题目二、题解方法一:迭代方法二:递归 一、题目 给定一个二叉树,找出其最小深度。 最小深度是从根节点到最近叶子…

理光310/320/325系列激光打印机加粉后不换芯片清零方法

设置步骤: 依次按停止107开始键进入维修模式, 按下键两次选择Engine Maintenance,点OK键进入, 按上键选择Refill mode项后点OK键, 按下键选择到Pure refill mode后点Ok键(默认是Auto refill mode), 然后按两次后退…

GB/T 25000.51解读——软件产品的功能性怎么测?

前面的文章中,我们为大家整体介绍了GB/T 25000.51-2016《软件产品质量要求和测试细则》国家标准的结构和所涵盖的内容,从本文开始,我们将针对标准中规定的软件产品的八大质量特性进行详细解读。本文为大家解读软件产品的功能性测试。 软件产…

微服务契约测试框架-Pact

契约测试 契约测试的思想就是将原本的 Consumer 与 Provider 间同步的集成测试,通过契约进行解耦,变成 Consumer 与 Provider 端两个各自独立的、异步的单元测试。 契约测试的优点: 契约测试与单元测试以及其它测试之间没有重复&#xff0c…

java商城系统和php商城系统有什么差异?如何选择?

java商城系统和php商城系统是两种常见的电子商务平台,它们都具有一定的优势和劣势。那么,java商城系统和php商城系统又有哪些差异呢? 一、开发难度 Java商城系统和PHP商城系统在开发难度方面存在一定的差异。Java商城系统需要使用Java语言进…

小红书课程发光社群知识库,点亮哥专为超级个体设计解决方案

小红书课程点亮哥知识库 开创了学习小红书教育培训先河 针对超级个体轻创业的学习需求场景 创新推出了“知识库全新学习方式”。 一个人如何做好小红书? 超级个体轻创业,如何做好小红书? 通过打造个人IP、或者塑造老板个人品牌,来实现互联网变现,如何做好小红书? 就像挑…

系统架构设计师-软件架构设计(5)

目录 一、构件与中间件技术 1、软件复用 2、构件与中间件技术的概念 3、构件的复用 3.1 检索与提取构件 3.2 理解与评价构件 3.3 修改构件 3.4 组装构件 4、中间件 4.1 采用中间件技术的优点: 4.2 中间件的分类: 5、构件标准 5.1 CORBA(公共…

Android 电子称定标流程

1、首先确保电子称正确安装,底部悬空,托盘悬空。 2、去皮,把去皮数据保存到本地 3、定标、例如拿100克的砝码放入托盘, 获取值-去皮值及得到定标值 4、通过定标值计算出需要设置的满量程,或者计算对应的重量&#x…

vue-element-admin中实现自适应功能

npm install postcss-px-to-viewport --save-dev 项目根目录下建一个名字为 .postcssrc.js 的js文件(前边的.别忘了),在该文件里写以下代码 //添加如下配置: module.exports {plugins: {autoprefixer: {}, // 用来给不同的浏览器自动添加相应前缀&a…

小学期笔记——天天酷跑4

效果: 点击登录: ------------------------ 效果: 静态的一张图 ------------------------ 完善一下会变成那张静态的图从左往右移动,但是这一张图到后面会拉丝 -------------------- 再完善一下: (再…

洗地机有没有必要买?好用的洗地机推荐

随着科技的发展,越来越多的家用电器出现。就比如在清洁家电方面,相继出现了吸尘器、扫地机、洗地机!其中洗地机更是近年来爆火的一个智能清洁家电!而如果你们和小编一样是个上班族,然后每天下班回家面对脏乱的地板&…

汽车UDS诊断深度学习专栏

1.英文术语 英文术语翻译Diagnostic诊断Onboard Diagnostic 在线诊断 Offboard Diagnostic离线诊断Unified diagnostic service简称 UDS 2.缩写表 缩写解释ISO国际标准化组织UDSUnified diagnostic service,统一的诊断服务ECU电控单元DTC 诊断故障码 ISO14229UD…

Modbus TCP/IP之异常响应

文章目录 一、异常响应二、异常码分析2.1 异常码0x012.2 异常码0x022.3 异常码0x032.4 异常码0x04、0x05等 一、异常响应 对于查询报文,存在以下四种处理反馈: 正常接收,正常处理,返回正常响应报文;因为通信错误等原因…

我对牟长青分享的各个私董会数据分析

我是卢松松,点点上面的头像,欢迎关注我哦! 其实之前,我也想写一个关于各个草根社群的数据分析,但这样的文章容易得罪人,因为我一直喜欢直言不讳,所以一直没有动笔。例如,我在6月份写…

OpenGl中的VAO、VBO与EBO

文章目录 VBO(顶点缓冲区对象)VBO的使用 EBO(索引缓冲对象)EBO的使用 VAO(顶点数组对象)VAO的使用 三者的区别someting。。。 哎,很离谱,上个月学learnopengl学到一半跑去看庄懂老师的视频,结果该还的东西迟早得还,再打开之前的工…

NineData支持最受欢迎数据库PostgreSQL

根据在 Stack Overflow 发布的 2023 开发者调研报告中显示,PostgreSQL 以 45% vs 41% 的受欢迎比率战胜 MySQL,成为新的最受欢迎的数据库。NineData 也在近期支持了 PostgreSQL,用户可以在 NineData 平台上进行创建数据库/Schema、管理用户与…

BTTES,2101505-88-6,是各种化学生物实验中生物偶联的理想选择

资料编辑|陕西新研博美生物科技有限公司小编MISSwu​ 规格单位:g |货期:按照具体的库存进行提供 | 纯度:95% PART1----​试剂描述: BTTES是铜(I)催化的叠氮化物-炔烃环加成(CuAAC&#x…