Spark17-18-19

news2025/2/24 10:06:49

17. Spark执行流程

 

17.1 创建SparkContext

使用spark-submit脚本,会启动SparkSubmit进程,然后通过反射调用我们通过--class传入类的main方法,在main方法中,就行我们写的业务逻辑了,先创建SparkContext,向Master申请资源,然后Master跟Worker通信,启动Executor,然后所有的Executor向Driver反向注册

17.2 创建RDD并构建DAG

DAG(Directed Acyclic Graph)叫做有向无环图,是的一系列RDD转换关系的描述,原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。

 

依赖关系划分为两种:窄依赖(Narrow Dependency)和 宽依赖(源码中为Shuffle Dependency)。

窄依赖指的是父 RDD 中的一个分区最多只会被子RDD 中的一个分区使用,意味着父RDD的一个分区内的数据是不能被分割的,子RDD的任务可以跟父RDD在同一个Executor一起执行,不需要经过 Shuffle 阶段去重组数据。

窄依赖包括两种:一对一依赖(OneToOneDependency)和范围依赖(RangeDependency) 

一对一依赖

  

宽依赖指的是父 RDD 中的分区可能会被多个子 RDD 分区使用。因为父 RDD 中一个分区内的数据会被分割,发送给子 RDD 的多个分区,因此宽依赖也意味着父 RDD 与子 RDD 之间存在着 Shuffle 过程。

宽依赖只有一种:Shuffle依赖(ShuffleDependency) 

  

17.3 切分Stage,生成Task和TaskSet

触发Action,会根据最后一个RDD,从后往前推,如果是窄依赖(没有shuffle),继续往前推,如果是宽依赖(有shuffle),那么会递归进去,然后再根据递归进去的最后一个RDD进行向前推,如果一个RDD再也没有父RDD(递归出口),那么递归出来划分Stage(DAGScheduler完成的以上工作)

 

17.4 将Task调度到Executor

划分完Stage后,DAGScheduler将根据Stage的类型,生成Task,然后将同一个Stage的多个计算逻辑相同的Task放入到同一个TaskSet中,然后向DAGScheduler将TaskSet传递给TaskScheduler,TaskScheduler会根据Executor的的资源情况,然后将Task序列化发送给Executor

17.5 在Executor中执行Task

Executor接收到TaskScheduler发送过来的Task后,将其反序列化,然后使用一个实现了Runnable接口的包装类进行包装,最后将包装的Task丢入到线程池,一旦丢入到线程池,run方法会执行,run方法会调用Task对应的迭代器链进行迭代数据

 

  • Job:RDD每一个行动操作都会生成一个或者多个调度阶段 调度阶段(Stage):每个Job都会根据依赖关系,以Shuffle过程作为划分,分为Shuffle Map Stage和Result Stage。每个Stage对应一个TaskSet,一个Task中包含多Task,TaskSet的数量与该阶段最后一个RDD的分区数相同。 
  • Task:分发到Executor上的工作任务,是Spark的最小执行单元 
  • DAGScheduler:DAGScheduler是将DAG根据宽依赖将切分Stage,负责划分调度阶段并Stage转成TaskSet提交给TaskScheduler
  • TaskScheduler:TaskScheduler是将Task序列化然后发送到Worker下的Exexcutor进程,在Executor中,将Task反序列化,然后使用实现Runable接口的包装类包装,最后丢入到Executor的线程池的中进行执行 

18. shuffle 过程详解

18.1 spark shuffle 演进的历史

Spark 0.8及以前 Hash Based Shuffle

Spark 0.8.1 为Hash Based Shuffle引入File Consolidation机制

Spark 0.9 引入ExternalAppendOnlyMap

Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle

Spark 1.2 默认的Shuffle方式改为Sort Based Shuffle

Spark 1.4 引入Tungsten-Sort Based Shuffle

Spark 1.6 Tungsten-sort并入Sort Based Shuffle

Spark 2.0 Hash Based Shuffle退出历史舞台

18.2 HashShuffleManager(已不再使用)

  • 优化前的

