大数据-94 Spark 集群 SQL DataFrame DataSet RDD 创建与相互转换 SparkSQL

news2024/11/24 15:55:30

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(正在更新!)

章节内容

上节完成的内容如下:

  • SparkSQL介绍
  • SparkSQL特点
  • SparkSQL数据抽象
  • SparkSQL数据类型
    在这里插入图片描述

SparkSession

在 Spark2.0 之前

  • SQLContext 是创建 DataFrame 和 执行SQL的入口
  • HiveContext 通过HiveSQL语句操作Hive数据,兼Hive操作,HiveContext继承自SQLContext
    在这里插入图片描述

在 Spark2.0 后

  • 这些入口点统一到了SparkSession,SparkSession封装了SQLContext及HiveContext
  • 实现了SQLContext即HiveContext所有功能
  • 通过SparkSession可以获取到SparkContext

RDD(Resilient Distributed Dataset,弹性分布式数据集)

RDD 是 Spark 的基础抽象,它表示一个不可变的、分布式的数据集。

特点:

  • 不可变性:RDD 是不可变的,一旦创建就不能修改。任何对 RDD 的操作都会生成一个新的 RDD。
  • 弹性:RDD 可以自动从节点失败中恢复数据,通过将计算逻辑重新应用到原始数据来重建丢失的数据。
  • 分布式:RDD 可以分布在多个节点上执行操作,充分利用集群的计算能力。
  • 延迟计算:RDD 的操作是延迟执行的(lazy evaluation),即只有在触发行动操作(如 count()、collect())时,Spark 才会实际执行计算。
  • 类型安全:RDD 是类型化的,但它的 API 是松散类型(loosely typed)的,这意味着编译器不会在编译时检查数据的类型,而是在运行时才会发现类型错误。

DataFrame

DataFrame 是一种基于 RDD 的分布式数据集,它具有命名的列。

特点:

  • 结构化数据:DataFrame 是一个二维表格,具有命名的列和行,类似于关系数据库中的表或 Pandas 的 DataFrame。
  • 优化引擎:DataFrame 受益于 Spark SQL 引擎的优化,如 Catalyst 优化器,可以自动优化查询并生成高效的执行计划。
  • 丰富的 API:DataFrame 提供了一个高层次的 API,支持复杂的查询、过滤、聚合和连接操作。
  • 类型不安全:与 RDD 不同,DataFrame 是动态类型(dynamic typing)的,数据类型检查是在运行时进行的,因此它在编译时不进行类型检查。

DataSet

DataSet 是 Spark 1.6 引入的一个新的数据抽象,它结合了 RDD 的强类型优势和 DataFrame 的优化能力。

特点:

  • 类型安全:DataSet 是强类型的,它利用编译时类型检查,确保在编译时检测类型错误。
  • 优化和性能:DataSet 受益于 Catalyst 优化器和 Tungsten 执行引擎,提供与 DataFrame 相同的优化能力,同时保留了类型安全性。
  • 更丰富的 API:DataSet 提供了 RDD 的大部分 API,如 map、filter 等,同时也支持 SQL 查询。
  • 统一 API:DataSet API 统一了 RDD 和 DataFrame,提供了一种更具表现力和安全性的编程模型。

DataFrame & Dataset 创建

不要刻意区分: DF & DS,DF是一种特殊的DS:ds.transformation => ds

由 Range 生成 Dataset

在 spark-shell 中进行测试

val numDS = spark.range(5, 100, 5)
// orderBy 转换操作 
numDS.orderBy(desc("id")).show(5)
// 统计信息
numDS.describe().show
// 显示 Schema 信息
numDS.printSchema
// 使用RDD执行同样的操作
numDS.rdd.map(_.toInt).stats
// 检查分区数
numDS.rdd.getNumPartitions

运行测试的过程如下图所示:
在这里插入图片描述

有集合生成Dataset

Dataset = RDD[case class],在 spark-shell 中进行测试

