spark源码 spark on yarn环境的创建

news2025/1/13 7:40:00

1.入口类 sparkSubmit 的main方法 提交application

submit=new SparkSubmit
submit.doSubmit(args) ->
super.doSubmit(args):
parseArguments(args) :参数解析

   方法 中 new sparkSubmitArguments(args)   点进去该类(主要解析参数),然后找到parse(args.asJava):{ Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)"); 正则表达式对命令行参数做分解,得到参数名称和值,里边的handle用来做处理,这个handle实际执行的是SparkSubmitArguments 类中的handle方法
}
SparkSubmitArguments 中handle 中点击MASTER( 进入到 类SparkSubmitOptionParser 中的MASTER = "--master" 对应脚本中的参数)
然后点击 sparkSubmitArguments 中找action
action = Option(action).getOrElse(SUBMIT)  默认是提交

回到 SparkSubmit
发现 case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)的默认是提交
点进去 submit 发现

    if (args.isStandaloneCluster && args.useRest)else {
      doRunMain() --点进去
    }

 if (args.proxyUser != null) { 判断命令行参数是否有代理用户,没有 然后进入else
 else {
        runMain(args, uninitLog) --运行我们的主程序,点进去
      }

runMain 中

	**val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)** 准备提交的环境
	
	val loader = getSubmitClassLoader(sparkConf) 类加载器
	mainClass = Utils.classForName(childMainClass) 反射根据类的名称获得类的信息
	

	    
```scala
    val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {-- 该类是否继承SparkApplication(YarnClusterApplication 继承了),是的话反射创建该对象
      mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
    } else {
      new JavaMainApplication(mainClass)
    }

app.start(childArgs.toArray, sparkConf) --启动它
```
注意:prepareSubmitEnvironment中childMainClass 赋值的地方
	    childMainClass = YARN_CLUSTER_SUBMIT_CLASS(org.apache.spark.deploy.yarn.YarnClusterApplication)

3. YarnClusterApplication 提交启动ApplicationMaster

(new Client(new ClientArguments(args), conf, (RpcEnv)null)).run()
ClientArguments 点进去parseArgs 主要是解析参数
new Client点进去-》YarnClient.createYarnClient -》new YarnClientImpl(){
该对象中的变量 rmClient(yarn当中的资源调度节点)
}
此时,客户端已经创建好了,点击run方法

   submitApplication() 提交应用程序返回appid,点进去{
         launcherBackend.connect()
      yarnClient.init(hadoopConf)
      yarnClient.start() 客户端启动

val newApp = yarnClient.createApplication()--告诉resourceManager要创建应用
appId = newAppResponse.getApplicationId()  创建会有响应,得到全局id


  val containerContext = createContainerLaunchContext(newAppResponse) {--创建容器的启动环境
      val amClass =
  if (isClusterMode) {--如果是集群模式
    Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
  } else {
    Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
  }

      val amArgs =
  Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++
  Seq("--properties-file",
    buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) ++
  Seq("--dist-cache-conf",
    buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, DIST_CACHE_CONF_FILE))
    
    val commands(包装指令) = prefixEnv ++
  Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
  javaOpts ++ amArgs ++
  Seq(
    "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
    "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
    
amContainer.setCommands(printableCommands.asJava)  提交指令给RM(RM得到指令后,在NodeManger 中启动ApplicationMaster)

集群模式:amClass =org.apache.spark.deploy.yarn.ApplicationMaste

/bin/java   org.apache.spark.deploy.yarn.ApplicationMaster

}
      val appContext = createApplicationSubmissionContext(newApp, containerContext)--创建提交环境
 yarnClient.submitApplication(appContext) --建立RM的链接,等同于提交
点appContext进去 createApplicationSubmissionContext
      }

在这里插入图片描述

3.ApplicationMaster【进程】 根据参数启动Driver线程并初始化SparkContext

