Nacos 客户端的服务发现与服务订阅机制的纠缠 - 篇七

news2024/9/23 3:30:32

Nacos 客户端的服务发现与服务订阅机制的纠缠 - 篇七

历史篇章

🕐Nacos 客户端服务注册源码分析-篇一

🕑Nacos 客户端服务注册源码分析-篇二

🕒Nacos 客户端服务注册源码分析-篇三

🕓Nacos 服务端服务注册源码分析-篇四

🕔Nacos 服务端健康检查-篇五

🕕Nacos 客户端服务发现源码分析-篇六

Nacos 客户服务发现续接

之前,在第六篇的时候我们探究了 Nacos 客户端的服务发现源码的具体实现流程。

image-20211022150934273

最终是调用的 NamingService 的 getAllInstance 方法获取了所有的实例列表,而客户端实例列表是封装在一个 List <Instance> 的集合当中的。

//获取所以的实例信息,这里的实例信息就是客户端的信息
List<Instance> list = namingService.getAllInstances("nacos.test.1");

最终是调用 NamingClientProxyDelegate 类下的 subscribe 方法完成订阅,并返回实体信息的。

if (null == serviceInfo) {
    //如果本地的缓存不存在服务信息,则进行订阅
    //查找到最新的实例信息
    serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
}

由于这一部分的内容在之前的第六篇 Nacos 客户端服务发现源码分析-篇六 已经是分析过了的,所以这里我就不再进行赘述这一块的内容了,感兴趣的可以返回调转到指定的篇章进行浏览即可。

可能有些人好奇,哎。标题为什么称作 Nacos 客户端的服务发现与服务订阅机制的纠缠呢?

哈哈,其实他们两者是有联系的,具体是什么联系,就在我们接下来要探究 Nacos 客户端的服务订阅当中有其答案。

既然如此,我们就今天研究一把, Nacos 客户端服务订阅事件机制的具体实现叭。。。


Nacos 客户端服务订阅机制核心流程

首先,先谈谈什么是订阅?生活中那些那些方面体现着类似于订阅这样的概念?只要真正的理解了订阅这一概念,我们才能更好的进行接下来的内容。

订阅其实简单与生活对比来讲,其实就是预定。当然预定的这个动作有发出者,就必须有动作的承受者,举个栗子,外出旅游我们可以会定酒店,那么酒店的服务者就是动作的承受者,订酒店的对象就是动作的发出者,再比如我们的常常提到的订阅一个期刊,如果这个期刊的周期是一年,而该期刊每月都会推送该期的内容,那么订阅期刊的对象就是动作发出者,发布期刊的对象就是动作承受者。

订阅者订阅,承受者在接受到订阅者的指定命令后,周期性的完成指定的任务,这就是订阅。

所以对于注册中心 Nacos 也是同样提供了这样的服务的。。。

大致的流程就是 客户端 通过一个定时的任务每 6 秒从注册中心获取当前的实例列表,当发现实例发生了变化的时候,发布变更事件。对于订阅者而言,完成业务部分的处理(更新实例,更新本地缓存)。

我们可以通过一个流程图,观察其具体的实现。。。 原图点这里

image-20230421231128284

其实从图中已经大致的清楚了,客户端的这个订阅的整体流程。

我们从源码的角度进分析一波。

进入我们的 NacosNamingService 类当中

//在 NacosNamingService 中暴露了许多的重载的 subscribe 方法
//这里 NacosNamingService 类下的 subscribe 方法 和 NamingService 下的 getAllInstances 发现获取实例列表的方法重载的过程都是一样的
@Override
public void subscribe(String serviceName, EventListener listener) throws NacosException {
    //创建一个空的集群对象集合
    subscribe(serviceName, new ArrayList<String>(), listener);
}
@Override
public void subscribe(String serviceName, List<String> clusters, EventListener listener) throws NacosException {
   //设置默认的群组 DEFAULT_GROUP 默认群组
   subscribe(serviceName, Constants.DEFAULT_GROUP, clusters, listener);
}
@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
        throws NacosException {
    //如果事件监听器为空 则返回
    if (null == listener) {
        return;
    }
    String clusterString = StringUtils.join(clusters, ",");
    //注册监听器
    changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
    //对于订阅的本质就是服务的发现的一种方式,也就是服务在发现的时候执行订阅方法,同时触发定时任务去服务端拉去数据
    clientProxy.subscribe(serviceName, groupName, clusterString);
}

