Spark-Job启动、Stage划分

news2025/1/22 19:33:31

一、上下文

《Spark-driver和executor启动过程》详细分析了driver和executor的启动,此时资源已经给我们分配好了,且Application也已经注册完成。下面我们就来看看Spark是如何启动job并根据DAG来划分Stage的

二、Job启动

Spark RDD中的算子分为Transformations 算子和Actions 算子,Transformations 算子只是将RDD在逻辑上进行了转换,只有调用Actions 算子时才会真正执行以上对RDD的所有操作。为什么呢?点进去这些Actions 算子看下就发现它们都调用了SparkContext的runJob() ,因此程序调用几次Actions算子就会启动几个Job,因此一个Application是对应多个Job的。下面继续看下runJob()中做了什么事情。

1、SparkContext

class SparkContext(config: SparkConf) extends Logging {

 //在RDD中的给定分区集上运行一个函数,并将结果传递给给定的处理程序函数。
 //这是Spark中所有操作的主要入口点。
 def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    ......
    //打印启动job的日志
    logInfo("Starting job: " + callSite.shortForm)
    //调用dagScheduler的runJob()
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    //在使用此RDD的作业完成后调用它
    rdd.doCheckpoint()
  }

}

2、DAGScheduler

DAGScheduler 是面向 stage 调度的高级调度层。它为每个作业计算一个stage的DAG,跟踪这些RDD和stage输出被物化,并找到运行作业的最小时间表。然后,它将stage作为TaskSet(包含完全独立的任务)提交给在集群上运行它们的底层TaskScheduler实现。

Spark stages 是通过在Shuffle边界处打破RDD图来创建的。是一系列具有“窄”依赖关系的RDD操作,如map() 和filter(),这些窄依赖的RDD在每个 stage 都被 pipelined 到一组任务中,但具有shuffle依赖关系的操作需要多个stage(一个stage shuffle write 一组文件,另一个stage shuffle read 这些文件)。最后,每个stage都只会对其他stage进行shuffle依赖,并可能在其中计算多个操作。这些操作的实际 pipelining 发生在各种RDD的RDD.compute()函数中

除了生成阶段的DAG外,DAGScheduler还根据当前缓存状态确定运行每个 Task 的首选位置,并将其传递给低级TaskScheduler。此外,它还可以处理由于shuffle输出文件丢失而导致的故障,在这种情况下,可能需要重新提交之前跑过的stage。非随机文件丢失导致的阶段内故障由TaskScheduler处理,TaskScheduler将在取消整个stage之前重试每个任务几次。

在看源码时,有几个关键概念:

Jobs

        是提交给调度器的顶级工作项,例如,当用户调用类似count() 的操作时,作业将通过submitJob提交。每个Job可能需要执行多个Stage来构建中间数据。

Stages

        是计算Job中中间结果的一组任务,其中每个Task在同一RDD的分区上计算相同的函数。
Stage在shuffle边界处分开,也说明我们必须等待前一阶段完成才能获取输出。有两种类型的阶段:ResultStage,用于执行动作的最后阶段,以及ShuffleMapStage,用于为shuffle写入映射输出文件。如果这些作业重用相同的RDD,则Stage通常在多个Job之间共享。

Tasks

        单独的工作单元,每个任务都发送到一台机器上。

Cache tracking

        DAGScheduler计算出哪些RDD被缓存以避免重新计算它们,并且同样记住哪些shuffle write阶段已经生成了输出文件,以避免重做shuffle write

Preferred locations

        DAGScheduler还根据其底层RDD的首选位置,或缓存或shuffle数据的位置,计算在stage中中的每个Task应该去哪台节点执行

Cleanup

        当Job完成后,所有依赖的数据结构都会被清除,防止OOM

