SparkSQL的编程模型(DataFrame和DataSet)

news2024/12/25 9:02:30

1.2 SparkSQL的编程模型(DataFrame和DataSet)

1.2.1  编程模型简介

主要通过两种方式操作SparkSQL,一种就是SQL,另一种为DataFrame和Dataset。

  • SQL SQL不用多说,就和Hive操作一样,但是需要清楚一点的时候,SQL操作的是表,所以要想用SQL进行操作,就需要将SparkSQL对应的编程模型转化成为一张表才可以。 同时支持,通用SQL和HQL。

  • DataFrame和Dataset DataFrameDataset是SparkSQL中的编程模型。DataFrame和Dataset我们都可以理解为是一张mysql中的二维表,表有什么?表头,表名,字段,字段类型。RDD其实说白了也是一张二维表,但是这张二维表相比较于DataFrame和Dataset却少了很多东西,比如表头,表名,字段,字段类型,只有数据。 Dataset是在spark1.6.2开始出现出现的api,DataFrame是1.3的时候出现的,早期的时候DataFrame叫SchemaRDD,SchemaRDD和SparkCore中的RDD相比较,就多了Schema,所谓约束信息,元数据信息。 一般的,将RDD称之为Spark体系中的第一代编程模型;DataFrame比RDD多了一个Schema元数据信息,被称之为Spark体系中的第二代编程模型;Dataset吸收了RDD的优点(强类型推断和强大的函数式编程)和DataFrame中的优化(SQL优化引擎,内存列存储),成为Spark的最新一代的编程模型。

1.2.2 RDD\DataSet\DataFrame

RDD

弹性分布式数据集,是Spark对数据进行的一种抽象,可以理解为Spark对数据的一种组织方式,更简单些说,RDD就是一种数据结构,里面包含了数据和操作数据的方法

从字面上就能看出的几个特点:

  • 弹性:

    数据可完全放内存或完全放磁盘,也可部分存放在内存,部分存放在磁盘,并可以自动切换

    RDD出错后可自动重新计算(通过血缘自动容错)

    可checkpoint(设置检查点,用于容错),可persist或cache(缓存)

    里面的数据是分片的(也叫分区,partition),分片的大小可自由设置和细粒度调整

  • 分布式:

    RDD中的数据可存放在多个节点上

  • 数据集:

    数据的集合,没啥好说的

相对于与DataFrame和Dataset,RDD是Spark最底层的抽象,目前是开发者用的最多的,但逐步会转向DataFrame和Dataset(当然,这是Spark的发展趋势)

DataFrame

DataFrame:理解了RDD,DataFrame就容易理解些,DataFrame的思想来源于Python的pandas库,RDD是一个数据集,DataFrame在RDD的基础上加了Schema(描述数据的信息,可以认为是元数据,DataFrame曾经就有个名字叫SchemaRDD)

假设RDD中的两行数据长这样;

1张三20
2李四21
3王五22

那么在DataFrame中数据就变成这样;

ID:IntName:StringAge:Int
1张三20
2李四21
3王五22

从上面两个表格可以看出,DataFrame比RDD多了一个表头信息(Schema),像一张表了,DataFrame还配套了新的操作数据的方法等,有了DataFrame这个高一层的抽象后,我们处理数据更加简单了,甚至可以用SQL来处理数据了,对开发者来说,易用性有了很大的提升。,不仅如此,通过DataFrame API或SQL处理数据,会自动经过Spark 优化器(Catalyst)的优化,即使你写的程序或SQL不高效,也可以运行的很快。

Dataset

相对于RDD,Dataset提供了强类型支持,也是在RDD的每行数据加了类型约束

假设RDD中的两行数据长这样;

1张三20
2李四21
3王五22

那么在DataFrame中数据就变成这样;

ID:IntName:StringAge:Int
1张三20
2李四21
3王五22

那么在DataSet中数据就变成这样;

Person(id:Int,Name:String,Age:Int)
Person(1,张三,20)
Person(2,李四,21)
Person(3,王五,22)
目前仅支持Scala、Java API,尚未提供Python的API(所以一定要学习Scala),相比DataFrame,Dataset提供了编译时类型检查,对于分布式程序来讲,提交一次作业太费劲了(要编译、打包、上传、运行),到提交到集群运行时才发现错误,实在是想骂人,这也是引入Dataset的一个重要原因。
​
使用DataFrame的代码中json文件中并没有score字段,但是能编译通过,但是运行时会报异常!如下图代码所示.
val df1 = spark.read.json( "/tmp/people.json")
// json文件中没有score字段,但是能编译通过
val df2 = df1.filter("score > 60")df2.show()

