Spark-SparkSubmit详细过程

news2025/1/10 16:44:24

一、概览

《Spark-环境启动》中讲了Spark环境的启动,以及Master和多个Worker之间时基于Endpoint之间的Netty通信,也被称为Spark的RpcEnv。在此基础上我们来看下spark-submit是如何将我们写的Spark程序调起的

二、启动脚本示例:

spark-submit \
--master spark://node1:7077 \
--deploy-mode cluster \
--driver-memory 100g \
--executor-memory 20g \
--num-executors 120 \
--executor-cores 3 \
--conf spark.shuffle.io.maxRetries=2 \
--conf spark.xx.xx.xx=xx \
--class com.xx.xxx  \
--files "/data/xxxx" \
/xxx/project/xxx/spark-1.1-SNAPSHOT.jar \
p1 p2 p3 …… pn \

三、spark-submit.sh

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

四、SparkSubmit

//启动Spark应用程序的主网关。
//此程序处理设置具有相关Spark依赖关系的类路径,并在Spark支持的不同集群管理器和部署模式上提供一个层。
private[spark] class SparkSubmit extends Logging {

    //Yarn模式下的下一个执行的主类
    private[deploy] val YARN_CLUSTER_SUBMIT_CLASS =
    "org.apache.spark.deploy.yarn.YarnClusterApplication"

    //STANDALONE 下的rest模式 或 Mesos 模式的下一个执行的主类
    private[deploy] val REST_CLUSTER_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName()

    //STANDALONE 下的传统模式的下一个执行的主类  我们主要分析这一个
    private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()

    //K8s下的下一个执行的主类
    private[deploy] val KUBERNETES_CLUSTER_SUBMIT_CLASS =
    "org.apache.spark.deploy.k8s.submit.KubernetesClientApplication"

    override def main(args: Array[String]): Unit = {
        val submit = new SparkSubmit() {...}
        submit.doSubmit(args)
    }

    def doSubmit(args: Array[String]): Unit = {
        //解析命令行参数
        val appArgs = parseArguments(args)
        appArgs.action match {
          case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
          case SparkSubmitAction.KILL => kill(appArgs)
          case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
          case SparkSubmitAction.PRINT_VERSION => printVersion()
        }
    }

    private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
        def doRunMain(): Unit = {
            runMain(args, uninitLog)
        }
        doRunMain()
    }

    //使用submit参数运行子类的main方法。
    //这分为两个步骤。首先,我们通过设置适当的类路径、系统属性和应用程序参数来准备启动环境,以便根据集群管理器和部署模式运行子主类。
    //请注意,如果我们运行的是集群部署模式或python应用程序,那么这个主类将不是用户提供的。  (需要跳几次才真正运行自己的main方法)
    private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
        val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
        mainClass = Utils.classForName(childMainClass)
        val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
      mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
        } else {
              new JavaMainApplication(mainClass)
        }

        app.start(childArgs.toArray, sparkConf)
    }

    //得到一个 4-tuple:
    //  1、子进程的参数
    //  2、进入子类的路径列表
    //  3、系统环境映射
    //  4、子进程的主类
    private[deploy] def prepareSubmitEnvironment(
        
        //设置集群管理
        val clusterManager: Int = args.master match {
          case "yarn" => YARN  //1
          case m if m.startsWith("spark") => STANDALONE  // 2 
          case m if m.startsWith("mesos") => MESOS    // 4
          case m if m.startsWith("k8s") => KUBERNETES  // 16
          case m if m.startsWith("local") => LOCAL    // 8
          case _ =>
            error("Master must either be yarn or start with spark, mesos, k8s, or local")
            -1
        }    
        
        //设置部署模式;默认为客户端模式
        //在客户端模式下,需要下载远程文件
        //在YARN中运行时,对于某些具有scheme的远程资源:
        //  1.Hadoop FileSystem不支持它们。
        //  2.我们使用“spark.yarn.dist.forceDownloadSchemes”明确绕过Hadoop文件系统。
        //在添加到YARN的分布式缓存之前,我们将把它们下载到本地磁盘。
        //对于yarn客户端模式,由于我们已经用上面的代码下载了它们,所以我们只需要找出本地路径并替换远程路径。
        //如果我们正在运行一个python应用程序,请将main类设置为我们特定的python运行器
        //非PySpark应用程序可能需要Python依赖关系。
        //在R应用程序的YARN模式下,将SparkR包存档和包含所有构建的R库的R包存档添加到存档中,以便它们可以与作业一起分发

        ......省略......

        //在客户端模式下,直接启动应用程序主类
        //此外,将主应用程序jar和任何添加的jar(如果有的话)添加到类路径中
        if (deployMode == CLIENT) {
            childMainClass = args.mainClass
            if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
        childClasspath += localPrimaryResource
            }
            if (localJars != null) { childClasspath ++= localJars.split(",") }
        }

        //在独立集群模式下,使用REST客户端提交应用程序(Spark 1.3+)。所有Spark参数都应通过系统属性传递给客户端。
        if (args.isStandaloneCluster) {
            if (args.useRest) {
                childMainClass = REST_CLUSTER_SUBMIT_CLASS
            } else {
                //在传统的独立集群模式下,使用客户端作为用户类的包装器
                childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS
            }
        }

        //在yarn模式下,使用yarn.Client作为用户类的包装器
        if (isYarnCluster) {
            childMainClass = YARN_CLUSTER_SUBMIT_CLASS
        }

    }

}