private[spark] class DAGScheduler(
    private[scheduler] val sc: SparkContext,
    private[scheduler] val taskScheduler: TaskScheduler,
    listenerBus: LiveListenerBus,
    mapOutputTracker: MapOutputTrackerMaster,
    blockManagerMaster: BlockManagerMaster,
    env: SparkEnv,
    clock: Clock = new SystemClock())
  extends Logging {


  def runJob[T, U](...) {
    //提交要给Job给调度器
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
    ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
    }
  }

  def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
    // 获取rdd的所有分区数量
    val maxPartitions = rdd.partitions.length

    ......
    
    //等待DAGScheduler作业完成的对象。当任务完成时,它将其结果传递给给定的处理函数。
    val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler)
    //向线程组提交一个 JobSubmitted 事件
    //在目标RDD上提交了一个产生结果的作业
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      Utils.cloneProperties(properties)))
    waiter
  }

}

private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
  extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging {

  private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      //处理 JobSubmitted 事件 转交给 dagScheduler 处理
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

  }

}

三、Stage划分

Stage是一组并行Task,它们都计算需要作为Spark作业的一部分运行的相同函数,
其中所有Task都具有相同的shuffle依赖关系。调度器运行的每个Task 
DAG在Shuffle发生的边界处被划分为Stage,然后DAGScheduler按拓扑顺序运行这些阶段。

每个 Stage 可以是 ShuffleMapStage ,在这种情况下,其Task的结果被输入到其他Stage,
也可以是ResultStage,在该情况下,它的任务通过在RDD上运行函数直接计算Spark动作
(例如count()、save()等)。对于 ShuffleMapStage ,还跟踪每个输出分区所在的节点。

1、以Stage为单位一层一层递归寻找

private[spark] class DAGScheduler(......)
  extends Logging {

  private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties): Unit = {
    var finalStage: ResultStage = null
    try {
      //创建第一个 Stage 
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      .......
    }
    //作业已提交,内部数据清理。
    barrierJobIdToNumTasksCheckFailures.remove(jobId)

    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    clearCacheLocs()

    //提交finalStage 但首先要递归提交该阶段依赖的所有父stage
    submitStage(finalStage)
  }

  //递归方法
  private def submitStage(stage: Stage): Unit = {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        //将 finalStage 传入 获得其 依赖的 Stage
        val missing = getMissingParentStages(stage).sortBy(_.id)
        //如果该Stage上一层没有Stage了说明到头了,开始提交这个最初的第一个stage
        if (missing.isEmpty) {
          如果上一层没有Stage了那么就提交这层起始的stage
          submitMissingTasks(stage, jobId.get)
        } else {
          for (parent <- missing) {
            //如果Stage前面还有Stage,就继续提交,看是否还有上一层Stage
            submitStage(parent)
          }
          waitingStages += stage
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id, None)
    }
  }


 private def getMissingParentStages(stage: Stage): List[Stage] = {
    val missing = new HashSet[Stage]  //缺失的Stage
    val visited = new HashSet[RDD[_]] //访问过的RDD
    // 我们在这里手动维护一个堆栈,以防止递归访问引起的StackOverflowError
    //因为递归需要一直将方法压栈 ,最后回归时再弹栈,如果递归太深很容易导致栈溢出
    val waitingForVisit = new ListBuffer[RDD[_]]
    waitingForVisit += stage.rdd //等待访问的RDD,最开始就是一个Job的最后那个RDD
    def visit(rdd: RDD[_]): Unit = {
      if (!visited(rdd)) {
        visited += rdd //这个rdd 已经访问过了
        val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
        if (rddHasUncachedPartitions) {
          //从这个循环我们可以看出,rdd的依赖是多个的,也就是一个rdd可能来自于1-多个rdd的数据
          //我们详细看下rdd的依赖关系
          for (dep <- rdd.dependencies) {
            dep match {
              case shufDep: ShuffleDependency[_, _, _] =>
                //如果RDD和父RDD是Shuffle依赖,就创建一个ShuffleMapStage
                val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
                // 只有在基于推送的混洗合并完成后,才将mapStage标记为可用于混洗输出。
                // 如果没有,后续ShuffleMapStage将不会从合并的输出中读取,因为MergeStatuses不可用。
                if (!mapStage.isAvailable || !mapStage.shuffleDep.shuffleMergeFinalized) {
                  //添加到在丢失的Stage Set中
                  missing += mapStage
                } else {
                  //如果跳过并首次被访问,则转发nextAttemptId。否则一旦重试,
                  //  1) 阶段信息中的内容变得扭曲,例如任务编号、输入字节、e.t.c
                  //  2) 第一次尝试从0-idx开始,不会标记为重试
                  mapStage.increaseAttemptIdOnFirstSkip()
                }
              case narrowDep: NarrowDependency[_] =>
                //当这个RDD和它的父RDD时窄依赖时,放到栈中,继续往下寻找
                waitingForVisit.prepend(narrowDep.rdd)
            }
          }
        }
      }
    }
    //循环这个栈,看是否还有RDD,如果还有就说明还没有窄依赖的父RDD
    while (waitingForVisit.nonEmpty) {
      visit(waitingForVisit.remove(0))
    }
    //返回该stage上一层的stage列表
    missing.toList
  }

}