而使用Dataset实现,会在IDE中就报错,出错提前到了编译之前

val ds1 = spark.read.json( "/tmp/people.json" ).as[ People]
// 使用dataset这样写,在IDE中就能发现错误
val ds2 = ds1.filter(_.score < 60)
val ds3 = ds1.filter(_.age < 18)
// 打印
ds3.show( )

总体来说DS这种方式更加合理,并且更加人性化,比较适合程序员的开发及使用,而且Spark在2.X版本以后也在推行开发者开发中使用DS进行开发。

1.2.3 SparkSQL的编程入口
在SparkSQL中的编程模型,不在是SparkContext,但是创建需要依赖SparkContext。SparkSQL中的编程模型,在spark2.0以前的版本中为SQLContext和HiveContext,HiveContext是SQLContext的一个子类,提供Hive中特有的一些功能,比如row_number开窗函数等等,这是SQLContext所不具备的,在Spark2.0之后将这两个进行了合并——SparkSession。SparkSession的构建需要依赖SparkConf或者SparkContext。使用工厂构建器(Builder方式)模式创建SparkSession。
1.2.4 SparkSQL基本编程

创建SparkSQL的模块

创建工程省略,直接在原有工程引入Pom即可

<dependency>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-sql_2.12</artifactId>
     <version>${spark.version}</version>
</dependency>
1.2.5 SparkSQL编程初体验
  • SparkSession的构建

val spark = SparkSession.builder()
    .appName("SparkSQLOps")
    .master("local[*]")
//.enableHiveSupport()//支持hive的相关操作
    .getOrCreate()
  • 基本编程

object _01SparkSQLOps {
    def main(args: Array[String]): Unit = {
​
        val spark = SparkSession.builder()
                .appName("SparkSQLOps")
                .master("local[*]")
//                .enableHiveSupport()//支持hive的相关操作
                .getOrCreate()
        //加载数据
        val pdf:DataFrame = spark.read.json("file:///E:/data/spark/sql/people.json")
        //二维表结构
        pdf.printSchema()
        //数据内容 select * from tbl
        pdf.show()
        //具体的查询 select name, age from tbl
        pdf.select("name", "age").show()
        import spark.implicits._//导入sparksession中的隐式转换操作,增强sql的功能
        pdf.select($"name",$"age").show()
        //列的运算,给每个人的年龄+10 select name, age+10,height-1 from tbl
        pdf.select($"name",$"height" - 1, new Column("age").+(10)).show()
        //起别名  select name, age+10 as age,height-1  as height from tbl
        pdf.select($"name",($"height" - 1).as("height"), new Column("age").+(10).as("age")).show()
        //做聚合统计 统计不同年龄的人数 select age, count(1) counts from tbl group by age
        pdf.select($"age").groupBy($"age").count().show()
        //条件查询 获取年龄超过18的用户  select * from tbl where age > 18
        // pdf.select("name", "age", "height").where($"age".>(18)).show()
        pdf.select("name", "age", "height").where("age > 18").show()
        //sql
        //pdf.registerTempTable()
        //在spark2.0之后处于维护状态,使用createOrReplaceTempView
        /*
            从使用范围上说,分为global和非global
                global是当前SparkApplication中可用,非global只在当前SparkSession中可用
            从创建的角度上说,分为createOrReplace和不Replace
                createOrReplace会覆盖之前的数据
                create不Replace,如果视图存在,会报错
         */
        pdf.createOrReplaceTempView("people")
        // 使用SQL语法进行处理
        spark.sql(
            """
              |select
              | age,
              | count(1) as countz
              |from people
              |group by age
            """.stripMargin).show
        // 打印输出
        spark.stop()
    }
}
1.2.6 SparkSQL编程模型的操作

DataFrame的构建方式

构建方式有两,一种通过Javabean+反射的方式来进行构建;还有一种的话通过动态编码的方式来构建。

  • JavaBean+反射

import org.apache.spark.sql.{DataFrame, SparkSession}
​
/**
 * 创建DataFrame方式
 * 反射方式
 */
