上篇我们讲server端处理服务注册源码时,遇到了一个关键类NotifyCenter
,本篇就主要来分析下这个类。
NotifyCenter
这个类所在包:nacos-common
/**
* Unified Event Notify Center.
*/
通过类注释可以看出来这个类是一个统一的事件通知中心,那也就足可见这个类的重要性了。
public class NotifyCenter
通过类定义发现这个类就是一个普通类,不会被注册为spring bean
且类中的方法都是静态方法,也就是说这个类是整个进程共享的了,所以其中的内部变量必然要是线程安全的,结合源码看看是如何处理的。
private static final Logger LOGGER = LoggerFactory.getLogger(NotifyCenter.class);
//这两个变量是在静态块中初始化的,所以不需要特殊处理
public static int ringBufferSize;
public static int shareBufferSize;
private static final AtomicBoolean CLOSED = new AtomicBoolean(false);
//初始化EventPublisher的工厂
private static final EventPublisherFactory DEFAULT_PUBLISHER_FACTORY;
private static final NotifyCenter INSTANCE = new NotifyCenter();
private DefaultSharePublisher sharePublisher;
//SPI机制获取到EventPublisher实现之后,获取class,赋值到此
private static Class<? extends EventPublisher> clazz;
/**
* Publisher management container.
*/
private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<>(16);
static {
// Internal ArrayBlockingQueue buffer size. For applications with high write throughput,
// this value needs to be increased appropriately. default value is 16384
String ringBufferSizeProperty = "nacos.core.notify.ring-buffer-size";
ringBufferSize = Integer.getInteger(ringBufferSizeProperty, 16384);
// The size of the public publisher's message staging queue buffer
String shareBufferSizeProperty = "nacos.core.notify.share-buffer-size";
shareBufferSize = Integer.getInteger(shareBufferSizeProperty, 1024);
//采用SPI拓展机制,NacosServiceLoader内部封装了JDK的ServiceLoader,在此基础上加了一层缓存
final Collection<EventPublisher> publishers = NacosServiceLoader.load(EventPublisher.class);
Iterator<EventPublisher> iterator = publishers.iterator();
if (iterator.hasNext()) {
clazz = iterator.next().getClass();
} else {
//默认使用DefaultPublisher
clazz = DefaultPublisher.class;
}
DEFAULT_PUBLISHER_FACTORY = (cls, buffer) -> {
try {
//实例化EventPublisher
EventPublisher publisher = clazz.newInstance();
//调用EventPublisher初始化方法
publisher.init(cls, buffer);
return publisher;
} catch (Throwable ex) {
LOGGER.error("Service class newInstance has error : ", ex);
throw new NacosRuntimeException(SERVER_ERROR, ex);
}
};
try {
// Create and init DefaultSharePublisher instance.
INSTANCE.sharePublisher = new DefaultSharePublisher();
INSTANCE.sharePublisher.init(SlowEvent.class, shareBufferSize);
} catch (Throwable ex) {
LOGGER.error("Service class newInstance has error : ", ex);
}
//JVM销毁前执行的钩子函数 NotifyCenter::shutdown
ThreadUtils.addShutdownHook(NotifyCenter::shutdown);
}
查看代码可以知道关键的处理逻辑是:创建EventPublisher
。
EventPublisher
以上涉及到两个EventPublisher
一个是默认的DefaultPublisher
,另一个是DefaultSharePublisher
我们来一个个梳理下UML中的类:
interface EventPublisher
事件通知的接口,定义了事件通知的一系列方法
/**
* Initializes the event publisher.
* 初始化
* @param type {@link Event >}
* @param bufferSize 消息队列大小
*/
void init(Class<? extends Event> type, int bufferSize);
/**
* The number of currently staged events.
* 当前暂存的事件数量
* @return event size
*/
long currentEventSize();
/**
* Add listener.
* 添加订阅者
* @param subscriber {@link Subscriber}
*/
void addSubscriber(Subscriber subscriber);
/**
* Remove listener.
* 移除订阅者
* @param subscriber {@link Subscriber}
*/
void removeSubscriber(Subscriber subscriber);
/**
* publish event.
* 发布事件
* @param event {@link Event}
* @return publish event is success
*/
boolean publish(Event event);
/**
* Notify listener.
* 通知订阅者
* @param subscriber {@link Subscriber}
* @param event {@link Event}
*/
void notifySubscriber(Subscriber subscriber, Event event);
class DefaultPublisher
public class DefaultPublisher extends Thread implements EventPublisher
DefaultPublisher
是事件发布接口EventPublisher
的默认实现,除此之外,还继承了Thread
,那么也就意味着DefaultPublisher
除了有事件处理能力,本身还是一个线程。
先看接口中定义的初始化方法init
是如何实现的,从这里看起会很好的帮助我们理解这个类:
@Override
public void init(Class<? extends Event> type, int bufferSize) {
//设置为守护线程
setDaemon(true);
//设置线程名称,一定程度上这可以帮助我们排查问题
setName("nacos.publisher-" + type.getName());
//Event类型
this.eventType = type;
//队列长度
this.queueMaxSize = bufferSize;
//队列
this.queue = new ArrayBlockingQueue<>(bufferSize);
//Thread的start()方法
start();
}
//全局变量,用来标识当前Publisher是否初始化完成
private volatile boolean initialized = false;
@Override
public synchronized void start() {
if (!initialized) {
// start just called once
super.start();
if (queueMaxSize == -1) {
queueMaxSize = ringBufferSize;
}
initialized = true;
}
}
即然继承了Thread
那么就肯定回去重写关键的run()
方法,我们看下是如何重写的:
@Override
public void run() {
openEventHandler();
}
void openEventHandler() {
try {
// This variable is defined to resolve the problem which message overstock in the queue.
// 这个变量用于解决队列中消息积压的问题
int waitTimes = 60;
// To ensure that messages are not lost, enable EventHandler when
// waiting for the first Subscriber to register
// 没有关闭 && 没有订阅者 && waitTimes>0 --> 休眠1s,最多可以sleep 60s
while (!shutdown && !hasSubscriber() && waitTimes > 0) {
ThreadUtils.sleep(1000L);
waitTimes--;
}
while (!shutdown) {
//如果没有获取到事件,这里会阻塞
final Event event = queue.take();
//处理事件
receiveEvent(event);
UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
}
} catch (Throwable ex) {
LOGGER.error("Event listener exception : ", ex);
}
}
可以看到,这里有一个死循环,只要没有收到shutdown
事件,那么就会一直尝试从队列中获取事件去处理。这里获取事件的逻辑是从阻塞队列中取数据,所以当没有时间要处理时,当前线程会阻塞在获取事件的逻辑中,不会对cpu有过多的压力。
/**
* Receive and notifySubscriber to process the event.
*
* @param event {@link Event}.
*/
void receiveEvent(Event event) {
final long currentEventSequence = event.sequence();
if (!hasSubscriber()) {
LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.", event);
return;
}
// Notification single event listener
for (Subscriber subscriber : subscribers) {
if (!subscriber.scopeMatches(event)) {
continue;
}
// Whether to ignore expiration events
if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
event.getClass());
continue;
}
// Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
// Remove original judge part of codes.
notifySubscriber(subscriber, event);
}
}
@Override
public void notifySubscriber(final Subscriber subscriber, final Event event) {
LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
//将订阅者对事件的处理构建成一个Runnable
final Runnable job = () -> subscriber.onEvent(event);
//获取订阅者处理事件的Executer,这里的设计逻辑是:将异步或者同步的决定权交给订阅者本身,
//是否异步处理决定于订阅者对executor()方法的实现方式
final Executor executor = subscriber.executor();
if (executor != null) {
executor.execute(job);
} else {
try {
//如果没有executor,那就直接执行Runnable
job.run();
} catch (Throwable e) {
LOGGER.error("Event callback exception: ", e);
}
}
}
看了这么多代码,简单对DefaultPublisher
做个总结:
- 是什么?
- 事件通知的默认实现,用于处理事件通知逻辑
- 继承了线程
- 有什么?
- 订阅者集合
- 事件集合
- 做什么?
- 处理事件通知,同时处理订阅者的相关逻辑(添加/删除)
- 怎么做?
- 初始化方法中会启动线程
- run()处理事件,从自己的事件集合中获取事件,然后交给订阅者去处理
class DefaultSharePublisher
这个类继承了DefaultPublisher
,设计的目的应该是处理一些比较耗时的事件。本身有一个map做缓存,没有什么特别的逻辑。
private final Map<Class<? extends SlowEvent>, Set<Subscriber>> subMappings = new ConcurrentHashMap<>();
这个类去处理事件的逻辑还是其父类DefaultPublisher
的notifySubscriber(subscriber, event)
方法。我们上文已经看过这个方法了。
回过头再看NotifyCenter
看完EventPublisher
之后,别忘记我们为什么出发🫣。
回过头再看NotifyCenter
:我们从接收一个事件开始,从头梳理代码。
接收事件
/**
* Request publisher publish event Publishers load lazily, calling publisher. Start () only when the event is
* actually published.
*
* @param event class Instances of the event.
*/
public static boolean publishEvent(final Event event) {
try {
return publishEvent(event.getClass(), event);
} catch (Throwable ex) {
LOGGER.error("There was an exception to the message publishing : ", ex);
return false;
}
}
发布事件
/**
* Request publisher publish event Publishers load lazily, calling publisher.
*
* @param eventType class Instances type of the event type.
* @param event event instance.
*/
private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
//当前事件如果是SlowEvent,那么交与sharePublisher处理
return INSTANCE.sharePublisher.publish(event);
}
//获取Event的类名
final String topic = ClassUtils.getCanonicalName(eventType);
//根据事件的类名,从map中获取EventPublisher去处理当前事件
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
if (publisher != null) {
return publisher.publish(event);
}
if (event.isPluginEvent()) {
return true;
}
LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
return false;
}
ps:Class.getCanonicalName()的用法
EventPublisher是如何添加到publisherMap中的?
public static void registerSubscriber(final Subscriber consumer) {
registerSubscriber(consumer, DEFAULT_PUBLISHER_FACTORY);
}
public static void registerSubscriber(final Subscriber consumer, final EventPublisherFactory factory) {
// If you want to listen to multiple events, you do it separately,
// based on subclass's subscribeTypes method return list, it can register to publisher.
//SmartSubscriber是为了处理一个订阅者订阅多个事件而设计的
if (consumer instanceof SmartSubscriber) {
for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {
// For case, producer: defaultSharePublisher -> consumer: smartSubscriber.
if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
} else {
// For case, producer: defaultPublisher -> consumer: subscriber.
addSubscriber(consumer, subscribeType, factory);
}
}
return;
}
final Class<? extends Event> subscribeType = consumer.subscribeType();
if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
return;
}
//添加订阅者
addSubscriber(consumer, subscribeType, factory);
}
private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType,
EventPublisherFactory factory) {
final String topic = ClassUtils.getCanonicalName(subscribeType);
synchronized (NotifyCenter.class) {
// MapUtils.computeIfAbsent is a unsafe method.
MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, subscribeType, ringBufferSize);
}
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
if (publisher instanceof ShardedEventPublisher) {
((ShardedEventPublisher) publisher).addSubscriber(consumer, subscribeType);
} else {
publisher.addSubscriber(consumer);
}
}
最后的一个方法中我们看到了Map<String, EventPublisher> publisherMap
的数据添加逻辑:根据Event
的类名做为key,EventPublisher
作为value。
同时我们也看到了事件订阅者是如何和事件发布者建立关系的:publisher.addSubscriber(consumer);
,publisher中有一个集合,用于存储事件的订阅者。
总结
好了,总结下NotifyCenter
:
- 是什么?
- 统一的事件处理类,所有的事件通知都借助此类去处理
- 组织事件-发布者的关联关系
- 有什么?
- 事件处理类
EventPublisher
- 共享事件处理类
DefaultSharePublisher
,用于处理耗时的事件 Map<String, EventPublisher> publisherMap
- 事件处理类
- 做什么?
- 注册发布者到
publisherMap
- 接收事件之后交与对应的订阅者处理
- 注册发布者到
- 怎么做?
- 根据
Event
的类名去publisherMap
中获取对应的EventPublisher
,然后发布事件,交与对应的订阅者处理事件
- 根据
本篇主要讲了NotifyCenter
相关的事件代码逻辑,特别对 EventPublisher
做了详细的分析。我们从中看到了一个事件发布与订阅解耦的设计方式,很多框架中都会使用事件驱动去设计发布-订阅模型,日常工作中如果遇到类似的情景,也可以尝试使用这种方式去实现。