2、获取与父RDDs的依赖关系

abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging {

  //我们的依赖关系和分区将通过调用下面的子类方法获得,并在我们被检查点时被覆盖
  @volatile private var dependencies_ : Seq[Dependency[_]] = _

  //获取此RDD的依赖关系列表,同时考虑RDD是否为检查点。
  final def dependencies: Seq[Dependency[_]] = {
    //OneToOneDependency 是表示父RDD和子RDD的分区之间的一对一依赖关系。
    checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
      if (dependencies_ == null) {
        stateLock.synchronized {
          if (dependencies_ == null) {
            //在此处才真正去获取此RDD的依赖关系
            dependencies_ = getDependencies
          }
        }
      }
      dependencies_
    }
  }

  //由子类实现,以返回此RDD如何依赖父RDD。此方法只会被调用一次,因此在其中实现耗时的计算是安全的。
  //因此不同形态的rdd 获取依赖关系的方法也不同,它的子类有
  //CoGroupedRDD 有OneToOneDependency (当父子RDD分区数一样时,也就是可以认为调整分区数来调整某些场景的宽窄依赖) 也有 ShuffleDependency
  //CoalescedRDD 只有 NarrowDependency
  //ShuffledRDD  只有ShuffleDependency,需要序列化管理器
  //SubtractedRDD 有OneToOneDependency (当父子RDD分区数一样时) 也有 ShuffleDependency
  //CartesianRDD  只有固定的两个NarrowDependency
  //UnionRDD RangeDependency 它 继承了 NarrowDependency 时窄依赖 (表示父RDD和子RDD中分区范围之间的一对一依赖关系)
  //CoGroupedRDD 、SubtractedRDD  宽窄依赖都有可能,可以通过父子分区数调节
  //CoalescedRDD 、CartesianRDD  、UnionRDD 都是窄依赖关系 
  //ShuffledRDD  必然是宽依赖,因为都开始准备序列化管理器拉数据了
  protected def getDependencies: Seq[Dependency[_]] = deps


}

//继承了一个NarrowDependency 有一个未实现的方法 即: 获取父rdd的分区数量
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}

 我们根据一个例子来捋一下

/**
 * 该程序只作为学习用,没有任何业务知识哈
 */