object _02SparkSQLCreateDFOps {
  def main(args: Array[String]): Unit = {
    // 创建执行入口
    val spark = SparkSession.builder()
      .appName("createDF")
      .master("local")
      .getOrCreate()
    // 创建集合数据,并将数据封装到样例类中
    val list = List(
      student(1,"王凯",0,23),
      student(2,"赵凯",0,32),
      student(3,"姜华劲",1,24)
    )
    // 导入隐式转换
    import spark.implicits._
    // 创建DF,创建DF的同时可以进行字段名重命名
    val df: DataFrame = list.toDF("ids","names","genders","ages")
    // 打印输出
    df.printSchema()
    df.show()
    // 关闭
    spark.stop()
  }
}
// 构建反射方式的样例类
case class student(id:Int,name:String,gender:Int,age:Int)
  • 动态编程

object _02SparkSQLDataFrameOps {
    def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder()
                    .master("local[*]")
                    .appName("SparkSQLDataFrame")
                    .getOrCreate()
/*
            使用动态编程的方式构建DataFrame
            Row-->行,就代表了二维表中的一行记录,jdbc中的resultset,就是java中的一个对象
         */
val row:RDD[Row] = spark.sparkContext.parallelize(List(
    Row(1, "李伟", 1, 180.0),
    Row(2, "汪松伟", 2, 179.0),
    Row(3, "常洪浩", 1, 183.0),
    Row(4, "麻宁娜", 0, 168.0)
))
//表对应的元数据信息
val schema = StructType(List(
    StructField("id", DataTypes.IntegerType, false),
    StructField("name", DataTypes.StringType, false),
    StructField("gender", DataTypes.IntegerType, false),
    StructField("height", DataTypes.DoubleType, false)
))
​
val df = spark.createDataFrame(row, schema)
df.printSchema()
df.show()
    }
}

说明:这里学习三个新的类:

Row:代表的是二维表中的一行记录,或者就是一个Java对象

StructType:是该二维表的元数据信息,是StructField的集合

StructField:是该二维表中某一个字段/列的元数据信息(主要包括,列名,类型,是否可以为null)

  • 总结: 这两种方式,都是非常常用,但是动态编程更加的灵活,因为javabean的方式的话,提前要确定好数据格式类型,后期无法做改动。

Dataset的构建方式

Dataset是DataFrame的升级版,创建方式和DataFrame类似,但有不同。

//dataset的构建
object _03SparkSQLDatasetOps {
    def main(args: Array[String]): Unit = {
​
        val spark = SparkSession.builder()
                    .appName("SparkSQLDataset")
                    .master("local[*]")
                    .getOrCreate()
​
        //dataset的构建
        val list = List(
            new Student(1, "王盛芃", 1, 19),
            new Student(2, "李金宝", 1, 49),
            new Student(3, "张海波", 1, 39),
            new Student(4, "张文悦", 0, 29)
        )
        import spark.implicits._
        val ds = spark.createDataset[Student](list)
        ds.printSchema()
        ds.show()
        spark.stop()
    }
}
case class Student(id:Int, name:String, gender:Int, age:Int)

注意:出现如下错误

在创建Dataset的时候,需要注意数据的格式,必须使用case class,或者基本数据类型,同时需要通过import spark.implicts._来完成数据类型的编码,而抽取出对应的元数据信息,否则编译无法通过

RDD和DataFrame以及DataSet的互相转换

RDD--->DataFrame

    def beanRDD2DataFrame(spark:SparkSession): Unit = {
        val stuRDD:RDD[Student] = spark.sparkContext.parallelize(List(
            new Student(1, "王盛芃", 1, 19),
            new Student(2, "李金宝", 1, 49),
            new Student(3, "张海波", 1, 39),
            new Student(4, "张文悦", 0, 29)
        ))
        val sdf =spark.createDataFrame(stuRDD, classOf[Student])
        sdf.printSchema()
        sdf.show()
    }

RDD--->Dataset

def  rdd2Dataset(spark:SparkSession): Unit = {
    val stuRDD = spark.sparkContext.parallelize(List(
        Student(1, "王盛芃", 1, 19),
        Student(2, "李金宝", 1, 49),
        Student(3, "张海波", 1, 39),
        Student(4, "张文悦", 0, 29)
    ))
    import spark.implicits._
    val ds:Dataset[Student] = spark.createDataset(stuRDD)
​
    ds.show()
}   
case class Student(id:Int, name:String, gender:Int, age:Int)

在RDD转换为DataFrame和Dataset的时候可以有更加简单的方式