在shuffle write前,应用分区器,根据对应的分区规则,计算出数据partition编号,然后将数据写入bucket内存中,当数据达到一定大小或数据全部处理完后,将数据溢写持久化。之所以要持久化,一方面是要减少内存存储空间压力,另一方面也是为了容错降低数据恢复的代价。

 

 上图有2个Executor,每个Executor有1个coretotal-executor-cores为数为 2,每个 task 的执行结果会被溢写到本地磁盘上。每个 task 包含 R 个缓冲区,R = reducer 个数(也就是下一个 stage 中 task 的个数),缓冲区被称为 bucket,其大小为spark.shuffle.file.buffer.kb ,默认是 32KB。

其实bucket表示缓冲区,即ShuffleMapTask 调用分区器的后数据要存放的地方。

ShuffleMapTask 的执行过程:先根据 pipeline 的计算逻辑对数据进行运算,然后根据分区器计算出每一个record的分区编号。每得到一个 record 就将其送到对应的 bucket 里,具体是哪个 bucket 由partitioner.getPartition(record.getKey()))决定。每个 bucket 里面的数据会满足溢写的条件会被溢写到本地磁盘上,形成一个 ShuffleBlockFile,或者简称 FileSegment。之后的 下游的task会根据分区会去 fetch 属于自己的 FileSegment,进入 shuffle read 阶段。

老版本的HashShuffleManager存在的问题:

1.产成的 FileSegment 过多。每个 ShuffleMapTask 产生 R(下游Task的数量)个 FileSegment,M 个 ShuffleMapTask 就会产生 M * R 个文件。一般 Spark job 的 M 和 R 都很大,因此磁盘上会存在大量的数据文件。

2.缓冲区占用内存空间大。每个 ShuffleMapTask 需要开 R 个 bucket,M 个 ShuffleMapTask 就会产生 M * R 个 bucket。虽然一个 ShuffleMapTask 结束后,对应的缓冲区可以被回收,但一个 worker node 上同时存在的 bucket 个数可以达到 cores  R 个,占用的内存空间也就达到了cores * R * 32 KB。对于 8 核 1000 个 reducer 来说,占用内存就是 256MB。

  • 优化后的:

 

可以明显看出,在一个core上连续执行的ShuffleMapTasks可以共用一个输出文件 ShuffleFile。先执行完的 ShuffleMapTask 形成ShuffleBlock i,后执行的 ShuffleMapTask可以将输出数据直接追加到ShuffleBlock i后面,形成ShuffleBlock’,每个ShuffleBlock被称为FileSegment。下一个stage的reducer只需要fetch整个 ShuffleFile就行了。这样每个Executor持有的文件数降为cores*R。

FileConsolidation 功能可以通过spark.shuffle.consolidateFiles=true来开启。

18.3 SortShuffleManager

 

  • BypassMergeSortShuffleWriter

使用这种ShuffleWriter的条件是:

(1) 没有map端的聚合操作

(2) 分区数小于参数:spark.shuffle.sort.bypassMergeThreshold,默认是200

BypassMergeSortShuffleWriter 算法适用于没有聚合,数据量不大的场景。 给每个分区分配一个临时文件,对每个 record 的 key 使用分区器(模式是hash,如果用户自定义就使用自定义的分区器)找到对应分区的输出文件并写入文件对应的文件。

因为写入磁盘文件是通过 Java的 BufferedOutputStream 实现的,BufferedOutputStream 是 Java 的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘 IO 次数,提升性能。所以图中会有内存缓冲的概念。

 

 

 

  • UnsafeShuffleWriter

使用这种ShuffleWriter的条件是:

  • Serializer 支持 relocation。Serializer 支持 relocation 是指,Serializer 可以对已经序列化的对象进行排序,这种排序起到的效果和先对数据排序再序列化一致。支持 relocation 的 Serializer 是 KryoSerializer,Spark 默认使用 JavaSerializer,通过参数 spark.serializer 设置;
  • 没有指定 aggregation 或者 key 排序, 因为 key 没有编码到排序指针中,所以只有 partition 级别的排序。
  • partition 数量不能大于指定的阈值(2^24),因为 partition number 使用24bit 表示的。即不能大于PackedRecordPointer.MAXIMUM_PARTITION_ID + 1