object StageDivision {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("StageDivision").setMaster("local")
    val sc = new SparkContext(conf)
    val sourceRdd1 = sc.textFile("file/word_count_data.txt")
    val sourceRdd2 = sc.textFile("file/word_count_data2.txt")
    val sourceRdd3 = sc.textFile("file/word_count_data3.txt")
    val stepOneRdd = sourceRdd1.filter(_.length>10).flatMap(_.split(","))
      .map(x=>{(x,1)}).groupByKey().map(data=>{
      (data._1,data._2.filter(_.>(5)).toList.length)
    })
    val stepTwoRdd = sourceRdd2.flatMap(_.split(",")).map(x=>{(x,10)})
    val stepThreeRdd = sourceRdd3.flatMap(_.split(",")).map(x=>{(x,20)}).reduceByKey(_+_)
    val unionStepOneTwoRdd = stepOneRdd.union(stepTwoRdd)
    val countRdd = unionStepOneTwoRdd.reduceByKey(_+_).union(stepThreeRdd)
    val resultMap = countRdd.collect().toMap
    sc.stop()
  }

该程序的DAG如下图:

1、submitStage(Stage3)

2、根据RDD依赖关系找上一层Stage:Stage2和Stage1

3、执行submitStage(Stage2)和submitStage(Stage1)

4、根据RDD依赖关系找Stage2和Stage1的上一层Stage,

        Stage2没有上一层了,执行submitMissingTasks(stage2, jobId.get)

        Stage1还有上一层Stage:执行submitStage(Stage0)

5、根据RDD依赖关系找Stage0的上一层Stage

        Stage0没有上一层了,执行submitMissingTasks(stage0, jobId.get)

注意:其中只有Stage3为ResultStage,其他Stage均为ShuffleMapStage

四、提交Task

当根据RDD的依赖关系划分完Stage,就开始从每个分支的最外层Stage提交Task了,即执行submitMissingTasks(最外层Stage)