可以看到的是 NacosNamingService 中提供了大量的 subscribe 的重载方法,这些重载一些默认的参数。

走到 subscribe 方法的尽头,在该方法内可以看到有两个核心的方法 InstanceChangeNotifier 类下的registerListener 注册监听器方法与 NamingClientProxy 类下的 subscribe 订阅方法。我们就探究一下这两个方法具体实现,以及这两个方法的功能作用是什么?

changeNotifier.registerListener 注册监听器

/**
 * register listener.
 *
 * @param groupName   group name
 * @param serviceName serviceName
 * @param clusters    clusters, concat by ','. such as 'xxx,yyy'
 * @param listener    custom listener
 */
public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) {
    String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
    ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
    if (eventListeners == null) {
        synchronized (lock) {
            eventListeners = listenerMap.get(key);
            if (eventListeners == null) {
                eventListeners = new ConcurrentHashSet<EventListener>();
                listenerMap.put(key, eventListeners);
            }
        }
    }
    eventListeners.add(listener);
}
/**
 * deregister listener.
 *
 * @param groupName   group name
 * @param serviceName serviceName
 * @param clusters    clusters, concat by ','. such as 'xxx,yyy'
 * @param listener    custom listener
 */
public void deregisterListener(String groupName, String serviceName, String clusters, EventListener listener) {
    String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
    ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
    if (eventListeners == null) {
        return;
    }
    eventListeners.remove(listener);
    if (CollectionUtils.isEmpty(eventListeners)) {
        listenerMap.remove(key);
    }
}

可以看到在 InstancesChangeNotifier 类下有两个关于监听器的方法,注册监听与取消监听。

注册监听其实就是在监听集合对象 ConcurrentHashSet<EventListener> 中添加一个监听事件,而对于取消监听是通过 key 将需要移除的监听事件从集合当中移除。

那么关于这个监听事件添加都监听集合当中后,这个监听事件是如何触发又如何调用执行的呢?这个。。。哈哈,留一个坑,其实这一块我自己还没有研究的特别清楚。。。

接下来我们看看,另一个重要的方法 clientProxy.subscribe() 服务订阅

clientProxy.subscribe 服务订阅

其实玩到这里呢,也就与我们的标题 Nacos 客户的服务发现与客户端服务订阅机制的纠缠,就关联了起来,为什么这么说呢?那让我们看看 clientProxy.subscribe 方法内部的具体实现咯。。。

//其实走到这里就可以看到,该方法与之前的服务发现调用的是同一个方法,这里其实在做的是服务列表的查询
//查询与订阅都调用了同样的而方法
@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
    String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
    String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
    //开启定时任务调度 UpdateTask
    serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
    //获取缓存中的 ServiceInfo
    ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
    if (null == result) {
        //如果缓存中没有数据,则进行订阅逻辑处理,基于 gRPC 协议
        result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
    }
    //serviceInfo 本地缓存处理
    serviceInfoHolder.processServiceInfo(result);
    return result;
}

哈哈 ,看到这一块的代码是不是有一种似曾相识的感觉呢?

对咯,没错在第六篇 Nacos 客户端服务发现源码分析 当中的发现获取实例列表的时候在 NacosNamingService 中的 getAllInstances 方法多次重载之后调用的 clientProxy.sunscribe 调用的是同一个方法。

所以其实到这里是可以得到一个结论的,就是 在 Nacos 客户端的查询与订阅服务都是调用了同样的方法的

这就解释了为什么标题 Nacos 客户端的服务发现与服务订阅机制是冥冥之中有种联系在一起的呢。

