【Spark】SparkCore

news2025/1/19 23:27:23

文章目录

  • RDD
    • 特点:
      • 弹性
      • 分布式
      • 数据集
      • 数据抽象
      • 不可变
      • 可分区、并行计算
        • 分区列表
        • 分区计算函数
        • RDD 之间的依赖关系
        • 分区器(可选)
        • 首选位置(可选)
    • 执行原理
      • 启动 Yarn 集群环境
      • Spark 通过申请资源创建调度节点和计算节点
      • Spark 框架根据需求将计算逻辑根据分区划分成不同的任务
      • 调度节点将任务根据计算节点状态发送到对应的计算节点进行计算
    • 创建
      • 从集合(内存)中创建
        • parallelize
        • makeRDD
      • 从外部存储(文件)创建
      • 从其他RDD创建
      • 直接创建RDD(new)
    • 并行度与分区
    • 转换因子
    • 行动因子
      • reduce
      • collect
      • count
      • first
      • take
      • takeOrdered
      • aggregate
      • fold
      • countByKey
      • save相关算子
      • foreach
    • RDD序列化
      • 闭包检查
      • 序列化方法和属性
      • Kryo 序列化框架
    • RDD依赖关系
      • RDD血缘关系
      • RDD依赖关系
      • RDD任务划分
    • RDD持久化
      • RDD Cache缓存
        • 存储级别
      • RDD CheckPoint检查点
      • 缓存和检查点区别
    • RDD分区器
      • Hash分区(默认)
      • Range分区
      • 用户自定义分区
    • RDD读写文件
      • text文件
      • sequence文件
      • object对象文件
  • 累加器
    • 系统累加器
    • 自定义累加器
  • 广播变量
    • 来源

三大数据结构分别是:

➢ RDD : 弹性分布式数据集

➢ 累加器:分布式共享只写变量

➢ 广播变量:分布式共享只读变量

RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型

代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

特点:

弹性

存储的弹性:内存与磁盘的自动切换;

容错的弹性:数据丢失可以自动恢复;

计算的弹性:计算出错重试机制;

分片的弹性:可根据需要重新分片。

分布式

数据存储在大数据集群不同节点上

数据集

RDD 封装了计算逻辑,并不保存数据

数据抽象

RDD 是一个抽象类,需要子类具体实现

不可变

RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在新的 RDD 里面封装计算逻辑

可分区、并行计算

分区列表

RDD 数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性。

分区计算函数

Spark 在计算时,是使用分区函数对每一个分区进行计算

RDD 之间的依赖关系

RDD 是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个 RDD 建立依赖关系

分区器(可选)

当数据为 KV 类型数据时,可以通过设定分区器自定义数据的分区

首选位置(可选)

计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算

执行原理

从计算的角度来讲,数据处理过程中需要计算资源(内存 & CPU)和计算模型(逻辑)。

执行时,需要将计算资源和计算模型进行协调和整合。

Spark 框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的计算任务。然后将任务发到已经分配资源的计算节点上, 按照指定的计算模型进行数据计算。最后得到计算结果。

RDD 是 Spark 框架中用于数据处理的核心模型,在 Yarn 环境中,RDD的工作原理:

启动 Yarn 集群环境

yarn集群

Spark 通过申请资源创建调度节点和计算节点

spark与yarn

Spark 框架根据需求将计算逻辑根据分区划分成不同的任务

计算节点

调度节点将任务根据计算节点状态发送到对应的计算节点进行计算

在这里插入图片描述

创建

从集合(内存)中创建

parallelize

val rdd1 = sparkContext.parallelize(
 List(1,2,3,4)
)

makeRDD

val rdd2 = sparkContext.makeRDD(
 List(1,2,3,4)
)

从底层代码实现来讲,makeRDD 方法其实就是 parallelize 方法

从外部存储(文件)创建

本地的文件系统

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val fileRDD: RDD[String] = sparkContext.textFile("input")

所有 Hadoop 支持的数据集

从其他RDD创建

直接创建RDD(new)

使用 new 的方式直接构造 RDD,一般由 Spark 框架自身使用。

并行度与分区

并行度

Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能够并行计算的任务数量