import spark.implicits._
rdd.toDF()
rdd.toDS()

DataFrame--->RDD

val rdd:RDD[Row] = df.rdd
rdd.foreach(row => {
    //            println(row)
    val id = row.getInt(0)
    val name = row.getString(1)
    val gender = row.getInt(2)
    val height = row.getAs[Double]("height")
    println(s"id=${id},name=$name,gender=$gender,height=$height")
})

DataFrame--->Dataset

无法直接将DataFrame转化为Dataset

Dataset --->RDD

val stuDS: Dataset[Student] = list2Dataset(spark)
        //dataset --> rdd
val stuRDD:RDD[Student] = stuDS.rdd
stuRDD.foreach(println)

Dataset--->DataFrame

val stuDS: Dataset[Student] = list2Dataset(spark)      
//dataset --->dataframe
val df:DataFrame = stuDS.toDF()
df.show()

Guff_hys_python数据结构,大数据开发学习,python实训项目-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1326094.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

助老理发,寒冬送暖从头开始

为进一步弘扬尊老、敬老、爱老、助老的中华民族传统美德&#xff0c;解决老年人年龄大、冬季出行不便的问题&#xff0c;2023年12月20日&#xff0c;绿萝志愿服务队在翠堤社区开展了“助老理发”志愿活动。 大雪过后天气格外寒冷&#xff0c;但志愿者们依旧早早的来现场做…

Ethernet/IP 之IO 连接简要记录

IO连接 EIP的IO连接提供了在一个生产者和多个消费者之间的特定的通信路径&#xff0c;以达到IO数据在IO连接下传输。 生产者对象产生IO数据通过生产者IO连接管理者对象将连接ID和数据组帧发送给消费者IO连接管理者对象然后将IO数据发送给消费者对象。 显示消息连接 显式消息传…

Seata中AT模式的实现原理03-二阶段提交

全局事务提交 TM提交全局事务 当业务正常处理完毕后 本地事务全部提交完成&#xff0c;TM会将xid提交给TC&#xff0c;TC会返回当前事务状态&#xff0c;status由TC决定&#xff0c;TM最后会将xid从RootContext中解绑&#xff0c;全局事务结束。 TransactionalTemplate priva…

序列化类的高级用法

1.3.3 模型类序列化器 如果我们想要使用序列化器对应的是Django的模型类&#xff0c;DRF为我们提供了ModelSerializer模型类序列化器来帮助我们快速创建一个Serializer类。 ModelSerializer与常规的Serializer相同&#xff0c;但提供了&#xff1a; 基于模型类自动生成一系列…

ansible的playbook

1、playbook的组成部分 &#xff08;1&#xff09;task任务&#xff1a;在目标主机上执行的操作&#xff0c;使用模块定义这些操作&#xff0c;每个任务都是一个模块的调用 &#xff08;2&#xff09;variables变量&#xff1a;存储和传递数据&#xff08;变量可以自定义&…

使用Python将OSS文件免费下载到本地:项目分析和准备工作

大家好&#xff0c;我是水滴~~ 本文将介绍如何使用Python编程语言将OSS&#xff08;对象存储服务&#xff09;中的文件免费下载到本地计算机。我们先进行项目分析和准备工作&#xff0c;为后续的编码及实施提供基础。 《Python入门核心技术》专栏总目录・点这里 系列文章 使用…

RocketMQ系统性学习-RocketMQ原理分析之Broker接收消息的处理流程

Broker接收消息的处理流程&#xff1f; 既然要分析 Broker 接收消息&#xff0c;那么如何找到 Broker 接收消息并进行处理的程序入口呢&#xff1f; 那么消息既然是从生产者开始发送&#xff0c;消息是有单条消息和批量消息之分的&#xff0c;那么消息肯定是有一个标识&#…

java中常用的加密算法总结

目前在工作中常用到加密的一些场景&#xff0c;比如密码加密&#xff0c;数据加密&#xff0c;接口参数加密等&#xff0c;故通过本文总结以下常见的加密算法。 1. 对称加密算法 对称加密算法使用相同的密钥进行加密和解密。在Java中&#xff0c;常见的对称加密算法包括&…

活动回顾丨迁飞之路主题艺术墙绘落地大坪大融城

重庆作为鹰飞之城&#xff0c;不仅是数十万猛禽迁飞的必经之路&#xff0c;也是其他珍稀鸟类的家园。守护飞羽精灵&#xff0c;领略迁飞之美&#xff0c;2023年12月19日&#xff0c;传益千里携手重庆工商大学艺术学院党员服务站的志愿者们一起走进大坪大融城开展迁飞之路生态艺…