我们还记得流程图中有一个关于 UpdateTask 定时任务调度吗?

让我们接下来看看,这个里面到底在做什么呢???

定时任务执行内容

//开启定时任务调度 UpdateTask
serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
/**
 * Schedule update if absent.
 *
 * @param serviceName service name
 * @param groupName   group name
 * @param clusters    clusters
 */
public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {
    String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clu
    if (futureMap.get(serviceKey) != null) {
        return;
    }
    //双重检测锁
    synchronized (futureMap) {
        if (futureMap.get(serviceKey) != null) {
            return;
        }
        //构建一个定时处理的任务,最终这里的 future 就是构建的定时任务,该任务用于在 run 中执行
        ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));
        futureMap.put(serviceKey, future);
    }
}
public UpdateTask(String serviceName, String groupName, String clusters) {
    this.serviceName = serviceName;
    this.groupName = groupName;
    this.clusters = clusters;
    this.groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    this.serviceKey = ServiceInfo.getKey(groupedServiceName, clusters);
}
private synchronized ScheduledFuture<?> addTask(UpdateTask task) {
    //执行延时函数,延时时间为 1000L * MICRO_SCALE = 1S
    return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}

可以看到在第二片代码中有这样一个方法,addTask () 对的,没错就是将通过 serviceName、groupName、cliusters 构建一个 UbdateTask 的更新任务对象,然后将其对象构建成一个未来执行的定时任务,添加到执行的集合当中,最终是由 ServiceInfoUpdateService 中的 run 方法去执行。

定时任务 run() 方法的执行

@Override
public void run() {
    long delayTime = DEFAULT_DELAY;
    
    try {
        //判断更改通知对象 serviceName 是否订阅
        if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKe
            NAMING_LOGGER
                    .info("update task is stopped, service:" + groupedServiceName + ", clusters:" + clusters);
            return;
        }
        //获取缓存中的信息
        ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
        //缓存为空
        if (serviceObj == null) {
            //生成一个服务实例对象
            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false)
            //处理更新或添加到本地的缓存当中
            serviceInfoHolder.processServiceInfo(serviceObj);
            //更新最后一次的时间
            lastRefTime = serviceObj.getLastRefTime();
            return;
        }
        //过期服务,如果说,服务的更新时间是小于等于缓存刷新的时间的
        //那就说明本地的缓存不是最新的,而当前的服务实例信息也不是客户端最新的,
        //这个时候就需要从 注册中心 中重新的进行一次查询,获取最的服务实例信息并更新本地缓存
        if (serviceObj.getLastRefTime() <= lastRefTime) {
            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false)
            //更新处理本地的缓存
            serviceInfoHolder.processServiceInfo(serviceObj);
        }
        //刷新更新的当前时间
        lastRefTime = serviceObj.getLastRefTime();
        if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
            incFailCount();
            return;
        }
        //下次的更新缓存时间设置为缓存中的默认基数 (cacheMillis = 1000) * 6
        // TODO multiple time can be configured.
        delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
        // 重置失败数量为 0
        // 可能会出现一些异常,比如调用 queryInstancesOfService 方法的时候
        // 没有 ServiceInfo 连接不到则会出现异常
        resetFailCount();
    } catch (Throwable e) {
        incFailCount();
        NAMING_LOGGER.warn("[NA] failed to update serviceName: " + groupedServiceName, e);
    } finally {
        // 下次调度刷新时间,下次执行的时间与failCount 失败的次数有关,failCount=0,则下次调度时间为6秒,最长为1分钟
        // 当无异常的情况下 failCount 始终都是 0 则默认的时间一直都 6 s
        executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
    }
}

未完待续。。。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/447312.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

最新入河排污口设置论证、水质影响预测与模拟、污水处理工艺分析及建设项目入河排污口方案报告书

随着水资源开发利用量不断增大&#xff0c;全国废污水排放量与日俱增&#xff0c;部分河段已远远超出水域纳污能力。近年来,部分沿岸入河排污口设置不合理&#xff0c;超标排污、未经同意私设排污口等问题逐步显现&#xff0c;已威胁到供水安全、水环境安全和水生态安全&#x…