ApplicationMaster 点击进入伴生对象中的main方法  {
ApplicationMasterArguments 解析参数
master = new ApplicationMaster(amArgs, sparkConf, yarnConf){

new YarnRMClient()  点{
amClient: AMRMClient[ContainerRequest] = _  --applicationMaster连接RM之间的客户端}}

master.run() -》 runDriver {
userClassThread = startUserApplication(){-- 非常重要,启动用户的应用程序的线程
    val mainMethod = userClassLoader.loadClass(args.userClass)
      .getMethod("main", classOf[Array[String]])  类加载器找到用户的main方法
  
  mainMethod.invoke(null, userArgs.toArray)  调用main方法,初始化创建SparkContext
userThread.setName("Driver") 该线程命名为driver
userThread.start()
}
      val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
        Duration(totalWaitTime, TimeUnit.MILLISECONDS))  --线程的主射功能,等待sparkcontext的结果,上下文的坏境对象
        
sc.env.rpcEnv --通信环境

registerAM --注册AM,申请资源
createAllocator --分配器{
allocator =client.createAllocator( -- 这里边的客户端就是yarnRMClient,创建一个分配器
allocator.allocateResources() {点进去-- 得到可分配的资源
val allocatedContainers = allocateResponse.getAllocatedContainers() 得到可分配的资源

handleAllocatedContainers(allocatedContainers.asScala) {--处理可用于分配的容器,将申请到的容器进行分类,分布在不同的机架上,或者同一个NodeManager 首选位置

  runAllocatedContainers(containersToUse){--运行已分配的container
  for (container <- containersToUse)   挨个遍历容器
           if (runningExecutors.size() < targetNumExecutors) { --判断现在运行的资源是否小于我们需要的资源
        numExecutorsStarting.incrementAndGet()
        if (launchContainers) {
          launcherPool.execute().run{线程池(ExecutorRunnable)启动Executor
           nmClient.start() --
             startContainer(){
  
                 val commands = prepareCommand() {
                 /bin/java", "-server"
                 *org.apache.spark.executor.YarnCoarseGrainedExecutorBackend* --进程,excutor的通讯后台
  } ---准备指令,启动容器
             nmClient.startContainer(container.get, ctx)   其中的nmClient 启动容器,ctx 环境信息,
             } --启动container
          }-
  }
}
}
}
	}
	}

3.YarnCoarseGrainedExecutorBackend:进程(也可以理解为executor)

Backend:后台
Endpoint:终端
RpcEndpoint:通信终端( constructor -> onStart -> receive* -> onStop})生命周期
CoarseGrainedExecutorBackend.run(backendArgs, createFn){ 点进去该方法
val fetcher = --找到driver,和driver有连接

SparkEnv.createExecutorEnv
env.rpcEnv.setupEndpoint (“Executor”,-在整个通讯环境中增加一个终端 execuor
def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef(抽象,实现的类是NettyRpcEnv){
dispatcher.registerRpcEndpoint(name, endpoint) {–注册rpc的通信终端
val addr = RpcEndpointAddress(nettyEnv.address, name) 通信地址
val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv) --ref通信引用
var messageLoop: MessageLoop = nu --消息循环器
匹配成功创建 DedicatedMessageLoop(
private val inbox = new Inbox(name, endpoint) {
messages.add(OnStart)–当前节点有个收件箱,可以往里边发消息,发给自己
}–收件箱
threadpool --线程池

}
}

backendCreateFn就是当前的YarnCoarseGrainedExecutorBackend 继承的IsolatedRpcEndpoint(应该遵循生命周期)
当往inbox发送了一个Onstart之后,IsolatedRpcEndpoint 就会收到消息
}

override def onStart():{
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>. --得到了driver
driver = Some(ref)
ref.ask[Boolean](RegisterExecutor() -第七步: 向driver发起一个请求,注册executor,
self.send(RegisteredExecutor)–给自己发送注册成功的消息 第八步:注册成功
}
}

driver (指driverRpc的通信句柄) 收到请求后找SparkContext(
_schedulerBackend: SchedulerBackend = _ --通信后台 其集群模式的实现类为CoarseGrainedSchedulerBackend

点进去该类CoarseGrainedSchedulerBackend(

receiveAndReply(context: RpcCallContext) {–接收上边的注册executor的消息
case RegisterExecutor {
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1) 资源做增加
context.reply(true) --响应注册成功
}
}

override def receive: PartialFunction[Any, Unit] = {
case RegisteredExecutor=>
logInfo(“Successfully registered with driver”)
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,
resources = _resources)
driver.get.send(LaunchedExecutor(executorId)) --启动,

然后 CoarseGrainedExecutorBackend接受到消息
override def receive: PartialFunction[Any, Unit] = {
case RegisteredExecutor=>
logInfo(“Successfully registered with driver”)
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,
resources = _resources)
driver.get.send(LaunchedExecutor(executorId)) --发送LaunchedExecutor消息,

CoarseGrainedSchedulerBackend 接收到该消息

  case LaunchedExecutor(executorId) => --增加核数
    executorDataMap.get(executorId).foreach { data =>
      data.freeCores = data.totalCores
    }

makeOffers{}

第九步:创建executor 计算对象
executor自己调度自己

然后回到ApplicationMaster
runDriver {–完成了注册am和createAllocator

  resumeDriver() 继续运行driver,即用户的代码初始化sparkContext后,继续往下走
  userClassThread.join()

}

sparkContext(

_taskScheduler.postStartHook() 实现类TaskSchedulerImpl
override def postStartHook(): Unit = {
waitBackendReady()
}

YarnClusterScheduler 继承TaskSchedulerImpl

private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) {

logInfo(“Created YarnClusterScheduler”)

override def postStartHook(): Unit = {
ApplicationMaster.sparkContextInitialized(sc) --当前的初始化环境已经完成了,让ApplicationMaster 中的val sc = ThreadUtils.awaitResult 继续往下执行
super.postStartHook() {点进去进入TaskSchedulerImpl的该方法,然后再点击waitBackendReady :死循环,backend一直在等待,我们的程序往下走
}
logInfo(“YarnClusterScheduler.postStartHook done”)
}

}
ApplicationMaster 中
resumeDriver():driver通知sparkcontext继续走下去,自己写的代码就可以继续执行
userClassThread.join()


在这里插入图片描述

自此:整个提交流程和环境就执行好了

在这里插入图片描述
driver线程执行的过程中,注册应用程序是阻塞的,当反射执行main方法的时候,去创建SparkContext并完成初始化之后,通知注册应用程序继续执行,当executor反向注册之后,然后通知action算子继续执行

SparkEnv spark的

createSparkEnv -》{
SparkEnv.createDriverEnv->{
create ->{
RpcEnv.create ->{
NettyRpcEnvFactory().create ->{
NettyRpcEnv
nettyEnv.startServer(config.bindAddress, actualPort)->{
transportContext.createServer ->{
return new TransportServer(Epoll 模式) ->{
init(hostToBind, portToBind) ->{
IOMode—io模式
getServerChannelClass -》 case NIO:
}
}
}

dispatcher.registerRpcEndpoint – 通讯终端(RpcEndpoint(inbox收件箱) 收数据receive,RpcEndpointRef(outboxes 发件箱) 主要是ask, Dispacher 调度的)
}

                Utils.startServiceOnPort ->{
                     startService(tryPort)
                  }
          }
        }
    }
}

}
在这里插入图片描述在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/533534.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C/C++ 内存管理 new delete operator new与operator delete函数 内存泄漏

C/C 内存分布 在C/C 当中有 &#xff1a; 局部数据静态数据和全局数据常量数据动态申请数据 上述不同的数据存储的位置也不同&#xff0c;&#xff1a; 1. 栈又叫堆栈--非静态局部变量/函数参数/返回值等等&#xff0c;栈是向下增长的。2. 内存映射段是高效的I/O映射方式&…

Java运算符详解

目录 &#x1f511;前言 &#x1f3bd;算式运算符 ⚙赋值运算符 &#x1f34a;关系运算符 &#x1f37a;逻辑运算符 &#x1f352;位运算符 &#x1f339;移位运算符 &#x1f343;条件运算符 &#x1f349;运算符优先级 &#x1f511;前言 任何一个程序都离不开计算问题&…

IOC/DI配置管理第三方bean(Druid/C3P0)

文章目录 1 案例:数据源对象管理1.1 环境准备1.2 思路分析1.3 实现Druid管理步骤1:导入druid的依赖步骤2:配置第三方bean步骤3:从IOC容器中获取对应的bean对象步骤4:运行程序 1.4 实现C3P0管理步骤1:导入C3P0的依赖步骤2:配置第三方bean步骤3:运行程序 2 加载properties文件2.1…

详解MySQL的并发控制

目录 1.概述 2.事务 2.1.什么是事务 2.2.事务的隔离级别 2.2.1.三种数据一致性问题 2.2.2.四种隔离级别 2.3.如何设置隔离级别 3.锁 3.1.锁与事务的关系 3.2.分类 3.3.表锁 3.3.1.概述 3.3.2.读锁 3.3.3.写锁 3.3.4.保护机制 3.4.行锁 3.4.1.概述 3.4.2.什么…

Redis Java API操作

1、普通maven工程方式 Redis不仅可以通过命令行进行操作&#xff0c;也可以通过JavaAPI操作&#xff0c;通过使用Java API来对Redis数据库中的各种数据类型操作 导入POM依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http…

【Linux Network】传输层协议——UDP

目录 传输层 端口号 端口号范围划分 认识知名端口号(Well-Know Port Number) netstat pidof UDP协议 UDP协议端格式 UDP的特点 面向数据报 UDP的缓冲区 UDP使用注意事项 基于UDP的应用层协议 UDP详解&#x1f337; 传输层 在TCP/IP协议中可以把网络简单的划分为四个部分&#…

大数据如何助力营销(3)产品定位

在市场竞争日益激烈的环境下&#xff0c;产品定位是企业成功的关键因素之一。产品定位是指根据目标市场和目标消费者的需求、偏好和期望&#xff0c;确定产品的特性、功能、形象和价值&#xff0c;并与竞争对手的产品进行差异化的过程。产品定位不仅影响产品的设计、开发、生产…

全面讲解涂鸦PaaS2.0开发平台!物联网干货预警

之前我们有介绍过涂鸦 IoT PaaS&#xff08;点击查看往期介绍&#xff09;&#xff0c;面向开发生态&#xff0c;它集成了云开发、App 开发、硬件开发三大核心支撑能力&#xff0c;能够全方位助力开发者打造极具竞争力的个性化 IoT 解决方案&#xff0c;极大地降低 IoT 开发门槛…

Appium APP自动化环境搭建

1.下载安装 F:\android-sdk-windows F:\Appium-windows-1.21.0 F:\nodejs 2.创建一个bat文件&#xff0c;命名为appium.bat&#xff0c;并在其中写入如下内容&#xff1a; node Appium安装目录\resources\app\node_modules\appium\build\lib\main.js %* 注意&#xff1a;请…

从0到1开始,一步步搭建Web自动化测试框架

测试框架的设计有两种思路&#xff0c;一种是自底向上&#xff0c;从脚本逐步演变完善成框架&#xff0c;这种适合新手了解框架的演变过程。另一种则是自顶向下&#xff0c;直接设计框架结构和选取各种问题的解决方案&#xff0c;这种适合有较多框架事件经验的人。本章和下一张…

【zabbix】批量监控端口,自动发现规则

快速搞定端口批量监控 一、脚本及配置 1、&#xff08;文件名&#xff1a;check_port.py&#xff09;&#xff08;python2.7版本&#xff09; 存在路径&#xff1a;/etc/zabbix/zabbix_agentd.d/check_port.py 这个脚本有一部分内容是我从百度上找的&#xff0c;有一部分自己…

【C++】红黑树的实现

文章目录 &#x1f4d5; 概念特性 &#x1f4d5; 红黑树具体实现节点定义结构框架插入 &#x1f4d5; 概念 红黑树&#xff0c;是一种二叉搜索树&#xff0c;但在每个结点上增加一个存储位表示结点的颜色&#xff0c;可以是Red或Black。 通过对任何一条从根到叶子的路径上各个…

Python基础(四)

目录 一、程序的组织结构 1、前言 二、顺序结构 1、介绍 三、对象的布尔值 1、介绍 2、规定 四、分支结构 1、单分支if结构 1、语法语义 2、语法结构 3、案例 2、双分支if...else结构 1、语法语义 2、语法结构 3、案例 3、多分支if...elif...else结构 1、语…

Java语言---栈与队列

目录 一.栈 1.1栈的概念 1.2.栈的实现 1.2.1数组实现 栈 栈的创建 栈的基本方法实现 1.2.2链表实现 栈 栈的创建 栈的基本方法实现 二.队列 2.1队列的概念 2.2队列的实现 2.3代码实现 2.3.1队列代码的构建 2.3.2 队列 基础方法实现 总结 &#x1f63d;个人主页…

深入理解2D卷积和3D卷积

文章目录 卷积核的维度2D卷积单通道多通道代码example2d卷积操作后变化 3D卷积单通道多通道代码 在项目中用到了conv3但是对其背后的原理还有一些模糊的地方&#xff0c;conv2d与多通道的conv2d的区别在哪里&#xff1f;conv3d的思想理论是什么&#xff1f;对此进行探究和记录……

「AI之劫」:当机器超越人类底线,正在侵犯我们的创造力和道德

随着AI技术的不断发展&#xff0c;AI生成内容&#xff08;AIGC&#xff09;已经成为数字娱乐、商业营销和学术研究等领域的热门话题。随着人工智能技术的不断发展越来越多的领域开始应用AI技术&#xff0c;其中之一就是内容生成领域。 AIGC全称为AI-Generated Content, 指基于生…

2023年5月广州/深圳制造业产品经理很适合考的证书-NPDP

产品经理国际资格认证NPDP是新产品开发方面的认证&#xff0c;集理论、方法与实践为一体的全方位的知识体系&#xff0c;为公司组织层级进行规划、决策、执行提供良好的方法体系支撑。 【认证机构】 产品开发与管理协会&#xff08;PDMA&#xff09;成立于1979年&#xff0c;是…

【利用AI让知识体系化】入门Egg框架(含实战)

思维导图 文章目录 思维导图第一章&#xff1a;概述1.1 Egg.js 简介1.2 Egg.js 的架构和优势1.3 Egg.js 的基本组件和插件 第二章&#xff1a;环境搭建2.1 Node.js 环境安装和配置2.2 Egg.js 应用创建和项目结构介绍2.3 Egg.js 应用部署和启动 第三章&#xff1a;基本开发3.1 路…

经纬恒润新产品系列 | 这款AR-HUD将颠覆你的认知

随着科技的发展与突破&#xff0c;智能化产品在汽车领域扮演了越来越重要的角色。本文即将介绍**经纬恒润新产品——AR-HUD&#xff08;增强现实抬头显示系统&#xff09;&#xff0c;**它可以将科幻电影中的驾驶场景变为现实——将信息投影在挡风玻璃上&#xff0c;基于此功能…

开发环境搭建和创建STM32工程

目录 一、开发环境搭建 1. STM32CubeMX 2.Keil安装 二、创建STM32工程 一、开发环境搭建 1. STM32CubeMX ST公司出品 工具链接 https://www.st.com/zh/development-tools/stm32cubemx.html STM32CubeMX是一种图形工具&#xff0c;通过分步过程可以非常轻松地配置STM32微控制器和…