目录
一、Nacos客户端服务订阅的事件机制
1、监听事件的注册
2、ServiceInfo处理
serviceInfoHolder.processServiceInfo
一、Nacos客户端服务订阅的事件机制
Nacos客户端订阅的核心流程:Nacos客户端通过一个定时任务,每6秒从注册中心获取实例列表,当发现实例发生变化时,发布变更事件,订阅者进行业务处理,然后更新内存中和本地的缓存中的实例。
在第一步调用subscribe方法时,会订阅一个EventListener事件。而在定时任务UpdateTask定时获取实例列表之后,会调用ServiceInfoHolder.processServiceInfo方法对ServiceInfo进行本地处理,这其中就包括和事件处理。
1、监听事件的注册
在subscribe方法中,通过了下面的源码进行了监听事件的注册:
public class NacosNamingService implements NamingService {
...
@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException {
if (null == listener) {
return;
}
String clusterString = StringUtils.join(clusters, ",");
changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
clientProxy.subscribe(serviceName, groupName, clusterString);
}
在这其中我们主要要关注的就是changeNotifier.registerListener,此监听就是进行具体事件注册逻辑的,我们来看一下源码:
可以看出,事件的注册便是将EventListener存储在InstancesChangeNotifier的listenerMap属性当中了。同时这里的数据结构为ConcurrentHashMap,key为服务实例的信息的拼接,value为监听事件的集合。
public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {
private final String eventScope;
private final Map<String, ConcurrentHashSet<EventListener>> listenerMap = new ConcurrentHashMap<>();
...
public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) {
String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
ConcurrentHashSet<EventListener> eventListeners = listenerMap.computeIfAbsent(key, keyInner -> new ConcurrentHashSet<>());
eventListeners.add(listener);
}
2、ServiceInfo处理
上面的源码中已经完成了事件的注册,现在就来追溯触发事件的来源,UpdateTask中获取到最新的实例会进行本地化处理,部分源码如下:
// ServiceInfoUpdateService>UpdateTask>run()
ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
if (serviceObj == null) {
serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
// 本地缓存处理
serviceInfoHolder.processServiceInfo(serviceObj);
lastRefTime = serviceObj.getLastRefTime();
return;
}
serviceInfoHolder.processServiceInfo
这个逻辑简单来说:判断新的ServiceInfo数据是否正确,是否发生了变化。如果数据格式正确,且发生变化,那就发布一个InstancesChangeEvent事件,同时将ServiceInfo写入本地缓存。
public class ServiceInfoHolder implements Closeable {
...
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
String serviceKey = serviceInfo.getKey();
if (serviceKey == null) {
return null;
}
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
if (isEmptyOrErrorPush(serviceInfo)) {
//empty or error push, just ignore
return oldService;
}
// 缓存服务信息
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
// 判断注册的实例信息是否已变更
boolean changed = isChangedServiceInfo(oldService, serviceInfo);
if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
}
// 监控服务监控缓存Map的大小
MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
// 服务实例已更变
if (changed) {
NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),
JacksonUtils.toJson(serviceInfo.getHosts()));
// 添加实例变更事件,会被订阅者执行
NotifyCenter.publishEvent(new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts()));
// 记录Service本地文件
DiskCache.write(serviceInfo, cacheDir);
}
return serviceInfo;
}
分析到这里我们发现其实这个重点应该在服务信息变更之后,发布的InstancesChangeEvent事件,这个事件是NotifyCenter进行发布的,我们来追踪一下源码
Spring Cloud Alibaba - Nacos
干我们这行,啥时候懈怠,就意味着长进的停止,长进的停止就意味着被淘汰,只能往前冲,直到凤凰涅槃的一天!