前言
在Nacos源码中,你是否也经常看到NotifyCenter.publishEvent
这样的代码块?
这个事件发布出去后,有哪些类接收到通知并进行了逻辑处理呢?
这里面的实现逻辑是什么呢?
如果你不太清楚,那我们一起来梳理下。Let’s go!
NotifyCenter
如果我们不靠其它资料,从代码层面去解读的话,还是得先从这个类入手,因为都是调用其静态方法完成通知的。
这个类的静态代码块里面有这样的逻辑:
static {
// 从系统环境中取"nacos.core.notify.ring-buffer-size",如果没有设置,则取默认值16384
String ringBufferSizeProperty = "nacos.core.notify.ring-buffer-size";
ringBufferSize = Integer.getInteger(ringBufferSizeProperty, 16384);
// 从系统环境中取"nacos.core.notify.share-buffer-size",如果没有设置,则取默认值1024
String shareBufferSizeProperty = "nacos.core.notify.share-buffer-size";
shareBufferSize = Integer.getInteger(shareBufferSizeProperty, 1024);
// 通过SPI的方式加载EventPublisher的实现类
final Collection<EventPublisher> publishers = NacosServiceLoader.load(EventPublisher.class);
Iterator<EventPublisher> iterator = publishers.iterator();
if (iterator.hasNext()) {
clazz = iterator.next().getClass();
} else {
clazz = DefaultPublisher.class;
}
// 创建一个默认的EventPublisherFactory,用来产生EventPublisher
DEFAULT_PUBLISHER_FACTORY = (cls, buffer) -> {
try {
EventPublisher publisher = clazz.newInstance();
publisher.init(cls, buffer);
return publisher;
} catch (Throwable ex) {
LOGGER.error("Service class newInstance has error : ", ex);
throw new NacosRuntimeException(SERVER_ERROR, ex);
}
};
try {
// 创建一个默认共享的EventPublisher
INSTANCE.sharePublisher = new DefaultSharePublisher();
INSTANCE.sharePublisher.init(SlowEvent.class, shareBufferSize);
} catch (Throwable ex) {
LOGGER.error("Service class newInstance has error : ", ex);
}
// 添加一个ShutdownHook
ThreadUtils.addShutdownHook(NotifyCenter::shutdown);
}
从这里我们可以知道,有EventPublisherFactory
和EventPublisher
的概念,在EventPublisher
中,还专门创建了一个DefaultSharePublisher
,还知道了一个事件叫做SlowEvent
。
那我们就看一下这几个模型:
模型
EventPublisherFactory
public interface EventPublisherFactory extends BiFunction<Class<? extends Event>, Integer, EventPublisher> {
/**
* Build an new {@link EventPublisher}.
*
* @param eventType eventType for {@link EventPublisher}
* @param maxQueueSize max queue size for {@link EventPublisher}
* @return new {@link EventPublisher}
*/
@Override
EventPublisher apply(Class<? extends Event> eventType, Integer maxQueueSize);
}
可以看到,接口中就一个方法,就是通过事件类型和最大队列大小来得到一个EventPublisher
。
其中有NamingEventPublisherFactory
和TraceEventPublisherFactory
两个实现类。
EventPublisher
public interface EventPublisher extends Closeable {
// 初始化EventPublisher
void init(Class<? extends Event> type, int bufferSize);
// 获取当前事件的数量
long currentEventSize();
// 添加一个订阅者
void addSubscriber(Subscriber subscriber);
// 删除一个订阅者
void removeSubscriber(Subscriber subscriber);
// 发布事件
boolean publish(Event event);
// 通知订阅者
void notifySubscriber(Subscriber subscriber, Event event);
}
其类图如下:
这里SharedEventPublisher
看起来是做了更多的扩展,来看看呢:
public interface ShardedEventPublisher extends EventPublisher {
// 给指定的一个事件添加订阅者
void addSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType);
// 给指定的一个事件删除订阅者
void removeSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType);
}
这里新加入了个Subscriber,那我们来看看:
Subscriber
public abstract class Subscriber<T extends Event> {
// 接收事件
public abstract void onEvent(T event);
// 获取当前订阅的事件类型
public abstract Class<? extends Event> subscribeType();
// 获取异步处理器
public Executor executor() {
return null;
}
// 忽略失效的事件
public boolean ignoreExpireEvent() {
return false;
}
// 获取事件是否匹配作用域
public boolean scopeMatches(T event) {
return true;
}
}
有个特殊的SmartSubscriber
,它是Subscriber
的子类,是一个抽象类:
public abstract class SmartSubscriber extends Subscriber {
// 返回订阅的事件列表
public abstract List<Class<? extends Event>> subscribeTypes();
@Override
public final Class<? extends Event> subscribeType() {
return null;
}
// 默认不忽略过期的事件
@Override
public final boolean ignoreExpireEvent() {
return false;
}
}
Event
public abstract class Event implements Serializable {
private static final long serialVersionUID = -3731383194964997493L;
private static final AtomicLong SEQUENCE = new AtomicLong(0);
private final long sequence = SEQUENCE.getAndIncrement();
// 获取事件序号
public long sequence() {
return sequence;
}
// 获取事件作用域
public String scope() {
return null;
}
// 判断是否是一个插件的事件
public boolean isPluginEvent() {
return false;
}
}
在SlowEvent
中,仅仅是将sequence统一默认为0。
public abstract class SlowEvent extends Event {
@Override
public long sequence() {
return 0;
}
}
综上,我们通过接口关系就可以大概得出一个结论:
EventPublisher
可通过EventPublisherFactory
创建得来,EventPublisher
可以给Event
添加Subscriber
,并发布事件让对应的Subscriber
执行。这不妥妥的观察者模式嘛!
我们来画张图:
了解了NotifyCenter
大概的模型之后,我们来看看内部的实现。
实现方法
registerToPublisher
给时间注册一个EventPublisher。
public static EventPublisher registerToPublisher(final Class<? extends Event> eventType,
final EventPublisherFactory factory, final int queueMaxSize) {
if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
return INSTANCE.sharePublisher;
}
final String topic = ClassUtils.getCanonicalName(eventType);
synchronized (NotifyCenter.class) {
// MapUtils.computeIfAbsent is a unsafe method.
MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, eventType, queueMaxSize);
}
return INSTANCE.publisherMap.get(topic);
}
诶,如果事件是SlowEvent
,就会统一返回一个sharePublisher
,这个就是静态代码块中初始化的DefaultSharePublisher
。
好,(敲黑板),所有SlowEvent
的事件共用一个DefaultSharePublisher
。
接着给NotifyCenter
上锁,如果publisherMap
中没有当前事件的EventPublisher
,那就用EventPublisherFactory
创建一个EventPublisher
,并存入publisherMap
中,再返回。
registerSubscriber
给事件注册一个订阅者。
public static void registerSubscriber(final Subscriber consumer, final EventPublisherFactory factory) {
// 如果订阅者是一个SmartSubscriber,可以订阅多个事件
if (consumer instanceof SmartSubscriber) {
for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {
// 如果事件是SlowEvent,则添加到sharePublisher中
if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
} else {
// 其他事件则添加到对应的EventPublisher中
addSubscriber(consumer, subscribeType, factory);
}
}
return;
}
// 其他类型的Subscriber
final Class<? extends Event> subscribeType = consumer.subscribeType();
if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
// 如果事件是SlowEvent,则添加到sharePublisher中
INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
return;
}
// 其他事件则添加到对应的EventPublisher中
addSubscriber(consumer, subscribeType, factory);
}
publishEvent
发送事件。
private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {\
// 如果事件是SlowEvent,则用sharePublisher发送
if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
return INSTANCE.sharePublisher.publish(event);
}
final String topic = ClassUtils.getCanonicalName(eventType);
// 根据事件找到对应的EventPublisher进行发送
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
if (publisher != null) {
return publisher.publish(event);
}
if (event.isPluginEvent()) {
return true;
}
LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
return false;
}
总结
Nacos的事件发布是基于观察者模式进行设计的。
如果一个处理器需要接收到某个特定事件的通知,那么就要先通过NotifyCenter
来订阅自己感兴趣的Event
,当显示调用NotifyCenter.publishEvent
时,会通过EventPublisher
进行通知到订阅者。