val dataRDD: RDD[Int] =
 sparkContext.makeRDD(
 List(1,2,3,4),
 4)
val fileRDD: RDD[String] =
 sparkContext.textFile(
 "input",
 2)

转换因子

转换因子

行动因子

reduce

def reduce(f: (T, T) => T): T

聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据

collect

在驱动程序中,以数组 Array 的形式返回数据集的所有元素

count

返回 RDD 中元素的个数

first

返回 RDD 中的第一个元素

take

返回一个由 RDD 的前 n 个元素组成的数组

takeOrdered

返回该 RDD 排序后的前 n 个元素组成的数组

aggregate

分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 8)
val result: Int = rdd.aggregate(10)(_ + _, _ + _)

fold

折叠操作,aggregate 的简化版操作

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val foldResult: Int = rdd.fold(0)(_+_)

countByKey

统计每种 key 的个数

save相关算子

// 保存成 Text 文件
rdd.saveAsTextFile("output")
// 序列化成对象保存到文件
rdd.saveAsObjectFile("output1")
// 保存成 Sequencefile 文件
rdd.map((_,1)).saveAsSequenceFile("output2")

(4条消息) 一篇文章搞懂 SequenceFile 到底是什么以及该怎么用_Shockang的博客-CSDN博客

foreach

分布式遍历 RDD 中的每一个元素,调用指定函数

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 收集后打印
rdd.map(num=>num).collect().foreach(println)
println("****************")
// 分布式打印
rdd.foreach(println)

RDD序列化

闭包检查

从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor

端执行。那么在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就

形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给 Executor

端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列

化,这个操作我们称之为闭包检测。Scala2.12 版本后闭包编译方式发生了改变

序列化方法和属性

从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor端执行

Kryo 序列化框架

Java 的序列化能够序列化任何的类。但是比较重(字节多),序列化后,对象的提交也

比较大。Spark 出于性能的考虑,Spark2.0 开始支持另外一种 Kryo 序列化机制。Kryo 速度

是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型

已经在 Spark 内部使用 Kryo 来序列化。

RDD依赖关系

RDD血缘关系

RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建 RDD 的一系列 Lineage

(血统)记录下来,以便恢复丢失的分区。RDD 的 Lineage 会记录 RDD 的元数据信息和转

换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的

只记录计算过程而不记录中间数据

RDD依赖关系

两个相邻 RDD 之间的关系

RDD 窄依赖

RDD 宽依赖

RDD 阶段划分

DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,

不会闭环。例如,DAG 记录了 RDD 的转换过程和任务的阶段。

RDD任务划分

RDD 任务切分中间分为:Application、Job、Stage 和 Task

Application:初始化一个 SparkContext 即生成一个 Application;

Job:一个 Action 算子就会生成一个 Job;

Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1;

Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数。

Application->Job->Stage->Task 每一层都是 1 对 n 的关系。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kXqSIYoR-1688483545022)(C:/Users/19801/AppData/Roaming/Typora/typora-user-images/image-20230501185825933.png)]

RDD持久化

RDD Cache缓存

RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存在 JVM 的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 算子时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。

// cache 操作会增加血缘关系,不改变原有的血缘关系
println(wordToOneRdd.toDebugString)
// 数据缓存。
wordToOneRdd.cache()
// 可以更改存储级别
//mapRdd.persist(StorageLevel.MEMORY_AND_DISK_2)

存储级别

存储级别

丢失的数据会被重算,由于 RDD 的各个 Partition 是相对独立的,因此只需要计算丢失的部分即可,

并不需要重算全部 Partition。

Spark 会自动对一些 Shuffle 操作的中间数据做持久化操作(比如:reduceByKey)。这样

做的目的是为了当一个节点 Shuffle 失败了避免重新计算整个输入。但是,在实际使用的时

候,如果想重用数据,仍然建议调用 persist 或 cache。

RDD CheckPoint检查点

所谓的检查点其实就是通过将 RDD 中间结果写入磁盘

由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点

之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。

对 RDD 进行 checkpoint 操作并不会马上被执行,必须执行 Action 操作才能触发。

