一、上下文
《Spark-Task启动流程》中我们讲到了ShuffleMapTask中会对这个Stage的结果进行磁盘的写入,并且从SparkEnv中得到了ShuffleManager,且调用了它的getWriter方法并在这个Stage的入口处(也就是RDD的迭代器数据源处)调用了它的getReader,下面我们来详细分析下ShuffleManager在整个Task中的形态。
二、ShuffleManager
Shuffle System 的可插拔接口。根据spark.shuffle.manager设置,位于SparkEnv中,在driver 和每个executor 上创建ShuffleManager。driver 来注册一个Shuffle,executor(或在driver中本地运行的任务)可以使用它来进行Shuffle Reader和Shuffle Writer
private[spark] trait ShuffleManager {
//注册洗牌,并获得一个句柄,以便将其传递给任务
def registerShuffle[K, V, C](
shuffleId: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle
//为给定分区找一个写入器。被 executor 上的Task调用执行
//一个Stage最后写入时调用
def getWriter[K, V](
handle: ShuffleHandle,
mapId: Long,
context: TaskContext,
metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V]
//获取一个reduce分区范围(startPartition到endPartition-1,包括端点)的读取器,
//以从map输出范围(startMapIndex到endMapIndex-1,包括两端)读取
//被reduce端的Executor上的Task调用
//一个Stage开始计算时调用
final def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
getReader(handle, 0, Int.MaxValue, startPartition, endPartition, context, metrics)
}
}
SortShuffleManager
它是ShuffleManager的唯一子类
在基于排序的Shuffle中,传入记录根据其目标分区ID进行排序,然后写入单个map输出文件,reducer获取此文件的连续区域,以便读取其在map输出中的部分。如果map输出数据太大而无法放入内存,则可以将输出的排序子集溢写到磁盘,并将磁盘上的文件合并以生成最终输出文件。
基于排序的Shuffle有两种不同的写入路径来生成对应的map输出文件:
1、序列化排序:当满足以下三个条件时使用
- Shuffle依赖关系未指定Map侧组合。
- Shuffle序列化程序支持重新定位序列化值(目前KryoSerialer和Spark SQL的自定义序列化程序支持此功能)
-
洗牌产生的输出分区小于或等于16777216(2的24次方)个。
2、非序列化排序:用于处理所有其他情况
序列化排序模式
在序列化排序模式下,传入记录在传递给ShuffleWriter后立即被序列化,并在排序过程中以序列化形式缓冲。此写入路径实现了几个优化:
1、它的排序操作基于序列化的二进制数据,而不是Java对象,这减少了内存消耗和GC开销。此优化要求记录序列化器具有某些属性,以允许序列化记录重新排序,而不需要反序列化。有关更多详细信息,请参阅SPARK-4550,其中首次提出并实施了此优化。
2、它使用一个专门的缓存高效排序器([[ShuffleExternalSorter]])对压缩记录指针和分区ID的数组进行排序。通过在排序数组中每条记录只使用8个字节的空间,这可以将更多的数组放入缓存中。
3、溢写合并过程对属于同一分区的序列化记录块进行操作,在合并过程中不需要对记录进行反序列化。
4、当溢写压缩编解码器支持压缩数据的连接时,溢写合并只是将序列化和压缩的溢写分区连接起来,以产生最终的输出分区。这允许使用高效的数据复制方法,如NIO的“transferTo”,并避免了在合并过程中分配解压缩或复制缓冲区的需要。
private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {
//注册Shuffle
override def registerShuffle[K, V, C](
shuffleId: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
//如果有少于spark.shuffle.sort.bypassMergeThreshold 默认值200 的分区,并且我们不需要map侧聚合,那么直接编写numPartitions文件,并在最后将它们连接起来。这避免了两次进行序列化和反序列化以将溢写的文件合并在一起,这在正常的代码路径中会发生。缺点是一次打开多个文件,因此分配给缓冲区的内存更多。
new BypassMergeSortShuffleHandle[K, V](
shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
//否则,请尝试以序列化形式缓冲映射输出,因为这样更有效:
new SerializedShuffleHandle[K, V](
shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else {
// 如果上面两种情况都不满足,走它:缓冲区数据将以反序列化的形式输出:
new BaseShuffleHandle(shuffleId, dependency)
}
}
//获取一个reduce分区范围(startPartition到endPartition-1,包括端点)的读取器,
//以从map输出范围(startMapIndex到endMapIndex-1,包括两端)读取。如果endMapIndex=Int.MaxValue,则实际endMapIndex将更改为“getMapSizesByExecutorId”中Shuffle的总map输出长度。
override def getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, _, C]]
val (blocksByAddress, canEnableBatchFetch) =
//如果启用了Push-based shuffle 且 rdd不是Barrier 那么可以直接构建迭代器,就不用拉取了
if (baseShuffleHandle.dependency.shuffleMergeEnabled) {
val res = SparkEnv.get.mapOutputTracker.getPushBasedShuffleMapSizesByExecutorId(
handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
(res.iter, res.enableBatchFetch)
} else {
//从mapOutputTracker处获取需要拉取数据的地址
val address = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
(address, true)
}
new BlockStoreShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,
shouldBatchFetch =
canEnableBatchFetch && canUseBatchFetch(startPartition, endPartition, context))
}
//每个分区会获取一个ShuffleWriter
override def getWriter[K, V](
handle: ShuffleHandle,
mapId: Long,
context: TaskContext,
metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {
val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
handle.shuffleId, _ => new OpenHashSet[Long](16))
mapTaskIds.synchronized { mapTaskIds.add(mapId) }
val env = SparkEnv.get
handle match {
case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
new UnsafeShuffleWriter(
env.blockManager,
context.taskMemoryManager(),
unsafeShuffleHandle,
mapId,
context,
env.conf,
metrics,
shuffleExecutorComponents)
case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
new BypassMergeSortShuffleWriter(
env.blockManager,
bypassMergeSortHandle,
mapId,
env.conf,
metrics,
shuffleExecutorComponents)
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
new SortShuffleWriter(other, mapId, context, shuffleExecutorComponents)
}
}
}
在SortShuffleManager中的getReader()中我们可以看到,只有一种ShuffleReader,即:BlockStoreShuffleReader,但其因为ShuffleDependency的不同也会返回不同的Iterator
getWriter()中看到,根据ShuffleHandle可以分为三种ShuffleWriter,即:
unsafeShuffleHandle -> UnsafeShuffleWriter
bypassMergeSortHandle -> BypassMergeSortShuffleWriter
BaseShuffleHandle -> SortShuffleWriter
三、谁注册的ShuffleManager
ShuffleDependency中注册的,并得到了一个ShuffleHandle
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Serializer = SparkEnv.get.serializer,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false,
val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)
extends Dependency[Product2[K, V]] with Logging {
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, this)
}
那么什么时候会调用RDD的getDependencies?
我们回想下之前几篇博客的内容《Spark-SparkSubmit详细过程》、《Spark-driver和executor启动过程》、《Spark-Job启动、Stage划分》是不是在Stage划分时调用了这个方法来判断是否要创建一个ShuffleMapStage。因此在划分Stage时就确定了ShuffleHandle,换言之也就确定了这个Stage最后的结果要选用哪个ShuffleWriter。而ShuffleWriter又是Spark计算中一个大的瓶颈,因此调节ShuffledRDD的ShuffleDependency就成了调优的必要且重要渠道。后面展开分析ShuffleWriter时再具体讲
哪里用到了ShuffleHandle
1、getReader调用时
getReader是在一个Stage中读取上游Stage时调用的也就是ShuffleRDD中的compute()
class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
@transient var prev: RDD[_ <: Product2[K, V]],
part: Partitioner)
extends RDD[(K, C)](prev.context, Nil) {
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
val metrics = context.taskMetrics().createTempShuffleReadMetrics()
SparkEnv.get.shuffleManager.getReader(
dep.shuffleHandle, split.index, split.index + 1, context, metrics)
.read()
.asInstanceOf[Iterator[(K, C)]]
}
}
2、getWriter调用时
getWriter是在一个Stage结束并将数据溢写磁盘时调用的也就是ShuffleWriteProcessor中的write()
private[spark] class ShuffleWriteProcessor extends Serializable with Logging {
def write(
rdd: RDD[_],
dep: ShuffleDependency[_, _, _],
mapId: Long,
context: TaskContext,
partition: Partition): MapStatus = {
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](
dep.shuffleHandle,
mapId,
context,
createMetricsReporter(context))
writer.write(
rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
}
}
四、Shuffle场景描述
通过以上的梳理我们大致画下ShuffleManager在计算Task中的场景