Spark Core内核调度机制详解(第5天)

news2024/11/23 13:43:19

系列文章目录

  1. 如何构建DAG执行流程图 (掌握)
  2. 如何划分Stage阶段 (掌握)
  3. Driver底层是如何运转 (掌握)
  4. 确定需要构建多少分区(线程) (掌握)

文章目录

  • 系列文章目录
  • 引言
  • 一、Spark内核调度(掌握)
      • 1.1、内容概述
      • 1.2、RDD的依赖
      • 1.3、DAG和Stage
      • 1.4、Spark Shuffle
      • 1.5、Job调度流程
      • 1.6、Spark RDD并行度
  • 二. 常见面试题
      • 1.简单介绍下RDD宽依赖和窄依赖的区别
      • 2.简单介绍下DAG有向无环图如何划分Stage阶段的
      • 3.请描述下rdd中job的整个调度流程

引言

本文主要介绍了
1.RDD的依赖
2.DAG和Stage
3.Spark Shuffle
4.job调度流程(掌握)
5.Spark RDD并行度
帮助读者更好地理解其工作原理和优化方法。


一、Spark内核调度(掌握)

1.1、内容概述

Spark内核调度的任务:

  • 如何构建DAG执行流程图
  • 如何划分Stage阶段
  • Driver底层是如何运转
  • 确定需要构建多少分区(线程)

Spark内核调度的目的:尽可能用最少的资源高效地完成任务计算。

1.2、RDD的依赖

RDD依赖:一个RDD的形成可能是由一个或者多个RDD得到的,此时这个RDD和之前的RDD之间产生依赖关系。

在Spark中,RDD之间的依赖关系,主要有二种类型:

  • 窄依赖
  1. 作用: 能够让Spark程序并行计算。也就是一个分区数据计算出现问题以后,其他的分区计算不受到任何影响。
  2. 特点: 父RDD的分区和子RDD的分区关系是一对一的关系。也就是父RDD分区的数据会整个被下游子RDD的分区接收。

在这里插入图片描述

  • 宽依赖
  1. 作用: 划分Stage的重要依据。宽依赖也叫做Shuffle依赖。
  2. 特点: 父RDD的分区和子RDD的分区关系是一对多的关系,也就是父RDD的分区数据会被分成多份给到下游子RDD的多个分区所接收。
  3. 注意:为了避免数据不完整,如果有宽依赖,shuffle下游的其他操作,必须等待shuffle执行完成以后才能够继续执行。

在这里插入图片描述

  • 说明
    1. 在实际使用中,不需要纠结哪些算子会存在shuffle,以需求为目标。虽然shuffle的存在会影响一定的效率, 但是以完成任务为准则,该用那个算子,就使用那个算子即可,不要过分纠结
    2. 算子中一般以ByKey结尾的会发生shuffle另外是重分区算子也会发生shuffle。

1.3、DAG和Stage

  • DAG:有向无环图,主要描述一段执行任务,从开始一直往下走,不允许出现回调操作,Spark应用程序中,遇到一个Action算子,就会触发形成一个Job任务的产生。

  • 思考:对于每一个Job的任务,都会产生一个DAG执行流程图,那么这个流程图是如何形成的呢?

层级关系:
1- 一个application应用程序 -> 遇到一个Action算子,就会触发形成一个Job任务
2- 一个Job任务只有一个DAG有向无环图
3- 一个DAG有向无环图 -> 有多个Stage
4- 一个Stage -> 有多个Task线程

5- 一个RDD -> 有多个分区
6- 一个分区会被一个Task线程所处理
  • DAG执行流程图形成和Stage划分
    在这里插入图片描述
1- Spark应用程序遇到Action算子后,就会触发一个Job任务的产生。Job任务会将它所依赖的所有算子全部加载进来,形成一个Stage。

2- 接着从Action算子从后往前进行回溯,遇到窄依赖就将算子放在同一个Stage当中;如果遇到宽依赖,就划分形成新的Stage,最后一直回溯完成。
  • 细化剖析Stage内部的流程
    在这里插入图片描述
  • 默认并行度的值确认
因为是使用textFile读取HDFS上的文件,因此RDD分区数=max(文件的block块的数量, defaultMinPartition),继续需要知道defaultMinPartition的值是多少。

defaultMinPartition=min(spark.default.parallelism,2)取最小值。最终我们确认spark.default.parallelism的参数值就能够最终确认RDD的分区数有多少个。

