spark应用程序的执行

news2024/11/23 15:44:24

1 SparkContext -》{
sparkconf --配置对象,基础配置
sparkEnv --环境对象,通讯环境
SchedulerBackend --通讯后台 住哟啊用于和Executor之间进行通讯
TaskScheduler – 任务调度器 任务调度
DAGScheduler – 阶段调度器 阶段划分
}

spark.sparkContext.textFile("")
  .flatMap(x => {
    x.split(",")
  }).groupBy(x => x).map { case (word, list) => {
  (word, list.size)
}
}.collect()

1.rdd依赖

1.value1=sc.textFile()

val rdd2=rdd1.flatMap [
new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.flatMap(cleanF)) --外层的rdd把this(上一层的rdd包进去了,该对象依赖于this)

MapPartitionsRDD extends RDDU 【 rdd点进去
def this(@transient oneParent: RDD[_]) =
this(oneParent.context, List(new OneToOneDependency(oneParent))) --窄依赖传进去,rdd1为rdd2的父依赖

]

rdd2.groupby[
groupBy[
groupByKey[
combineByKeyWithClassTag[
ShuffledRDD extends RDD[(K, C)](prev.context, Nil) 【–Nil未传入依赖,这里用的是默认值
getDependencies 【
List(new ShuffleDependency


]
]
]
]
在这里插入图片描述

**

2.阶段划分 spark中阶段的划分等于shuffle依赖的数量+1

**
点击collect【
runJob【
runJob【
dagScheduler.runJob【
submitJob{
eventProcessLoop.post(JobSubmitted(
点post(eventQueue.put(event) --将事件放进去事件队列)[
eventThread { --该线程将事件取出来
val event = eventQueue.take()
try {
onReceive(event)} --实现类DAGSchedulerEventProcessLoop(
doOnReceive(event){
dagScheduler.handleJobSubmitted【–进行阶段的划分
createResultStage—创建结果阶段
val parents = getOrCreateParentStages(rdd, jobId)-》{–获取或创建上级阶段
getShuffleDependencies(rdd)->{
toVisit.dependencies --判断rdd中的依赖是否是shuffle依赖
}.map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)–获取或创建shuffleMap阶段,写磁盘之前的阶段(
createShuffleMapStage-》{

getOrCreateParentStages //判断是否还有shuffle依赖
new ShuffleMapStage
}

}.toList
}
new ResultStage


}

]
}



  1. task任务的切分,task总共任务是每个阶段最后一个rdd分区数量之和

回到方法handleJobSubmitted{
val job = new ActiveJob 阶段划分后,提交job

submitStage(finalStage)–提交阶段{
getMissingParentStages(stage)–看你有没有上一级的阶段
没有的话就
submitMissingTasks(stage, jobId.get)–提交任务{
val tasks: Seq[Task[_]] = 创建task{

case stage: ShuffleMapStage =>
partitionsToCompute-》{
= stage.findMissingPartitions()
}

然后返回点击 ShuffleMapStage -》{
override def findMissingPartitions(): Seq[Int] = {
mapOutputTrackerMaster
.findMissingPartitions(shuffleDep.shuffleId)
.getOrElse(0 until numPartitions) --看看
}
}
}
}

}
task数总共6个
在这里插入图片描述
4.任务的调度
类名:DAGScheduler
val tasks: Seq[Task[_]]

  taskScheduler.submitTasks(new TaskSet(
    tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))

TaskSchedulerImpl 继承taskScheduler 并实现其submitTasks方法 {

val manager = createTaskSetManager(taskSet, maxTaskFailures)-【

new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt) --封装了任务管理器

schedulableBuilder.addTaskSetManager --schedulableBuilder是一个调度器,根据调度模式生成不同的调度器(
实现类FIFOSchedulableBuilder:的该方法
rootPool.addSchedulable(manager),将任务管理器放到任务池里边

backend.reviveOffers() --取消息点进去实现类CoarseGrainedSchedulerBackend (
case ReviveOffers =>
makeOffers()-》【–得到任务的描述信息
scheduler.resourceOffers(workOffers) --取任务-》{
val sortedTaskSets = rootPool.getSortedTaskSetQueue.filterNot(_.isZombie)》{–得到排过序的taskset
schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator) --根据不同的调度策略使用不同的调度算法
}

for (currentMaxLocality <- taskSet.myLocalityLevels) --数据本地化,根据数据本地性,决定将数据和计算是否发送到同一个节点,移动数据不如移动计算
}
if (taskDescs.nonEmpty) {
launchTasks(taskDescs)–如果任务不为空,启动任务 -》{
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) --从任务池中取到的序列化后的任务,找到对应的executor的终端,发送启动任务的消息
}
}


}

5

5.任务的执行

CoarseGrainedExecutorBackend收到启动任务的消息
receive -》{
case LaunchTask(data) =>
val taskDesc = TaskDescription.decode(data.value)–反序列化
executor.launchTask–开始执行 -》【
val tr = new TaskRunner(context, taskDescription) --计算对象中有一个线程池 -》(
val res = task.run --点进去{
runTask
}

runningTasks.put(taskDescription.taskId, tr)
threadPool.execute(tr)

}
在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/555778.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

贪心算法专练

⭐️前言⭐️ 本篇文章主要分享几道贪心算法的题目&#xff0c;贪心算法是一种基于自然智慧的算法&#xff0c;这类题目并没有统一的解法&#xff0c;但通常都是每一步做出一个局部最优的选择&#xff0c;最终的结果就是全局最优。 &#x1f349;欢迎点赞 &#x1f44d; 收藏 …

图神经网络:(化学领域)再次认识图神经网络

文章说明&#xff1a; 1)参考资料&#xff1a;PYG官方文档。超链。 2)博主水平不高&#xff0c;如有错误还望批评指正。 3)我在百度网盘上传了这篇文章的jupyter notebook和有关文献。超链。提取码8848。 文章目录 Mutagenicity数据集搭建模型训练模型文献阅读重新回来 Mutagen…

day36_JQuery

今日内容 零、 复习昨日 一、正则表达式 二、JQuery 零、 复习昨日 零、正则表达式 Regular expression RegExp 0.1 正则表达式 正则表达式是描述字符模式的对象。正则表达式用于对字符串模式匹配及检索替换&#xff0c;是对字符串执行模式匹配的强大工具。语法&#xff1a; va…

京东云技术团队 —— 浅谈测试用例设计

一、测试用例为什么存在 1.1 定义 测试用例(Test Case)是指对特定的软件产品进行测试任务的描述&#xff0c;体现测试方案、方法、技术和策略。测试用例内容包括测试目标、测试环境、输入数据、测试步骤、预期结果、测试脚本等&#xff0c;最终形成文档类的输出。简而言之&am…

04. 数据结构之栈

前言 栈&#xff08;stack&#xff09;是一种线性数据的逻辑存储结构。栈中的元素只能先入后出&#xff08;First In Last Out&#xff0c;简称FILO&#xff09;。最早进入的元素存放的位置叫作栈底&#xff08;bottom&#xff09;&#xff0c;最后进入的元素存放的位置叫作栈…

网络故障老搞不定,就看这篇笔记

大家好&#xff0c;我是老杨。 做咱们这行&#xff0c;每天遇到的故障千奇百怪什么都有。很多网工每天只是在工作而已&#xff0c;遇到一个问题&#xff0c;就解决一个问题&#xff0c;每天的日子都是一样的&#xff0c;枯燥无趣。 但是&#xff0c;就很少有人去汇总问题&…

分布式事务的21种武器 - 1

在分布式系统中&#xff0c;事务的处理分布在不同组件、服务中&#xff0c;因此分布式事务的ACID保障面临着一些特殊难点。本系列文章介绍了21种分布式事务设计模式&#xff0c;并分析其实现原理和优缺点&#xff0c;在面对具体分布式事务问题时&#xff0c;可以选择合适的模式…

Scala学习(五)---面向对象

文章目录 1.Scala面向对象的构造器1.1 主构造器和从构造器(辅助构造器)1.2 主构造器参数 2.继承2.1 抽象属性和方法2.2 匿名子类 1.Scala面向对象的构造器 1.1 主构造器和从构造器(辅助构造器) //主构造器 class ConstructorTest(name:String) {//主构造器调用val name1:Stri…

【MyBatis框架】

文章目录 Mybatis1.简介1.1MyBatis历史1.2MyBatis特性1.3MyBatis下载1.4和其它持久化层技术对比 2.搭建MyBatis2.1创建maven工程2.2创建MyBatis的核心配置文件2.3创建mapper接口2.4创建实体类2.5创建MyBatis的映射文件2.6通过junit测试功能2.7加入log4j日志功能2.8MyBatis的增删…

pytorch的学习与总结(第二次组会)

pytorch的学习与总结 一、pytorch的基础学习1.1 dataset与dataloader1.2 可视化工具(tensorboard)、数据转换工具(transforms)1.3 卷积、池化、线性层、激活函数1.4 损失函数、反向传播、优化器1.5 模型的保存、加载、修改 二、 pytorch分类项目实现2.1 网络模型2.2 具体代码 一…

新星计划2023【《计算之魂》读书会】学习方向报名入口!

前排提醒&#xff1a;这里是新星计划2023【《计算之魂》读书会】学习方向的报名入口&#xff0c;一经报名&#xff0c;不可更换。 ↓↓↓报名方式&#xff1a;&#xff08;下滑到本页面底部&#xff09; 一、关于本学习方向导师 博客昵称&#xff1a;异步社区博客主页&#x…

AI大模型时代,云从科技携“从容大模型”入场如何“从容”?

5月18日&#xff0c;在“AI赋能数字中国产业论坛暨2023云从科技人机协同发布会”上&#xff0c;云从科技自研“从容大模型”正式亮相。 根据发布会信息&#xff0c;“从容大模型”具备问答、阅读理解、文学创作以及解题方面的能力。受发布会消息影响&#xff0c;5月18日午间休盘…

【libdatachannel】cmake+vs2022 构建

libdatachannel libdatachannel 是基于c++17实现的构建 OpenSSL 找不到 Selecting Windows SDK version 10.0.22000.0 to target Windows 10.0.22621. The CXX compiler identification is MSVC 19.35.32217.1 Detecting CXX compiler ABI info Detecting CXX compiler ABI inf…

利用GPIO线进行板间通信-23-5-22

本项目基于VU9P(xcvu9pflga2105)板卡以及ZYNQ(xc7z015clg485) 简单结构流程介绍&#xff1a; 1.上位机通过千兆网将指令下发到ZYNQ&#xff0c;ZYNQ进行解帧&#xff0c;将数据解析出来后存储到RAM中,RAM将数据不断输送给GPIO模块&#xff0c;GPIO模块根据对应地址输出数据是…

新来的00后实习生太牛了,已经被取代了.....

前几天有个朋友向我哭诉&#xff0c;说她在公司工作&#xff08;软件测试&#xff09;了7年了&#xff0c;却被一个00后实习生代替了&#xff0c;该何去何从&#xff1f; 这是一个值得深思的问题&#xff0c;作为职场人员&#xff0c;我们确实该思考&#xff0c;我们的工作会被…

1718_Linux命令模式下查看日历

全部学习汇总&#xff1a; GreyZhang/bash_basic: my learning note about bash shell. (github.com) 前面发布了一份学习笔记&#xff0c;涉嫌过渡宣传&#xff0c;虽然我也没搞懂为什么。有一系列修改建议&#xff0c;我觉得直接放弃了。还是发一份新的吧&#xff01; Linux命…

【数据结构】哈希底层结构

目录 一、哈希概念 二、哈希实现 1、闭散列 1.1、线性探测 1.2、二次探测 2、开散列 2.1、开散列的概念 2.2、开散列的结构 2.3、开散列的查找 2.4、开散列的插入 2.5、开散列的删除 3、性能分析 一、哈希概念 顺序结构以及平衡树中&#xff0c;元素关键码与其存储位…

如何用Postman做接口自动化测试?

本文适合已经掌握 Postman 基本用法的读者&#xff0c;即对接口相关概念有一定了解、已经会使用 Postman 进行模拟请求等基本操作。 工作环境与版本&#xff1a; Window 7&#xff08;64位&#xff09;Postman &#xff08;Chrome App v5.5.3&#xff09; P.S. 不同版本页面 U…

JAVA—实验4 继承、接口与多态

一、实验目的 掌握类的继承机制掌握接口的定义方法熟悉成员方法或构造方法多态性 二、实验内容 1&#xff0e;卖车&#xff0d;接口与多态编程 【问题描述】 (1) 汽车接口(Car)&#xff1a;有两个方法&#xff0c; getName()、getPrice()(接口源文件可以自己写&#xff0c;也…

2024总统大选,成为“关乎比特币未来的公投”?背后是怎样的政治抱负?

在今年的迈阿密比特币大会上&#xff0c;Robert F.Kennedy Jr和Vivek Ramaswamy相继发布声明表示&#xff0c;他们将在2024年初选前接受比特币&#xff08;BTC&#xff09;的捐款。 RFK Jr作为美国前总统约翰肯尼迪的侄子&#xff0c;是第一个公开接受Crypto的总统候选人&#…