一、概览
从start-all.sh开始捋,一直捋到Master、Worker的启动并建立通信
二、宏观描述
Master端
1、start-all.sh调用start-master.sh启动Master
2、执行org.apache.spark.deploy.master.Master中main方法
3、通过工厂模式创建RpcEnv子类NettyRpcEnv
a、创建TransportServer
b、初始化TransportServer(通过ServerBootstrap去引导Netty并并绑定端口)
c、Dispatcher将Master作为一个RpcEndpoint注册到RpcEnv中并返回RpcEndpointRef
d、创建一个端点传递消息的MessageLoop,并将这个MessageLoop和Master绑定
e、New一个带有待处理消息的收件箱(Inbox)同时也和Master绑定
f、Inbox中有一个代码块会把OnStart方法添加到消息列表
g、调用setActive(inbox)将其标记为活动以处理OnStart消息
4、Master中的OnStart的方法被调起
5、处理来自Workers的注册信息RegisterWorker,向Workers发送注册成功信息RegisterWorkerResponse
5、检查并删除超时的Worker
Worker端
1、start-all.sh调用start-workers.sh脚本
2、start-workers.sh去workers文件中查询节点ip或host依次ssh过去启动start-worker.sh脚本
3、各个节点执行org.apache.spark.deploy.worker.Worker中main方法
4、通过工厂模式创建RpcEnv子类NettyRpcEnv
a、创建TransportServer
b、初始化TransportServer(通过ServerBootstrap去引导Netty并并绑定端口)
c、Dispatcher将Master作为一个RpcEndpoint注册到RpcEnv中并返回RpcEndpointRef
d、创建一个端点传递消息的MessageLoop,并将这个MessageLoop和Master绑定
e、New一个带有待处理消息的收件箱(Inbox)同时也和Master绑定
f、Inbox中有一个代码块会把OnStart方法添加到消息列表
g、调用setActive(inbox)将其标记为活动以处理OnStart消息
5、Worker中的OnStart的方法被调起
6、根据配置看是否启动外部ShuffleService
7、获取资源信息
8、根据Master的URL从RpcEnv中获取MasterEndpointRef
9、向所有的Master(可能会是HA的场景)发送注册信息RegisterWorker
10、接收到来自Master的RegisterWorkerResponse信息
11、向Master发送心跳Heartbeat
三、源码调用绘图
把图下载下来,放大可以看清楚
四、核心源码
脚本部分
1、start-all.sh
#启动所有spark守护进程
#在此节点上启动Master
#在conf/workers中指定的每个节点上启动一个worker
# Start Master
"${SPARK_HOME}/sbin"/start-master.sh
# Start Workers
"${SPARK_HOME}/sbin"/start-workers.sh
2、start-master.sh
CLASS="org.apache.spark.deploy.master.Master"
#默认端口
SPARK_MASTER_PORT=7077
SPARK_MASTER_WEBUI_PORT=8080
#获取Master节点的域名
$SPARK_MASTER_HOST
#启动Master守护进程
"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \
--host $SPARK_MASTER_HOST --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \
$ORIGINAL_ARGS
3、start-workers.sh
#Master节点默认端口
SPARK_MASTER_PORT=7077
#获取Master节点的域名
$SPARK_MASTER_HOST
#启动所有的worker
"${SPARK_HOME}/sbin/workers.sh" cd "${SPARK_HOME}" \; "${SPARK_HOME}/sbin/start-worker.sh" "spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT"
4、start-worker.sh
CLASS="org.apache.spark.deploy.worker.Worker"
#启动几个Worker实例
$SPARK_WORKER_INSTANCES
#第一个worker的端口号。如果设置,后续的workers将递增此数字。如果未设置,Spark将找到一个有效的端口号,但不能保证模式是可预测的。
$SPARK_WORKER_PORT
#第一个worker的web界面的基本端口。后续的workers将增加这个数字。默认值为8081。
$SPARK_WORKER_WEBUI_PORT
#启动worker
"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS $WORKER_NUM \
--webui-port "$WEBUI_PORT" $PORT_FLAG $PORT_NUM $MASTER "$@"
scala代码部分
1、Master
//Master 是一个多线程安全的RpcEndpoint
private[deploy] class Master(
override val rpcEnv: RpcEnv,
address: RpcAddress,
webUiPort: Int,
val securityMgr: SecurityManager,
val conf: SparkConf)
extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {
private val forwardMessageThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")
//Master端有所有worker的信息 并且有它们的 Rpc通信地址
val workers = new HashSet[WorkerInfo]
private val addressToWorker = new HashMap[RpcAddress, WorkerInfo]
override def onStart(): Unit = {
logInfo("Starting Spark master at " + masterUrl)
logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
webUi = new MasterWebUI(this, webUiPort)
webUi.bind()
masterWebUiUrl = webUi.webUrl
//检测所有Worker的超时任务
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(
() => Utils.tryLogNonFatalError { self.send(CheckForWorkerTimeOut) },
0, workerTimeoutMs, TimeUnit.MILLISECONDS)
}
override def onStop(): Unit = {...}
//HA下的Leader选举
override def electedLeader(): Unit = {
self.send(ElectedLeader)
}
//接收其他 Endpoint 的 send 消息
override def receive: PartialFunction[Any, Unit] = {
case ElectedLeader => Leader选举
case RegisterWorker(...)=>
worker 此时会携带它的资源信息 xx核 xx 内存
master 会添加这个 worker 并向这个 worker send RegisterWorkerResponse 消息
case RegisterApplication(description, driver) => derver端的应用注册
case Heartbeat(workerId, worker) => 接收worker端的心跳
case CheckForWorkerTimeOut => 检测worker的心跳
......
}
//接收其他 Endpoint 的 ask 消息
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RequestSubmitDriver(description) => 提交 driver
case RequestKillDriver(driverId) => 杀掉 driver
case RequestDriverStatus(driverId) => driver 状态
case RequestMasterState => master 状态
case RequestExecutors(appId, requestedTotal) => 启动 executors
case KillExecutors(appId, executorIds) => 杀掉 executors
......
}
//在workers上启动executors
//返回一个数组,其中包含分配给每个worker的核心数。
//有两种启动executors 的模式。
//第一种方法试图将应用程序的executors 分散到尽可能多的worker进程上, 更适合数据局部性,是默认设置
//第二种方法则相反(即在尽可能少的worker进程中启动它们)。
//分配给每个executor的核心数量是可配置的。当显式设置此选项时,如果工作进程有足够的内核和内存,则可以在同一worker进程上启动来自同一应用程序的多个executor。否则,
//默认情况下,每个executor都会抓取worker上可用的所有内核,在这种情况下,在一次单独的计划迭代中,每个应用程序只能在每个worker上启动一个executor。
//请注意,当未设置“spark.executor.cores”时,我们仍然可以在同一个worker上从同一个应用程序启动多个executor。
//假设appA和appB都有一个executor在worker1上运行,并且appA.cores>0,则appB完成并释放worker1上的所有内核,因此对于下一个计划迭代,appA启动一个新的executor,抓取worker1上所有空闲的内核,因此我们从运行在worker1的appA中获得多个executor。
private def scheduleExecutorsOnWorkers(...){
......
}
//在workers上启动executors
private def startExecutorsOnWorkers(): Unit = {
.......
}
//分配worker上的资源给 1个 或多个 executor
private def allocateWorkerResourceToExecutors(){
.......
}
//在等待的应用程序之间安排当前可用的资源。每当有新应用加入或资源可用性发生变化时,都会调用此方法。
private def schedule(): Unit = {
.......
}
......
}
private[deploy] object Master extends Logging {
val SYSTEM_NAME = "sparkMaster"
val ENDPOINT_NAME = "Master"
def main(argStrings: Array[String]): Unit = {
val args = new MasterArguments(argStrings, conf)
//启动Master 并返回一个 tuple 3
val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
rpcEnv.awaitTermination()
}
def startRpcEnvAndEndpoint(...){
//启动一个RpcEnv
val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
//将Master Endpoint 注册到 RpcEnv 中
val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
}
}
2、Worker
//Worker 是一个多线程安全的RpcEndpoint
private[deploy] class Worker(...){
//Worker 端也有 Master 端的RpcEndpointRef
private var master: Option[RpcEndpointRef] = None
private var activeMasterUrl: String = ""
//该Worker上有哪些 app driver executor
val drivers = new HashMap[String, DriverRunner]
val executors = new HashMap[String, ExecutorRunner]
val appDirectories = new HashMap[String, Seq[String]]
override def onStart(): Unit = {
//根据配置启动外部ShuffleService
startExternalShuffleService()
//设置资源
setupWorkerResources()
//向所有Master注册自己
registerWithMaster()
......
}
//使用个新的Master
private def changeMaster(masterRef: RpcEndpointRef, uiUrl: String,
masterAddress: RpcAddress): Unit = {
......
}
//向所有的Master注册
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
......
}
//向Master发送注册消息
private def sendRegisterMessageToMaster(masterEndpoint: RpcEndpointRef): Unit = {
......
}
//处理Master发送的注册响应
private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
msg match {
case RegisteredWorker(...) => 注册成功
case RegisterWorkerFailed(message) => 注册失败
case MasterInStandby => Master 还没准备好
}
}
//处理来自其他 Endpoint 的 send 消息
override def receive: PartialFunction[Any, Unit] = synchronized {
.......
}
//处理来自其他 Endpoint 的 ask 消息
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
.......
}
//向当前主机发送消息。如果尚未在任何主机上成功注册,则消息将被丢弃。
private def sendToMaster(message: Any): Unit = {
}
......
}
private[deploy] object Worker extends Logging {
val SYSTEM_NAME = "sparkWorker"
val ENDPOINT_NAME = "Worker"
def main(argStrings: Array[String]): Unit = {
val args = new WorkerArguments(argStrings, conf)
//启动 RpcEnv 并将自己放进去
val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
args.memory, args.masters, args.workDir, conf = conf,
resourceFileOpt = conf.get(SPARK_WORKER_RESOURCE_FILE))
......
rpcEnv.awaitTermination()
}
def startRpcEnvAndEndpoint(......){
//一个节点可以启动多个worker 默认是1个
val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL)
rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr, resourceFileOpt))
rpcEnv
}
}
3、NettyRpcEnv
//父类是RpcEnv
//RPC环境。[[RpcEndpoint]]需要在[[RpcEnv]]中注册一个名称来接收消息。
//然后[[RpcEnv]]将处理从[[RpcEndpointRef]]或远程节点发送的消息,并将其传递给相应的[[RpcEndpoint]]s。
//对于[[RpcEN]]捕获的未捕获异常,[[Rpcen]]将使用[[RpcCallContext.sendFailure]]将异常发送回发送方,或者在没有此类发送方或“NoterializableException”的情况下记录它们。
//[[RpcEnv]]还提供了一些方法来检索[[RpcEndpointRef]]的给定名称或uri
//它是各种角色通信的基础,是基于Netty的
private[netty] class NettyRpcEnv(...) extends RpcEnv {
//分发器
private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
//基于Netty 其初始化时会通过bootstrap引导启动Netty
@volatile private var server: TransportServer = _
//[[RpcAddress]]和[[发件箱]]的map。当我们连接到远程[[RpcAddress]]时,我们只需将消息放入其[[Outbox]]即可实现非阻塞的“send”方法。
private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()
//启动这个NettyRpcEnv
def startServer(bindAddress: String, port: Int): Unit = {
val bootstraps: java.util.List[TransportServerBootstrap] =
if (securityManager.isAuthenticationEnabled()) {
java.util.Arrays.asList(new AuthServerBootstrap(transportConf, securityManager))
} else {
java.util.Collections.emptyList()
}
//创建一个将尝试绑定到特定主机和端口的Netty服务器
server = transportContext.createServer(bindAddress, port, bootstraps)
//注册一个RpcEndpointVerifier 用于给远程RpcEndpoint查询是否存在对应的RpcEndpoint
dispatcher.registerRpcEndpoint(
RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
}
//设置一个RpcEndpoint
override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
dispatcher.registerRpcEndpoint(name, endpoint)
}
//往Outbox中放信息(且指明了收件人)
private def postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage): Unit = {
......
}
//向远程 RPC endpoint 发送信息 其实就是放到 Outbox中
private[netty] def send(message: RequestMessage): Unit = {
......
}
//异步发送消息,设置了超时时间,需要等待响应
private[netty] def ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout): Future[T] = {
......
}
......
}
//RpcEndpointRef的NettyRpcEnv版本。
//此类的行为因创建位置而异。在“拥有”RpcEndpoint的节点上,它是一个围绕RpcEndpointAddress实例的简单包装器。
//在接收引用序列化版本的其他机器上,行为会发生变化。实例将跟踪发送引用的TransportClient,以便通过客户端连接向端点发送消息,而不需要打开新的连接。
//此引用的RpcAddress可以为空;这意味着ref只能通过客户端连接使用,因为承载端点的进程不监听传入连接。这些引用不应与第三方共享,因为它们将无法向端点发送消息。
private[netty] class NettyRpcEndpointRef(...){
.......
}
//从发送方发送到接收方的消息。
private[netty] class RequestMessage(...){
.......
}
//将传入的RPC分派到已注册的端点。
//处理程序跟踪与其通信的所有客户端实例,以便RpcEnv在向客户端端点(即不监听传入连接,而是需要通过客户端Socket联系的端点)发送RPC时知道要使用哪个“TransportClient”实例。
//事件是按每个连接发送的,因此,如果客户端打开多个到RpcEnv的连接,将为该客户端创建多个连接/断开连接事件(尽管具有不同的“RpcAddress”信息)。
private[netty] class NettyRpcHandler(...}
.......
}
//通过工厂创建NettyRpcEnv
private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {
def create(config: RpcEnvConfig): RpcEnv = {
......
}
}
4、TransportServer
public class TransportServer implements Closeable {
//Netty服务端的引导,用于创建ServerChannel
private final List<TransportServerBootstrap> bootstraps;
private ServerBootstrap bootstrap;
//构造器
public TransportServer(...){
...
init(hostToBind, portToBind);
...
}
//初始化该TransportServer
private void init(String hostToBind, int portToBind) {
//IO模式 NIO 、 EPOLL 模式是NIO
//如果是Linux操作系统可以在spark-defaults.conf中设置spark.rpc.io.mode = EPOLL 来实现
IOMode ioMode = IOMode.valueOf(conf.ioMode());
//基于Netty中Reactor模型的启动
EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, 1,
conf.getModuleName() + "-boss");
EventLoopGroup workerGroup = NettyUtils.createEventLoop(ioMode, conf.serverThreads(),
conf.getModuleName() + "-server");
bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NettyUtils.getServerChannelClass(ioMode))
.option(ChannelOption.ALLOCATOR, pooledAllocator)
.option(ChannelOption.SO_REUSEADDR, !SystemUtils.IS_OS_WINDOWS)
.childOption(ChannelOption.ALLOCATOR, pooledAllocator);
//绑定端口启动一个Netty服务端
channelFuture = bootstrap.bind(address);
}
}
5、TransportClient
//客户端,用于获取预先协商的流的连续块。此API旨在实现大量数据的高效传输,这些数据被分解为大小从数百KB到几MB不等的块。
//请注意,虽然此客户端处理从流(即数据平面)中提取块,但流的实际设置是在传输层范围之外完成的。提供方便的方法“sendRPC”来实现客户端和服务器之间的控制平面通信,以执行此设置。
//例如:典型的工作流程是这样的:
//client.sendRPC(new OpenFile("/foo")) --> returns StreamId = 100
//client.fetchChunk(streamId = 100, chunkIndex = 0, callback)
//client.fetchChunk(streamId = 100, chunkIndex = 1, callback)
//......
//client.sendRPC(new CloseStream(100))
//使用TransportClientFactory构造TransportClient的实例。单个TransportClient可用于多个流,但任何给定的流都必须限制在单个客户端,以避免乱序响应。
//注意:此类用于向服务器发出请求,而 TransportResponseHandler 负责处理来自服务器的响应。
//并发:线程安全,可以从多个线程调用。
public class TransportClient implements Closeable {
//构造的时候给一个 Channel 就可以和Server端进行通信了
//给一个TransportResponseHandler 就可以处理来自服务器的响应了
public TransportClient(Channel channel, TransportResponseHandler handler) {
this.channel = Preconditions.checkNotNull(channel);
this.handler = Preconditions.checkNotNull(handler);
this.timedOut = false;
}
public SocketAddress getSocketAddress() {
return channel.remoteAddress();
}
//从远程端请求一个块,来自预先协商的streamId。
//块指数从0开始。多次请求同一块是有效的,尽管某些流可能不支持此操作。
//多个fetchChunk请求可能同时未完成,并且假设只使用单个TransportClient来获取块,则块保证会按照请求的顺序返回。
public void fetchChunk(...){}
//请求从远程端以给定的流ID流式传输数据。
public void stream(String streamId, StreamCallback callback) {}
//向服务器端的RpcHandler发送不透明消息。回调将随服务器的响应或在任何故障时调用。
public long sendRpc(ByteBuffer message, RpcResponseCallback callback) {}
......
}
6、Dispatcher
//消息调度器,负责将RPC消息路由到适当的端点。
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) extends Logging {
//端点和消息的映射
private val endpoints: ConcurrentMap[String, MessageLoop] =
new ConcurrentHashMap[String, MessageLoop]
//端点和引用的映射
private val endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] =
new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]
private lazy val sharedLoop = new SharedMessageLoop(nettyEnv.conf, this, numUsableCores)
//使用名字注册RpcEndpoint 放回对应的引用
//就是在endpoints加一条记录 (name -> RpcEndpoint)
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
......
}
//获取对应的引用
def getRpcEndpointRef(endpoint: RpcEndpoint): RpcEndpointRef = endpointRefs.get(endpoint)
//向所有已注册的[[RpcEndpoint]]发送消息。这可用于使所有端点都知道网络事件(例如“新增了节点”)。
def postToAll(message: InboxMessage): Unit = {
......
}
//向远程端点发送消息
def postRemoteMessage(message: RequestMessage, callback: RpcResponseCallback): Unit = {
......
}
//向本地端点发送消息
def postLocalMessage(message: RequestMessage, p: Promise[Any]): Unit = {
......
}
//发送一个单向消息
def postOneWayMessage(message: RequestMessage): Unit = {}
//向特定端点发送消息
private def postMessage(){}
}
7、MessageLoop
//Dispatcher 用于向端点传递消息的消息循环。
private sealed abstract class MessageLoop(dispatcher: Dispatcher) extends Logging {
//待消息循环处理的带有待处理消息的收件箱列表。
private val active = new LinkedBlockingQueue[Inbox]()
//消息循环任务;应该在消息循环池的所有线程中运行。
protected val receiveLoopRunnable = new Runnable() {
override def run(): Unit = receiveLoop()
}
//不停的处理inbox中的信息,
private def receiveLoop(): Unit = {
while (true) {
val inbox = active.take()
if (inbox == MessageLoop.PoisonPill) {
// Put PoisonPill back so that other threads can see it.
setActive(MessageLoop.PoisonPill)
return
}
//处理inbox存储的消息
inbox.process(dispatcher)
}
}
}
//使用共享线程池为多个RPC端点提供服务的消息循环。
private class DedicatedMessageLoop(...){}
8、Inbox
//一个收件箱,用于存储 RpcEndpoint 的消息,并安全地向其线程发布消息。
private[netty] class Inbox(val endpointName: String, val endpoint: RpcEndpoint)
extends Logging {
inbox => 给它一个别名,这样我们就可以在闭包中更清楚地使用它。
protected val messages = new java.util.LinkedList[InboxMessage]()
//OnStart是要处理的第一条消息
inbox.synchronized {
messages.add(OnStart)
}
//处理Inbox中的消息
def process(dispatcher: Dispatcher): Unit = {
message = messages.poll()
while (true) {
safelyCall(endpoint) {
message match {
case RpcMessage(_sender, content, context) =>
endpoint.receiveAndReply
case OneWayMessage(_sender, content) =>
endpoint.receive.applyOrElse[Any, Unit](content, { msg =>
case OnStart =>
endpoint.onStart() //调用端点的Onstart() Master 、 Worker 的Onstart() 就是这样起来的
case OnStop =>
dispatcher.removeRpcEndpointRef(endpoint)
endpoint.onStop()
case RemoteProcessConnected(remoteAddress) =>
endpoint.onConnected(remoteAddress)
case RemoteProcessDisconnected(remoteAddress) =>
endpoint.onDisconnected(remoteAddress)
case RemoteProcessConnectionError(cause, remoteAddress) =>
endpoint.onNetworkError(cause, remoteAddress)
}
}
}
}
}
9、OutBox
private[netty] class Outbox(nettyEnv: NettyRpcEnv, val address: RpcAddress) {
outbox => //给它一个别名,这样我们就可以在闭包中更清楚地使用它。
private val messages = new java.util.LinkedList[OutboxMessage]
//connectFuture指向连接任务。如果没有连接任务,connectFuture将为空
private var connectFuture: java.util.concurrent.Future[Unit] = null
//发送消息。如果没有活动连接,请缓存它并启动新连接
def send(message: OutboxMessage): Unit = {}
//清空消息队列
private def drainOutbox(): Unit = {}
//异步启动要给连接任务
private def launchConnectTask(): Unit = {}
}