// 设置检查点路径
sc.setCheckpointDir("./checkpoint1")
// 创建一个 RDD,读取指定位置文件:hello atguigu atguigu
val lineRdd: RDD[String] = sc.textFile("input/1.txt")
// 业务逻辑
val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))
val wordToOneRdd: RDD[(String, Long)] = wordRdd.map {
 word => {
 (word, System.currentTimeMillis())
 }
}
// 增加缓存,避免再重新跑一个 job 做 checkpoint
wordToOneRdd.cache()
// 数据检查点:针对 wordToOneRdd 做检查点计算
wordToOneRdd.checkpoint()
// 触发执行逻辑
wordToOneRdd.collect().foreach(println)

缓存和检查点区别

1)Cache 缓存只是将数据保存起来,不切断血缘依赖。Checkpoint 检查点切断血缘依赖。

2)Cache 缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint 的数据通常存

储在 HDFS 等容错、高可用的文件系统,可靠性高。

3)建议对 checkpoint()的 RDD 使用 Cache 缓存,这样 checkpoint 的 job 只需从 Cache 缓存

中读取数据即可,否则需要再从头计算一次 RDD。

RDD分区器

分区器直接决定了 RDD 中分区的个数、RDD 中每条数据经过 Shuffle 后进入哪个分区,进而决定了 Reduce 的个数。

Hash分区(默认)

对于给定的 key,计算其 hashCode,并除以分区个数取余

Range分区

将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序

用户自定义分区

RDD读写文件

文件格式分为:text 文件、csv 文件、sequence 文件以及 Object 文件;

文件系统分为:本地文件系统、HDFS、HBASE 以及数据库。

text文件

// 读取输入文件
val inputRDD: RDD[String] = sc.textFile("input/1.txt")
// 保存数据
inputRDD.saveAsTextFile("output")

sequence文件

// 保存数据为 SequenceFile
dataRDD.saveAsSequenceFile("output")
// 读取 SequenceFile 文件
sc.sequenceFile[Int,Int]("output").collect().foreach(println)

object对象文件

对象文件是将对象序列化后保存的文件,采用 Java 的序列化机制。

// 保存数据
dataRDD.saveAsObjectFile("output")
// 读取数据
sc.objectFile[Int]("output").collect().foreach(println)

累加器

累加器用来把 Executor 端变量信息聚合到 Driver 端。

在 Driver 程序中定义的变量,在Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge。

系统累加器

val rdd = sc.makeRDD(List(1,2,3,4,5))
// 声明累加器
var sum = sc.longAccumulator("sum");
rdd.foreach(
 num => {
 // 使用累加器
 sum.add(num)
 }
)
// 获取累加器的值
println("sum = " + sum.value)

自定义累加器

// 自定义累加器
// 1. 继承 AccumulatorV2,并设定泛型
// 2. 重写累加器的抽象方法
class WordCountAccumulator extends AccumulatorV2[String, mutable.Map[String, 
Long]]{
var map : mutable.Map[String, Long] = mutable.Map()
// 累加器是否为初始状态
override def isZero: Boolean = {
 map.isEmpty
}
// 复制累加器
override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
 new WordCountAccumulator
}
// 重置累加器
override def reset(): Unit = {
 map.clear()
}
// 向累加器中增加数据 (In)
override def add(word: String): Unit = {
 // 查询 map 中是否存在相同的单词
 // 如果有相同的单词,那么单词的数量加 1
 // 如果没有相同的单词,那么在 map 中增加这个单词
 map(word) = map.getOrElse(word, 0L) + 1L
}
// 合并累加器
override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): 
Unit = {
 val map1 = map
 val map2 = other.value
 // 两个 Map 的合并
 map = map1.foldLeft(map2)(
 ( innerMap, kv ) => {
 innerMap(kv._1) = innerMap.getOrElse(kv._1, 0L) + kv._2
 innerMap
 }
 )
}
// 返回累加器的结果 (Out)
override def value: mutable.Map[String, Long] = map
}

广播变量

广播变量用来高效分发较大的对象。

向所有工作节点发送一个较大的只读值,以供一个或多个 Spark 操作使用。