private[spark] class DAGScheduler(...){

  //决定任务是否可以将输出提交到HDFS。使用“第一提交者获胜”策略
  //OutputCommitCoordinator在 driver 和 executor 中都被实例化。在executor 上,它配置了对driver OutputCommitCoordinatorEndpoint的引用,因此提交输出的请求将被转发到driver的OutputCommitCoordinator。
  private [scheduler] val outputCommitCoordinator = env.outputCommitCoordinator

  private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
    //计算出分区ID的索引Seq。
    val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

    //当一个Stage 启动会有DAGScheduler调度,并对Stage进行初始化
    stage match {
        case s: ShuffleMapStage =>
            outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)

        case s: ResultStage =>
            outputCommitCoordinator.stageStart(
              stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)


    //计算每一个分区的所有位置
    val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
      stage match {
        case s: ShuffleMapStage =>
          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
        case s: ResultStage =>
          partitionsToCompute.map { id =>
            val p = s.partitions(id)
            (id, getPreferredLocs(stage.rdd, p))
          }.toMap
      }
    }
    
    //为此 Stage 创建新的尝试 ,也就是Stage有失败重试的机制
    stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)

    //为 Task 广播二进制文件,用于将 Task 分派给 executor。
    //请注意,我们广播了RDD的序列化副本,对于每个Task,我们将对其进行反序列化,
    //这意味着每个任务都会得到一个不同的RDD副本。这在可能修改闭包中引用的对象状态的任务之间提供了更强的隔离。
    //这在Hadoop中是必要的,因为JobConf/Configuration对象不是线程安全的。
    var taskBinary: Broadcast[Array[Byte]] = null
    var partitions: Array[Partition] = null
    try {
      //对于ShuffleMapTask 序列化并广播  (rdd, shuffleDep)
      //对于ResultTask 序列化并广播 (rdd, func)
      var taskBinaryBytes: Array[Byte] = null
      RDDCheckpointData.synchronized {
        taskBinaryBytes = stage match {
          case stage: ShuffleMapStage =>
            JavaUtils.bufferToArray(
              closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
          case stage: ResultStage =>
            JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
        }

        partitions = stage.rdd.partitions
      }
      //如果Task序列化后的大小 > 1 M 就会发出警告
      if (taskBinaryBytes.length > TaskSetManager.TASK_SIZE_TO_WARN_KIB * 1024) {
        logWarning(s"Broadcasting large task binary with size " +
          s"${Utils.bytesToString(taskBinaryBytes.length)}")
      }
      //将序列化后的Task广播到每个Executor
      //有多分区就有多少Task,虽然每个Task会按照分区位置移动到最佳的Executor,但是它们的计算逻辑是一样的,因此可以直接广播
      taskBinary = sc.broadcast(taskBinaryBytes)
    }

    //ShuffleMapStage  ---对应---> ShuffleMapTask
    //ResultStage   ----对应--->  ResultTask
    val tasks: Seq[Task[_]] = try {
      val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
      stage match {
        case stage: ShuffleMapStage =>
          partitionsToCompute.map { id =>
            new ShuffleMapTask(...)
          }

        case stage: ResultStage =>
          partitionsToCompute.map { id =>
            new ResultTask(...)
          }
      }
    }

    //提交任务
    //会调用 TaskSchedulerImpl.submitTasks() 是在构建SparkContext时就指定好的
    taskScheduler.submitTasks(new TaskSet(...))

  }


  //获取与特定RDD的分区关联的局部性信息
  //该方法时一个递归方法,知道找到这个Stage的最除的那个rdd,然后获取每个分区的最佳位置
  //根据不同rdd的类型有不同的算法,我们单拿出来分析下
  private[spark]
  def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
    getPreferredLocsInternal(rdd, partition, new HashSet)
  }

  private def getPreferredLocsInternal(
      rdd: RDD[_],
      partition: Int,
      visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
    //如果分区已被访问,则无需重新访问
    if (!visited.add((rdd, partition))) {
      return Nil
    }

    //如果分区已经缓存了,那么返回缓存的地址
    //如果你的rdd会被多个rdd使用,那么可以缓存起来,其他rdd使用时可以直接从缓存拿数据
    val cached = getCacheLocs(rdd)(partition)
    if (cached.nonEmpty) {
      return cached
    }
    // 获取分区的首选位置,同时考虑RDD是否为检查点。
    // 如果rdd设置了首选位置,那么直接使用它
    val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
    if (rddPrefs.nonEmpty) {
      return rddPrefs.filter(_ != null).map(TaskLocation(_))
    }

    //如果RDD具有窄依赖关系,请选择具有任何放置偏好的第一个窄依赖关系的第一个分区。
    //理想情况下,我们会根据传输大小进行选择,但现在就可以了。
    //递归方法:同一个Stage的分区也都是窄依赖的,因此需要获取到这个Stage的第一个RDD的那个分区中的最佳位置
    rdd.dependencies.foreach {
      case n: NarrowDependency[_] =>
        for (inPart <- n.getParents(partition)) {
          val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
          if (locs != Nil) {
            return locs
          }
        }

      case _ =>
    }

    Nil
  }

}

Task判断分发到哪个Executor

从上面我们已经知道Task的个数 = 这个Stage的最后的那个RDD的分区数量,且需要对每个分区进行递归找到这个Stage最初的那个RDD,找到其数据的存放位置,如果这个位置所在节点也正好有启动的存活的Executor,那么这就是Task要分发的目的地了,下面我们详细看下不同RDD类型对应的位置计算逻辑

1、HadoopRDD

  private[spark] def convertSplitLocationInfo(
       infos: Array[SplitLocationInfo]): Option[Seq[String]] = {
    Option(infos).map(_.flatMap { loc =>
      val locationStr = loc.getLocation
      if (locationStr != null && locationStr != "localhost") {
        //优先取在内存中存放的数据节点上的Excutor
        if (loc.isInMemory) {
          logDebug(s"Partition $locationStr is cached by Hadoop.")
          Some(HDFSCacheTaskLocation(locationStr).toString)
        } else {
          Some(HostTaskLocation(locationStr).toString)
        }
      } else {
        None
      }
    })
  }