五、ClientApp

private[spark] class ClientApp extends SparkApplication {

  override def start(args: Array[String], conf: SparkConf): Unit = {
    val driverArgs = new ClientArguments(args)

    //rpc通信的ask消息超时时间默认为 10s
    if (!conf.contains(RPC_ASK_TIMEOUT)) {
      conf.set(RPC_ASK_TIMEOUT, "10s")
    }
    Logger.getRootLogger.setLevel(driverArgs.logLevel)

    //构建RpcEnv通信环境
    val rpcEnv =
      RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))

    //获取masterEndpoints 
    val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
      map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))
    //设置ClientEndpoint 默认会调它的 onStart()
    rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))

    rpcEnv.awaitTermination()
  }
}

六、ClientEndpoint

//将消息中继到驱动程序的代理。
//如果提交失败,我们目前不支持重试。在HA模式下,客户端将向所有主机提交请求,看看哪个主机可以处理它。
private class ClientEndpoint(
    override val rpcEnv: RpcEnv,
    driverArgs: ClientArguments,
    masterEndpoints: Seq[RpcEndpointRef],
    conf: SparkConf)
  extends ThreadSafeRpcEndpoint with Logging {

  override def onStart(): Unit = {
    driverArgs.cmd match {
        case "launch" =>
            //Diver的包装类
            val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
            val command = new Command(mainClass,
          Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
          sys.env, classPathEntries, libraryPathEntries, javaOpts)
            val driverResourceReqs = ResourceUtils.parseResourceRequirements(conf,
          config.SPARK_DRIVER_PREFIX)
            //构建DriverDescription 用于启动Driver
            //里面包含jar的下载路径、所需要的内存和核数、启动命令等等
            val driverDescription = new DriverDescription(
              driverArgs.jarUrl,
              driverArgs.memory,
              driverArgs.cores,
              driverArgs.supervise,
              command,
              driverResourceReqs)
            //向Master发送 RequestSubmitDriver 消息
            asyncSendToMasterAndForwardReply[SubmitDriverResponse](
                RequestSubmitDriver(driverDescription))
        case "kill" =>
            val driverId = driverArgs.driverId
            submittedDriverID = driverId
            asyncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))

  }
}

七、Master处理Client发出的RequestSubmitDriver 

private[deploy] class Master(
    override val rpcEnv: RpcEnv,
    address: RpcAddress,
    webUiPort: Int,
    val securityMgr: SecurityManager,
    val conf: SparkConf)
  extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {

  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case RequestSubmitDriver(description) =>
      //如果Master不是ALIVE 直接返回 Client 失败的响应
      if (state != RecoveryState.ALIVE) {
        val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
          "Can only accept driver submissions in ALIVE state."
        context.reply(SubmitDriverResponse(self, false, None, msg))
      } else {
        logInfo("Driver submitted " + description.command.mainClass)
        //创建一个Driver
        val driver = createDriver(description)
        //把新创建的Driver添加到Master对Derviers的维护中去
        persistenceEngine.addDriver(driver)
        waitingDrivers += driver
        drivers.add(driver)
        //在等待的应用程序之间安排当前可用的资源。每当有新应用加入或资源可用性发生变化时,都会调用此方法。
        schedule()
        //回复客户端消息
        context.reply(SubmitDriverResponse(self, true, Some(driver.id),
          s"Driver successfully submitted as ${driver.id}"))

  }

  //创建driver
  private def createDriver(desc: DriverDescription): DriverInfo = {
    val now = System.currentTimeMillis()
    val date = new Date(now)
    new DriverInfo(now, newDriverId(date), desc, date)
  }

 private def schedule(): Unit = {
    if (state != RecoveryState.ALIVE) {
      return
    }
    //将状态为ALIVE的Workers打散,保证多个Driver分散启动
    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
    val numWorkersAlive = shuffledAliveWorkers.size
    var curPos = 0
    迭代所有需要启动的driver
    for (driver <- waitingDrivers.toList) {
      var launched = false
      var isClusterIdle = true
      var numWorkersVisited = 0
      while (numWorkersVisited < numWorkersAlive && !launched) {
        val worker = shuffledAliveWorkers(curPos)
        isClusterIdle = worker.drivers.isEmpty && worker.executors.isEmpty
        numWorkersVisited += 1
        校验该worker上剩余的内存和内核是否满足启动driver的要求
        if (canLaunchDriver(worker, driver.desc)) {
          val allocated = worker.acquireResources(driver.desc.resourceReqs)
          driver.withResources(allocated)
          //启动Driver
          launchDriver(worker, driver)
          waitingDrivers -= driver
          launched = true
        }
        curPos = (curPos + 1) % numWorkersAlive
      }

    }
    startExecutorsOnWorkers()
  }


  private def launchDriver(worker: WorkerInfo, driver: DriverInfo): Unit = {
    logInfo("Launching driver " + driver.id + " on worker " + worker.id)
    worker.addDriver(driver)
    driver.worker = Some(worker)
    //向worker ENdpoint 发送 LaunchDriver 消息
    worker.endpoint.send(LaunchDriver(driver.id, driver.desc, driver.resources))
    //将driver的状态设置称RUNNING
    driver.state = DriverState.RUNNING
  }

}

