事件总线基本流程
图片来源:https://blog.csdn.net/sinat_26781639/article/details/105012302
LiveListenerBus创建
在sparkContext初始化中创建LiveListenerBus对象。
主要变量有两个
- queues:事件队列,里面存放四个队列,每个队列中有对应的listener注册
- queuedEvents:发生的事件
注册listener
内部listener注册
举例DynamicAllocation,在SpackContext初始化的时候创建了ExecutorAllocationManager对象,再调用start方法。
在ExecutorAllocationManager的start方法中,调用listenerBus相关方法完成注册
LiveListenerBus中queues分成四个队列,分别对应不同的注册方法,最终都是调用addToQueue方法
addToQueue比较简单,就是从queues中获取,有AsyncEventQueue就加入listener,没有就创建再加入。加入是调用AsyncEventQueue的addListener方法。
AsyncEventQueue的addListener方法是父类ListenerBus的addListener方法。
将listener加入到listenersPlusTimers中
内部listener是放入了对应队列的listenersPlusTimers中。
外部listener注册
在内部listener注册完成后调用setupAndStartListenerBus注册外部listener
在setupAndStartListenerBus方法中,读取配置spark.extraListeners获取要注册的外部listener集合,使用反射创建listener对象,遍历调用addToSharedQueue加入share queue。最后调用listenerBus的start方法启动listenerBus。
事件总线启动
如上图,在listener全部完成注册后,调用listenerBus的start方法启动。
将started变量修改为true,标记listenerBus启动。遍历queues,启动有注册的队列AsyncEventQueue(总共四个队列,但不一定全部都启用)。遍历queuedEvents处理在listenerBus没有启动期间产生的event。最后不再需要缓存消息了,将queuedEvents置为空。
AsyncEventQueue启动
将started变量变成true,标记AsyncEventQueue启动。启动dispatchThread线程。
dispatchThread线程是调用dispatch方法。
在dispatch方法中,可以看到是循环读取eventQueue,从其中读取event,调用postToAll发送给全部的listener。
eventQueue是一个阻塞队列LinkedBlockingQueue
到此,listenerBus启动完成,其中的列队AsyncEventQueue也启动完成。AsyncEventQueue循环从eventQueue中获取event来处理(这里是阻塞的)
发送消息
发送消息的入口是调用LiveListenerBus的post方法。
如果还没有启动,就将消息先缓存到queueEvents中。
如果启动了,就调用postToQueues将消息发送给全部队列。
在postToQueues中是遍历queue,调用post方法。
AsyncEventQueue的post方法中,就是将消息放入eventQueue即可。
但是eventQueue是有容量大小的,超过的消息就会丢弃。
至此,发送消息完成,将消息放入到AsyncEventQueue的eventQueue中。
处理消息
在启动的时候,dispatch线程已经完成了启动,从eventQueue获取event来处理。
处理消息是调用父类postToAll方法
postToAll方法中是遍历该队列全部listener,调用doPostEvent方法。
doPostEvent对应是SparkListenerBus的doPostEvent方法,根据event的类型,调用listener的不同的方法。
listener是要实现的SparkListenerInterface的方法,可以看到方法很多。。。