应用程序的执行 && 第 4 章 Spark 任务调度机制
- 应用程序的执行
- (1) RDD 依赖
- shuffledRDD
- ShuffleDependency
- MapPartitionRDD
- OneToOneDependency
- (2) 阶段的划分
- shuffledRDD
- RDD
- ShuffleMapStage
- ResultStage
- (3) 任务的切分
- (4) 任务的调度
- Task
- TaskPool
- (5) 任务的执行
- Task
- Thread
- ThreadPool
- Executor
- 第 4 章 Spark 任务调度机制
- 4.1 Spark 任务调度概述
- 4.2 Spark Stage 级调度
- 4.3 Spark Task 级调度
- 4.3.1 调度策略
- 4.3.2 本地化调度
- 4.3.3 失败重试与黑名单机制
(任意内容)
此处输入任意想输入的内容 |
应用程序的执行
(1) RDD 依赖
shuffledRDD
"ShuffledRDD"是Apache Spark框架中的一个类,用于表示经过Shuffle操作后的分布式数据集。
在Spark中,Shuffle是指将数据重新分区和重新排序的操作。当需要对数据进行重新分组、聚合或排序时,Spark会执行Shuffle操作,将数据重新分配到不同的计算节点上进行处理。Shuffle是Spark中非常关键的操作,对性能和资源消耗有着重要的影响。
"ShuffledRDD"类是表示经过Shuffle操作后的数据集的一种特殊RDD(弹性分布式数据集)。它包含了以下关键特性和功能:
-
数据重新分区:ShuffledRDD将数据重新分区到不同的计算节点上,以便进行后续的数据处理操作。分区的方式可以根据应用程序的需求进行配置。
-
数据重新排序:ShuffledRDD会对数据进行重新排序,以满足应用程序的排序需求。它可以按照指定的排序规则对数据进行排序,或者根据键值对中的键进行排序。
-
Shuffle依赖:ShuffledRDD与上游RDD之间存在Shuffle依赖关系,表示Shuffle操作是由上游RDD生成的。这样可以确保在计算节点发生故障时能够重新计算和恢复Shuffle操作。
-
持久化和序列化:ShuffledRDD可以通过持久化(缓存)来提高性能,并支持数据的序列化和反序列化操作,以提高数据传输效率。
通过使用"ShuffledRDD",开发者可以利用Spark的Shuffle操作来进行数据重分区和重新排序。这对于需要进行数据聚合、分组、排序等操作的应用程序非常重要。ShuffledRDD提供了高度可扩展和容错的特性,使得在大规模数据集上执行Shuffle操作成为可能。
ShuffleDependency
"ShuffleDependency"是Apache Spark中的一个类,表示Spark作业中涉及Shuffle操作的两个阶段之间的依赖关系。
当需要重新分区、聚合或排序数据时,Spark会执行Shuffle操作。"ShuffleDependency"类用于表示这个Shuffle操作的依赖关系,它包含以下关键特性和功能:
-
分区方式:它指定了Shuffle操作期间数据的分区方式。这个信息用于确定输出分区的数量以及数据在分区之间的分布方式。
-
聚合函数:ShuffleDependency允许指定聚合函数,用于在Shuffle操作期间对数据进行聚合。在执行类似reduceByKey或aggregateByKey的操作时非常有用。
-
Shuffle映射阶段:ShuffleDependency表示从映射阶段到减少阶段的依赖关系,Shuffle操作在其中进行。它捕获了映射输出分区与减少输入分区之间的关系。
-
序列化和反序列化:ShuffleDependency支持对Shuffle元数据进行序列化和反序列化,包括分区信息、Shuffle输出和依赖关系。
ShuffleDependency在Spark作业的执行计划中扮演着重要的角色,它捕获了数据Shuffle和重新分配所需的信息。它使得Spark能够在集群上高效执行Shuffle操作,并支持数据聚合、分组和排序等任务。
MapPartitionRDD
"MapPartitionRDD"是Apache Spark中的一个类,用于表示对RDD执行mapPartitions操作后得到的结果。
在Spark中,mapPartitions是一种转换操作,它允许开发者在RDD的每个分区上进行批量操作,而不是逐个元素进行操作。mapPartitions将一个函数应用于每个分区的数据,并生成一个新的RDD。
"MapPartitionRDD"类是表示经过mapPartitions操作后的RDD的一种特殊类型。它包含了以下关键特性和功能:
-
分区信息:MapPartitionRDD继承自父RDD,并保留了父RDD的分区信息。它可以根据父RDD的分区方式创建相应的分区。
-
依赖关系:MapPartitionRDD与父RDD之间存在依赖关系,用于在作业中构建RDD的依赖关系图。
-
数据转换:MapPartitionRDD通过应用用户提供的函数,对每个分区中的数据进行转换。函数将一个迭代器作为输入参数,可以对迭代器中的元素进行任意操作,并返回一个新的迭代器作为输出结果。
-
懒加载:与其他RDD类似,MapPartitionRDD是惰性加载的,只有在执行操作时才会触发计算。
通过使用"MapPartitionRDD",开发者可以对RDD的每个分区进行批量处理,从而提高数据处理的效率。这对于需要在每个分区上进行复杂操作的场景非常有用,例如数据转换、过滤、计算等。
需要注意的是,"MapPartitionRDD"是Spark内部的类,用于表示RDD的内部结构和转换操作,并不直接暴露给用户。用户通常使用RDD的相关转换操作(如mapPartitions)来操作数据,而不需要直接操作"MapPartitionRDD"类。
OneToOneDependency
“OneToOneDependency” 是 Apache Spark 中的一个类,用于表示 RDD 之间的一对一依赖关系。
在 Spark 中,RDD(弹性分布式数据集)是分布式数据的抽象表示,而依赖关系用于描述 RDD 之间的关系和转换操作的依赖。一对一依赖关系表示每个父 RDD 分区都只对应一个子 RDD 分区。
“OneToOneDependency” 类具有以下关键特征和功能:
-
分区映射:每个父 RDD 分区都对应一个子 RDD 分区,它们之间存在一对一的映射关系。
-
依赖关系:OneToOneDependency 表示 RDD 之间的依赖关系,用于构建 RDD 的依赖关系图。
-
转换操作:OneToOneDependency 用于描述一对一的转换操作,表示子 RDD 的每个分区只依赖于父 RDD 的相应分区。
-
懒加载:与其他 RDD 类似,OneToOneDependency 也是惰性加载的,只有在执行操作时才会触发计算。
“OneToOneDependency” 类通常由 Spark 内部使用,在用户层面并不直接暴露。它用于构建 RDD 的依赖关系图,确保在进行转换操作时,每个子 RDD 分区都与相应的父 RDD 分区保持一对一的关系。
(2) 阶段的划分
shuffledRDD
"ShuffledRDD"是Apache Spark框架中的一个类,用于表示经过Shuffle操作后的分布式数据集。
在Spark中,Shuffle是指将数据重新分区和重新排序的操作。当需要对数据进行重新分组、聚合或排序时,Spark会执行Shuffle操作,将数据重新分配到不同的计算节点上进行处理。Shuffle是Spark中非常关键的操作,对性能和资源消耗有着重要的影响。
"ShuffledRDD"类是表示经过Shuffle操作后的数据集的一种特殊RDD(弹性分布式数据集)。它包含了以下关键特性和功能:
-
数据重新分区:ShuffledRDD将数据重新分区到不同的计算节点上,以便进行后续的数据处理操作。分区的方式可以根据应用程序的需求进行配置。
-
数据重新排序:ShuffledRDD会对数据进行重新排序,以满足应用程序的排序需求。它可以按照指定的排序规则对数据进行排序,或者根据键值对中的键进行排序。
-
Shuffle依赖:ShuffledRDD与上游RDD之间存在Shuffle依赖关系,表示Shuffle操作是由上游RDD生成的。这样可以确保在计算节点发生故障时能够重新计算和恢复Shuffle操作。
-
持久化和序列化:ShuffledRDD可以通过持久化(缓存)来提高性能,并支持数据的序列化和反序列化操作,以提高数据传输效率。
通过使用"ShuffledRDD",开发者可以利用Spark的Shuffle操作来进行数据重分区和重新排序。这对于需要进行数据聚合、分组、排序等操作的应用程序非常重要。ShuffledRDD提供了高度可扩展和容错的特性,使得在大规模数据集上执行Shuffle操作成为可能。
RDD
RDD(Resilient Distributed Dataset)是Apache Spark中的一个核心概念,是Spark的主要数据抽象。
RDD是一个分布式的、不可变的数据集合,可以在Spark集群中并行计算和操作。它具有以下特性:
-
弹性(Resilient):RDD具有容错性,即当节点发生故障时,可以通过RDD的血统(lineage)信息重新计算丢失的数据,保证数据的可靠性和可恢复性。
-
分布式(Distributed):RDD可以分布在集群中的多个节点上,并支持并行计算。每个节点上的数据可以独立地进行计算和处理。
-
不可变(Immutable):RDD是不可变的数据集合,一旦创建就不能进行修改。每次对RDD进行转换操作时,都会生成一个新的RDD。
-
延迟计算(Lazy Evaluation):RDD具有惰性计算的特性,只有在需要返回结果或触发动作操作时才会进行实际的计算。
RDD支持丰富的转换操作和动作操作。转换操作(Transformation)用于对RDD进行转换和处理,例如map、filter、reduceByKey等。动作操作(Action)用于触发计算并返回结果,例如count、collect、save等。
通过使用RDD,开发者可以利用Spark的分布式计算能力进行大规模数据处理和分析。Spark提供了丰富的API和功能,可以进行复杂的数据操作和算法处理。
需要注意的是,Spark 2.0版本引入了Dataset API,它是对RDD的更高级别抽象,提供了类型安全和优化执行计划。因此,在新的Spark应用程序中,推荐使用Dataset API来处理数据,而不是直接操作RDD。
ShuffleMapStage
"ShuffleMapStage"是Apache Spark中的一个概念,用于表示Spark作业中涉及Shuffle操作的阶段。
在Spark中,Shuffle是指将数据重新分区和重新排序的操作。当需要对数据进行重新分组、聚合或排序时,Spark会执行Shuffle操作,将数据重新分配到不同的计算节点上进行处理。Shuffle是Spark中的一个重要操作,对性能和资源消耗有着重要的影响。
"ShuffleMapStage"表示一个阶段,它包含了一系列的任务(tasks)和对应的Shuffle依赖关系。一个ShuffleMapStage负责对输入数据进行处理和Shuffle操作,并生成Shuffle的输出结果。
以下是"ShuffleMapStage"的一些关键特性和功能:
-
任务划分:ShuffleMapStage将作业中的数据处理任务划分为一组并行执行的任务,每个任务负责处理输入数据的一个分区。
-
Shuffle依赖:ShuffleMapStage表示从数据源(如RDD)到Shuffle操作的依赖关系。它定义了输入数据的分区方式、分区器、聚合函数等信息。
-
数据处理和Shuffle:ShuffleMapStage负责对输入数据进行处理,并执行Shuffle操作,将数据重新分区和重新排序。
-
任务调度和执行:ShuffleMapStage将任务调度到集群中的计算节点上执行,并管理任务的进度和状态。
ShuffleMapStage在Spark作业的执行过程中起着关键的作用。它将数据处理和Shuffle操作组合在一起,确保数据按照需要的方式进行重新分区和重新排序。同时,它还负责管理任务的调度和执行,以实现高效的并行计算。
需要注意的是,"ShuffleMapStage"是Spark内部的概念,用于表示Shuffle操作的阶段和任务划分。对于普通的Spark应用程序开发者来说,更多地关注RDD的转换操作和数据处理逻辑即可,而无需直接操作"ShuffleMapStage"类。
ResultStage
"ResultStage"是Apache Spark中的一个概念,用于表示Spark作业中的结果阶段。
在Spark中,作业(Job)由一系列的阶段(Stage)组成。每个阶段代表了一组相互依赖的任务(Tasks)的集合,这些任务可以并行执行。Spark将作业划分为多个阶段,以便在计算节点上进行并行执行,并通过阶段间的数据传输和依赖关系来实现整个作业的计算流程。
"ResultStage"是其中的一种特殊阶段,它表示最后一个阶段,负责将计算结果返回给应用程序或保存到外部存储系统。
以下是"ResultStage"的一些关键特性和功能:
-
结果计算:ResultStage执行最后的计算操作,将计算结果生成并返回给应用程序或保存到外部存储系统。
-
依赖关系:ResultStage通常依赖前面的阶段,根据前面阶段的计算结果进行进一步的处理。
-
数据传输:ResultStage可能需要从前面的阶段获取数据,进行汇总、合并或聚合等操作,以生成最终的计算结果。
-
任务调度和执行:ResultStage负责将任务调度到集群中的计算节点上执行,并管理任务的进度和状态。
ResultStage在Spark作业的执行过程中起着关键的作用,它表示了作业的最终阶段,并负责生成最终的计算结果。它依赖于前面的阶段,并根据前面阶段的计算结果进行后续的处理和汇总。
需要注意的是,"ResultStage"是Spark内部的概念,用于表示作业的执行流程和结果阶段。对于普通的Spark应用程序开发者来说,更多地关注RDD的转换操作和数据处理逻辑即可,而无需直接操作"ResultStage"类。
(3) 任务的切分
在Spark中,任务的切分是将作业分解为可并行执行的小任务单元的过程。任务切分的目的是将作业中的工作划分为多个任务,以便在集群中的多个计算节点上并行执行。
任务切分通常发生在以下几个阶段:
-
RDD分区:如果作业的输入是一个RDD(弹性分布式数据集),首先需要将RDD划分为多个分区。RDD的分区决定了任务的粒度,每个分区将成为一个独立的任务。
-
作业划分:作业(Job)通常由多个阶段(Stage)组成。每个阶段都可以进一步划分为多个任务。作业划分的依据是根据数据的依赖关系和计算操作的转换关系,将作业划分为多个阶段,其中每个阶段包含一组可以并行执行的任务。
-
数据本地性:在任务切分过程中,Spark还会考虑数据本地性,即尽可能将任务分配到数据所在的计算节点上执行。这有助于减少数据传输的开销,提高任务执行效率。
-
任务调度:任务切分之后,任务需要被调度到集群中的计算节点上执行。Spark的任务调度器将根据集群资源和任务优先级等因素,将任务分发到可用的计算节点上。
需要注意的是,任务切分是由Spark框架自动处理的,开发者无需显式地编写任务切分的代码。Spark根据作业的RDD依赖关系和转换操作,自动将作业切分为多个阶段和任务,并将其调度到集群中执行。
任务切分是Spark作业执行的重要步骤,它允许Spark在分布式环境中实现任务的并行执行和数据处理。通过任务切分,Spark能够充分利用集群资源,提高作业的执行效率和性能。
(4) 任务的调度
Task
在Apache Spark中,任务(Task)是作业(Job)的最小执行单元,代表了在集群中执行的一项工作。每个任务通常处理一个数据分区,并对其应用指定的转换或操作。
以下是有关任务的关键特性和概念:
-
分区处理:每个任务负责处理数据集中的一个分区,分区是数据在集群中分布的基本单位。任务通过在分区上应用指定的函数来执行计算操作。
-
数据本地性:Spark优化任务调度,尽可能将任务分配给存储有相应数据分区的计算节点。这样可以减少数据传输的开销,并提高任务执行的效率。
-
并行执行:Spark以并行的方式在集群中执行任务。它可以在多个计算节点上同时运行多个任务,从而利用集群资源实现高性能的分布式计算。
-
任务依赖:任务之间存在依赖关系,即某些任务需要在其他任务完成后才能执行。这种依赖关系通过RDD的转换操作和数据依赖关系来确定。
-
任务调度:任务调度器负责将任务分配给集群中的可用计算节点。它根据集群资源、任务依赖关系和数据本地性等因素进行调度和分配。
任务在Spark中扮演着关键的角色,它们是作业执行的基本单元,通过并行执行和分布式计算实现大规模数据处理和分析。Spark框架自动处理任务的调度和执行,开发者只需要定义转换操作和构建作业,Spark会自动将其划分为任务并在集群中执行。
需要注意的是,任务是Spark内部的概念,在应用程序代码中通常不需要直接操作任务。开发者主要关注定义转换操作、构建作业流程和优化数据处理逻辑。
TaskPool
在Apache Spark中,TaskPool(任务池)是用于管理和调度任务的组件。它是Spark任务调度器的一部分,负责维护任务的队列和执行状态,并根据可用资源和调度策略来分配任务。
TaskPool具有以下主要功能和特性:
-
任务队列管理:TaskPool维护一个任务队列,按照先进先出(FIFO)或其他调度策略来管理任务的执行顺序。
-
资源管理:TaskPool跟踪可用的计算资源,例如CPU核心、内存等。它根据可用资源的情况来决定分配任务的数量和执行方式。
-
任务调度:TaskPool负责将任务从任务队列中分配给可用的执行器(Executor)或计算节点,并确保任务按照指定的调度策略执行。
-
任务状态跟踪:TaskPool跟踪每个任务的执行状态,例如正在运行、已完成、失败等。它维护任务的状态信息,并提供给任务调度器进行进一步的决策。
-
错误处理和重试:TaskPool捕获任务执行过程中可能出现的错误,并根据配置的策略进行错误处理和任务重试。
TaskPool是Spark任务调度器中的重要组件,确保任务按照指定的调度策略和资源管理原则进行分配和执行。它帮助Spark高效地管理任务的执行顺序、资源利用和错误处理,以实现作业的高性能和可靠性。
需要注意的是,TaskPool是Spark内部的组件,在应用程序代码中通常不需要直接操作。Spark提供了灵活的任务调度和资源管理配置选项,开发者可以根据具体的需求和集群环境进行配置和优化。
(5) 任务的执行
Task
在Apache Spark中,任务(Task)是作业(Job)的最小执行单元,代表了在集群中执行的一项工作。每个任务通常处理一个数据分区,并对其应用指定的转换或操作。
以下是有关任务的关键特性和概念:
-
分区处理:每个任务负责处理数据集中的一个分区,分区是数据在集群中分布的基本单位。任务通过在分区上应用指定的函数来执行计算操作。
-
数据本地性:Spark优化任务调度,尽可能将任务分配给存储有相应数据分区的计算节点。这样可以减少数据传输的开销,并提高任务执行的效率。
-
并行执行:Spark以并行的方式在集群中执行任务。它可以在多个计算节点上同时运行多个任务,从而利用集群资源实现高性能的分布式计算。
-
任务依赖:任务之间存在依赖关系,即某些任务需要在其他任务完成后才能执行。这种依赖关系通过RDD的转换操作和数据依赖关系来确定。
-
任务调度:任务调度器负责将任务分配给集群中的可用计算节点。它根据集群资源、任务依赖关系和数据本地性等因素进行调度和分配。
任务在Spark中扮演着关键的角色,它们是作业执行的基本单元,通过并行执行和分布式计算实现大规模数据处理和分析。Spark框架自动处理任务的调度和执行,开发者只需要定义转换操作和构建作业,Spark会自动将其划分为任务并在集群中执行。
需要注意的是,任务是Spark内部的概念,在应用程序代码中通常不需要直接操作任务。开发者主要关注定义转换操作、构建作业流程和优化数据处理逻辑。
Thread
线程(Thread)是计算机中执行任务的最小单位,是操作系统进行任务调度和执行的基本单元。在多线程编程中,线程允许程序同时执行多个任务,每个任务在独立的线程中运行。
以下是线程的一些关键特性和概念:
-
并发执行:线程使得程序能够并发执行多个任务,每个任务在自己的线程中独立运行。这样可以充分利用多核处理器和系统资源,提高程序的性能和响应能力。
-
上下文切换:当操作系统进行线程调度时,会发生上下文切换。这意味着当前正在执行的线程暂停,保存其状态,并切换到另一个线程的执行。
-
共享资源:线程之间共享进程的资源,如内存、文件和网络连接。这也需要在多线程编程中考虑线程安全性,以避免数据竞争和并发访问问题。
-
同步和互斥:多线程环境下,线程之间可能会访问和修改共享的数据。同步机制和互斥锁等机制用于确保多个线程之间的数据访问顺序和一致性,以避免竞态条件和数据不一致问题。
-
线程调度:线程调度器负责决定线程的执行顺序和优先级。调度器根据不同的调度算法将CPU时间片分配给不同的线程,以实现公平性和高效性。
线程在编程中发挥着重要的作用,特别是在并发和并行计算的场景中。多线程编程可以提高程序的性能和资源利用率,同时也需要注意线程安全和同步机制,以避免潜在的并发问题。
需要注意的是,线程的创建和管理方式依赖于所使用的编程语言和平台。在Java中,可以使用Thread类或Executor框架来创建和管理线程。在Apache Spark中,线程的管理由Spark框架和底层的执行引擎负责,开发者通常无需直接操作线程。
ThreadPool
线程池(ThreadPool)是一种管理和复用线程的机制,它用于执行并管理多个任务。线程池可以提高程序的性能和资源利用率,避免频繁地创建和销毁线程的开销。
以下是线程池的一些关键特性和概念:
-
线程复用:线程池在初始化时会创建一组固定数量的线程,这些线程可以被重复利用来执行多个任务。当一个任务完成后,线程会返回线程池并等待分配新的任务,而不是销毁线程。
-
任务队列:线程池维护一个任务队列,用于存储待执行的任务。当线程池中的线程空闲时,它们从任务队列中获取任务并执行。
-
线程管理:线程池负责管理线程的生命周期,包括创建、分配、执行和回收线程。它还可以根据需要动态调整线程池的大小。
-
并发控制:线程池通过控制并发执行的线程数量来管理系统资源。可以根据需求配置线程池的最大线程数,以控制同时执行的任务数量。
-
线程池策略:线程池可以采用不同的调度策略,例如固定大小线程池、缓存线程池和工作窃取线程池等,以适应不同类型的任务和负载。
线程池在多线程编程中非常常用,它提供了一种有效的方式来管理线程资源和并发执行的任务。使用线程池可以减少线程创建和销毁的开销,提高程序的性能和响应能力。
在不同的编程语言和平台中,线程池的实现和使用方式可能有所不同。例如,Java提供了ThreadPoolExecutor类来创建和管理线程池,而在Apache Spark中,也使用线程池来管理并发执行的任务。
需要根据具体的应用需求和性能要求,选择适当的线程池配置和策略,以获得最佳的性能和资源利用。同时,需要注意线程安全和共享资源的同步机制,以避免并发访问的问题。
Executor
Executor(执行器)是一种用于管理和执行任务的组件,通常与线程池结合使用。它提供了一种将任务提交给线程池并进行执行的方式,以实现并发执行和资源管理。
以下是Executor的一些关键特性和概念:
-
任务提交:Executor提供了一种将任务提交给线程池执行的接口。通过将任务提交给Executor,可以实现任务的异步执行,而无需手动创建和管理线程。
-
资源管理:Executor负责管理可用的线程资源,并根据任务的数量和优先级来调度和分配线程。它可以控制并发执行的任务数量,以避免资源耗尽或过载。
-
线程调度:Executor使用调度算法来决定任务的执行顺序和优先级。它根据不同的调度策略将任务分配给可用的线程,以实现公平性和高效性。
-
异步执行:通过Executor,可以实现任务的异步执行。任务提交后,可以继续执行其他操作,而不需要等待任务完成。
-
任务状态和结果:Executor跟踪每个任务的执行状态,并提供获取任务结果的方式。它可以通过Future或CompletionService等机制返回任务的执行结果。
Executor在多线程编程中扮演着重要的角色,它提供了一种简化任务管理和并发执行的方式。通过使用Executor,可以将任务的执行与线程的创建和管理解耦,从而实现更高效的并发编程。
需要注意的是,Executor的具体实现和使用方式可能因编程语言和平台而异。例如,在Java中,可以使用Executor框架提供的ThreadPoolExecutor来创建和管理线程池,并通过ExecutorService接口提交任务。而在Apache Spark中,也使用Executor来管理任务的执行和资源分配。
选择适当的Executor配置和调度策略,根据具体的应用需求和性能要求进行调整,可以实现高性能的并发执行和任务管理。同时,还需要注意线程安全和共享资源的同步,以避免并发访问的问题。
第 4 章 Spark 任务调度机制
- 在生产环境下,Spark 集群的部署方式一般为 YARN-Cluster 模式,之后的内核分析内容中我们默认集群的部署方式为 YARN-Cluster 模式。在上一章中我们讲解了 Spark YARNCluster 模式下的任务提交流程,但是我们并没有具体说明 Driver 的工作流程, Driver 线程主 要 是 初 始 化 SparkContext 对 象 , 准 备 运 行 所 需 的 上 下 文 , 然 后 一 方 面 保 持 与ApplicationMaster 的 RPC 连接,通过 ApplicationMaster 申请资源,另一方面根据用户业务逻辑开始调度任务,将任务下发到已有的空闲 Executor 上。
- 当 ResourceManager 向 ApplicationMaster 返回 Container 资源时,ApplicationMaster 就尝试在对应的 Container 上启动 Executor 进程,Executor 进程起来后,会向 Driver 反向注册,注册成功后保持与 Driver 的心跳,同时等待 Driver 分发任务,当分发的任务执行完毕后,将任务状态上报给 Driver。
4.1 Spark 任务调度概述
当 Driver 起来后,Driver 则会根据用户程序逻辑准备任务,并根据 Executor 资源情况
逐步分发任务。在详细阐述任务调度前,首先说明下 Spark 里的几个概念。一个 Spark 应用
程序包括 Job、Stage 以及 Task 三个概念:
- Job 是以 Action 方法为界,遇到一个 Action 方法则触发一个 Job;
- Stage 是 Job 的子集,以 RDD 宽依赖(即 Shuffle)为界,遇到 Shuffle 做一次划分;
- Task 是 Stage 的子集,以并行度(分区数)来衡量,分区数是多少,则有多少个 task。
Spark 的任务调度总体来说分两路进行,一路是 Stage 级的调度,一路是 Task 级的调度,总
体调度流程如下图所示:
Spark RDD 通过其 Transactions 操作,形成了 RDD 血缘(依赖)关系图,即 DAG,最后通过 Action 的调用,触发 Job 并调度执行,执行过程中会创建两个调度器:DAGScheduler和 TaskScheduler。
➢ DAGScheduler 负责 Stage 级的调度,主要是将 job 切分成若干 Stages,并将每个 Stage打包成 TaskSet 交给 TaskScheduler 调度。
➢ TaskScheduler 负责 Task 级的调度,将 DAGScheduler 给过来的 TaskSet 按照指定的调度策略分发到 Executor 上执行,调度过程中 SchedulerBackend 负责提供可用资源,其中SchedulerBackend 有多种实现,分别对接不同的资源管理系统。
- Driver 初始化 SparkContext 过程中,会分别初始化 DAGScheduler、TaskScheduler、SchedulerBackend 以及 HeartbeatReceiver,并启动 SchedulerBackend 以及 HeartbeatReceiver。SchedulerBackend 通过 ApplicationMaster 申请资源,并不断从 TaskScheduler 中拿到合适的Task 分发到 Executor 执行。HeartbeatReceiver 负责接收 Executor 的心跳信息,监控 Executor的存活状况,并通知到 TaskScheduler。
4.2 Spark Stage 级调度
Spark 的任务调度是从 DAG 切割开始,主要是由 DAGScheduler 来完成。当遇到一个Action 操作后就会触发一个 Job 的计算,并交给 DAGScheduler 来提交,下图是涉及到 Job提交的相关方法调用流程图。
-
Job 由最终的 RDD 和 Action 方法封装而成;
-
SparkContext 将 Job 交给 DAGScheduler 提交,它会根据 RDD 的血缘关系构成的 DAG
进行切分,将一个 Job 划分为若干 Stages,具体划分策略是,由最终的 RDD 不断通过
依赖回溯判断父依赖是否是宽依赖,即以 Shuffle 为界,划分 Stage,窄依赖的 RDD 之
间被划分到同一个 Stage 中,可以进行 pipeline 式的计算。划分的 Stages 分两类,一类
叫做 ResultStage,为 DAG 最下游的 Stage,由 Action 方法决定,另一类叫做
ShuffleMapStage,为下游 Stage 准备数据,下面看一个简单的例子 WordCount。
-
Job 由 saveAsTextFile 触发,该 Job 由 RDD-3 和 saveAsTextFile 方法组成,根据 RDD 之
间的依赖关系从 RDD-3 开始回溯搜索,直到没有依赖的 RDD-0,在回溯搜索过程中,RDD-
3 依赖 RDD-2,并且是宽依赖,所以在 RDD-2 和 RDD-3 之间划分 Stage,RDD-3 被划到最
后一个 Stage,即 ResultStage 中,RDD-2 依赖 RDD-1,RDD-1 依赖 RDD-0,这些依赖都是
窄依赖,所以将 RDD-0、RDD-1 和 RDD-2 划分到同一个 Stage,形成 pipeline 操作,。即
ShuffleMapStage 中,实际执行的时候,数据记录会一气呵成地执行 RDD-0 到 RDD-2 的转
化。不难看出,其本质上是一个深度优先搜索(Depth First Search)算法。 -
一个 Stage 是否被提交,需要判断它的父 Stage 是否执行,只有在父 Stage 执行完毕才
能提交当前 Stage,如果一个 Stage 没有父 Stage,那么从该 Stage 开始提交。Stage 提交时会
将 Task 信息(分区信息以及方法等)序列化并被打包成 TaskSet 交给 TaskScheduler,一个Partition 对应一个 Task,另一方面 TaskScheduler 会监控 Stage 的运行状态,只有 Executor 丢
失或者 Task 由于 Fetch 失败才需要重新提交失败的 Stage 以调度运行失败的任务,其他类型
的 Task 失败会在 TaskScheduler 的调度过程中重试。 -
相对来说 DAGScheduler 做的事情较为简单,仅仅是在 Stage 层面上划分 DAG,提交
Stage 并监控相关状态信息。TaskScheduler 则相对较为复杂,下面详细阐述其细节。
4.3 Spark Task 级调度
- Spark Task 的调度是由 TaskScheduler 来完成,由前文可知,DAGScheduler 将 Stage 打
包到交给 TaskScheTaskSetduler,TaskScheduler 会将 TaskSet 封装为 TaskSetManager 加入到
调度队列中,TaskSetManager 结构如下图所示。
TaskSetManager 负 责监控 管理 同一 个 Stage 中的 Tasks, TaskScheduler 就是以TaskSetManager 为单元来调度任务。
-
前面也提到,TaskScheduler 初始化后会启动 SchedulerBackend,它负责跟外界打交道,接收 Executor 的注册信息,并维护 Executor 的状态,所以说 SchedulerBackend 是管“粮食”的,同时它在启动后会定期地去“询问”TaskScheduler 有没有任务要运行,也就是说,它会定期地“问”TaskScheduler“我有这么余粮,你要不要啊”,TaskScheduler 在 SchedulerBackend“问”它的时候,会从调度队列中按照指定的调度策略选择 TaskSetManager 去调度运行,大致方法调用流程如下图所示:
-
上图中,将 TaskSetManager 加入 rootPool 调度池中之后,调用 SchedulerBackend 的riviveOffers 方法给 driverEndpoint 发送 ReviveOffer 消息;driverEndpoint 收到 ReviveOffer 消息后调用 makeOffers 方法,过滤出活跃状态的 Executor(这些 Executor 都是任务启动时反向注册到 Driver 的 Executor),然后将 Executor 封装成 WorkerOffer 对象;准备好计算资源(WorkerOffer)后,taskScheduler 基于这些资源调用 resourceOffer 在 Executor 上分配 task。
4.3.1 调度策略
TaskScheduler 支持两种调度策略,一种是 FIFO,也是默认的调度策略,另一种是 FAIR。在 TaskScheduler 初始化过程中会实例化 rootPool,表示树的根节点,是 Pool 类型。
- FIFO 调度策略
如果是采用 FIFO 调度策略,则直接简单地将 TaskSetManager 按照先来先到的方式入队,出队时直接拿出最先进队的 TaskSetManager,其树结构如下图所示,TaskSetManager 保存在一个 FIFO 队列中。
- FAIR 调度策略
FAIR 调度策略的树结构如下图所示:
- FAIR 模式中有一个 rootPool 和多个子 Pool,各个子 Pool 中存储着所有待分配的TaskSetMagager。
- 在 FAIR 模式中,需要先对子 Pool 进行排序,再对子 Pool 里面的 TaskSetMagager 进行排序,因为 Pool 和 TaskSetMagager 都继承了 Schedulable 特质,因此使用相同的排序算法。排序过程的比较是基于 Fair-share 来比较的,每个要排序的对象包含三个属性: runningTasks值(正在运行的Task数)、minShare值、weight值,比较时会综合考量runningTasks值,minShare 值以及 weight 值。注意,minShare、weight 的值均在公平调度配置文件 fairscheduler.xml 中被指定,调度池在构建阶段会读取此文件的相关配置。
- 如果A对象的runningTasks大于它的minShare,B对象的runningTasks小于它的minShare,
那么 B 排在 A 前面;(runningTasks 比 minShare 小的先执行) - 如果 A、B 对象的 runningTasks 都小于它们的 minShare,那么就比较 runningTasks 与
minShare 的比值(minShare 使用率),谁小谁排前面;(minShare 使用率低的先执行) - 如果 A、B 对象的 runningTasks 都大于它们的 minShare,那么就比较 runningTasks 与
weight 的比值(权重使用率),谁小谁排前面。(权重使用率低的先执行) - 如果上述比较均相等,则比较名字。
整体上来说就是通过minShare和weight这两个参数控制比较过程,可以做到让minShare使用率和权重使用率少(实际运行 task 比例较少)的先运行。FAIR 模式排序完成后,所有的 TaskSetManager 被放入一个 ArrayBuffer 里,之后依次被取出并发送给 Executor 执行。从调度队列中拿到 TaskSetManager 后,由于 TaskSetManager 封装了一个 Stage 的所有Task,并负责管理调度这些 Task,那么接下来的工作就是 TaskSetManager 按照一定的规则一个个取出 Task 给 TaskScheduler,TaskScheduler 再交给 SchedulerBackend 去发到 Executor上执行。
4.3.2 本地化调度
- DAGScheduler 切割 Job,划分 Stage, 通过调用 submitStage 来提交一个 Stage 对应的tasks,submitStage 会调用 submitMissingTasks,submitMissingTasks 确定每个需要计算的 task 的 preferredLocations,通过调用 getPreferrdeLocations()得到 partition 的优先位置,由于一个partition 对应一个 Task,此 partition 的优先位置就是 task 的优先位置,对于要提交到TaskScheduler 的 TaskSet 中的每一个 Task,该 task 优先位置与其对应的 partition 对应的优先位置一致。
- 从调度队列中拿到 TaskSetManager 后,那么接下来的工作就是 TaskSetManager 按照一定的规则一个个取出 task 给 TaskScheduler,TaskScheduler 再交给 SchedulerBackend 去发到Executor 上执行。前面也提到,TaskSetManager 封装了一个 Stage 的所有 Task,并负责管理调度这些 Task。根据每个 Task 的优先位置,确定 Task 的 Locality 级别,Locality 一共有五种,优先级由高到低顺序:
- 在调度执行时,Spark 调度总是会尽量让每个 task 以最高的本地性级别来启动,当一个task 以 X 本地性级别启动,但是该本地性级别对应的所有节点都没有空闲资源而启动失败,此时并不会马上降低本地性级别启动而是在某个时间长度内再次以 X 本地性级别来启动该task,若超过限时时间则降级启动,去尝试下一个本地性级别,依次类推。可以通过调大每个类别的最大容忍延迟时间,在等待阶段对应的 Executor 可能就会有相应的资源去执行此 task,这就在在一定程度上提到了运行性能。
4.3.3 失败重试与黑名单机制
- 除了选择合适的 Task 调度运行外,还需要监控 Task 的执行状态,前面也提到,与外部打交道的是SchedulerBackend,Task 被提交到 Executor 启动执行后,Executor 会将执行状态上报给SchedulerBackend,SchedulerBackend 则告诉 TaskScheduler,TaskScheduler 找到该Task 对应的 TaskSetManager,并通知到该 TaskSetManager,这样 TaskSetManager 就知道 Task的失败与成功状态,对于失败的 Task,会记录它失败的次数,如果失败次数还没有超过最大重试次数,那么就把它放回待调度的 Task 池子中,否则整个 Application 失败。在记录 Task 失败次数过程中,会记录它上一次失败所在的 Executor Id 和 Host,这样下次再调度这个 Task 时,会使用黑名单机制,避免它被调度到上一次失败的节点上,起到一定的容错作用。黑名单记录 Task 上一次失败所在的 Executor Id 和 Host,以及其对应的“拉黑”时间,“拉黑”时间是指这段时间内不要再往这个节点上调度这个 Task 了。