128-spark-核心编程-源码(主要以了解基本原理和流程为主):
总体相关
1.环境准备(Yarn 集群)
(1) Driver, Executor
2.组件通信
(1) Driver => Executor
(2) Executor => Driver
(3) Executor => Executor
3.应用程序的执行
(1) RDD 依赖
(2)阶段的划分
(3) 任务的切分
(4)任务的调度
(5)任务的执行
4.Shuffle
(1) Shuffle 的原理和执行过程
(2) Shuffle 写磁盘
(3) Shuffle 读取磁盘
5.内存的管理
(1)内存的分类
(2)内存的配置
起点:org/apache/spark/deploy/SparkSubmit.scala main
java org.apache.spark.deploy.SparkSubmit
java HelloWorld
JVM => Process ( SparkSubmit)
SparkSubmit.main
jps
结合尚硅谷视频讲解图片理解。
#提交命令
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10
main->doSubmit->parseArguments->parse(args.asJava)->SparkSubmitArguments.handle(--master --class)
给action赋值action = Option(action).getOrElse(SUBMIT) org.apache.spark.deploy.SparkSubmit#submit ->doRunMain()-org.apache.spark.deploy.SparkSubmit#runMain->prepareSubmitEnvironment(准备提交的环境)
#准备提交的环境
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
#根据环境找到childMainClass
if (isYarnCluster) {childMainClass = YARN_CLUSTER_SUBMIT_CLASS。。。} (YARN_CLUSTER_SUBMIT_CLASS: org.apache.spark.deploy.yarn.YarnClusterApplication)
#yarnclient创建了资源调度器rmclient
YarnClient.createYarnClient->ApplicationClientProtocol rmClient;->org.apache.spark.deploy.yarn.Client#run
#提交应用程序,返回appid
org.apache.spark.deploy.yarn.Client#submitApplication
#客户端启动 yarnClient.init(hadoopConf)->yarnClient.start()->val newApp = yarnClient.createApplication()创建应用 -> createContainerLaunchContext(创建容器环境)->createApplicationSubmissionContext(创建提交环境)
#连接和提交,yarnClient连接,submitApplication提交
yarnClient.submitApplication(appContext)->Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
javaOpts ++ amArgs ++ ->amArgs->amClass="org.apache.spark.deploy.yarn.ApplicationMaster"(启动ApplicationMaster)
#启动ApplicationMaster
org.apache.spark.deploy.yarn.ApplicationMaster#main-》org.apache.spark.deploy.yarn.YarnRMClient#amClient属性。applicationmaster和resourcemaster的链接-》org.apache.spark.deploy.yarn.ApplicationMaster#runDriver-》
org.apache.spark.deploy.yarn.ApplicationMaster#startUserApplication启动用户应用程序-》startUserApplication.start(driver驱动线程初始化sparkcontext以及run mian方法)-》org.apache.spark.deploy.yarn.ApplicationMaster#registerAM(注册到rm,申请资源)-》org.apache.spark.deploy.yarn.YarnRMClient#createAllocator(创建分配器)-》org.apache.spark.deploy.yarn.YarnAllocator#allocateResources(返回可用资源列表)-》
org.apache.spark.deploy.yarn.YarnAllocator#handleAllocatedContainers(处理可用的容器)-》
org.apache.spark.deploy.yarn.YarnAllocator#runAllocatedContainers(运行已分配的容器)-》
org.apache.spark.deploy.yarn.ExecutorRunnable#prepareCommand(准备指令)-》
nmClient.startContainer(container.get, ctx)(向指定的NM启动容器)-》
/bin/java org.apache.spark.executor.YarnCoarseGrainedExecutorBackend(启动Executor)
#启动Executor
org.apache.spark.executor.YarnCoarseGrainedExecutorBackend#main-》 run ->
org.apache.spark.SparkEnv#createExecutorEnv(创建executorenv环境)-》
org.apache.spark.rpc.netty.Dispatcher#registerRpcEndpoint(注册rpc通讯终端)-》
org.apache.spark.rpc.netty.Inbox#Inbox(收件箱,自己给自己发消息constructor -> onStart -> receive* -> onStop)-》
org.apache.spark.executor.CoarseGrainedExecutorBackend#onStart-》
org.apache.spark.rpc.RpcEndpointRef#ask(向driver注册executor)-》
org.apache.spark.scheduler.SchedulerBackend-》
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend-》
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#receiveAndReply(接收和应答)-》
org.apache.spark.rpc.RpcCallContext#reply( context.reply(true)注册成功)-》
org.apache.spark.executor.CoarseGrainedExecutorBackend#receive(executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,resources = _resources)创建Executor计算对象)-》
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#receive(makeOffers(executorId)注册成功)-》
通讯环境:
Netty:通讯框架,AIO异步非阻塞IO,BIO阻塞式IO,NIO非阻塞式IO
Linux对AIO支持不够好,Windows支持,Linux采用Epoll方式模仿AIO操作
org.apache.spark.SparkContext#createSparkEnv-》
org.apache.spark.rpc.RpcEnv#create(rpcenv创建)-》
org.apache.spark.rpc.netty.NettyRpcEnv#NettyRpcEnv-》
org.apache.spark.util.Utils$#startServiceOnPort-》
org.apache.spark.rpc.netty.NettyRpcEnv#startServer(创建服务)-》
org.apache.spark.network.server.TransportServer#TransportServer(创建服务)-》
org.apache.spark.network.server.TransportServer#init(初始化)-》
org.apache.spark.network.util.NettyUtils#getServerChannelClass(nio,EPOLL方式模仿异步)-》
org.apache.spark.rpc.netty.Dispatcher#registerRpcEndpoint(注册通讯终端,receive接受数据,收件箱inbox)-》
org.apache.spark.rpc.RpcEndpointRef#ask(发送数据,终端引用,)-》
org.apache.spark.rpc.netty.NettyRpcEnv#outboxes属性,发件箱
应用程序的执行:
应用SparkContext对象重要相关字段
(1) RDD 依赖
(2)阶段的划分
org.apache.spark.rdd.RDD#collect(行动算子的触发)-》
org.apache.spark.SparkContext#runJob(运行任务)-》
org.apache.spark.scheduler.DAGScheduler#runJob(有向无环图)-》
org.apache.spark.util.EventLoop#post(将JobSubmitted事件放入事件队列中)-》
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop#doOnReceive(事件队列中取出作业提交)-》
org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted(进行阶段的划分)-》
org.apache.spark.scheduler.DAGScheduler#createResultStage(进行阶段的划分)-》
org.apache.spark.scheduler.DAGScheduler#getOrCreateParentStages(获取上级阶段)
(3) 任务的切分
每个阶段最后分区的数量即n*2个任务,例如最后一个阶段分为3个分区,最后shufflerdd也会有三个分区
org.apache.spark.scheduler.DAGScheduler#submitStage-》
org.apache.spark.scheduler.DAGScheduler#submitMissingTasks(没上一节阶段提交任务,有上一级阶段提交上一级阶段)-》
(4)任务的调度
org.apache.spark.scheduler.TaskScheduler#submitTasks(任务调度器提交任务)-》
org.apache.spark.scheduler.TaskSchedulerImpl#submitTasks(实现)-》
org.apache.spark.scheduler.TaskSetManager(任务tasksset的管理者)-》
org.apache.spark.scheduler.SchedulableBuilder(调度器)-》
org.apache.spark.scheduler.TaskSchedulerImpl#initialize(初始化调度器,默认FIFO)-》
org.apache.spark.scheduler.Pool#addSchedulable(任务池添加调度)-》
org.apache.spark.scheduler.SchedulerBackend#reviveOffers(取任务)-》org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#reviveOffers(集群模式取)-》
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#makeOffers()-》
org.apache.spark.scheduler.TaskSchedulerImpl#resourceOffers(获取资源调度信息)-》
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#launchTasks(调度任务)-》
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))(任务池取出发送终端执行)
本地化级别
#补充,本地化级别,首选位置。task任务发送到哪里,数据和节点位置等。效率考虑,移动数据不如移动计算
for(currentMaxLocality<-taskSet.myLocalityLevels)
移动数据不如移动计算
计算和数据的位置存在不同的级别,这个级别称之为本地化级别
进程本地化:数据和计算在同一个进程中
节点本地化:数据和计算在同一个节点中
机架本地化:数据和计算在同一个机架中
任意
(5)任务的执行
org.apache.spark.executor.CoarseGrainedExecutorBackend#receive(executor接收到消息)-》
org.apache.spark.executor.Executor#launchTask(启动Task)-》
java.util.concurrent.ThreadPoolExecutor#execute(每个线程执行每个task)-》
org.apache.spark.executor.Executor.TaskRunner#run-》
org.apache.spark.scheduler.Task#run
Shuffle原理
详解转变的过程
前提1:一核,一个task,落盘一个文件,三个任务读取数据,但是没法分辨需要的数据是文件中的那些数据
前提2:多核,多task,每个任务针对落盘三个文件,导致小文件过多
前提3:对前提2的优化,将同核任务的落盘,写相同的文件,但是真实环境task可能会很多,下游任务也可能人多,或者100核数。文件将还是很多。
前提4:对前提3的优化,写到同一个文件,使用index索引文件记录下游任务读取数据的偏移量。
Shuffle实现过程:
org.apache.spark.scheduler.ShuffleMapTask-》
org.apache.spark.scheduler.ShuffleMapTask#runTask-》
org.apache.spark.shuffle.ShuffleWriteProcessor#write(写)-》
org.apache.spark.shuffle.ShuffleManager#getWriter(获取写入的对象)-》
org.apache.spark.shuffle.sort.SortShuffleWriter#write()-》
org.apache.spark.util.collection.ExternalSorter#writePartitionedMapOutput-》
org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter#commitAllPartitions(提交)-》
org.apache.spark.shuffle.IndexShuffleBlockResolver#writeIndexFileAndCommit(索引文件和数据文件的提交)-》
#读
org.apache.spark.scheduler.ResultTask#runTask(结果任务)-》
org.apache.spark.rdd.RDD#getOrCompute(获取或计算)-》
org.apache.spark.rdd.ShuffledRDD#compute(shufflerdd的计算)-》
org.apache.spark.shuffle.BlockStoreShuffleReader#read(读取数据)
shuffle写:
org.apache.spark.shuffle.ShuffleWriteProcessor(shuffle写的处理器)-》write-》
org.apache.spark.shuffle.ShuffleManager(shuffle管理器,hash早期有,sort现版本)-》
org.apache.spark.shuffle.sort.SortShuffleManager#getWriter(获取到了下面的写对象的SortShuffleWriter)-》
org.apache.spark.shuffle.sort.SortShuffleWriter#write(写)-》
org.apache.spark.util.collection.ExternalSorter#writePartitionedMapOutput(写出)-》
org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter#commitAllPartitions(提交)
补充writer写的类型,判断条件org.apache.spark.shuffle.sort.SortShuffleManager#registerShuffle
处理器 | 写对象 | 判断条件 |
---|---|---|
SerializedShuffleHandle | UnsafeShuffleWriter | 1.序列化规则支持重定位操作(java序列化不支持,kryo序列化支持) 2.不能使用预聚合功能 3.如果下游的分区数量小区大(16777215+1=16777216)PackedRecordPointer.MAXIMUM_PARTITION_ID + 1 |
BypassMergeSortShuffleHandle | BypassMergeSortShuffleWriter | 1.不能使用预聚合 2、如果下游的分区数量小区等于200(可配) |
BaseShuffleHandle | SortShuffleWriter | 其他情况 |
Shuffle归并排序和读
org.apache.spark.util.collection.ExternalSorter#insertAll(插入)-》
org.apache.spark.util.collection.PartitionedAppendOnlyMap(支持预聚合的map结构)
org.apache.spark.util.collection.PartitionedPairBuffer(不支持预聚合的结构)
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap#changeValue(预聚合)-》位置-》
org.apache.spark.util.collection.ExternalSorter#maybeSpillCollection(是否溢写磁盘)-》
org.apache.spark.util.collection.Spillable#maybeSpill-》
org.apache.spark.util.collection.ExternalSorter#spill(溢写)-》
org.apache.spark.util.collection.ExternalSorter#spillMemoryIteratorToDisk(写到磁盘)-》
org.apache.spark.util.collection.ExternalSorter#writePartitionedMapOutput-》
org.apache.spark.util.collection.ExternalSorter#merge(Merge spilled and in-memory data合并溢写和磁盘)-》
org.apache.spark.util.collection.ExternalSorter#mergeSort(归并排序)-》
org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter#commitAllPartitions()-》
org.apache.spark.shuffle.IndexShuffleBlockResolver#writeIndexFileAndCommit(写索引文件和数据文件)->
5.内存的管理
(1)内存的分类
相关位置:
org.apache.spark.memory.UnifiedMemoryManager#apply ->
org.apache.spark.memory.UnifiedMemoryManager#getMaxMemory
#相关参数:org.apache.spark.memory.UnifiedMemoryManager#RESERVED_SYSTEM_MEMORY_BYTES预留内存300M
(2)内存的配置