ChatGPT探索系列之五:讨论人工智能伦理问题及ChatGPT的责任

文章目录 前言一、安全二、隐私和道德三、我们应该做什么总结 前言 ChatGPT发展到目前&#xff0c;其实网上已经有大量资料了&#xff0c;博主做个收口&#xff0c;会出一个ChatGPT探索系列的文章&#xff0c;帮助大家深入了解ChatGPT的。整个系列文章会按照一下目标来完成&am…

STM32(十六)正交编码器

一、简介 增量式编码器 增量式编码器也称为正交编码器&#xff0c;是通过两个信号线的脉冲输出来进行数据处理&#xff0c;一个输出脉冲信号就对应于一个增量位移&#xff0c;编码器每转动固定的位移&#xff0c;就会产生一个脉冲信号 通过读取单位时间脉冲信号的数量&#xff…

自动修改文章的软件-自动修改文案原创软件

有没有自动修改文章的软件 修改文章可能是很多人日常工作中必须完成的任务&#xff0c;但一般情况下&#xff0c;这需要人工完成。幸运的是&#xff0c;现在有很多文章修改软件可以帮助我们节省时间和精力。本文将向您介绍一款优秀的修改文章软件&#xff0c;名为147SEO&#…

SLAM论文速递【SLAM—— TwistSLAM:动态环境下的约束SLAM】—4.17(2)

论文信息 题目&#xff1a; Optimization RGB-D 3-D Reconstruction Algorithm Based on Dynamic SLAM 基于动态SLAM的RGB-D三维重建算法优化论文地址&#xff1a; https://ieeexplore.ieee.org/abstract/document/10050782发表期刊&#xff1a; IEEE Transactions on Instru…

Apache POI 实现用Java操作Excel完成读写操作

简介 Apache POI是一个用于操作Microsoft Office格式文件&#xff08;包括xls、docx、xlsx、pptx等&#xff09;的Java API库。POI全称为Poor Obfuscation Implementation&#xff0c;是Apache Software Foundation的一个开源项目。它提供了一组Java API&#xff0c;使得Java程…

LLM_StableDiffusion_studio发布

背景&#xff1a; 从chatgpt发布到现在已经超过半年时间&#xff0c;AGI的势头越来越猛。大家都在做各种的尝试和组合&#xff0c;把chatgpt通用的强大的知识表达和理解能力尝试应用在自己的业务场景。前期也是出现非常多的业务应用&#xff0c;但是主要还是围绕chatgpt本身已…

循环神经网络(RNN)简单介绍—包括TF和PyTorch源码,并给出详细注释

文章目录 循环神经网络&#xff08;RNN&#xff09;入门教程1. 循环神经网络的原理2. 循环神经网络的应用3. 使用keras框架实现循环神经网络3.1导入对应的库及加载数据集3.2.数据预处理3.3定义RNN模型3.4训练模型3.5测试模型 4.使用PyTorch框架实现上述功能—注释详细5.结论 循…

动静态库的制作和使用

动静态库 一&#xff0c;什么是库二&#xff0c;静态库的制作静态库原理 三&#xff0c;动态库的制作四&#xff0c;动态库的配置五&#xff0c;动态库的加载 一&#xff0c;什么是库 &#x1f680;库这个东西我们一直在使用&#xff0c;举个简单了例子&#xff0c;无论你是用…

netplan, NetworkManager, systemd-networkd简介

1、systemd-networkd简介 systemd-networkd是systemd 的一部分 &#xff0c;负责 systemd 生态中的网络配置部分(systemd-networkd.service, systemd-resolved.service)。使用 systemd-networkd&#xff0c;你可以为网络设备配置基础的 DHCP/静态IP网络等&#xff0c;还可以配…

U8W/U8W-Mini使用与常见问题解决