DriverInfo的信息如下:

private[deploy] class DriverInfo(
    val startTime: Long,
    val id: String,
    val desc: DriverDescription,
    val submitDate: Date)
  extends Serializable {

  //driver的初始化状态就是 SUBMITTED
  @transient var state: DriverState.Value = DriverState.SUBMITTED
  /* 如果我们在启动Driver时失败,异常将存储在此处 */
  @transient var exception: Option[Exception] = None
  /* 最近分配给 driver 的 worker */
  @transient var worker: Option[WorkerInfo] = None
  // 分配给此驱动程序的资源(例如:gpu/fpga)从资源名称映射到资源信息
  //CPU、GPU都属于冯·诺依曼结构,指令译码执行、共享内存。
  //FPGA是一种硬件可重构的体系结构中文名是现场可编程门阵列
  //FPGA 之所以比 CPU 甚至 GPU 能效高,本质上是无指令、无需共享内存的体系结构带来的福利
  //GPU不支持超流水技术 只能数据并行
  //FPGA支持超流水技术,可以支持指令并行
  private var _resources: Map[String, ResourceInformation] = Map.empty

} 

八、Worker处理Master调度发起的LaunchDriver请求

private[deploy] class Worker(
    override val rpcEnv: RpcEnv,
    webUiPort: Int,
    cores: Int,
    memory: Int,
    masterRpcAddresses: Array[RpcAddress],
    endpointName: String,
    workDirPath: String = null,
    val conf: SparkConf,
    val securityMgr: SecurityManager,
    resourceFileOpt: Option[String] = None,
    externalShuffleServiceSupplier: Supplier[ExternalShuffleService] = null)
  extends ThreadSafeRpcEndpoint with Logging {

  override def receive: PartialFunction[Any, Unit] = synchronized {
    case LaunchDriver(driverId, driverDesc, resources_) =>
      logInfo(s"Asked to launch driver $driverId")
      //管理一个driver的执行,包括在失败时自动重新启动driver。这目前仅在独立集群部署模式下使用。
      val driver = new DriverRunner(
        conf,
        driverId,
        workDir,
        sparkHome,
        driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
        self,
        workerUri,
        workerWebUiUrl,
        securityMgr,
        resources_)
      //该worker中对不同driver的维护
      drivers(driverId) = driver
      //启动driver
      driver.start()
      
      //重新计算该worker可用的资源
      coresUsed += driverDesc.cores
      memoryUsed += driverDesc.mem
      addResourcesUsed(resources_)

  }
}

九、DriverWrapper启动

