17. Spark执行流程
17.1 创建SparkContext
使用spark-submit脚本,会启动SparkSubmit进程,然后通过反射调用我们通过--class传入类的main方法,在main方法中,就行我们写的业务逻辑了,先创建SparkContext,向Master申请资源,然后Master跟Worker通信,启动Executor,然后所有的Executor向Driver反向注册
17.2 创建RDD并构建DAG
DAG(Directed Acyclic Graph)叫做有向无环图,是的一系列RDD转换关系的描述,原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。
依赖关系划分为两种:窄依赖(Narrow Dependency)和 宽依赖(源码中为Shuffle Dependency)。
窄依赖指的是父 RDD 中的一个分区最多只会被子RDD 中的一个分区使用,意味着父RDD的一个分区内的数据是不能被分割的,子RDD的任务可以跟父RDD在同一个Executor一起执行,不需要经过 Shuffle 阶段去重组数据。
窄依赖包括两种:一对一依赖(OneToOneDependency)和范围依赖(RangeDependency)
一对一依赖
宽依赖指的是父 RDD 中的分区可能会被多个子 RDD 分区使用。因为父 RDD 中一个分区内的数据会被分割,发送给子 RDD 的多个分区,因此宽依赖也意味着父 RDD 与子 RDD 之间存在着 Shuffle 过程。
宽依赖只有一种:Shuffle依赖(ShuffleDependency)
17.3 切分Stage,生成Task和TaskSet
触发Action,会根据最后一个RDD,从后往前推,如果是窄依赖(没有shuffle),继续往前推,如果是宽依赖(有shuffle),那么会递归进去,然后再根据递归进去的最后一个RDD进行向前推,如果一个RDD再也没有父RDD(递归出口),那么递归出来划分Stage(DAGScheduler完成的以上工作)
17.4 将Task调度到Executor
划分完Stage后,DAGScheduler将根据Stage的类型,生成Task,然后将同一个Stage的多个计算逻辑相同的Task放入到同一个TaskSet中,然后向DAGScheduler将TaskSet传递给TaskScheduler,TaskScheduler会根据Executor的的资源情况,然后将Task序列化发送给Executor
17.5 在Executor中执行Task
Executor接收到TaskScheduler发送过来的Task后,将其反序列化,然后使用一个实现了Runnable接口的包装类进行包装,最后将包装的Task丢入到线程池,一旦丢入到线程池,run方法会执行,run方法会调用Task对应的迭代器链进行迭代数据
- Job:RDD每一个行动操作都会生成一个或者多个调度阶段 调度阶段(Stage):每个Job都会根据依赖关系,以Shuffle过程作为划分,分为Shuffle Map Stage和Result Stage。每个Stage对应一个TaskSet,一个Task中包含多Task,TaskSet的数量与该阶段最后一个RDD的分区数相同。
- Task:分发到Executor上的工作任务,是Spark的最小执行单元
- DAGScheduler:DAGScheduler是将DAG根据宽依赖将切分Stage,负责划分调度阶段并Stage转成TaskSet提交给TaskScheduler
- TaskScheduler:TaskScheduler是将Task序列化然后发送到Worker下的Exexcutor进程,在Executor中,将Task反序列化,然后使用实现Runable接口的包装类包装,最后丢入到Executor的线程池的中进行执行
18. shuffle 过程详解
18.1 spark shuffle 演进的历史
Spark 0.8及以前 Hash Based Shuffle
Spark 0.8.1 为Hash Based Shuffle引入File Consolidation机制
Spark 0.9 引入ExternalAppendOnlyMap
Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle
Spark 1.2 默认的Shuffle方式改为Sort Based Shuffle
Spark 1.4 引入Tungsten-Sort Based Shuffle
Spark 1.6 Tungsten-sort并入Sort Based Shuffle
Spark 2.0 Hash Based Shuffle退出历史舞台
18.2 HashShuffleManager(已不再使用)
- 优化前的
在shuffle write前,应用分区器,根据对应的分区规则,计算出数据partition编号,然后将数据写入bucket内存中,当数据达到一定大小或数据全部处理完后,将数据溢写持久化。之所以要持久化,一方面是要减少内存存储空间压力,另一方面也是为了容错降低数据恢复的代价。
上图有2个Executor,每个Executor有1个core,total-executor-cores为数为 2,每个 task 的执行结果会被溢写到本地磁盘上。每个 task 包含 R 个缓冲区,R = reducer 个数(也就是下一个 stage 中 task 的个数),缓冲区被称为 bucket,其大小为spark.shuffle.file.buffer.kb ,默认是 32KB。
其实bucket表示缓冲区,即ShuffleMapTask 调用分区器的后数据要存放的地方。
ShuffleMapTask 的执行过程:先根据 pipeline 的计算逻辑对数据进行运算,然后根据分区器计算出每一个record的分区编号。每得到一个 record 就将其送到对应的 bucket 里,具体是哪个 bucket 由partitioner.getPartition(record.getKey()))决定。每个 bucket 里面的数据会满足溢写的条件会被溢写到本地磁盘上,形成一个 ShuffleBlockFile,或者简称 FileSegment。之后的 下游的task会根据分区会去 fetch 属于自己的 FileSegment,进入 shuffle read 阶段。
老版本的HashShuffleManager存在的问题:
1.产成的 FileSegment 过多。每个 ShuffleMapTask 产生 R(下游Task的数量)个 FileSegment,M 个 ShuffleMapTask 就会产生 M * R 个文件。一般 Spark job 的 M 和 R 都很大,因此磁盘上会存在大量的数据文件。
2.缓冲区占用内存空间大。每个 ShuffleMapTask 需要开 R 个 bucket,M 个 ShuffleMapTask 就会产生 M * R 个 bucket。虽然一个 ShuffleMapTask 结束后,对应的缓冲区可以被回收,但一个 worker node 上同时存在的 bucket 个数可以达到 cores R 个,占用的内存空间也就达到了cores * R * 32 KB。对于 8 核 1000 个 reducer 来说,占用内存就是 256MB。
- 优化后的:
可以明显看出,在一个core上连续执行的ShuffleMapTasks可以共用一个输出文件 ShuffleFile。先执行完的 ShuffleMapTask 形成ShuffleBlock i,后执行的 ShuffleMapTask可以将输出数据直接追加到ShuffleBlock i后面,形成ShuffleBlock’,每个ShuffleBlock被称为FileSegment。下一个stage的reducer只需要fetch整个 ShuffleFile就行了。这样每个Executor持有的文件数降为cores*R。
FileConsolidation 功能可以通过spark.shuffle.consolidateFiles=true来开启。
18.3 SortShuffleManager
- BypassMergeSortShuffleWriter
使用这种ShuffleWriter的条件是:
(1) 没有map端的聚合操作
(2) 分区数小于参数:spark.shuffle.sort.bypassMergeThreshold,默认是200
BypassMergeSortShuffleWriter 算法适用于没有聚合,数据量不大的场景。 给每个分区分配一个临时文件,对每个 record 的 key 使用分区器(模式是hash,如果用户自定义就使用自定义的分区器)找到对应分区的输出文件并写入文件对应的文件。
因为写入磁盘文件是通过 Java的 BufferedOutputStream 实现的,BufferedOutputStream 是 Java 的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘 IO 次数,提升性能。所以图中会有内存缓冲的概念。
- UnsafeShuffleWriter
使用这种ShuffleWriter的条件是:
- Serializer 支持 relocation。Serializer 支持 relocation 是指,Serializer 可以对已经序列化的对象进行排序,这种排序起到的效果和先对数据排序再序列化一致。支持 relocation 的 Serializer 是 KryoSerializer,Spark 默认使用 JavaSerializer,通过参数 spark.serializer 设置;
- 没有指定 aggregation 或者 key 排序, 因为 key 没有编码到排序指针中,所以只有 partition 级别的排序。
- partition 数量不能大于指定的阈值(2^24),因为 partition number 使用24bit 表示的。即不能大于PackedRecordPointer.MAXIMUM_PARTITION_ID + 1
UnsafeShuffleWriter 将 record 序列化后插入sorter,然后对已经序列化的 record 进行排序,并在排序完成后写入磁盘文件作为 spill file,再将多个 spill file 合并成一个输出文件。在合并时会基于 spill file 的数量和 IO compression codec 选择最合适的合并策
- SortShuffleWriter
若以上两种ShuffleWriter都不能选择,则使用SortShuffleWriter类,SortShuffleWriter也是相对比较常用的一种ShuffleWriter。
1.SortShuffleWriter会先把数据先写入到内存中,并会尝试扩展内存大小,若内存不足,则把数据持久化到磁盘上。
2.SortShuffleWriter在把数据写入磁盘时,会按分区ID进行合并,并对key进行排序,然后写入到该分区的临时文件中。
3.SortShuffleWriter最后会把前面写的分区临时文件进行合并,合并成一个文件,也就是说,会在map操作结束时把各个分区文件合并成一个文件。这样做可以有效的减少文件个数,和为了维护这些文件而产生的资源消耗。
在进行 shuffle 之前,map 端会先将数据进行排序。排序的规则,根据不同的场景,会分为两种。首先会根据 Key 将元素分成不同的 partition。第一种只需要保证元素的 partitionId 排序,但不会保证同一个 partitionId 的内部排序。第二种是既保证元素的 partitionId 排序,也会保证同一个 partitionId 的内部排序。
接着,往内存写入数据,每隔一段时间,当向 MemoryManager 申请不到足够的内存时,或者数据量超过 spark.shuffle.spill.numElementsForceSpillThreshold 这个阈值时 (默认是 Long 的最大值,不起作用),就会进行 Spill 内存数据到文件,然后清空内存数据结构。假设可以源源不断的申请到内存,那么 Write 阶段的所有数据将一直保存在内存中,由此可见,PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 是比较吃内存的。
在溢写到磁盘文件之前,会先根据 key 对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的 batch 数量是 10000 条,也就是说,排序好的数据,会以每批 1 万条数据的形式分批写入磁盘文件。写入磁盘文件也是通过 Java 的 BufferedOutputStream 实现的。
一个 task 将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。在将最终排序结果写入到数据文件之前,需要将内存中的 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 和已经 spill 到磁盘的 SpillFiles 进行合并。
此外,由于一个 task 就只对应一个磁盘文件,也就意味着该 task 为下游 stage 的 task 准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个 task 的数据在文件中的 start offset 与 end offset。
19. Spark思考题
- 1.SparkContext哪一端生成的?
Driver端即SparkContext(Driver是一个统称,里面还DAGScheduler、TaskScheduler、ShuffleManager、BroadcastManager等)
- 2.DAG是在哪一端被构建的?
Drvier端
- 3.RDD是在哪一端创建的?
Driver端,RDD不装真正要计算的数据,而是记录了数据的描述信息(以后从哪里读数据,怎么计算)
- 6.调用RDD的算子(Transformation和Action)是在哪一端调用的
Driver端
- 7.RDD在调用Transformation和Action时需要传入一个函数,函数是在哪一端声明【定义】和传入的?
Driver端
- 6.RDD在调用Transformation和Action时需要传入函数,请问传入的函数是在哪一端执行了函数的业务逻辑?
Executor中的Task指定的
- 9.Task是在哪一端生成的呢?
Driver端,Task分为ShuffleMapTask和ResultTask
- 10.DAG是在哪一端构建好的并被切分成一到多个Stage的
Driver
- 11.DAG是哪个类完成的切分Stage的功能?
DAGScheduler
- 12.DAGScheduler将切分好的Task以什么样的形式给TaskScheduler
TaskSet
- 13.分区器这个类是在哪一端实例化的?
Driver端
- 14.分区器中的getParitition方法在哪一端调用的呢?
Executror中的Task
- 15.广播变量是在哪一端调用的方法进行广播的?
Driver端
- 16.要广播的数据应该在哪一端先创建好再广播呢?
Driver端
- 17.广播变量以后能修改吗?
不能修改
- 18.广播变量广播到Executor后,一个Executor进程中有几份广播变量的数据
一份全部广播的数据
19.广播变量如何释放
调用广播变量返回到Driver端的引用的unpersist()方法进行释放