U8W/U8W-Mini使用与常见问题解决 U8WU8W/U8W-mini简介准备工作U8W/U8W-mini在线联机下载U8W/U8W-mini脱机下载第一步&#xff0c;把程序下载到U8W/U8W-mini烧录器中&#xff1a;第二步&#xff0c;用U8W/U8W-mini烧录器给目标单片机脱机下载 U8W/U8W-mini烧录器使用中常见的问题…

初识Linux运维

一.初识Linux 1.Linux系统内核 内核提供了Linux系统的主要功能&#xff0c;如硬件调度管理的能力。 Linux内核是免费开源的&#xff0c;任何人都可以查看内核的源代码&#xff0c;甚至是贡献源代码。 2.Linux系统发行版 内核无法被用户直接使用&#xff0c;需要配合应用程…

淘宝iOS拍立淘微距能力探索与实现

画面模糊问题的源头也是来自用户的微距体验不佳&#xff0c;我们对问题深入分析&#xff0c;适当拆解。通过 Apple Development Doc 的查阅及实践&#xff0c;一步步抽丝剥茧&#xff0c;最终完美解决用户的体验痛点&#xff0c;也为我们自身沉淀了展示微距的能力。 前言 在最近…

Unix和Linux

UNIX 诞生于 20 世纪 60 年代末 Windows 诞生于 20 世纪 80 年代中期 Linux 诞生于 20 世纪 90 年代初 1965 年&#xff0c;贝尔实验室、美国麻省理工学院和通用电气公司联合发起了Multics 工程计划&#xff0c;目标是开发一种交互式的、具有多道程序处理能力的分时操作系统&a…

NTP服务与SSH服务

NTP&#xff1a;时间同步服务&#xff0c;采用UDP协议&#xff0c;端口号为123。 配置NTP时间服务器&#xff0c;确保客户端主机能和服务主机同步时间 首先&#xff0c;我们必须确保服务端与客户端在同一时区。 更改时区&#xff1a;timedatectl set-timezone asia/shanghai …

隋唐洛阳“西宫”:上阳宫的GIS视角

隋唐洛阳城简介 营建 隋大业元年&#xff08;605年&#xff09;&#xff0c;在隋炀帝的授意下&#xff0c;隋代著名城市设计师宇文恺&#xff0c;在汉魏故城以西重新选址&#xff0c;历时8个月&#xff0c;日役劳工200万&#xff0c;兴建新都洛阳城。 城和苑 隋唐洛阳城采用…

页面注册案例

效果图&#xff1a; 分析业务模块&#xff1a; 发送验证码模块各个表单验证模块勾选已经阅读同意模块下一步验证全部模块&#xff1a;只要上面有一个input验证不通过就不同意提交 业务 1 &#xff1a;发送验证码 用户点击之后&#xff0c;显示05秒后重新获取时间到了&…

大国护眼学习笔记01

第一天&#xff08;23.4.17&#xff09; 2—11节什么是近视&#xff1f; 1、“近视离焦”是指成像点落在视网膜的哪里&#xff1f; 前面 2、“远视离焦”是指成像点落在视网膜的哪里&#xff1f; 后面 3、眼轴变长时&#xff0c;成像点会往前移还是往后移&#xff1f; 前移 4、…

毛灵栋 : 以兴趣为壤,育能力之实 | 提升之路系列(一)

导读 为了发挥清华大学多学科优势&#xff0c;搭建跨学科交叉融合平台&#xff0c;创新跨学科交叉培养模式&#xff0c;培养具有大数据思维和应用创新的“π”型人才&#xff0c;由清华大学研究生院、清华大学大数据研究中心及相关院系共同设计组织的“清华大学大数据能力提升项…

【RP-RV1126】Ubuntu上配置Buildroot Qt 开发板远程开发调试环境(SSH)

文章目录 一、前提二、基础设置建设Buildroot编译Qt5配置SSHBuildroot文件系统添加账号密码开发板联网Buildroot文件系统构建时打开rsync功能 三、QtCreator配置3.1 配置Qt交叉编译套件(Kits)配置Kits里面的交叉编译器配置Kits里面的qmake工具最后配置Kits 3.2 配置远程部署设备…