case class Person(name: String, age: Int, height: Int)

// 注意 Seq 中元素的类型
val seq1 = Seq(Person("Jack", 28, 184), Person("Tom", 10, 144), Person("Andy", 16, 165))

val ds1 = spark.createDataset(seq1)
ds1.printSchema
ds1.show

执行的结果:
在这里插入图片描述
再来一个测试:

val seq2 = Seq(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
val ds2 = spark.createDataset(seq2)
ds2.printSchema
ds2.show

执行的结果:
在这里插入图片描述

由集合生成DataFrame

DataFrame = RDD[Row] + Schema
继续进行测试:

val lst = List(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
val df1 = spark.createDataFrame(lst).withColumnRenamed("_1", "name1").withColumnRenamed("_2", "age1").withColumnRenamed("_3", "height1")
df1.orderBy("age1").show(10)

执行的结果如下图所示:
在这里插入图片描述

RDD转成DataFrame

DataFrame = RDD[Row] + Schema

val arr = Array(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
val rdd1 = sc.makeRDD(arr).map(f => Row(f._1, f._2, f._3))

val schema = StructType(
  StructField("name", StringType, false) ::
  StructField("age", IntegerType, false) ::
  StructField("height", IntegerType, false) ::
  Nil
)

val schema1 = (new StructType).add("name", "string", false).add("age", "int", false).add("height", "int", false)
val rddToDF = spark.createDataFrame(rdd1, schema)
rddToDF.orderBy(desc("name")).show(false)

执行的结果如下图:
在这里插入图片描述

RDD转Dataset

Dataset = RDD[case class]
DataFrame = RDD[Row] + Schema

val arr = Array(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
val rdd1 = sc.makeRDD(arr)
val ds2 = spark.createDataset(rdd1)
ds2.show(10)

执行的结果如下图:
在这里插入图片描述

从文件创建DataFrame

CSV文件

我们生成了一个CSV文件,大致内容如下:
在这里插入图片描述

运行测试

val df1 = spark.read.csv("/opt/wzk/data/people1.csv")
df1.printSchema()
df1.show()

运行结果如下图所示:
在这里插入图片描述

三者转换

在这里插入图片描述
Spark SQL 提供了一个领域特定语言(DSL)以方便操作结构化数据,核心思想还是SQL,仅仅是一个语法问题。

RDD 与 DataFrame 之间的转换

RDD 转换为 DataFrame

将 RDD 转换为 DataFrame 需要提供数据的模式信息。通常你会使用 toDF() 方法将 RDD 转换为 DataFrame。
这里有两种主要方法:

  • 使用隐式转换:需要导入 spark.implicits._,这允许你在不显式提供模式的情况下将常见的 RDD(如元组)转换为 DataFrame。
  • 使用 StructType 定义模式:如果 RDD 的数据结构比较复杂,或者你需要精确控制 DataFrame 的模式,可以使用 StructType 和 Row。

DataFrame 转换为 RDD:

  • 将 DataFrame 转换为 RDD 非常简单,只需调用 rdd 方法即可

DataFrame 与 DataSet 之间的转换

DataFrame 转换为 DataSet

  • DataFrame 是无类型的,而 DataSet 是类型化的。为了将 DataFrame 转换为 DataSet,你需要定义一个对应的数据类型(通常是一个 case class)并使用 as[T] 方法

DataSet 转换为 DataFrame

  • 将 DataSet 转换为 DataFrame 非常简单,只需调用 toDF() 方法即可

RDD 与 DataSet 之间的转换

RDD 转换为 DataSet

  • 将 RDD 转换为 DataSet 需要将 RDD 的元素类型与 DataSet 的类型一致。与将 RDD 转换为 DataFrame 类似,通常使用隐式转换或显式提供模式信息

DataSet 转换为 RDD

  • DataSet 本质上是类型化的 RDD,因此转换为 RDD 非常直接,只需调用 rdd 方法

最终汇总

  • RDD 转换为 DataFrame:使用 toDF(),或使用 createDataFrame() 提供模式。
  • DataFrame 转换为 RDD:使用 rdd 方法,转换后元素类型为 Row。
  • DataFrame 转换为 DataSet:使用 as[T] 方法,需提供对应的 case class。
  • DataSet 转换为 DataFrame:使用 toDF() 方法。
  • RDD 转换为 DataSet:使用 toDS(),需提供对应的 case class。
  • DataSet 转换为 RDD:使用 rdd 方法。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2058412.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据资产入表,全流程实施指南!

数据成为生产要素已是社会共识,但不是所有数据都有资产价值。数据资源当中被重复使用的那部分才会资产化,具有流通中的定价,有些数据资产被专业开发变成数据产品,具有商品价值。从数据原始资源到数据产品,再到数据资产…

华为LTC流程体系的内涵(附PPT分享)

往期回顾: 企业4A架构:数字化转型的底层方法论(附TOGAF资料下载) PPT分享:数据治理的方法论、设计思路与方案(干货) 浅谈数字化转型方法论 110页PPT:xx业务流程优化(BPR&#xff…

Linux压缩和解压

目录 压缩和解压类 gzip/gunzip指令 zip/unzip指令 tar指令 压缩和解压类 gzip/gunzip指令 gzip用于压缩文件,gunzip用于解压缩文件。 解压后去掉了gz的后缀。 zip/unzip指令 ​​​​​​​ 将文件压缩后发给别人,别人再解压。 将整个文件压…

Python | Leetcode Python题解之第354题俄罗斯套娃信封问题

题目: 题解: class Solution:def maxEnvelopes(self, envelopes: List[List[int]]) -> int:if not envelopes:return 0n len(envelopes)envelopes.sort(keylambda x: (x[0], -x[1]))f [1] * nfor i in range(n):for j in range(i):if envelopes[j]…

利用srs进行视频流转发

框图如下 docker-compose.yaml如下 rtmp2rtc.conf的配置如下 就增加了 #配置如下 forward {enabled on;#开启转发backend http://192.168.0.131:6789/api/v1/forward; #有视频流数据后会调用这个接口} #回调的参数如下 Received payload: {actionon_forward, server_idvid-k2…

字节微前端框架Garfish

Garfish 是字节跳动开源的微前端框架,旨在应对现代 Web 应用在前端生态繁荣与应用日益复杂化背景下的挑战。本文将介绍如何使用 Garfish,提供代码示例,并与另一流行的微前端框架 Qiankun 进行对比分析。 安装 Garfish 首先,安装…

深度学习11--GAN进阶与变种

基础 GAN 存在的问题 在开始讲解变种之前,首先讲一下GAN 存在的问题。第一个问题就是判别器D太强了,损失都是0。假设判别器D能力强,G vl生成的图片与真实图片相差巨大,G v2生成的图片与真实图片相差不多,但是判别器都能…

01. 真正实现一键自动化生成pdf报告

easypdf使用手册 1. 项目介绍1.1 关于1.2 easypdf 有什么优势1.2 easypdf 可以用来做什么1.3 我们该做哪些准备?如何获取easypdf?1.4 项目演示 文章头部展示的附件pdf文档easypdf是项目执行生成的pdf文档 1. 项目介绍 1.1 关于 \qquad easypdf 是我在基…

Floyd算法(最短路问题)

文章目录 Floyd算法介绍Floyd算法思路代码及讲解 Floyd算法介绍 Floyd算法是一种用于找出加权图中所有顶点间最短路径的动态规划方法。它通过逐步考虑每个顶点作为中转点,检查是否有更短路径。算法首先初始化一个权值矩阵,然后通过三层循环更新矩阵&…

linux操作系统命令-文件系统-用户系统-网络-磁盘-进程-常用特殊字符-重定向-

Shell命令格式 命令提示符: 通常显示为 用户名主机名:目录名 $ 或 用户名主机名:目录名 #(对于root用户)。~ 表示当前用户的家目录。 命令格式: bash $ 命令 [-选项] [参数1] [参数2] ... $ 或 # 是命令提示符,$…

Go操作Redis基础方法小全

前言 在前一篇文章中,我们聊了Go操作Redis安装和使用,接下来这篇内容,就简单说一下,Go中操作Redis基础方法,通常是通过第三方库来实现的,如go-redis/redis(现在可能已经是v9或更高版本&#xf…

宋红康JVM调优思维导图

文章目录 1. 概述2. JVM监控及诊断命令-命令行篇3. JVM监控及诊断工具-GUI篇4. JVM运行时参数5. 分析GC日志 课程地址 1. 概述 2. JVM监控及诊断命令-命令行篇 3. JVM监控及诊断工具-GUI篇 4. JVM运行时参数 5. 分析GC日志

ubuntu安装minio

# 下载MinIO的可执行文件 curl -O https://dl.min.io/server/minio/release/linux-amd64/minio # 添加执行权限 chmod x minio # 运行MinIO (需要先创建存储数据和存储存储桶的目录) nohup ./minio server /home/lighthouse/minioDir/data /home/lighthouse/minioDir/bucke…

[JAVA]创建多线程的三种方式与区别

继承Thread类创建线程实现Runnable接口创建线程Callable接口创建线程 要学习创建线程,我们要通过代码来演示,这里我们可以通过实现以下参赛者跑步的场景来展开。 模拟以下场景 模拟10秒短跑程序 假设,这里有三名参赛者,十秒钟时…

使用对比!SLS 数据加工 SPL 与旧版 DSL 场景对照

作者:灵圣 概述 如前一篇《SLS 数据加工全面升级,集成 SPL 语法》所述,SLS 数据加工集成了 SLS 数据处理语法 SPL。与旧版本数据加工 DSL 相比,SPL 在处理非结构化数据的场景中,其语法简洁度上有很多提升&#xff0c…

监控台操作台空间设计如何提升工作环境

在现代化办公环境中,监控台与操作台不仅是技术监控与操作的核心区域,更是工作效率与团队协作的关键所在。优化其空间设计,不仅能显著提升工作环境的质量,还能激发员工的工作热情与创造力。 首先,人体工程学设计是提升监…

C++ | Leetcode C++题解之第352题将数据流变为多个不想交区间

题目&#xff1a; 题解&#xff1a; class SummaryRanges { private:map<int, int> intervals;public:SummaryRanges() {}void addNum(int val) {// 找到 l1 最小的且满足 l1 > val 的区间 interval1 [l1, r1]// 如果不存在这样的区间&#xff0c;interval1 为尾迭代…

Ubuntu 中GCC交叉编译工具链安装

​ Ubuntu 自带的 gcc 编译器是针对 X86 架构的&#xff0c;如果要编译的是 ARM 架构的代码&#xff0c;就需要一个在 X86 架构的 PC 上运行&#xff0c;可以编译 ARM 架 构代码的 GCC 编译器&#xff0c;这个编译器就叫做交叉编译器&#xff0c;总结一下交叉编译器就是&#x…

基于springboot的宠物领养与丢失寻找信息平台-计算机毕设 附源码 14772

基于springboot的宠物领养与丢失寻找信息平台 摘 要 本文介绍了一个基于Spring Boot框架的宠物领养与丢失寻找信息平台的设计与实现。该平台旨在解决宠物领养和丢失寻找过程中的信息不对称和效率低下问题&#xff0c;为宠物主人、领养者提供一个便捷、高效的交互平台。 平台采用…

C++观察者模式Observer

组件协作 –(都是晚绑定的&#xff09; ----观察者模式 为某些对象建立一种通知依赖的关系&#xff0c; 只要这个对象状态发生改变&#xff0c;观察者对象都能得到通知。 但是依赖关系要松耦合&#xff0c;不要太依赖。 eg&#xff1a;做一个文件分割器&#xff0c;需要一个…