UnsafeShuffleWriter 将 record 序列化后插入sorter,然后对已经序列化的 record 进行排序,并在排序完成后写入磁盘文件作为 spill file,再将多个 spill file 合并成一个输出文件。在合并时会基于 spill file 的数量和 IO compression codec 选择最合适的合并策

  • SortShuffleWriter

若以上两种ShuffleWriter都不能选择,则使用SortShuffleWriter类,SortShuffleWriter也是相对比较常用的一种ShuffleWriter。

1.SortShuffleWriter会先把数据先写入到内存中,并会尝试扩展内存大小,若内存不足,则把数据持久化到磁盘上。

2.SortShuffleWriter在把数据写入磁盘时,会按分区ID进行合并,并对key进行排序,然后写入到该分区的临时文件中。

3.SortShuffleWriter最后会把前面写的分区临时文件进行合并,合并成一个文件,也就是说,会在map操作结束时把各个分区文件合并成一个文件。这样做可以有效的减少文件个数,和为了维护这些文件而产生的资源消耗。

 

在进行 shuffle 之前,map 端会先将数据进行排序。排序的规则,根据不同的场景,会分为两种。首先会根据 Key 将元素分成不同的 partition。第一种只需要保证元素的 partitionId 排序,但不会保证同一个 partitionId 的内部排序。第二种是既保证元素的 partitionId 排序,也会保证同一个 partitionId 的内部排序。

接着,往内存写入数据,每隔一段时间,当向 MemoryManager 申请不到足够的内存时,或者数据量超过 spark.shuffle.spill.numElementsForceSpillThreshold 这个阈值时 (默认是 Long 的最大值,不起作用),就会进行 Spill 内存数据到文件,然后清空内存数据结构。假设可以源源不断的申请到内存,那么 Write 阶段的所有数据将一直保存在内存中,由此可见,PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 是比较吃内存的。

在溢写到磁盘文件之前,会先根据 key 对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的 batch 数量是 10000 条,也就是说,排序好的数据,会以每批 1 万条数据的形式分批写入磁盘文件。写入磁盘文件也是通过 Java 的 BufferedOutputStream 实现的。

一个 task 将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。在将最终排序结果写入到数据文件之前,需要将内存中的 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 和已经 spill 到磁盘的 SpillFiles 进行合并。

此外,由于一个 task 就只对应一个磁盘文件,也就意味着该 task 为下游 stage 的 task 准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个 task 的数据在文件中的 start offset 与 end offset。

19. Spark思考题

  • 1.SparkContext哪一端生成的?

Driver端即SparkContext(Driver是一个统称,里面还DAGScheduler、TaskScheduler、ShuffleManager、BroadcastManager等)

  • 2.DAG是在哪一端被构建的?

Drvier端

    

  • 3.RDD是在哪一端创建的?

Driver端,RDD不装真正要计算的数据,而是记录了数据的描述信息(以后从哪里读数据,怎么计算)

  • 6.调用RDD的算子(Transformation和Action)是在哪一端调用的

Driver端

        

  • 7.RDD在调用Transformation和Action时需要传入一个函数,函数是在哪一端声明【定义】和传入的?

Driver端

  • 6.RDD在调用Transformation和Action时需要传入函数,请问传入的函数是在哪一端执行了函数的业务逻辑?

Executor中的Task指定的

  

  • 9.Task是在哪一端生成的呢?

Driver端,Task分为ShuffleMapTask和ResultTask

  • 10.DAG是在哪一端构建好的并被切分成一到多个Stage的

Driver

  • 11.DAG是哪个类完成的切分Stage的功能?

DAGScheduler

         

  • 12.DAGScheduler将切分好的Task以什么样的形式给TaskScheduler

TaskSet

  • 13.分区器这个类是在哪一端实例化的?