spark.default.parallelism参数值确认过程如下:
1- 如果有父RDD,就取父RDD的最大分区数
2- 如果没有父RDD,根据集群模式进行取值:
   2.1- 本地模式:机器的最大CPU核数
   2.2- (了解)Mesos:默认是8
   2.3- 其他模式:所有执行节点上的核总数或2,以较大者为准

1.4、Spark Shuffle

  • Spark中shuffle的发展历程
1- 在1.1版本以前,Spark采用Hash shuffle (优化前 和 优化后)

2- 在1.1版本的时候,Spark推出了Sort Shuffle

3- 在1.5版本的时候,Spark引入钨丝计划(优化为主)

4- 在1.6版本的时候,将钨丝计划合并到sortShuffle中

5- 在2.0版本的时候,将Hash Shuffle移除,将Hash shuffle方案移植到Sort Shuffle

在这里插入图片描述

  • 未优化的Hash shuffle
    在这里插入图片描述

  • 存在的问题
    上游(map端)的每个Task会产生与下游Task个数相等的小文件个数。这种情况会导致上游有非常多的小文件。另外,下游(reduce端)来拉取文件的时候,会有大量的网络IO和磁盘IO过程,因为要打开和读取多个小文件。

  • 经过优化后的Hash shuffle
    在这里插入图片描述
    变成了由每个Executor进程产生与下游Task个数相等的小文件数。这样可以大量减小小文件的产生,以及降低下游拉取文件时候的网络IO和磁盘IO过程。

  • Sort shuffle
    在这里插入图片描述

Sort Shuffle分成了两种: 普通机制和bypass机制。具体使用哪种,由Spark底层决定。

普通机制的运行过程: 每个上游Task线程处理数据,数据处理完以后,先放在内存中。接着对内存中的数据进行分区、排序。将内存中的数据溢写到磁盘,形成一个个的小文件。溢写完成以后,会将多个小文件合并成一个大的磁盘文件。并且针对每个大的磁盘文件,会提供一个索引文件。接着是下游Task根据索引文件来读取相应的数据。

bypass机制: 就是在普通机制的基础上,省略了排序的过程

bypass机制的触发条件是:
1- 上游RDD的分区数量最多不能超过200个
2- 上游不能对数据进行提前聚合操作(因为提前聚合,需要先进行分组操作,而分组的操作实际上是有排序的操作)

1.5、Job调度流程

  • 主要是讨论:在Driver内部,是如何调度任务
    在这里插入图片描述
1- Driver进程启动后,底层PY4J创建SparkContext顶级对象。在创建该对象的过程中,还会创建另外两个对象,分别是: DAGScheduler和TaskScheduler
	DAGScheduler: DAG调度器。将Job任务形成DAG有向无环图和划分Stage的阶段
	TaskScheduler: Task调度器。将Task线程分配给到具体的Executor执行

2- 一个Spark程序遇到一个Action算子就会触发产生一个Job任务。SparkContext将Job任务给到DAG调度器,拿到Job任务后,会将Job任务形成DAG有向无环图和划分Stage的阶段。并且会确定每个Stage阶段有多少个Task线程,会将众多的Task线程放到TaskSet的集合中。DAG调度器将TaskSet集合给到Task调度器

3- Task调度器拿到TaskSet集合以后,将Task分配给到给到具体的Executor执行。底层是基于SchedulerBackend调度队列来实现的。

4- Executor开始执行任务。并且Driver会监控各个Executor的执行状态,直到所有的Executor执行完成,就认为任务运行结束

5- 后续过程和之前一样

1.6、Spark RDD并行度

  • 整个Spark应用中,影响并行度的因素有以下两个原因:
    1- 资源的并行度: Executor数量 和 CPU核心数 以及 内存的大小。
    2- 数据的并行度: Task的线程数 和 分区数量。
一般将Task线程数设置为CPU核数的2-3倍。另外每个线程分配3-5GB的内存资源。
  • 如何设置并行度
语法: SparkConf().set("spark.default.parallelism", "num")

说明: spark.default.parallelism该参数是SparkCore中的参数。
该参数只会影响shuffle以后的分区数量。
另外该参数对parallelize并行化本地集合创建的RDD不起作用。
  • 代码演示
# 导包
from pyspark import SparkContext, SparkConf
import os

# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
    # 6.1 TODO: 设置并行度
    conf = SparkConf().set('spark.default.parallelism', '6')

    # - 1.创建SparkContext对象
    sc = SparkContext(conf=conf)
    # - 2.数据输入
    textRDD = sc.textFile('file:///export/data/spark_project/spark_base/content.txt')
    # - 3.数据处理
    #   - 3.1文本内容切分
    flatMapRDD = textRDD.flatMap(lambda line: line.split(" "))
    #   - 3.2数据格式转换
    mapRDD = flatMapRDD.map(lambda word: (word, 1))

    # 6.2 TODO: shuffle之前查看分区数量
    print(f"shullfe之前分区数: {mapRDD.getNumPartitions()},分区内容:{mapRDD.glom().collect()}")

    #   - 3.3分组和聚合
    reduceRDD = mapRDD.reduceByKey(lambda agg, curr: agg + curr)
    
    # 6.3 TODO: shuffle之后查看分区数量
    print(f"shullfe之后分区数: {reduceRDD.getNumPartitions()},分区内容:{reduceRDD.glom().collect()}")

    # - 4.数据输出
    # print(reduceRDD.collect())
    # - 5.释放资源
    sc.stop()

二. 常见面试题

1.简单介绍下RDD宽依赖和窄依赖的区别

--- 窄依赖(Narrow Dependency)
1- 定义:每一个父RDD的分区最多被子RDD的一个分区所使用。这通常表现为一对一(OneToOneDependencies)或多对一(多个父RDD的分区对应于一个子RDD的分区)的关系。
2- 操作示例:map、filter、union等操作会产生窄依赖。这些操作的特点是,子RDD的每个分区都是基于父RDD的一个或多个特定分区计算得出的。
3- 容错处理:当窄依赖的子RDD数据丢失时,由于父RDD的一个分区只对应一个子RDD分区,因此只需要重新计算与子RDD分区对应的父RDD分区即可。
4- 优化特性:窄依赖可以进行流水线优化,利用fork/join机制,一个作业可以直接一个阶段完成,形成管道型的流水化处理。
--- 宽依赖(Wide Dependency 或 Shuffle Dependency)
1- 定义:子RDD的每一个分区都会使用所有父RDD的所有分区或多个分区。这通常表现为一对多(OneToManyDependencies)的关系。
2- 操作示例:groupByKey、reduceByKey、sortedByKey等操作会产生宽依赖。这些操作的特点是,子RDD的每个分区都可能依赖于父RDD的所有分区。
3- 容错处理:当宽依赖的子RDD数据丢失时,由于一个分区的数据通常由多个父RDD分区数据变换而来,因此可能需要重新计算父RDD的所有分区才能恢复数据。
4- 优化限制:宽依赖通常涉及shuffle操作,需要将数据写入磁盘并等待,因此无法形成管道型的流水化处理。
  • 总结
    数据流动:窄依赖的数据流动是线性的,而宽依赖的数据流动是跨多个分区的。
    容错处理:窄依赖的容错处理更为高效,只需要重新计算部分数据;而宽依赖的容错处理可能涉及大量数据的重新计算。
    优化特性:窄依赖可以进行流水线优化,而宽依赖通常不能。
    Shuffle操作:宽依赖通常伴随着shuffle操作,而窄依赖则没有。
    以上是对RDD宽依赖和窄依赖的简要介绍和区别分析。

2.简单介绍下DAG有向无环图如何划分Stage阶段的

  • 1- Spark应用程序遇到Action算子后,就会触发一个Job任务的产生。Job任务会将它所依赖的所有算子全部加载进来,形成一个Stage。

  • 2- 接着从Action算子从后往前进行回溯,遇到窄依赖就将算子放在同一个Stage当中;如果遇到宽依赖,就划分形成新的Stage,最后一直回溯完成。

3.请描述下rdd中job的整个调度流程

1- Driver进程启动后,底层PY4J创建SparkContext顶级对象。在创建该对象的过程中,还会创建另外两个对象,分别是: DAGScheduler和TaskScheduler
    DAGScheduler: DAG调度器。将Job任务形成DAG有向无环图和划分Stage的阶段
    TaskScheduler: Task调度器。将Task线程分配给到具体的Executor执行

2- 一个Spark程序遇到一个Action算子就会触发产生一个Job任务。SparkContext将Job任务给到DAG调度器,拿到Job任务后,会将Job任务形成DAG有向无环图和划分Stage的阶段。并且会确定每个Stage阶段有多少个Task线程,会将众多的Task线程放到TaskSet的集合中。DAG调度器将TaskSet集合给到Task调度器