// 声明广播变量
val broadcast: Broadcast[List[(String, Int)]] = sc.broadcast(list)
val resultRDD: RDD[(String, (Int, Int))] = rdd1.map {
 case (key, num) => {
 var num2 = 0
 // 使用广播变量
 for ((k, v) <- broadcast.value) {
 if (k == key) {
 num2 = v
 }
 }
 (key, (num, num2))
 }
}

来源

尚硅谷
https://blog.csdn.net/Shockang/article/details/117376761

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/719130.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ue4:Dota总结_BP_CameraPawn篇

设计wasd移动&#xff1a; 鼠标拖动视口&#xff1a; 鼠标滚轮调整远近&#xff1a; Beginplay&#xff1a; qe按键旋转&#xff1a; 变量&#xff1a;

Python练手项目-学生成绩管理系统

之前在最初的学习中&#xff0c;我就写过一个Python的学生管理系统&#xff0c;但是那个很粗糙&#xff0c;很简陋 今天在学习过程中&#xff0c;再次拿出来&#xff0c;重新优化书写 希望对初学者以及需要的朋友有帮助。 文章目录 系统开发环境&#xff1a;1.实现功能2.系统设…

css3动画应用

按钮边框 按钮 首先实现正常边框按钮&#xff0c; 其次按钮相对定位&#xff0c;因为旋转的内容需要绝对定位&#xff0c; 旋转内容做伪元素处理&#xff0c;z-index设置负数是为了把按钮文字显示出来&#xff0c; 定位一半一半是把长方块中心点放按钮正中&#xff0c; 变形原…

照片jpg大小kb如何修改?图片在线压缩大小怎么处理?

最近需要在各种报名平台上传照片的小伙伴比较多&#xff0c;难免会遇到需要压缩jpg图片的情况&#xff0c;那么怎么才能将jpg图片压缩&#xff08;https://www.yasuotu.com/jpg&#xff09;呢&#xff1f;今天介绍一个图片在线压缩大小的方法&#xff0c;不用下载任何软件就可以…

内嵌 iframe 实现PDF预览

效果图如下&#xff1a; 代码如下&#xff1a; <template><div><!-- 控制浮层显示隐藏 --><el-button type"primary" size"small" class"btn" click"dialogVisible true">PDF 预览 (内嵌 iframe)</el-but…

pwn(7.4小学期)

Ret2libc 先查一下壳 32位&#xff0c;放入IDA中看看 查看一下 vulnerable_function();这个函数 Read函数&#xff0c;很明显的栈溢出 但是观察半天&#xff0c;发现这里并没有system函数 字符搜索我们看到了libc.so.6 可以想到libc函数库以及 PLT表中的代码会根据函数在…

[已成功破解] 阿里 taobao 滑条验证码 x5sec解密 slidedata参数

[已破解] 阿里 taobao 滑条验证码 x5sec解密 slidedata参数 今天在爬tb数据的时候 发现老是会触发一个滑块验证 只要过了这个滑块将滑块返回的x5secdata 的cookie 带到请求参数里面去 就能完美避开了 然后去抓了下滑块的包 过了这个滑块拿到cookie就能 访问阿里一直获取数据…

FPGA纯verilog实现UDP协议栈 AXIS用户接口,可替代Tri Mode Ethernet MAC,提供三套工程源码和技术支持

目录 1、前言2、我这里已有的UDP方案3、该UDP协议栈性能4、详细设计方案网络PHYRGMII转GMII模块AXIS FIFOUDP协议栈 5、vivado工程1-->B50610 工程6、vivado工程1-->RTL8211 工程7、vivado工程1-->88E1518 工程8、上板调试验证并演示准备工作查看ARPUDP数据回环测试 9…

1.5 基于MyBatis数据库逆向生成工具,并创建serverice和controller控制层

步骤1&#xff1a;在mybatis-generator中添加要生成的数据库表名 在genratorConfig.xml内容: <!-- 数据库表 --> <table tableName"stu"></table>步骤2&#xff1a;StuMapper.xml和StuMapper拷贝到对应的mapper模块下 步骤3&#xff1a;pojo对应…

计算机视觉颜色校正方法

