一、概览
《Spark-环境启动》中讲了Spark环境的启动,以及Master和多个Worker之间时基于Endpoint之间的Netty通信,也被称为Spark的RpcEnv。在此基础上我们来看下spark-submit是如何将我们写的Spark程序调起的
二、启动脚本示例:
spark-submit \
--master spark://node1:7077 \
--deploy-mode cluster \
--driver-memory 100g \
--executor-memory 20g \
--num-executors 120 \
--executor-cores 3 \
--conf spark.shuffle.io.maxRetries=2 \
--conf spark.xx.xx.xx=xx \
--class com.xx.xxx \
--files "/data/xxxx" \
/xxx/project/xxx/spark-1.1-SNAPSHOT.jar \
p1 p2 p3 …… pn \
三、spark-submit.sh
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
四、SparkSubmit
//启动Spark应用程序的主网关。
//此程序处理设置具有相关Spark依赖关系的类路径,并在Spark支持的不同集群管理器和部署模式上提供一个层。
private[spark] class SparkSubmit extends Logging {
//Yarn模式下的下一个执行的主类
private[deploy] val YARN_CLUSTER_SUBMIT_CLASS =
"org.apache.spark.deploy.yarn.YarnClusterApplication"
//STANDALONE 下的rest模式 或 Mesos 模式的下一个执行的主类
private[deploy] val REST_CLUSTER_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName()
//STANDALONE 下的传统模式的下一个执行的主类 我们主要分析这一个
private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()
//K8s下的下一个执行的主类
private[deploy] val KUBERNETES_CLUSTER_SUBMIT_CLASS =
"org.apache.spark.deploy.k8s.submit.KubernetesClientApplication"
override def main(args: Array[String]): Unit = {
val submit = new SparkSubmit() {...}
submit.doSubmit(args)
}
def doSubmit(args: Array[String]): Unit = {
//解析命令行参数
val appArgs = parseArguments(args)
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
case SparkSubmitAction.PRINT_VERSION => printVersion()
}
}
private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
def doRunMain(): Unit = {
runMain(args, uninitLog)
}
doRunMain()
}
//使用submit参数运行子类的main方法。
//这分为两个步骤。首先,我们通过设置适当的类路径、系统属性和应用程序参数来准备启动环境,以便根据集群管理器和部署模式运行子主类。
//请注意,如果我们运行的是集群部署模式或python应用程序,那么这个主类将不是用户提供的。 (需要跳几次才真正运行自己的main方法)
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
mainClass = Utils.classForName(childMainClass)
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
} else {
new JavaMainApplication(mainClass)
}
app.start(childArgs.toArray, sparkConf)
}
//得到一个 4-tuple:
// 1、子进程的参数
// 2、进入子类的路径列表
// 3、系统环境映射
// 4、子进程的主类
private[deploy] def prepareSubmitEnvironment(
//设置集群管理
val clusterManager: Int = args.master match {
case "yarn" => YARN //1
case m if m.startsWith("spark") => STANDALONE // 2
case m if m.startsWith("mesos") => MESOS // 4
case m if m.startsWith("k8s") => KUBERNETES // 16
case m if m.startsWith("local") => LOCAL // 8
case _ =>
error("Master must either be yarn or start with spark, mesos, k8s, or local")
-1
}
//设置部署模式;默认为客户端模式
//在客户端模式下,需要下载远程文件
//在YARN中运行时,对于某些具有scheme的远程资源:
// 1.Hadoop FileSystem不支持它们。
// 2.我们使用“spark.yarn.dist.forceDownloadSchemes”明确绕过Hadoop文件系统。
//在添加到YARN的分布式缓存之前,我们将把它们下载到本地磁盘。
//对于yarn客户端模式,由于我们已经用上面的代码下载了它们,所以我们只需要找出本地路径并替换远程路径。
//如果我们正在运行一个python应用程序,请将main类设置为我们特定的python运行器
//非PySpark应用程序可能需要Python依赖关系。
//在R应用程序的YARN模式下,将SparkR包存档和包含所有构建的R库的R包存档添加到存档中,以便它们可以与作业一起分发
......省略......
//在客户端模式下,直接启动应用程序主类
//此外,将主应用程序jar和任何添加的jar(如果有的话)添加到类路径中
if (deployMode == CLIENT) {
childMainClass = args.mainClass
if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
childClasspath += localPrimaryResource
}
if (localJars != null) { childClasspath ++= localJars.split(",") }
}
//在独立集群模式下,使用REST客户端提交应用程序(Spark 1.3+)。所有Spark参数都应通过系统属性传递给客户端。
if (args.isStandaloneCluster) {
if (args.useRest) {
childMainClass = REST_CLUSTER_SUBMIT_CLASS
} else {
//在传统的独立集群模式下,使用客户端作为用户类的包装器
childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS
}
}
//在yarn模式下,使用yarn.Client作为用户类的包装器
if (isYarnCluster) {
childMainClass = YARN_CLUSTER_SUBMIT_CLASS
}
}
}
五、ClientApp
private[spark] class ClientApp extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = {
val driverArgs = new ClientArguments(args)
//rpc通信的ask消息超时时间默认为 10s
if (!conf.contains(RPC_ASK_TIMEOUT)) {
conf.set(RPC_ASK_TIMEOUT, "10s")
}
Logger.getRootLogger.setLevel(driverArgs.logLevel)
//构建RpcEnv通信环境
val rpcEnv =
RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
//获取masterEndpoints
val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))
//设置ClientEndpoint 默认会调它的 onStart()
rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))
rpcEnv.awaitTermination()
}
}
六、ClientEndpoint
//将消息中继到驱动程序的代理。
//如果提交失败,我们目前不支持重试。在HA模式下,客户端将向所有主机提交请求,看看哪个主机可以处理它。
private class ClientEndpoint(
override val rpcEnv: RpcEnv,
driverArgs: ClientArguments,
masterEndpoints: Seq[RpcEndpointRef],
conf: SparkConf)
extends ThreadSafeRpcEndpoint with Logging {
override def onStart(): Unit = {
driverArgs.cmd match {
case "launch" =>
//Diver的包装类
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
val command = new Command(mainClass,
Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
sys.env, classPathEntries, libraryPathEntries, javaOpts)
val driverResourceReqs = ResourceUtils.parseResourceRequirements(conf,
config.SPARK_DRIVER_PREFIX)
//构建DriverDescription 用于启动Driver
//里面包含jar的下载路径、所需要的内存和核数、启动命令等等
val driverDescription = new DriverDescription(
driverArgs.jarUrl,
driverArgs.memory,
driverArgs.cores,
driverArgs.supervise,
command,
driverResourceReqs)
//向Master发送 RequestSubmitDriver 消息
asyncSendToMasterAndForwardReply[SubmitDriverResponse](
RequestSubmitDriver(driverDescription))
case "kill" =>
val driverId = driverArgs.driverId
submittedDriverID = driverId
asyncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
}
}
七、Master处理Client发出的RequestSubmitDriver
private[deploy] class Master(
override val rpcEnv: RpcEnv,
address: RpcAddress,
webUiPort: Int,
val securityMgr: SecurityManager,
val conf: SparkConf)
extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RequestSubmitDriver(description) =>
//如果Master不是ALIVE 直接返回 Client 失败的响应
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can only accept driver submissions in ALIVE state."
context.reply(SubmitDriverResponse(self, false, None, msg))
} else {
logInfo("Driver submitted " + description.command.mainClass)
//创建一个Driver
val driver = createDriver(description)
//把新创建的Driver添加到Master对Derviers的维护中去
persistenceEngine.addDriver(driver)
waitingDrivers += driver
drivers.add(driver)
//在等待的应用程序之间安排当前可用的资源。每当有新应用加入或资源可用性发生变化时,都会调用此方法。
schedule()
//回复客户端消息
context.reply(SubmitDriverResponse(self, true, Some(driver.id),
s"Driver successfully submitted as ${driver.id}"))
}
//创建driver
private def createDriver(desc: DriverDescription): DriverInfo = {
val now = System.currentTimeMillis()
val date = new Date(now)
new DriverInfo(now, newDriverId(date), desc, date)
}
private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) {
return
}
//将状态为ALIVE的Workers打散,保证多个Driver分散启动
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
迭代所有需要启动的driver
for (driver <- waitingDrivers.toList) {
var launched = false
var isClusterIdle = true
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
isClusterIdle = worker.drivers.isEmpty && worker.executors.isEmpty
numWorkersVisited += 1
校验该worker上剩余的内存和内核是否满足启动driver的要求
if (canLaunchDriver(worker, driver.desc)) {
val allocated = worker.acquireResources(driver.desc.resourceReqs)
driver.withResources(allocated)
//启动Driver
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
startExecutorsOnWorkers()
}
private def launchDriver(worker: WorkerInfo, driver: DriverInfo): Unit = {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
worker.addDriver(driver)
driver.worker = Some(worker)
//向worker ENdpoint 发送 LaunchDriver 消息
worker.endpoint.send(LaunchDriver(driver.id, driver.desc, driver.resources))
//将driver的状态设置称RUNNING
driver.state = DriverState.RUNNING
}
}
DriverInfo的信息如下:
private[deploy] class DriverInfo(
val startTime: Long,
val id: String,
val desc: DriverDescription,
val submitDate: Date)
extends Serializable {
//driver的初始化状态就是 SUBMITTED
@transient var state: DriverState.Value = DriverState.SUBMITTED
/* 如果我们在启动Driver时失败,异常将存储在此处 */
@transient var exception: Option[Exception] = None
/* 最近分配给 driver 的 worker */
@transient var worker: Option[WorkerInfo] = None
// 分配给此驱动程序的资源(例如:gpu/fpga)从资源名称映射到资源信息
//CPU、GPU都属于冯·诺依曼结构,指令译码执行、共享内存。
//FPGA是一种硬件可重构的体系结构中文名是现场可编程门阵列
//FPGA 之所以比 CPU 甚至 GPU 能效高,本质上是无指令、无需共享内存的体系结构带来的福利
//GPU不支持超流水技术 只能数据并行
//FPGA支持超流水技术,可以支持指令并行
private var _resources: Map[String, ResourceInformation] = Map.empty
}
八、Worker处理Master调度发起的LaunchDriver请求
private[deploy] class Worker(
override val rpcEnv: RpcEnv,
webUiPort: Int,
cores: Int,
memory: Int,
masterRpcAddresses: Array[RpcAddress],
endpointName: String,
workDirPath: String = null,
val conf: SparkConf,
val securityMgr: SecurityManager,
resourceFileOpt: Option[String] = None,
externalShuffleServiceSupplier: Supplier[ExternalShuffleService] = null)
extends ThreadSafeRpcEndpoint with Logging {
override def receive: PartialFunction[Any, Unit] = synchronized {
case LaunchDriver(driverId, driverDesc, resources_) =>
logInfo(s"Asked to launch driver $driverId")
//管理一个driver的执行,包括在失败时自动重新启动driver。这目前仅在独立集群部署模式下使用。
val driver = new DriverRunner(
conf,
driverId,
workDir,
sparkHome,
driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
self,
workerUri,
workerWebUiUrl,
securityMgr,
resources_)
//该worker中对不同driver的维护
drivers(driverId) = driver
//启动driver
driver.start()
//重新计算该worker可用的资源
coresUsed += driverDesc.cores
memoryUsed += driverDesc.mem
addResourcesUsed(resources_)
}
}
九、DriverWrapper启动
object DriverWrapper extends Logging {
def main(args: Array[String]): Unit = {
args.toList match {
case workerUrl :: userJar :: mainClass :: extraArgs =>
val conf = new SparkConf()
val host: String = Utils.localHostName()
val port: Int = sys.props.getOrElse(config.DRIVER_PORT.key, "0").toInt
val rpcEnv = RpcEnv.create("Driver", host, port, conf, new SecurityManager(conf))
logInfo(s"Driver address: ${rpcEnv.address}")
//创建一个workerWatcher Endpoint 意为看着这个worker进程,如果丢了自己也就over了
//连接到worker进程,如果连接中断,则终止JVM。在工作进程及其关联的子进程之间提供命运共享。 它没有onStart()
rpcEnv.setupEndpoint("workerWatcher", new WorkerWatcher(rpcEnv, workerUrl))
//............
//我们编写的Spark程序在这里就会通过反射被调起执行
val clazz = Utils.classForName(mainClass)
val mainMethod = clazz.getMethod("main", classOf[Array[String]])
mainMethod.invoke(null, extraArgs.toArray[String])
rpcEnv.shutdown()
}
}
十、总结
1、编写Spark程序
2、编写启动脚本使用spark-submit命令提交任务
3、调用org.apache.spark.deploy.SparkSubmit中main方法
4、判断提交参数中的模式,根据不同的模式设置不同的mainClass
如果采用Standalone模式那么主类为:ClientApp
5、执行ClientApp中的start方法
6、构建RpcEnv环境并设置ClientEndpoint
7、构建DriverDescription并向Master发起启动Driver的请求
8、Master会处理所有启动Driver的请求并随机挑一个资源充足的Worker用于启动该Driver
9、Worker上启动一个DriverWrapper进程
10、DriverWrapper中的main方法会调用我们自己写的Spark程序中的main方法
下图是整体的方法调用图,下载放大后就会很清晰哟