2、ShuffledRDD

  //返回在给定Shuffle中运行给定map输出分区的首选主机,即该分区输出最多的节点。
  //如果映射输出是预合并的,那么如果合并率高于阈值,则返回合并块所在的节点。
  def getPreferredLocationsForShuffle(dep: ShuffleDependency[_, _, _], partitionId: Int)
      : Seq[String] = {
    val shuffleStatus = shuffleStatuses.get(dep.shuffleId).orNull
    if (shuffleStatus != null) {
      //检查map输出是否已预合并,合并比率是否高于阈值。如果是这样,合并块的位置是首选位置
      //判断是否启用基于推送的Shuffle
      //满足以下条件:
      //    提交应用程序以在YARN模式下运行
      //    已启用外部洗牌服务
      //    IO加密已禁用
      //    序列化器(如KryoSerialer)支持重新定位序列化对象
      val preferredLoc = if (pushBasedShuffleEnabled) {
        shuffleStatus.withMergeStatuses { statuses =>
          val status = statuses(partitionId)
          val numMaps = dep.rdd.partitions.length
          if (status != null && status.getNumMissingMapOutputs(numMaps).toDouble / numMaps
            <= (1 - REDUCER_PREF_LOCS_FRACTION)) {
            Seq(status.location.host)
          } else {
            Nil
          }
        }
      } else {
        Nil
      }
      if (preferredLoc.nonEmpty) {
        preferredLoc
      } else {
        //是否计算reduce任务的局部偏好 spark.shuffle.reduceLocality.enabled 默认 true
        //rdd的分区数 < 1000 (map和reduce任务的数量,超过此数量,我们不会根据map输出大小分配首选位置。我们限制了分配首选位置的作业的大小,因为按大小计算顶部位置变得昂贵。)
        //也就是如果shuffle过程中分区数大于1000就没有什么最佳位置的概念了,也就造成了速度的不确定性
        if (shuffleLocalityEnabled && dep.rdd.partitions.length < SHUFFLE_PREF_MAP_THRESHOLD &&
          dep.partitioner.numPartitions < SHUFFLE_PREF_REDUCE_THRESHOLD) {
          val blockManagerIds = getLocationsWithLargestOutputs(dep.shuffleId, partitionId,
            dep.partitioner.numPartitions, REDUCER_PREF_LOCS_FRACTION)
          if (blockManagerIds.nonEmpty) {
            blockManagerIds.get.map(_.host)
          } else {
            Nil
          }
        } else {
          Nil
        }
      }
    } else {
      Nil
    }
  }

五、总结 

1、线性解析程序中的代码,遇到Action算子调用SparkContext的runJob(),有几个Action算子就会产生几个Job

2、转交给DAGScheduler提交Job

3、DAGScheduler先为调用Action算子的RDD创建一个ResultStage

4、以ResultStage为其实递归调用submitStage(Stage)获取上一层的Stage,直到没有依赖关系(详细请看第三部分:Stage划分)(只有最后一个Stage叫ResultStage,其他Stage叫ShuffleMapStage )

5、从最前一层的Stage依次处理,计算分区数量以及每份分区对应数据的最佳位置的节点上的Executor,因为最终Task是要发到Executor执行的(每个Stage的第一个RDD类型不同计算最佳位置的方式也不同,详细看第四部分中的:Task判断分发到哪个Executor)

6、将Task逻辑序列化并交由TaskScheduler进行调度

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2092199.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

leetcode算法题之 K 个一组翻转链表

照我说这道题其实是披着困难皮的中等题目&#xff0c;问题如下&#xff1a; 题目地址 给你链表的头节点 head &#xff0c;每 k 个节点一组进行翻转&#xff0c;请你返回修改后的链表。 k 是一个正整数&#xff0c;它的值小于或等于链表的长度。如果节点总数不是 k 的整数倍…

深入解析多商户商城系统源码:如何开发直播商城小程序?