Driver端

  • 14.分区器中的getParitition方法在哪一端调用的呢?

Executror中的Task

  • 15.广播变量是在哪一端调用的方法进行广播的?

Driver端

  • 16.要广播的数据应该在哪一端先创建好再广播呢?

Driver端

  • 17.广播变量以后能修改吗?

不能修改

  • 18.广播变量广播到Executor后,一个Executor进程中有几份广播变量的数据

一份全部广播的数据

19.广播变量如何释放

调用广播变量返回到Driver端的引用的unpersist()方法进行释放

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/698036.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

YOLOv5-7.0添加BottleNet transformer

YOLOv5主干特征提取网络为CNN网络,CNN具有平移不变性和局部性,缺乏全局建模长距离建模的能力,引入自然语言领域的Transformer可以形成CNNTransFormer架构,充分结合两者的优点,提高目标检测效果。 1. BoTNet 论文地址…

BeautifulSoup爬取豆瓣电影数据

BeautifulSoup爬取豆瓣TOP250 豆瓣爬取地址 https://movie.douban.com/top250?formattext BeautifulSoup官网地址 https://www.rddoc.com/doc/BeautifulSoup/4.5.3/zh/quick-start/ 安装所需函数库 pip install beautifulsoup4pip install lxmlpip install requests导入…

IIS安装localhost显示下载,urlrewrite设置

1.取消ftp服务勾选 2. ping localhost ping 127.0.0.1 如果显示 ::1 则需要禁用ipv6 在注册表 找到并单击下面的注册表子项: HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services\Tcpip6\Parameters\ 双击“DisabledComponents”以修…

Git2023最新版下载与安装教程(Windows版)

Windows版Git下载与安装教程 1. 下载Git2. 安装Git3. 配置Git 1. 下载Git 打开Git官网下载地址:https://git-scm.com/downloads 点击Download for Windows 选择git版本进行下载 2. 安装Git 双击安装包 点击Next 选择Git的安装路径,点击Next 选择…

【Html】js+css实现平滑滚动

