目录
1. 介绍
2. Nacos 中的生产者
3. Nacos 中的消费者
4. Nacos 中的事件
5. 统一事件通知中心 NotifyCenter
6. 总结
1. 介绍
Nacos 节点内部使用了大量的事件机制进行通信,这样各种操作进行了解耦,提高了性能。
接下来就介绍。
所谓事件机制也就是 生产者消费者模式。
生产者向队列发送生产一条消息,消费者从队列拿出一个消息进行消费。
结构如下:
事件机制也是 Nacos 高性能的原因之一,事件生产后放到队列后请求就直接返回了。客户端不用等待完成,全都异步处理了,所以Nacos 支持高并发访问。
Nacos 中的事件机制非常庞大,接下来以不同的模块的角度来介绍。
2. Nacos 中的生产者
首先,我们需要有生产者来生产一条消息。
生产者结构如下
EventPublisher
生产者顶层接口,定义规范
-
初始化
-
发布事件
-
获取当前事件数量
-
添加订阅者(消费者)
-
移除订阅者(消费者)
-
通知消费者
DefaultPublisher
默认生产者实现,消息队列对应在了其内部,在 init 方法中完成了队列初始化。
public class DefaultPublisher extends Thread implements EventPublisher {
// 省略非关键代码
private BlockingQueue<Event> queue;
@Override
public void init(Class<? extends Event> type, int bufferSize) {
this.queue = new ArrayBlockingQueue<>(bufferSize);
}
}
订阅者的加入、移除
public class DefaultPublisher {
// 省略非关键代码
protected final ConcurrentHashSet<Subscriber> subscribers = new ConcurrentHashSet<>();
/**
* 添加订阅者
*/
@Override
public void addSubscriber(Subscriber subscriber) {
subscribers.add(subscriber);
}
/**
* 移除订阅者
*/
@Override
public void removeSubscriber(Subscriber subscriber) {
subscribers.remove(subscriber);
}
}
可见,就是对消费者数据结构简单的 add 和 remove。
生产消息:当发布消息(消息就是事件 Event 类)时,就往队列加入一个消息.
@Override
public boolean publish(Event event) {
boolean success = this.queue.offer(event);
return true;
// 省略非关键代码
}
消费消息
由于 DefaultPublisher 继承了 Thread 并且在 init 方法中执行了start 方法。
所以它也是一个线程,在线程的 run 方法内最终调用了如下方法:
void openEventHandler() {
// 省略非关键代码
for (; ; ) {
// 从队列中获取消息(事件):如果没有消息该方法则会阻塞,直到有消息了才返回该消息
final Event event = queue.take();
// 处理事件
receiveEvent(event);
}
}
// 处理事件
void receiveEvent(Event event) {
// 省略非关键代码
// 获取注册到当前 生产者 的全部消费者
for (Subscriber subscriber : subscribers) {
// 循环通知消费者
notifySubscriber(subscriber, event);
}
}
总结:DefaultPublisher 是一个线程,在它的 run 方法中,无限循环的从队列中获取数据并通知注册到自身的全部消费者。
ShardedEventPublisher
DefaultPublisher 看起来已经很完备了,那么这个类与上面的 DefaultPublisher 有何不同呢。
ShardedEventPublisher 继承了 DefaultPublisher ,也就是说,是在 DefaultPublisher 基础上增加了一些功能。
增加了哪些功能呢?就是支持多事件类型功能。
而 DefaultPublisher 只能支持一种事件类型。
在其初始化方法中,对事件类型指定。
public class DefaultPublisher {
// 省略非关键代码
private Class<? extends Event> eventType; //(不是集合,支持一种事件类型)
@Override
public void init(Class<? extends Event> type, int bufferSize) {
this.eventType = type;
}
}
在运行的时候,判断订阅者是否支持该事件类型。
// 该方法由 Thread 的 run 方法调用
void receiveEvent(Event event) {
// 省略非关键代码
// 循环全部消费者
for (Subscriber subscriber : subscribers) {
if (!subscriber.scopeMatches(event)) {
// 如果订阅者不是该事件类型的 则不处理此次事件。
continue;
}
// 通知订阅者处理事件
notifySubscriber(subscriber, event);
}
}
而在 ShardedEventPublisher 中,消费者管理代码如下
public class DefaultSharePublisher extends DefaultPublisher {
// 省略非关键代码
/**
* 以 Map<事件类型,Set<订阅者>> 结构存放订阅者
*/
private final Map<Class<? extends SlowEvent>, Set<Subscriber>> subMappings
= new ConcurrentHashMap<>();
/**
* 添加消费者
*/
@Override
public void addSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType) {
// 获取事件类型
Class<? extends SlowEvent> subSlowEventType = (Class<? extends SlowEvent>) subscribeType;
// 调用了 DefaultPublisher 的 add
subscribers.add(subscriber);
// 根据事件类型 获取该事件下的 订阅者 Set
Set<Subscriber> sets = subMappings.get(subSlowEventType);
// 加入此次订阅者
sets.add(subscriber);
}
@Override
public void removeSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType) {
// 获取事件类型
Class<? extends SlowEvent> subSlowEventType = (Class<? extends SlowEvent>) subscribeType;
// 调用了 DefaultPublisher 的 remove
subscribers.remove(subscriber);
// 根据事件类型 获取该事件下的 订阅者 Set
Set<Subscriber> sets = subMappings.get(subSlowEventType);
sets.remove(subscriber);
}
}
在 ShardedEventPublisher 中另外维护了一个 Map,Map<事件类型,Set<订阅者>>, 支持多种事件类型。
因为在 ShardedEventPublisher 中 以订阅者订阅的事件类型 做了一个分类。
再看 DefaultSharePublisher 的接收事件处理,根据事件类型只通知该事件类型下的订阅者。
// 该方法由 Thread 的 run 方法调用
public void receiveEvent(Event event) {
// 省略非关键代码
// 获取当前事件类型
Class<? extends SlowEvent> slowEventType = (Class<? extends SlowEvent>) event.getClass();
// 获取该事件类型下的全部订阅者
Set<Subscriber> subscribers = subMappings.get(slowEventType);
// 循环通知订阅者
for (Subscriber subscriber : subscribers) {
notifySubscriber(subscriber, event);
}
}
3. Nacos 中的消费者
类图如下:
Subscriber
抽象类 ,最顶层接口类。定义规范
public abstract class Subscriber<T extends Event> {
// 接收到事件 回调
public abstract void onEvent(T event);
// 返回当前订阅者监听的事件类型
public abstract Class<? extends Event> subscribeType();
// 订阅者线程池,如果返回 null (默认 null ),则代表直接调用 onEvent 回调。
// 如果返回了线程池对象,则代表通过线程池调用 onEvent 回调。
public Executor executor() {
return null;
}
// 省略非关键代码
}
如何使用呢 使用 NotifyCenter 完成注册
// 示例
NotifyCenter.registerSubscriber(new Subscriber<ClientEvent>() {
@Override
public void onEvent(ClientEvent event) {
// 业务处理
}
@Override
public Class<? extends Event> subscribeType() {
// 返回该订阅者监听的事件类型
return ClientEvent.class;
}
});
这是一个支持单事件类型的订阅者,其生产者便是由同样为支持单事件类型的 DefaultPublisher 来生产消息
SmartSubscriber
SmartSubscriber 是一个抽象类,继承自 Subscriber。
public abstract class SmartSubscriber extends Subscriber {
// 新增抽象方法,用于返回多个事件类型
public abstract List<Class<? extends Event>> subscribeTypes();
// 重写 将父类的获取单个的订阅事件类型 返回 null
@Override
public final Class<? extends Event> subscribeType() {
return null;
}
// 省略非关键代码
}
使用方法
// 示例
public class NamingMetadataManager extends SmartSubscriber {
// 返回多个订阅的事件类型,表示当前订阅者需要订阅以下多个事件
@Override
public List<Class<? extends Event>> subscribeTypes() {
List<Class<? extends Event>> result = new LinkedList<>();
result.add(MetadataEvent.InstanceMetadataEvent.class);
result.add(MetadataEvent.ServiceMetadataEvent.class);
result.add(ClientEvent.ClientDisconnectEvent.class);
return result;
}
// 事件回调
@Override
public void onEvent(Event event) {
// 根据不同事件类型处理。
if (event instanceof MetadataEvent.InstanceMetadataEvent) {
// 处理事件
} else if (event instanceof MetadataEvent.ServiceMetadataEvent) {
// 处理事件
} else {
// 处理事件
}
}
}
这是一个支持多事件类型的订阅者,其生产者便是由同样为支持多事件类型的 ShardedEventPublisher来生产消息
4. Nacos 中的事件
Event
Event 就是整个消息通知系统中的消息,消息就是 Event。
Event 是一个抽象类,所以消息事件都是继承该类。
SlowEvent
SlowEvent 标识着多事件版本。
全部事件类图
看看 Nacos 中都有哪些事件吧
5. 统一事件通知中心 NotifyCenter
整个消息通知系统都由 NotifyCenter 类来操作完成。
com.alibaba.nacos.common.notify.NotifyCenter
NotifyCenter 是整个消息通知系统中的非常关键的类,门面模式,负责管理整个消息通知系统。
提供快捷方法完成 注册或卸载生产者、注册或卸载消费者等
public class NotifyCenter {
// 注册发布者
public static EventPublisher registerToPublisher(final Class<? extends Event> eventType, final int queueMaxSize) {
...
}
// 注册订阅者
public static void registerSubscriber(final Subscriber consumer) {
...
}
// 发布事件
public static boolean publishEvent(final Event event) {
....
}
// 省略部分代码
}
注册发布者
// 第一个参数指定事件类型,表示该发布者处理的事件类型
// 第二个参数指定队列大小
NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
// 最终注册方法重载到如下方法
public static EventPublisher registerToPublisher(final Class<? extends Event> eventType,
final EventPublisherFactory factory, final int queueMaxSize) {
// 如果是 SlowEvent 就返回 对应的多事件的 发布者
if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
return INSTANCE.sharePublisher;
}
// 获取当前事件的类名作为 topic 例如:com.alibaba.nacos.naming.core.v2.event.client.ClientEvent
final String topic = ClassUtils.getCanonicalName(eventType);
synchronized (NotifyCenter.class) {
// 如果发布者map中还没有当前注册者,就加入注册者
MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, eventType, queueMaxSize);
}
return INSTANCE.publisherMap.get(topic);
}
/**
* 存放发布者 Map<Topic,事件> 一一对应
*/
private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<>(16);
可以发现,注册发布者的过程就是讲事件类型的类名 和 发布者实例 放入 map 中。
注册订阅者
注册订阅者可以直接传一个订阅者。
NotifyCenter.registerSubscriber(new Subscriber() {...});
// 最终注册方法重载到如下方法
private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType,
EventPublisherFactory factory) {
final String topic = ClassUtils.getCanonicalName(subscribeType);
synchronized (NotifyCenter.class) {
MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, subscribeType, ringBufferSize);
}
// 获取到处理当前事件的发布者
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
if (publisher instanceof ShardedEventPublisher) {
// 调用发布者的 添加订阅者方法
((ShardedEventPublisher) publisher).addSubscriber(consumer, subscribeType);
} else {
// 调用发布者的 添加订阅者方法
publisher.addSubscriber(consumer);
}
}
可见,注册订阅者最终是调用发布者的添加订阅者方法。
为了找到发布者,发布者必须先注册到 NotifyCenter 中。才能注册订阅者。
发布事件
NotifyCenter.publishEvent(new RegisterServiceTraceEvent(System.currentTimeMillis(), namespaceId,
NamingUtils.getGroupName(serviceName), NamingUtils.getServiceName(serviceName)));
// 最终发布重载到如下方法
private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
return INSTANCE.sharePublisher.publish(event);
}
final String topic = ClassUtils.getCanonicalName(eventType);
// 根据事件找到 发布者
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
if (publisher != null) {
// 调用发布者的 发布
return publisher.publish(event);
}
if (event.isPluginEvent()) {
return true;
}
LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
return false;
}
可见,发布事件实现原理是 找到发布者,然后调用发布者的发布方法。
6. 总结
消息通知系统可分为两种系列
单事件:由 DefaultPublisher、Subscriber、Event 组成
多事件:由 ShardedEventPublisher、SmartSubscriber、SlowEvent 组成
整个消息通知系统由 NotifyCenter 类来操作完成