Nacos 客户端的服务发现与服务订阅机制的纠缠 - 篇七
历史篇章
🕐Nacos 客户端服务注册源码分析-篇一
🕑Nacos 客户端服务注册源码分析-篇二
🕒Nacos 客户端服务注册源码分析-篇三
🕓Nacos 服务端服务注册源码分析-篇四
🕔Nacos 服务端健康检查-篇五
🕕Nacos 客户端服务发现源码分析-篇六
Nacos 客户服务发现续接
之前,在第六篇的时候我们探究了 Nacos 客户端的服务发现源码的具体实现流程。
最终是调用的 NamingService 的 getAllInstance 方法获取了所有的实例列表,而客户端实例列表是封装在一个 List <Instance> 的集合当中的。
//获取所以的实例信息,这里的实例信息就是客户端的信息
List<Instance> list = namingService.getAllInstances("nacos.test.1");
最终是调用 NamingClientProxyDelegate 类下的 subscribe 方法完成订阅,并返回实体信息的。
if (null == serviceInfo) {
//如果本地的缓存不存在服务信息,则进行订阅
//查找到最新的实例信息
serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
}
由于这一部分的内容在之前的第六篇 Nacos 客户端服务发现源码分析-篇六 已经是分析过了的,所以这里我就不再进行赘述这一块的内容了,感兴趣的可以返回调转到指定的篇章进行浏览即可。
可能有些人好奇,哎。标题为什么称作 Nacos 客户端的服务发现与服务订阅机制的纠缠呢?
哈哈,其实他们两者是有联系的,具体是什么联系,就在我们接下来要探究 Nacos 客户端的服务订阅当中有其答案。
既然如此,我们就今天研究一把, Nacos 客户端服务订阅事件机制的具体实现叭。。。
Nacos 客户端服务订阅机制核心流程
首先,先谈谈什么是订阅?生活中那些那些方面体现着类似于订阅这样的概念?只要真正的理解了订阅这一概念,我们才能更好的进行接下来的内容。
订阅其实简单与生活对比来讲,其实就是预定。当然预定的这个动作有发出者,就必须有动作的承受者,举个栗子,外出旅游我们可以会定酒店,那么酒店的服务者就是动作的承受者,订酒店的对象就是动作的发出者,再比如我们的常常提到的订阅一个期刊,如果这个期刊的周期是一年,而该期刊每月都会推送该期的内容,那么订阅期刊的对象就是动作发出者,发布期刊的对象就是动作承受者。
订阅者订阅,承受者在接受到订阅者的指定命令后,周期性的完成指定的任务,这就是订阅。
所以对于注册中心 Nacos 也是同样提供了这样的服务的。。。
大致的流程就是 客户端 通过一个定时的任务每 6 秒从注册中心获取当前的实例列表,当发现实例发生了变化的时候,发布变更事件。对于订阅者而言,完成业务部分的处理(更新实例,更新本地缓存)。
我们可以通过一个流程图,观察其具体的实现。。。 原图点这里
其实从图中已经大致的清楚了,客户端的这个订阅的整体流程。
我们从源码的角度进分析一波。
进入我们的 NacosNamingService 类当中
//在 NacosNamingService 中暴露了许多的重载的 subscribe 方法
//这里 NacosNamingService 类下的 subscribe 方法 和 NamingService 下的 getAllInstances 发现获取实例列表的方法重载的过程都是一样的
@Override
public void subscribe(String serviceName, EventListener listener) throws NacosException {
//创建一个空的集群对象集合
subscribe(serviceName, new ArrayList<String>(), listener);
}
@Override
public void subscribe(String serviceName, List<String> clusters, EventListener listener) throws NacosException {
//设置默认的群组 DEFAULT_GROUP 默认群组
subscribe(serviceName, Constants.DEFAULT_GROUP, clusters, listener);
}
@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException {
//如果事件监听器为空 则返回
if (null == listener) {
return;
}
String clusterString = StringUtils.join(clusters, ",");
//注册监听器
changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
//对于订阅的本质就是服务的发现的一种方式,也就是服务在发现的时候执行订阅方法,同时触发定时任务去服务端拉去数据
clientProxy.subscribe(serviceName, groupName, clusterString);
}
可以看到的是 NacosNamingService 中提供了大量的 subscribe 的重载方法,这些重载一些默认的参数。
走到 subscribe 方法的尽头,在该方法内可以看到有两个核心的方法 InstanceChangeNotifier 类下的registerListener 注册监听器方法与 NamingClientProxy 类下的 subscribe 订阅方法。我们就探究一下这两个方法具体实现,以及这两个方法的功能作用是什么?
changeNotifier.registerListener 注册监听器
/**
* register listener.
*
* @param groupName group name
* @param serviceName serviceName
* @param clusters clusters, concat by ','. such as 'xxx,yyy'
* @param listener custom listener
*/
public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) {
String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
if (eventListeners == null) {
synchronized (lock) {
eventListeners = listenerMap.get(key);
if (eventListeners == null) {
eventListeners = new ConcurrentHashSet<EventListener>();
listenerMap.put(key, eventListeners);
}
}
}
eventListeners.add(listener);
}
/**
* deregister listener.
*
* @param groupName group name
* @param serviceName serviceName
* @param clusters clusters, concat by ','. such as 'xxx,yyy'
* @param listener custom listener
*/
public void deregisterListener(String groupName, String serviceName, String clusters, EventListener listener) {
String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
if (eventListeners == null) {
return;
}
eventListeners.remove(listener);
if (CollectionUtils.isEmpty(eventListeners)) {
listenerMap.remove(key);
}
}
可以看到在 InstancesChangeNotifier 类下有两个关于监听器的方法,注册监听与取消监听。
注册监听其实就是在监听集合对象 ConcurrentHashSet<EventListener> 中添加一个监听事件,而对于取消监听是通过 key 将需要移除的监听事件从集合当中移除。
那么关于这个监听事件添加都监听集合当中后,这个监听事件是如何触发又如何调用执行的呢?这个。。。哈哈,留一个坑,其实这一块我自己还没有研究的特别清楚。。。
接下来我们看看,另一个重要的方法 clientProxy.subscribe() 服务订阅
clientProxy.subscribe 服务订阅
其实玩到这里呢,也就与我们的标题 Nacos 客户的服务发现与客户端服务订阅机制的纠缠,就关联了起来,为什么这么说呢?那让我们看看 clientProxy.subscribe 方法内部的具体实现咯。。。
//其实走到这里就可以看到,该方法与之前的服务发现调用的是同一个方法,这里其实在做的是服务列表的查询
//查询与订阅都调用了同样的而方法
@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
//开启定时任务调度 UpdateTask
serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
//获取缓存中的 ServiceInfo
ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
if (null == result) {
//如果缓存中没有数据,则进行订阅逻辑处理,基于 gRPC 协议
result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
}
//serviceInfo 本地缓存处理
serviceInfoHolder.processServiceInfo(result);
return result;
}
哈哈 ,看到这一块的代码是不是有一种似曾相识的感觉呢?
对咯,没错在第六篇 Nacos 客户端服务发现源码分析 当中的发现获取实例列表的时候在 NacosNamingService 中的 getAllInstances 方法多次重载之后调用的 clientProxy.sunscribe 调用的是同一个方法。
所以其实到这里是可以得到一个结论的,就是 在 Nacos 客户端的查询与订阅服务都是调用了同样的方法的。
这就解释了为什么标题 Nacos 客户端的服务发现与服务订阅机制是冥冥之中有种联系在一起的呢。
我们还记得流程图中有一个关于 UpdateTask 定时任务调度吗?
让我们接下来看看,这个里面到底在做什么呢???
定时任务执行内容
//开启定时任务调度 UpdateTask
serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
/**
* Schedule update if absent.
*
* @param serviceName service name
* @param groupName group name
* @param clusters clusters
*/
public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {
String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clu
if (futureMap.get(serviceKey) != null) {
return;
}
//双重检测锁
synchronized (futureMap) {
if (futureMap.get(serviceKey) != null) {
return;
}
//构建一个定时处理的任务,最终这里的 future 就是构建的定时任务,该任务用于在 run 中执行
ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));
futureMap.put(serviceKey, future);
}
}
public UpdateTask(String serviceName, String groupName, String clusters) {
this.serviceName = serviceName;
this.groupName = groupName;
this.clusters = clusters;
this.groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
this.serviceKey = ServiceInfo.getKey(groupedServiceName, clusters);
}
private synchronized ScheduledFuture<?> addTask(UpdateTask task) {
//执行延时函数,延时时间为 1000L * MICRO_SCALE = 1S
return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}
可以看到在第二片代码中有这样一个方法,addTask () 对的,没错就是将通过 serviceName、groupName、cliusters 构建一个 UbdateTask 的更新任务对象,然后将其对象构建成一个未来执行的定时任务,添加到执行的集合当中,最终是由 ServiceInfoUpdateService 中的 run 方法去执行。
定时任务 run() 方法的执行
@Override
public void run() {
long delayTime = DEFAULT_DELAY;
try {
//判断更改通知对象 serviceName 是否订阅
if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKe
NAMING_LOGGER
.info("update task is stopped, service:" + groupedServiceName + ", clusters:" + clusters);
return;
}
//获取缓存中的信息
ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
//缓存为空
if (serviceObj == null) {
//生成一个服务实例对象
serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false)
//处理更新或添加到本地的缓存当中
serviceInfoHolder.processServiceInfo(serviceObj);
//更新最后一次的时间
lastRefTime = serviceObj.getLastRefTime();
return;
}
//过期服务,如果说,服务的更新时间是小于等于缓存刷新的时间的
//那就说明本地的缓存不是最新的,而当前的服务实例信息也不是客户端最新的,
//这个时候就需要从 注册中心 中重新的进行一次查询,获取最的服务实例信息并更新本地缓存
if (serviceObj.getLastRefTime() <= lastRefTime) {
serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false)
//更新处理本地的缓存
serviceInfoHolder.processServiceInfo(serviceObj);
}
//刷新更新的当前时间
lastRefTime = serviceObj.getLastRefTime();
if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
incFailCount();
return;
}
//下次的更新缓存时间设置为缓存中的默认基数 (cacheMillis = 1000) * 6
// TODO multiple time can be configured.
delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
// 重置失败数量为 0
// 可能会出现一些异常,比如调用 queryInstancesOfService 方法的时候
// 没有 ServiceInfo 连接不到则会出现异常
resetFailCount();
} catch (Throwable e) {
incFailCount();
NAMING_LOGGER.warn("[NA] failed to update serviceName: " + groupedServiceName, e);
} finally {
// 下次调度刷新时间,下次执行的时间与failCount 失败的次数有关,failCount=0,则下次调度时间为6秒,最长为1分钟
// 当无异常的情况下 failCount 始终都是 0 则默认的时间一直都 6 s
executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
}
}