效果 示例 <!DOCTYPE html> <html><head><title> Document </title><style>button{bottom: 0;position: fixed;z-index: 999;left: 0;background: rgb(94, 171, 255);border: 1px red;color: white;font-size: large;font-family: ;}img{…

C++、Redis读取base64格式的图像记录

C、Redis读取base64格式的图像记录 一、案例需求 1.另一台电脑利用C#和Redis将图像数据按照base64格式&#xff0c;存储在某一个key中 2.本机需要使用C和Redis将图像数据获取到&#xff0c;并写入本地。 环境&#xff1a;Ubuntu20、Redis、QT 二、Qt中的Pro文件配置 QT中的…

深度学习记录1(线性回归的实现)

1、整体思路 根据线性回归的定义&#xff0c; &#xff0c;建立线性回归模型&#xff0c;在损失函数的计算上&#xff0c;采用L2 Loss&#xff08;均方误差&#xff09;。同时&#xff0c;对于模型的优化采用随机梯度下降。 2、详细代码分析 import random import torch from…

Day7——Web安全基础下

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 回顾前言一、owasp top 10漏洞&#xff08;了解&#xff09;&#xff08;四年一更&#xff09;1.访问控制崩溃2.敏感数据暴露3.sql注入4.不安全的设计5.安全配置不当…

【单片机】STM32单片机的各个定时器的定时中断程序,标准库

文章目录 定时器1_定时中断定时器2_定时中断定时器3_定时中断定时器4_定时中断定时器5_定时中断 高级定时器和普通定时器的区别&#xff08;https://zhuanlan.zhihu.com/p/557896041&#xff09;&#xff1a; 定时器1_定时中断 TIM1是高级定时器&#xff0c;使用的时钟总线是R…

使用Megascans,Blender和Substance 3D画家创建渔人旅馆(p2)

今天云渲染小编接着Polina Tarakanova分享的Fishermans Inn项目上篇分享&#xff0c;下篇主要是纹理和材料、组装场景、照明等方面的分享。 纹理和材料 随着酒馆的模块化建设完成&#xff0c;是时候进入贴图阶段了。我使用Substance 3D Painter进行了所有的贴图工作。在我的场…

【网站创建】网络杂谈(6)之web网站的创建

涉及知识点 如何创建web网站&#xff0c;web网站创建的步骤&#xff0c;手把手教你如何搭建web网站&#xff0c;web网站创建的过程&#xff0c;深入了解web网站创建。 原创于&#xff1a;CSDN博主-《拄杖盲学轻声码》&#xff0c;更多内容可去其主页关注下哈&#xff0c;不胜感…

基于Java+SpringBoot+Vue的计算机类考研交流平台设计与实现

博主介绍&#xff1a;擅长Java、微信小程序、Python、Android等&#xff0c;专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;&#x1f3fb; 不然下次找不到哟 Java项目精品实战案例…

React-View-UI组件库封装Loading加载中源码

目录 组件介绍Loading API能力组件源码组件测试源码组件库线上地址 组件介绍 Loading组件是日常开发用的很多的组件&#xff0c;这次封装主要包含两种状态的Loading&#xff0c;旋转、省略号&#xff0c;话不多说先看一下组件的文档页面吧&#xff1a; 正在上传…重新上传取…

掌握imgproc组件:opencv-图像变换

图像变换 1. 基于OpenCV的边缘检测1.1 边缘检测的一般步骤1.2 canny算子1.2.1 Canny边缘检测步骤&#xff1a;1.2.2 Canny边缘检测&#xff1a;Canny()函数1.2.3 Canny边缘检测案例 1.3 sobel算子1.3.1 sobel算子的计算过程1.3.2 使用Sobel算子&#xff1a;Sobel()函数1.3.3 示…

模拟高并发下RabbitMQ的削峰作用

在并发量很高的时候&#xff0c;服务端处理不过来客户端发的请求&#xff0c;这个时候可以使用消息队列&#xff0c;实现削峰。原理就是请求先打到队列上&#xff0c;服务端从队列里取出消息进行处理&#xff0c;处理不过来的消息就堆积在消息队列里等待。 可以模拟一下这个过…

生态+公链:中创面向未来的区块链建设!

未来的区块链市场&#xff0c;一定属于能够将区块链技术与应用完美结合在一起的产品。从互联网的发展历程来看&#xff0c;最后的竞争往往会集中到生态与兼容性。 如何将区块链的落地和应用更加有机地结合在一起&#xff0c;从而让区块链的功能和作用得到最大程度的发挥&#…

机器学习8:特征组合-Feature Crosses

特征组合也称特征交叉&#xff08;Feature Crosses&#xff09;&#xff0c;即不同类型或者不同维度特征之间的交叉组合&#xff0c;其主要目的是提高对复杂关系的拟合能力。在特征工程中&#xff0c;通常会把一阶离散特征两两组合&#xff0c;构成高阶组合特征。可以进行组合的…

css:去除input和textarea默认边框样式并美化

input input默认样式和focus样式 参考element-ui的css&#xff0c;可以实现如下效果 实现代码 <style>/* 去除默认样式 */input {border: none;outline: none;padding: 0;margin: 0;-webkit-appearance: none;-moz-appearance: none;appearance: none;background-im…

ElasticSearch 8.0+ 版本Windows系统启动

下载地址&#xff1a;https://www.elastic.co/cn/downloads/past-releases/winlogbeat-8-8-1 解压\elasticsearch\elasticsearch-8.5.1 进入bin目录&#xff0c;启动elasticsearch.bat 问题1&#xff1a; warning: ignoring JAVA_HOMED:\jdk1.8.0_271; using bundled JDK J…

使用凌鲨连接SSH服务器

SSH&#xff08;Secure Shell&#xff09;是一种加密的网络协议&#xff0c;用于安全地连接远程服务器。它提供了一种安全的通信方式&#xff0c;使得用户可以在不受干扰的情况下远程访问服务器。SSH协议的加密技术可以保护用户的登录信息和数据传输过程中的安全性。 SSH对于服…