sparksubmit源码解析
在提交我们写好的jar包时候,用到submit命令,他的源码解析流程如上图
位于deploy里的SparkSubmit里面,根据main方法一点点run进去,分配我们传的参数,尤其是
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
MainClass这个东西就是我们要先执行的一个位置,他根据我们设置的deploy-mode来进行选择
在分配参数的时候,deploy-mode如果是client模式,直接MainClass为我们jar包去执行
但如果是集群模式,他会启用ClientApp里的start方法,app.start(childArgs.toArray, sparkConf)
由此生成一个ClientEndpoint,也就是创建一个新的Env环境,由此 还是之前的老规矩,inbox一定会process onStart()方法。
在这个方法里面 MainClass会被指定为 创建Driver
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
同时,用一个command来存储我们自己的jar包
val command = new Command(mainClass, Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions, sys.env, classPathEntries, libraryPathEntries, javaOpts)
并将上述信息封装成driverDescription发送Master
asyncSendToMasterAndForwardReply[SubmitDriverResponse]( RequestSubmitDriver(driverDescription))
Master接收到之后,立马调配worker给他分配资源创建Driver
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
worker.addDriver(driver)
driver.worker = Some(worker)
worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
driver.state = DriverState.RUNNING
}
对worker的引用 调用send函数让他启动Driver
worker接受到之后,会立马在本机创建一个新的JVM DriverWrapper
这个新进程就负责执行我们自己的程序jar包
sparkContext解析
要执行我们自己的代码了,在new sparkContext()的时候,就会有上图的流程,
发送创建Application的信息给master之后,master就开始分配资源给我们的任务了,也就是分配executor,我们的executor会根据参数先定义好 每一个worker里面要启动几个executor,根据最终算好的结果去worker一个个发消息让他们启动,启动之后 worker就会另开一个新的rpcEnv 也就是新开端点ExecutorBackend,里面的start方法会和之前的driver端点通知,说我要注册了,最终executor里面也会新开一个线程池,这个线程池就是最终我们跑代码执行我们自己东西的地方
我们自己的代码执行过程
首先明确,分布式计算是大数据的期望执行过程
里面会分为相干计算 与 不相干计算
不相干计算就是,数据在不同的主机里面,大家各跑各的对最终结果不会有影响,比如简单的map flatMap
相干计算就是我一定要拿到所有主机的数据才能进行的计算,需要shuffle
所以在进行rdd.map().filter()的执行输出结果,由于没有用到shuffle,大家肯定是各跑各的,所以会有map,filter,map,filter这种交替执行的现象。
如上图,不相干计算组成了stage,在一台机器上就可以完成,中间的执行逻辑是pipeline模式,迭代器嵌套,就是
flatMap(map(filter())),这种就是窄依赖模式,stage与stage之间需要shuffle。
rdd他不存储具体数据,只存逻辑
描述任务的并行度 也就是task任务数量,一个task就是一个并行度
一个stage里task的数量就是由stage里最后一个rdd的分区的数量决定
然后就是具体的计算层执行rdd逻辑了
最终的执行逻辑:
从RDD.foreach()那一刻开始算,foreach里面有最终runJob方法,这里开始使用DAGschedule处理我们的Job
首先是开启ProcessLoop等待接收我们的job,提交之后里面就会处理我们的申请,根据我们传的rdd,它是最后的ResultStage,我们要根据这个ResultStage去递归的切割前面所有rdd,切割成一个个rdd,切割的标准就是,通过栈结构根据每一个rdd的dependices往前,以是否发生shuffle来进行切割,最终递归的回归过程就是每一个stage提交的过程
举例说明
首先要明确,从x到g这一步它是窄依赖,并不是shuffle
因为他俩的分区数一样
如果还是shuffle操作的话,最终比如msb这个数据b到x是去了分区2,x到g还会是分区2
没必要是shuffle操作,spark会自动调优,将这一操作放在一台机子上进行,后面的rdd f也会被shuffle到这台机子。
但是如果分区数改变成4 那么就不是窄依赖了,因为原数据存放的分区号发生了改变。
missing是判断出是否为shuffle rdd才存放的,他放的是 一个stage里面最后一个rdd
之后就是通过stage根据分区来划分task任务, 然后让driver端点分配executor给他来执行