软件测试工程师,“我“从月10k到月30k进阶自动化测试之路...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 作为手工测试&…

Likeshop单商户高级版商城的二次开发之路

一、产品介绍 likeshop单商户高级版是一款适用于B2C、单商户、自营商城场景的商城系统。它完美契合私域流量变现闭环交易使用&#xff0c;拥有丰富的营销玩法、强大的分销能力&#xff0c;支持DIY多模板&#xff0c;前后端分离。无论您是想要进行商城运营还是二次开发&#xf…

聚观早报 |xPad2 Pro系列学习机发布;华为Mate X5典藏版实力过硬

聚观早报每日整理最值得关注的行业重点事件&#xff0c;帮助大家及时了解最新行业动态&#xff0c;每日读报&#xff0c;就读聚观365资讯简报。 整理丨Cutie 12月21日消息 xPad2 Pro系列学习机发布 华为Mate X5典藏版实力过硬 iQOO Neo9系列标配芯片Q1 亚马逊云科技自研芯…

CentOS 宣布停更3年后,服务器操作系统何去何从?

“CentOS 要停止更新了&#xff1f;” 盯着电脑&#xff0c;某大型企业数字化部门的负责人彭素素看到这个消息&#xff0c;不仅在心里发出了一声惊呼。 2020年&#xff0c;CentOS 停止更新的消息&#xff0c;不仅彭素素所在的企业&#xff0c;对于不少正在使用 CentOS 的厂商…

搞懂这6 个持续集成工具,领先80%测试人!

开发人员喜欢把写的代码当成自己的孩子&#xff0c;他们会被当成艺术品一样呵护。作为家长&#xff0c;总是会认为自己的孩子是最好的&#xff0c;也会尽全力给自己的孩子最好的&#xff0c;就算有时候会超出自己的能力范围。 最终&#xff0c;孩子会走出去&#xff0c;和其他…

【笔试强化】Day 6

文章目录 一、单选1.2.3.4.5.6.7. 二、不定项选择1.2.3. 三、编程1. 把字符串转换成整数解法&#xff1a;代码&#xff1a; 2. 不要二解法&#xff1a;代码&#xff1a; 一、单选 1. 正确答案&#xff1a;D2. 正确答案&#xff1a;B3. 正确答案&#xff1a;D4. 正确答案&#…

Python编程技巧 – 使用正则表达式

Python编程技巧 – 使用正则表达式 Python Programming Skills – Using Regular Expression By JacksonML Python以其强大的功能高居全球编程软件的榜首。它易于学习和使用&#xff0c;使其成为初学者绝佳语言。此外&#xff0c;Python还用于各种应用程序&#xff0c;包括We…

Java Swing学生成绩管理系统期末大作业

1.且看界面 &#xff08;1&#xff09;登录页&#xff08;可记住账号密码&#xff09; &#xff08;2&#xff09;注册弹窗页 &#xff08;3&#xff09;登录弹窗 &#xff08;4&#xff09;还有账号密码错误3次需等待30秒 &#xff08;5&#xff09;成绩展示页面&#xff08;…

【Spring】15 ApplicationContextAware 接口

文章目录 1. 简介2. 作用3. 使用3.1 创建并实现接口3.2 配置 Bean 信息3.3 创建启动类3.4 启动 4. 应用场景总结 Spring 框架提供了许多回调接口&#xff0c;用于在 Bean 的生命周期中执行特定的操作。ApplicationContextAware 接口是其中之一&#xff0c;它允许 Bean 获取对 A…

无代码API集成助力电商平台,提升味分享营销系统效率

无代码开发的革命 在数字化转型的浪潮中&#xff0c;无代码开发正在成为企业提升效率和灵活性的重要工具。特别是在电商领域&#xff0c;高效的客户关系管理&#xff08;CRM&#xff09;系统和客户服务系统对于保持竞争力至关重要。无代码API集成方案如何实现电商系统的优化和…

存在重复元素

题目链接 存在重复元素 题目描述 注意点 无 解答思路 根据Set无法存储相同元素的特点判断nums中是否存在重复元素 代码 class Solution {public boolean containsDuplicate(int[] nums) {Set<Integer> set new HashSet<Integer>();for (int x : nums) {if …