object DriverWrapper extends Logging {
  def main(args: Array[String]): Unit = {
    args.toList match {
      case workerUrl :: userJar :: mainClass :: extraArgs =>
        val conf = new SparkConf()
        val host: String = Utils.localHostName()
        val port: Int = sys.props.getOrElse(config.DRIVER_PORT.key, "0").toInt
        val rpcEnv = RpcEnv.create("Driver", host, port, conf, new SecurityManager(conf))
        logInfo(s"Driver address: ${rpcEnv.address}")
        //创建一个workerWatcher Endpoint 意为看着这个worker进程,如果丢了自己也就over了
        //连接到worker进程,如果连接中断,则终止JVM。在工作进程及其关联的子进程之间提供命运共享。 它没有onStart()
        rpcEnv.setupEndpoint("workerWatcher", new WorkerWatcher(rpcEnv, workerUrl))

        //............

        //我们编写的Spark程序在这里就会通过反射被调起执行
        val clazz = Utils.classForName(mainClass)
        val mainMethod = clazz.getMethod("main", classOf[Array[String]])
        mainMethod.invoke(null, extraArgs.toArray[String])

        rpcEnv.shutdown()
    }
  }

十、总结

1、编写Spark程序

2、编写启动脚本使用spark-submit命令提交任务

3、调用org.apache.spark.deploy.SparkSubmit中main方法

4、判断提交参数中的模式,根据不同的模式设置不同的mainClass

如果采用Standalone模式那么主类为:ClientApp

5、执行ClientApp中的start方法

6、构建RpcEnv环境并设置ClientEndpoint

7、构建DriverDescription并向Master发起启动Driver的请求

8、Master会处理所有启动Driver的请求并随机挑一个资源充足的Worker用于启动该Driver

9、Worker上启动一个DriverWrapper进程

10、DriverWrapper中的main方法会调用我们自己写的Spark程序中的main方法

下图是整体的方法调用图,下载放大后就会很清晰哟

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2054363.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

css通过keyframes实现文字定时向上滚动

一、效果 二、代码 <!DOCTYPE html> <html lang="en"><head>

软考软件设计师-备考须知

&#x1f939;‍♀️潜意识起点&#xff1a;个人主页 &#x1f399;座右铭&#xff1a;得之坦然&#xff0c;失之淡然。 &#x1f48e;擅长领域&#xff1a;大前端 是的&#xff0c;我需要您的&#xff1a; &#x1f9e1;点赞❤️关注&#x1f499;收藏&#x1f49b; 是我…

【hot100篇-python刷题记录】【矩阵置零】

R5-矩阵篇 印象题&#xff0c;思路即可&#xff1a; 手动置0 无非就是行和列都置0 使用thex和they将该元素的i和y存储起来&#xff0c;再分别遍历thex&#xff0c;将所有y的位置置0 遍历they&#xff0c;将所有x 置0 class Solution:def setZeroes(self, matrix: List[List…

【机器学习】(基础篇六) —— 数据集的划分和过拟合问题

数据集的划分 训练集和测试集 在机器学习中&#xff0c;数据集通常会被划分为训练集&#xff08;Training Set&#xff09;和测试集&#xff08;Test Set&#xff09;&#xff0c;有时还会包括一个验证集&#xff08;Validation Set&#xff09;。这样的划分是为了能够更好地…

Ⅰ、基于 WebGPU 从 0 到 1 渲染 GLTF:第一个三角形

Ⅰ、基于 WebGPU 从 0 到 1 渲染 GLTF&#xff1a;第一个三角形 WebGPU 是一种面相网页的现代图形 API&#xff0c;由主要浏览器供应商开发。与 WebGL 相比&#xff0c;WebGPU 对 GPU 提供了更直接的控制&#xff0c;使应用程序能更有效地利用硬件&#xff0c;类似于 Vulkan 和…

深度学习设计模式之外观模式

文章目录 前言一、介绍二、特点三、详细分析1.核心组成2.代码示例3.优缺点优点缺点 4.使用场景 总结 前言 外观模式是结构型设计模式&#xff0c;定义一个高层接口&#xff0c;用来访问子系统中的众多接口&#xff0c;使系统更加容易使用。 一、介绍 外观设计模式&#xff08…

低代码与AI:赋能企业数字化转型

引言 随着全球经济的快速发展和科技的飞速进步&#xff0c;数字化转型已成为各个行业和企业发展的重要趋势。数字化转型的背景不仅是提升效率和竞争力的手段&#xff0c;更是适应市场变化、满足客户需求的必由之路。 在当今信息化时代&#xff0c;技术的变革推动了企业运营方式…

【Python机器学习】MapReduce(分布式计算的框架)

MapReduce的优缺点&#xff1a; 优点&#xff1a;可在短时间内完成大量工作&#xff1b; 缺点&#xff1a;算法必须经过重写&#xff0c;需要对系统工程有一定的理解&#xff1b; 适用数据类型&#xff1a;数值型和标称型数据。 MapReduce是一个软件框架&#xff0c;可以将单个…

SQL UA注入 (injection 第十八关)

简介 SQL注入&#xff08;SQL Injection&#xff09;是一种常见的网络攻击方式&#xff0c;通过向SQL查询中插入恶意的SQL代码&#xff0c;攻击者可以操控数据库&#xff0c;SQL注入是一种代码注入攻击&#xff0c;其中攻击者将恶意的SQL代码插入到应用程序的输入字段中&a…

[Python学习日记-10] Python中的流程控制(if...else...)

简介 假如把写程序比做走路&#xff0c;那我们到现在为止&#xff0c;一直走的都是直路&#xff0c;还没遇到过分叉口&#xff0c;想象现实中&#xff0c;你遇到了分叉口&#xff0c;然后你决定往哪拐必然是有所动作的。你要判断那条岔路是你真正要走的路&#xff0c;如果我们想…

合宙LuatOS AIR700 IPV6 TCP 客户端向NodeRed发送数据

为了验证 AIR700 IPV6 &#xff0c;特别新建向NodeRed Tcp发送的工程。 Air700发送TCP数据源码如下&#xff1a; --[[ IPv6客户端演示, 仅EC618系列支持, 例如Air780E/Air600E/Air780UG/Air700E ]]-- LuaTools需要PROJECT和VERSION这两个信息 PROJECT "IPV6_SendDate_N…

Leetcode面试经典150题-155.最小栈

解法都在代码里&#xff0c;不懂就留言或者私信 我写了两种解法&#xff0c;建议选择双栈的&#xff0c;感觉这才是考察点 /**一般解法&#xff1a;过个笔试没问题&#xff0c;建议用双栈的方法 */ class MinStack2 {/**至少应该有一个栈用于保存数据 对于push和pop以及top的话…

STM32之SPI读写W25Q128芯片

SPI简介 STM32的SPI是一个串行外设接口。它允许STM32微控制器与其他设备&#xff08;如传感器、存储器等&#xff09;进行高速、全双工、同步的串行通信。通常包含SCLK&#xff08;串行时钟&#xff09;、MOSI&#xff08;主设备输出/从设备输入Master Output Slave Input&…

【React Hooks - useState状态批量更新原理】

概述 所谓批量处理就是当在同时更新多个状态下&#xff0c;能够统一批量处理更新&#xff0c;避免了重复渲染。在React17及之前版本&#xff0c;React只会在合成事件以及生命周期内部进行批量处理&#xff0c;在setTimeout、Promise、Fetch等异步请求中&#xff0c;则不会自动…

【GH】【EXCEL】P1: Write DATA SET from GH into EXCEL

文章目录 WriteFast WriteGH data material :GH process and components instructionFast Write DataFast Write Data & Clear DataFast Write to Cell EXCEL written results Write by ColumnGH data material :Compile ColumnGH process and components instructionWrite…

三、Kafka副本

2、创建2个分区两个副本 /usr/local/kafka/bin# ./kafka-topics.sh --bootstrap-server 192.168.58.130:9092 --create --topic atguigu2 --partitions 2 --replication-factor 23、查看topic详细信息 /usr/local/kafka/bin# ./kafka-topics.sh --bootstrap-server 192.168.5…

如何理解CAPL—Test编程中的测试对象

前言&#xff1a;CAPL—Test编程中的对象&#xff0c;是一个比较复杂的概念&#xff0c;对象的作用是作为Test特定函数的参数。来执行特定的功能&#xff08;这是比较复杂的一个概念&#xff0c;下文会慢慢讲解&#xff09;。 注意&#xff1a;因为翻译的问题&#xff0c;有些…

arm:ADC模数转换器

比较器 AD&#xff1a; 精度&#xff1a;10位 转换速率&#xff1a;500 KSPS 量程&#xff1a;0~3.3v void adc_init(void) {ADCCON (1 << 14) | (49 << 6) | (1 << 1); }unsigned short adc_read(void) {unsigned short value ADCDAT0;while(~(ADCCON &am…

华为M60首次降价,消费回暖能延续?

导语 8月15日&#xff0c;华为Mate 60系列首次官宣降价&#xff01;能否带动消费电子进一步回暖&#xff1f; 在当前全球经济形势复杂多变的背景下&#xff0c;各行各业都在寻求新的增长点和突破口。 消费电子市场作为科技与日常生活紧密结合的重要领域&#xff0c;其发展态势一…

基于HarmonyOS的宠物收养系统的设计与实现(一)

基于HarmonyOS的宠物收养系统的设计与实现&#xff08;一&#xff09; 本系统是简易的宠物收养系统&#xff0c;为了更加熟练地掌握HarmonyOS相关技术的使用。 项目创建 创建一个空项目取名为PetApp 首页实现&#xff08;组件导航使用&#xff09; 官方文档&#xff1a;组…