3- Task调度器拿到TaskSet集合以后,将Task分配给到给到具体的Executor执行。底层是基于SchedulerBackend调度队列来实现的。

4- Executor开始执行任务。并且Driver会监控各个Executor的执行状态,直到所有的Executor执行完成,就认为任务运行结束

5- 后续过程和之前一样

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1841148.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

前端根据环境变量配置网页的title和favicon

前端根据环境变量配置网页的title和favicon 前言流程步骤一、设置environment文件二、在入口文件中配置三、删除index.html中的title和 icon link四、使用对应的打包命令进行部署 注意事项一、angular中,需要在angular.json添加favicon.ico额外的构建 前言 有些项目…

C++ | Leetcode C++题解之第167题两数之和II-输入有序数组

题目&#xff1a; 题解&#xff1a; class Solution { public:vector<int> twoSum(vector<int>& numbers, int target) {int low 0, high numbers.size() - 1;while (low < high) {int sum numbers[low] numbers[high];if (sum target) {return {low …

STM32通过I2C软件读写MPU6050

文章目录​​​​​​​ 1. MPU6050 1.1 运动学概念 1.2 工作原理 2. 参数 2.1 量程选择 2.2 I2C从机地址配置 3. 硬件电路 4. 框架图 5. 软件和硬件波形对比 6. 软件I2C读写MPU6050 6.1 程序整体构架 6.2 一些需要注意的点&#xff1a; 6.3 MPU6050初始化配置 6…

Vue--》从零开始打造交互体验一流的电商平台(三)

今天开始使用 vue3 + ts 搭建一个电商项目平台,因为文章会将项目的每处代码的书写都会讲解到,所以本项目会分成好几篇文章进行讲解,我会在最后一篇文章中会将项目代码开源到我的github上,大家可以自行去进行下载运行,希望本文章对有帮助的朋友们能多多关注本专栏,学习更多…

【2024】kafka streams的详细使用与案例练习(2)

目录 前言使用1、整体结构1.1、序列化 2、 Kafka Streams 常用的 API2.1、 StreamsBuilder2.2、 KStream 和 KTable2.3、 filter和 filterNot2.4、 map 和 mapValues2.5、 flatMap 和 flatMapValues2.6、 groupByKey 和 groupBy2.7、 count、reduce 和 aggregate2.8、 join 和 …

DSP28335:定时器

1.定时器介绍 1.1 定时器工作原理 TMS320F28335的CPU Time有三个&#xff0c;分别为Timer0&#xff0c;Timer1&#xff0c;Timer2&#xff0c;其中Timer2是为操作系统DSP/BIOS保留的&#xff0c;当未移植操作系统时&#xff0c;可用来做普通的定时器。这三个定时器的中断信号分…

RX8900/INS5A8900实时时钟-国产兼容RS4TC8900

该模块是一个符合I2C总线接口的实时时钟&#xff0c;包括一个32.768 kHz的DTCXO。 除了提供日历&#xff08;年、月、日、日、时、分、秒&#xff09;功能和时钟计数器功能外&#xff0c;该模块还提供了大量其他功能&#xff0c;包括报警功能、唤醒定时器功能、时间更新中断功能…

反激开关电源变压器设计1

特别注意&#xff1a;变压器计算出来的结果没有绝对的对与错 只要再全域范围内工作变压器不饱和就不能说变压器计算不对&#xff0c;&#xff08;输入全范围&#xff0c;输出全范围&#xff0c;温度度全范围&#xff09; 在变压器不饱和的情况下&#xff0c;只有优劣之分&…

数学建模基础:数学建模概述

目录 前言 一、数学建模的步骤 二、模型的分类 三、模型评价指标 四、常见的数学建模方法 实际案例&#xff1a;线性回归建模 步骤 1&#xff1a;导入数据 步骤 2&#xff1a;数据预处理 步骤 3&#xff1a;建立线性回归模型 步骤 4&#xff1a;模型验证 步骤 5&…

每日一练:攻防世界:简单的图片

这道题巨抽象&#xff01;巨抽象&#xff01;巨抽象&#xff01; 拿到图片&#xff0c;根据题目&#xff0c;尝试各种隐写方法。 这里就没思路了。查看WP。 根据题目的主办方&#xff1a;XSCTF。猜测XSCTF对应的是数字0&#xff0c;1&#xff0c;2&#xff0c;3&#xff0c;…