计算机视觉颜色校正方法 调色和色彩矫正之间的区别直方图均衡化原理实现代码 CCM颜色校正矩阵原理 深度学习Deep_White_Balance什么是sRGB图像问题描述&#xff1a;方法概述&#xff1a;模型架构&#xff1a;训练损失函数实现快速开始 调色和色彩矫正之间的区别 调色是指通过调…

Vue3使用element-plus实现弹窗效果-demo

使用 <ShareDialog v-model"isShow" onChangeDialog"onChangeDialog" /> import ShareDialog from ./ShareDialog.vue; const isShow ref(false); const onShowDialog (show) > {isShow.value show; }; const onChangeDialog (val) > {co…

使用AI人工智能给线稿上色,给漫画上色

【深度学习】AIGC &#xff0c;ControlNet 论文中的图。 一些酷炫的可以做纵深领域的图&#xff0c;这里再放一下。 下图是由手绘草稿给出图&#xff1a; 由线稿得到图&#xff0c;给漫画上色&#xff1a;

简单的PWN学习-ret2shellcode

最近笔者开始钻研pwn的一些知识&#xff0c;发现栈溢出真的非常的有意思&#xff0c;于是经过一个多礼拜的学习&#xff0c;终于是把2016年的一道CTF题给看明白了了&#xff0c;首先我们学习一下前置技能 0x01 shellcode​ 首先简单看一下shellcode是怎么生成的&#xff0c;首…

Linux和Windows两种平台安装Maven

Linux和Windows两种平台安装Maven 0. 写在前面 Linux版本&#xff1a;Centos-7.5Windows版本&#xff1a;Windows10Maven版本&#xff1a;Maven-3.5.4JDK版本&#xff1a;jdk1.8 1. Linux平台安装 1.1 查看Linux内核版本命令 查看Linux内核版本命令 [whybigdatanode02 ~]$ …

AI医疗。

随着技术的发展&#xff0c;人工智能&#xff08;AI&#xff09;已经渗透到了我们生活的许多领域&#xff0c;包括凭其强大的预测和分析能力已经走入了医疗卫生领域。特别是在使用OpenAI的GPT-4技术的chatbot&#xff0c;如chatGPT和GPT-4等&#xff0c;已经成为了给医疗行业注…

驾驶舱数据指标体系设计指南

大数据时代下&#xff0c;各行各业面对众多的顾客和复杂多变的市场需求&#xff0c;要想及时适应市场变化&#xff0c;掌握市场动态&#xff0c;就需要对各个环节的数据进行分析&#xff0c;得到科学有效的结论来指导决策&#xff0c;这就离不开领导驾驶舱。 一、领导驾驶舱是什…

记一次 .NET 某工控视觉系统 卡死分析

一&#xff1a;背景 1. 讲故事 前段时间有位朋友找到我&#xff0c;说他们的工业视觉软件僵死了&#xff0c;让我帮忙看下到底是什么情况&#xff0c;哈哈&#xff0c;其实卡死的问题相对好定位&#xff0c;无非就是看主线程栈嘛&#xff0c;然后就是具体问题具体分析&#x…

为生信写的Python简明教程 | 视频7

开源生信 Python教程 生信专用简明 Python 文字和视频教程 源码在&#xff1a;https://github.com/Tong-Chen/Bioinfo_course_python 目录 背景介绍 编程开篇为什么学习Python如何安装Python如何运行Python命令和脚本使用什么编辑器写Python脚本Python程序事例Python基本语法 数…

BurpSutie拓展插件推荐-辅助测试插件

为方便您的阅读&#xff0c;可点击下方蓝色字体&#xff0c;进行跳转↓↓↓ 01 chunked-coding-converter-0.4.0&#xff08;1&#xff09;工具介绍&#xff08;2&#xff09;下载地址&#xff08;3&#xff09;使用说明 02 captcha-killer&#xff08;1&#xff09;工具介绍&a…

HOT43-验证二叉搜索树

leetcode原题链接&#xff1a;验证二叉搜索树 题目描述 给你一个二叉树的根节点 root &#xff0c;判断其是否是一个有效的二叉搜索树。 有效 二叉搜索树定义如下&#xff1a; 节点的左子树只包含 小于 当前节点的数。节点的右子树只包含 大于 当前节点的数。所有左子树和右…