【Pyspark-驯化】一文搞定spark的代码执行原理和使用技巧
本次修炼方法请往下查看
🌈 欢迎莅临我的个人主页 👈这里是我工作、学习、实践 IT领域、真诚分享 踩坑集合,智慧小天地!
🎇 相关内容文档获取 微信公众号
🎇 相关内容视频讲解 B站
🎓 博主简介:AI算法驯化师,混迹多个大厂搜索、推荐、广告、数据分析、数据挖掘岗位 个人申请专利40+,熟练掌握机器、深度学习等各类应用算法原理和项目实战经验。
🔧 技术专长: 在机器学习、搜索、广告、推荐、CV、NLP、多模态、数据分析等算法相关领域有丰富的项目实战经验。已累计为求职、科研、学习等需求提供近千次有偿|无偿定制化服务,助力多位小伙伴在学习、求职、工作上少走弯路、提高效率,近一年好评率100% 。
📝 博客风采: 积极分享关于机器学习、深度学习、数据分析、NLP、PyTorch、Python、Linux、工作、项目总结相关的实用内容。
🌵文章目录🌵
- 🎯 1.基本介绍
- 💡 2. spark架构图
- 💡 3.spark进行DAG/TASK任务调度流程
- 💡 4. DAGScheduler的具体流程
下滑查看解决方法
🎯 1.基本介绍
spark作为在hadoop的基础上进行性能的优化,其生态图也基本和hadoop相似,而不同之处在于spark的计算准则是基于RDD进行设计DAG的,而hadoop则是通过MR来进行的,下面为spark的生态图:
- Spark Core:包含Spark的基本功能;尤其是定义RDD的API、操作以及这两者上的动作。其他Spark的库都是构建在RDD和Spark Core之上的。
- Spark SQL:提供通过Apache Hive的SQL变体Hive查询语言(HiveQL)与Spark进行交互的API。每个数据库表被当做一个RDD,Spark SQL查询被转换为Spark操作。Spark提供的sql形式的对接Hive、JDBC、HBase等各种数据渠道的API,用Java开发人员的思想来讲就是面向接口、解耦合,ORMapping、Spring Cloud Stream等都是类似的思想。
- Spark Streaming:基于SparkCore实现的可扩展、高吞吐、高可靠性的实时数据流处理。支持从Kafka、Flume等数据源处理后存储到HDFS、DataBase、Dashboard中。对实时数据流进行处理和控制。Spark Streaming允许程序能够像普通RDD一样处理实时数据。
- MLlib:一个常用机器学习算法库,算法被实现为对RDD的Spark操作。这个库包含可扩展的学习算法,比如分类、回归等需要对大量数据集进行迭代的操作。
- GraphX:控制图、并行图操作和计算的一组算法和工具的集合。GraphX扩展了RDD API,包含控制图、创建子图、访问路径上所有顶点的操作
💡 2. spark架构图
由于hadoop在运行的过程中是基于磁盘进行操作的,无论是MapReduce还是YARN都是将数据从磁盘中加载出来,经过DAG,然后重新写回到磁盘中,计算过程的中间数据又需要写入到HDFS的临时文件,这些都使得Hadoop在大数据运算上表现太“慢”,而Spark则基于内存进行DAG操作,具体执行框架图如下所示:
-
上图中,sparkcontext程序向集群控制中心cluster申请计算和资源,而cluster将设计好的dag配送给各个worker几点进行分配计算,具体的运行流程如下:
-
构建Spark Application的运行环境,启动SparkContext
-
SparkContext向资源管理器(可以是Standalone,Mesos,Yarn)申请运行Executor资源,并启动StandaloneExecutorbackend,
-
Executor向SparkContext申请Task
-
SparkContext将应用程序分发给Executor
-
SparkContext构建成DAG图,将DAG图分解成Stage、将Taskset发送给Task Scheduler,最后由Task Scheduler将Task发送给Executor运行
-
Task在Executor上运行,运行完释放所有资源
💡 3.spark进行DAG/TASK任务调度流程
spark进行任务调度的流程如下所示:
- 用户编排的代码由一个个的RDD Objects组成,DAGScheduler负责根据RDD的宽依赖拆分DAG为一个个的Stage,而每个Stage包含一组逻辑完全相同的可以并发执行的Task。TaskScheduler负责将Task推送给从ClusterManager那里获取到的Worker启动的Executor。
💡 4. DAGScheduler的具体流程
DAG负责的是将RDD中的数据依赖划分为不同可以并行的宽依赖task, 这些不同的task集合统称为stage,最后将这些stage推送给TaskScheduler进行调度,DAG的具体划分过程如下所示:
窄依赖经历的是map、filter等操作没有进行相关的shuffle,而宽依赖则通常都是join等操作需要进行一定的shuffle意味着需要打散均匀等操作
- 1 stage是触发action的时候
从后往前划分
的,所以本图要从RDD_G开始划分。 - 2 RDD_G依赖于RDD_B和RDD_F,随机决定先判断哪一个依赖,但是对于结果无影响。
- 3 RDD_B与RDD_G属于窄依赖,所以他们属于同一个stage,RDD_B与老爹RDD_A之间是宽依赖的关系,所以他们不能划分在一起,所以RDD_A自己是一个stage1
- 4 RDD_F与RDD_G是属于宽依赖,他们不能划分在一起,所以最后一个stage的范围也就限定了,RDD_B和RDD_G组成了Stage3
- 5 RDD_F与两个爹RDD_D、RDD_E之间是窄依赖关系,RDD_D与爹RDD_C之间也是窄依赖关系,所以他们都属于同一个stage2
- 6 执行过程中stage1和stage2相互之间没有前后关系所以可以并行执行,相应的每个stage内部各个partition对应的task也并行执行
- 7 stage3依赖stage1和stage2执行结果的partition,只有等前两个stage执行结束后才可以启动stage3.
- 8 我们前面有介绍过Spark的Task有两种:ShuffleMapTask和ResultTask,其中后者在DAG最后一个阶段推送给Executor,其余所有阶段推送的都是ShuffleMapTask。在这个案例中stage1和stage2中产生的都是ShuffleMapTask,在stage3中产生的ResultTask。
- 9 虽然stage的划分是从后往前计算划分的,但是依赖逻辑判断等结束后真正创建stage是从前往后的。也就是说如果从stage的ID作为标识的话,先需要执行的stage的ID要小于后需要执行的ID。就本案例来说,stage1和stage2的ID要小于stage3,至于stage1和stage2的ID谁大谁小是随机的,是由前面第2步决定的。