本篇文章&#xff0c;小编将深入解析多商户商城系统源码的关键技术&#xff0c;并详细探讨如何基于这些源码开发一个功能完善的直播商城小程序。 一、多商户商城系统源码的核心构架 多商户商城系统源码的核心在于其能够支持多个商户独立运营&#xff0c;但同时又在一个统一的平…

(转载)内存分配器101——写一个简单的内存分配器

文章目录 前提正文Malloc()free()calloc()realloc() 前提 之前学习过手写一个简单的内存分配器&#xff0c;原文是英文的&#xff0c;当初学习的时候便将英文翻译为中文的&#xff0c;方便阅读&#xff0c;当然和原文相比少了点味道。今天整理资料的时候看到了自己的翻译&…

为什么越来越多的助贷中介公司做债务重组?

大家有没有注意到个现象&#xff1f;现在越来越多的助贷中介公司和专门做债务重组的公司一起“合作”了。有的是接了单转手给重组公司&#xff0c;有的则是亲自下场&#xff0c;用自有资金做起了重组的事情。为什么会这样呢&#xff1f;好端端的贷款中介不做&#xff0c;偏要蹚…

RabbitMQ练习(Remote procedure call (RPC))

1、RabbitMQ教程 《RabbitMQ Tutorials》https://www.rabbitmq.com/tutorials 2、环境准备 参考&#xff1a;《RabbitMQ练习&#xff08;Hello World&#xff09;》。 确保RabbitMQ、Sender、Receiver容器正常安装和启动。 rootk0test1:~# docker run -it --rm --name rab…

前端内存泄露案例与解决方案

什么是内存泄漏&#xff1f; 内存泄露&#xff08;Memory Leaks&#xff09;&#xff1a;是指应用程序已经不再需要的内存&#xff0c;由于某种原因未返回给操作系统或者空闲内存池&#xff08;Pool of Free Memory&#xff09;。 内存泄露可能带来的问题&#xff1a;变慢、卡…

SAP LE学习笔记07 - MM与WM跨模块收货到仓库的流程中 如何实现 先上架再入库

上一章讲了LE中收货的一些特殊情况&#xff1a; 1&#xff0c;MM模块收货时&#xff0c;特别移动指标来标识的物料直接产生TO 2&#xff0c;MM中直接收货到仓库的固定Storage Bin(棚番)上 SAP LE学习笔记06 - MM与WM跨模块收货到仓库的流程中 带特别移动指标的物料也可以直接…

spring security 会话管理

一、简介 当浏览器调用登录接口登录成功后&#xff0c;服务端会和浏览器之间建立一个会话(Session)浏览器在每次发送请求时都会携带一个 Sessionld&#xff0c;服务端则根据这个 Sessionld 来判断用户身份当浏览器关闭后&#xff0c;服务端的 Session 并不会自动销毁&#xff0…

结构型设计模式-适配器(adapter)模式-python实现

设计模式汇总&#xff1a;查看 通俗示例 想象一下&#xff0c;你刚从国外带回一台最新的笔记本电脑&#xff0c;但是你发现它的电源插头是德标插头&#xff0c;而家里的电源插座是中式插座&#xff0c;这时怎么办呢&#xff1f;你需要一个电源适配器来将德标插头转换成中式插座…

“萌宠经济”全球化浪潮:宠物品牌如何利用TikTok达人破局出海

在全球“萌宠经济”不断升温的背景下&#xff0c;宠物品牌出海成为了重要的战略。随着市场的增长和消费者对宠物产品的需求增加&#xff0c;品牌需要寻找有效的方式进入新的海外市场。在这种情况下&#xff0c;TikTok平台的崛起和宠物达人的影响力成为了宠物品牌破局出海的关键…

数据结构与算法(快速基础C++版)

数据结构与算法&#xff08;快速基础C版&#xff09; 1. 基本概念第1章 绪论1.1 数据结构的研究内容1.2 基本概念和术语1.2.1 数据、数据元素、数据项和数据对象1.2.2 数据结构1.2.3 数据类型和抽象数据类型1.2.4 概念小结 1.3 算法和算法分析1.4 总结 2. 基本的数据结构第2章 …