【WEB前端2024】3D智体编程:乔布斯3D纪念馆-第44课-骨骼动画

【WEB前端2024】3D智体编程&#xff1a;乔布斯3D纪念馆-第44课-骨骼动画 使用dtns.network德塔世界&#xff08;开源的智体世界引擎&#xff09;&#xff0c;策划和设计《乔布斯超大型的开源3D纪念馆》的系列教程。dtns.network是一款主要由JavaScript编写的智体世界引擎&…

使用docker离线制作es镜像,方便内网环境部署

1、自己在本地安装docker以及docker-compose 2、拉取elasticsearch镜像 docker pull elasticsearch:7.14.0 docker pull kibana:7.14.0 3、将拉取到的镜像打包到本地目录 docker save elasticsearch:7.14.0 -o /Users/yanjun.hou/es/elasticsearch-7.14.0.tar docker save kib…

Ps:快速添加签名或水印

一般情况下&#xff0c;建议使用矢量工具来创建签名或水印&#xff0c;这样可以保证签名图形任意缩放而不失真。但普通的摄影爱好者如果不太擅长使用矢量工具&#xff0c;可考虑下面的画笔预设法或动作法来给自己的照片添加签名&#xff0c;亦可满足日常出片需要。 ◆ ◆ ◆ …

GT_BERT文本分类

目录 GT-BERT结束语代码实现整个项目源码&#xff08;数据集模型&#xff09; GT-BERT 在为了使 BERT 模型能够得到广泛的应用,在保证模型分类准确率不降低的情况下,减少模型参数规模并降低时间复杂度,提出一种基于半监督生成对抗网络与 BERT 的文本分类模型 GT-BERT。模型的整…

DNS污染是什么?防止和清洗DNS污染的解决方案

在运营互联网业务中&#xff0c;通常会遇到各种各样的问题。其实DNS污染就是其中一个很严重的问题&#xff0c;它甚至会导致我们的业务中断&#xff0c;无法进行。今天就来了解一下DNS污染是什么&#xff1f;以及如何防止和清洗DNS污染。 什么是DNS&#xff1f; 首先我们要了解…

企业微信,机器人定时提醒

场景&#xff1a; 每天定时发送文字&#xff0c;提醒群成员事情&#xff0c;可以用机器人代替 人工提醒。 1&#xff09;在企业微信&#xff0c;创建机器人 2&#xff09;在腾讯轻联&#xff0c;创建流程&#xff0c;选择定时任务&#xff0c;执行操作&#xff08;企业微信机…

Qt利用Coin3D(OpenInventor)进行3d绘图

文章目录 1.安装1.1.下载coin3d1.2.下载quarter1.3.解压并合并 2.在Qt中使用3.画个网格4.加载wrl模型 1.安装 1.1.下载coin3d 首先&#xff0c;到官网下载[coin3d/coin] 我是Qt5.15.2vs2019的&#xff0c;因此我选择这个coin-4.0.2-msvc17-x64.zip 1.2.下载quarter 到官网…

milvus元数据解析工具milvusmetagui介绍使用

简介 milvusmetagui是一款用来对milvus的元数据进行解析的工具&#xff0c;milvus的元数据存储在etcd上&#xff0c;而且经过了序列化&#xff0c;通过etcd-manager这样的工具来查看是一堆二进制乱码&#xff0c;因此开发了这个工具对value进行反序列化解析。 在这里为了方便交…

arm-linux-strip 指令的作用

指令作用 arm-linux-strip 是一个用于从目标文件&#xff08;如可执行文件或对象文件&#xff09;中移除符号信息的工具。这些符号信息&#xff08;如函数名、变量名等&#xff09;在开发过程中很有用&#xff0c;因为它们允许调试器&#xff08;如 GDB&#xff09;确定内存地址…

安装cuda、cudnn、Pytorch(用cuda和cudnn加速计算)

写在前面 最近几个月都在忙着毕业的事&#xff0c;好一阵子没写代码了。今天准备跑个demo&#xff0c;发现报错 AssertionError: Torch not compiled with CUDA enabled 不知道啥情况&#xff0c;因为之前有cuda环境&#xff0c;能用gpu加速&#xff0c;看这个报错信息应该是P…