数据本地性
object TaskLocality extends Enumeration {
// Process local is expected to be used ONLY within TaskSetManager for now.
val PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY = Value
type TaskLocality = Value
def isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = {
condition <= constraint
}
}
- PROCESS_LOCAL:
要处理的数据在同一个本地进程,
即数据和Task在同一个Excutor JVM中。
这种情况是RDD的数据经过缓存,此时不需要网络传输,是最优locality。(但是数据要先缓存)。
- NODE_LOCAL:
(1)数据和Task在同一节点上的不同executor中;
(2)数据HDFS和Task在同一个结点上,
此时需要进行进程间进行传输,速度比PROCESS_LOCAL略慢。
- NO_PREF:
数据从哪访问都一样,相当于没有数据本地性,一般值从外部数据源读取数据。
- RACK_LOCAL:
数据与Task在同机架的不同节点,此时需要通过网络传输,速度比NODE_LOCAL慢。
- ANY:
数据和Task可能在集群的任何地方,性能最差,一般出现这种情况就该排查原因了
TaskSetManager
TaskSetManager.scala
/** Add a task to all the pending-task lists that it should be on. */
private[spark] def addPendingTask(
index: Int,
resolveRacks: Boolean = true,
speculatable: Boolean = false): Unit = {
// A zombie TaskSetManager may reach here while handling failed task.
if (isZombie) return
val pendingTaskSetToAddTo = if (speculatable) pendingSpeculatableTasks else pendingTasks
for (loc <- tasks(index).preferredLocations) {
loc match {
case e: ExecutorCacheTaskLocation =>
pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
case e: HDFSCacheTaskLocation =>
val exe = sched.getExecutorsAliveOnHost(loc.host)
exe match {
case Some(set) =>
for (e <- set) {
pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
}
logInfo(s"Pending task $index has a cached location at ${e.host} " +
", where there are executors " + set.mkString(","))
case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
", but there are no executors alive there.")
}
case _ =>
}
pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
if (resolveRacks) {
sched.getRackForHost(loc.host).foreach { rack =>
pendingTaskSetToAddTo.forRack.getOrElseUpdate(rack, new ArrayBuffer) += index
}
}
}
if (tasks(index).preferredLocations == Nil) {
pendingTaskSetToAddTo.noPrefs += index
}
pendingTaskSetToAddTo.all += index
}
TaskLocation
TaskLocation.scala
/**
* A location that includes both a host and an executor id on that host.
*/
private [spark]
case class ExecutorCacheTaskLocation(override val host: String, executorId: String)
extends TaskLocation {
override def toString: String = s"${TaskLocation.executorLocationTag}${host}_$executorId"
}
/**
* A location on a host.
*/
private [spark] case class HostTaskLocation(override val host: String) extends TaskLocation {
override def toString: String = host
}
/**
* A location on a host that is cached by HDFS.
*/
private [spark] case class HDFSCacheTaskLocation(override val host: String) extends TaskLocation {
override def toString: String = TaskLocation.inMemoryLocationTag + host
}
private[spark] object TaskLocation {
// We identify hosts on which the block is cached with this prefix. Because this prefix contains
// underscores, which are not legal characters in hostnames, there should be no potential for
// confusion. See RFC 952 and RFC 1123 for information about the format of hostnames.
val inMemoryLocationTag = "hdfs_cache_"
// Identify locations of executors with this prefix.
val executorLocationTag = "executor_"
按照PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY的顺序进行调度task。