【PyTorch常用库函数】一文教你快速上手torch.abs()函数:获取张量的绝对值

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏: 《C干货基地》《粉丝福利》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 引言 在深度学习领域&#xff0c;PyTorch是一个非常受欢迎的框架&#xff0c;它提供了丰富的库函数来支持各种复杂的计算任务。…

利用Leaflet.js创建交互式地图:多种形状单个区域绘制

引言 在地图应用开发中&#xff0c;用户经常需要对特定区域进行标识和规划。本文将深入探讨如何利用Vue.js的响应式特性与Leaflet.js的地图功能&#xff0c;打造一个支持多边形、矩形、圆形等多种形状绘制的交互式地图编辑器。 功能亮点 自由绘制多边形&#xff1a;用户可以自…

mysql基础语法——个人笔记

0 前言 以前学习且实践过mysql&#xff0c;但后来用得少&#xff0c;随着岁月更替&#xff0c;对其印象渐浅&#xff0c;所以每次需要用时&#xff0c;都会去再看一眼语法规范&#xff0c;然后才能放心动手操作 然而&#xff0c;在信息爆炸的时代&#xff0c;查语法规范时&am…

BUUCTF PWN wp--jarvisoj_level0

第一步 checksec &#xff0c;该题为64位。 分析一下二进制保护机制&#xff1a; Arch: amd64-64-little 这个字段表示二进制程序的架构是 64 位的小端序的 x86-64 架构。小端序意味着低位字节存储在内存中的低地址上&#xff0c;高位字节存储在高地址上。RELRO: No RELRO …

迁移学习之领域自适应(domain adaptation)

比如有一堆有标注的训练数据&#xff0c;这些数 据来自源领域&#xff0c;用这些数据训练出一个模型&#xff0c;这个模型可以用在不一样的领域。在训练的时 候&#xff0c;我们必须要对测试数据所在的目标领域有一些了解。 随着了解的程度不同&#xff0c;领域自适应的方法也不…

(C++ STL)vector类的简单模拟实现与源码展示

vector类的简单模拟实现 一、前言二、vector 的成员变量三、vector 部分函数实现size、capacityreserveresizeinsert 与注意事项erase构造、析构、赋值拷贝 四、vector 源代码 以下代码环境为 VS2022 C。 一、前言 vector类 本质上就是数据结构中的顺序表。(可参考&#xff1…

【最新华为OD机试E卷】boos的收入(100分)-多语言题解-(Python/C/JavaScript/Java/Cpp)

🍭 大家好这里是春秋招笔试突围 ,一枚热爱算法的程序员 ✨ 本系列打算持续跟新华为OD-E/D卷的三语言AC题解 💻 ACM金牌🏅️团队| 多次AK大厂笔试 | 编程一对一辅导 👏 感谢大家的订阅➕ 和 喜欢💗 🍿 最新华为OD机试D卷目录,全、新、准,题目覆盖率达 95% 以上,…

4.负载均衡

文章目录 1.多级部署2.实现请求计数器3.负载均衡3.1服务端负载均衡3.2客户端负载均衡3.3自定义负载均衡3.4负载均衡策略3.5 LoadBalance 原理 4.部署实现 大家好&#xff0c;我是晓星航。今天为大家带来的是 负载均衡 相关的讲解&#xff01;&#x1f600; 1.多级部署 复制一…

C语言 | Leetcode C语言题解之第378题有序矩阵中第K小的元素

题目&#xff1a; 题解&#xff1a; bool check(int **matrix, int mid, int k, int n) {int i n - 1;int j 0;int num 0;while (i > 0 && j < n) {if (matrix[i][j] < mid) {num i 1;j;} else {i--;}}return num > k; }int kthSmallest(int **matri…