19 客户端服务订阅机制的核心流程

news2025/1/23 4:03:20

Nacos客户端服务订阅机制的核心流程

说起Nacos的服务订阅机制,大家会觉得比较难理解,那我们就来详细分析一下,那我们先从Nacos订阅的概述说起

Nacos订阅概述

Nacos的订阅机制,如果用一句话来描述就是:Nacos客户端通过一个定时任务,每6秒从注册中心获取实例列表,当发现实例发生变化时,发布变更事件,订阅者进行业务处理(更新实例,更改本地缓存)。
在这里插入图片描述

定时任务开启

其实订阅本质上就是服务发现的一种方式,也就是在服务发现的时候执行订阅方法,触发定时任务去拉取服务端的数据。

​ NacosNamingService中暴露的许多重载的subscribe,重载的目的就是让大家少写一些参数,这些参数呢,Nacos给默认处理了。最终这些重载方法都会调用到下面这个方法:

@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
    throws NacosException {
    if (null == listener) {
        return;
    }
    String clusterString = StringUtils.join(clusters, ",");
    changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
    clientProxy.subscribe(serviceName, groupName, clusterString);
}

这里我们先来看subscribe方法,大家可能有些眼熟它是clientProxy类型调用的方法,实际上就是NamingClientProxyDelegate.subscribe(),所以其实这里和之前的服务发现中调用的是一个方法,这里其实是在做服务列表的查询,所以得出结论查询和订阅都调用了同一个方法.

@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
    String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
    String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
    // 定时调度UpdateTask
    serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
    // 获取缓存中的ServiceInfo
    ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
    if (null == result) {
        // 如果为null,则进行订阅逻辑处理,基于gRPC协议
        result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
    }
    // ServiceInfo本地缓存处理
    serviceInfoHolder.processServiceInfo(result);
    return result;
}

但是这里我们要关注这里的任务调度,该方法包含了构建serviceKey、通过serviceKey判断重复、最后添加UpdateTask,而其中的addTask的实现就是发起了一个定时任务:

public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {
    String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
    if (futureMap.get(serviceKey) != null) {
        return;
    }
    synchronized (futureMap) {
        if (futureMap.get(serviceKey) != null) {
            return;
        }
		//构建UpdateTask
        ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));
        futureMap.put(serviceKey, future);
    }
}

定时任务延迟一秒执行:

private synchronized ScheduledFuture<?> addTask(UpdateTask task) {
    return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}

所以在这里我们得出结论,核心为:调用订阅方法和发起定时任务。

定时任务执行内容

在这里插入图片描述
当我们知道了整体流程以后,我们再来看对应源码:

@Override
public void run() {
    long delayTime = DEFAULT_DELAY;

    try {
        // 判断是服务是否订阅和未开启过定时任务,如果订阅过直接不在执行
        if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) {
            NAMING_LOGGER
                .info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters);
            return;
        }
		
        // 获取缓存的service信息
        ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
        // 如果为空
        if (serviceObj == null) {
            // 根据serviceName从注册中心服务端获取Service信息
            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
            // 处理本地缓存
            serviceInfoHolder.processServiceInfo(serviceObj);
            lastRefTime = serviceObj.getLastRefTime();
            return;
        }
		
        // 过期服务,服务的最新更新时间小于等于缓存刷新(最后一次拉取数据的时间)时间,从注册中心重新查询
        if (serviceObj.getLastRefTime() <= lastRefTime) {
            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
            // 处理本地缓存
            serviceInfoHolder.processServiceInfo(serviceObj);
        }
        //刷新更新时间
        lastRefTime = serviceObj.getLastRefTime();
        if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
            incFailCount();
            return;
        }
        // 下次更新缓存时间设置,默认6秒
        // TODO multiple time can be configured.
        delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
        // 重置失败数量为0(可能会出现失败情况,没有ServiceInfo,连接失败)
        resetFailCount();
    } catch (Throwable e) {
        incFailCount();
        NAMING_LOGGER.warn("[NA] failed to update serviceName: {}", groupedServiceName, e);
    } finally {
        // 下次调度刷新时间,下次执行的时间与failCount有关,failCount=0,则下次调度时间为6秒,最长为1分钟
        // 即当无异常情况下缓存实例的刷新时间是6秒
        executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
    }
}

业务逻辑最后会计算下一次定时任务的执行时间,通过delayTime来延迟执行。delayTime默认为 1000L * 6,也就是6秒。而在finally里面真的发起下一次定时任务。当出现异常时,下次执行的时间与失败次数有关,但最长不超过1分钟。

总结:

  1. 订阅方法的调用,并进行EventListener的注册,后面UpdateTask要用来进行判断;

  2. 通过委托代理类来处理订阅逻辑,此处与获取实例列表方法使用了同一个方法;

  3. 通过定时任务执行UpdateTask方法,默认执行间隔为6秒,当发生异常时会延长,但不超过1分钟;

  4. UpdateTask方法中会比较本地是否存在缓存,缓存是否过期。当不存在或过期时,查询注册中心,获取最新实例,更新最后获取时间,处理ServiceInfo。

  5. 重新计算定时任务时间,循环执行流程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/386640.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vue自定义指令以及angular自定义指令(以禁止输入空格为例)

哈喽&#xff0c;小伙伴们&#xff0c;大家好啊&#xff0c;最近要实现一个vue自定义指令&#xff0c;就是让input输入框禁止输入空格建立一个directives的指令文件&#xff0c;里面专门用来建立各个指令的官方文档&#xff1a;自定义指令 | Vue.js (vuejs.org)我们都知道vue中…

小白学Pytorch 系列--Torch API

小白学Pytorch 系列–Torch API Torch version 1.13 Tensors TORCH.IS_TENSOR 如果obj是PyTorch张量&#xff0c;则返回True。 注意&#xff0c;这个函数只是简单地执行isinstance(obj, Tensor)。使用isinstance 更适合用mypy进行类型检查&#xff0c;而且更显式-所以建议使…

开发手册——一、编程规约_5.集合处理

这篇文章主要梳理了在java的实际开发过程中的编程规范问题。本篇文章主要借鉴于《阿里巴巴java开发手册终极版》 下面我们一起来看一下吧。 1. 【强制】关于 hashCode 和 equals 的处理&#xff0c;遵循如下规则&#xff1a; 只要重写 equals&#xff0c;就必须重写 hashCod…

I.MX6ULL_Linux_系统篇(21) kernel启动流程

链接脚本 vmlinux.lds 要分析 Linux 启动流程&#xff0c;同样需要先编译一下 Linux 源码&#xff0c;因为有很多文件是需要编译才会生成的。首先分析 Linux 内核的连接脚本文件 arch/arm/kernel/vmlinux.lds&#xff0c;通过链接脚本可以 找到 Linux 内核的第一行程序是从哪里…

计算机网络安全基础知识3:网站漏洞,安装phpstudy,安装靶场漏洞DVWA,搭建一个网站

计算机网络安全基础知识3&#xff1a;网站漏洞&#xff0c;安装phpstudy&#xff0c;安装靶场漏洞DVWA&#xff0c;搭建一个网站 2022找工作是学历、能力和运气的超强结合体&#xff0c;遇到寒冬&#xff0c;大厂不招人&#xff0c;可能很多算法学生都得去找开发&#xff0c;测…

7.桥接模式

目录 简介 定义 特点 结构 示例 1. 新建 Brand.interface 接口类&#xff0c;定义不同品牌手机共有的基本功能 2. 新建 Xiaomi.class 类&#xff0c;实现 Brand.interface 接口&#xff0c;实现具体功能 3. 新建 Vivo.class 类&#xff0c;实现 Brand.interface 接口&…

Mybatis源码学习笔记(六)之Mybatis中集成日志框架原理解析

1 Mybatis中集成日志框架示例 1.1 Mybatis使用log4j示例&#xff08;推荐方式&#xff09; 第一步&#xff1a;pom.xml引入log4j依赖 <!-- slf4j日志门面 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId&…

Thinkphp6使用RabbitMQ消息队列

Thinkphp6连接使用RabbitMQ&#xff08;不止tp6&#xff0c;其他框架对应改下也一样&#xff09;&#xff0c;如何使用Docker部署RabbitMQ&#xff0c;在上一篇已经讲了->传送门<-。 部署环境 开始前先进入RabbitMQ的web管理界面&#xff0c;选择Queues菜单&#xff0c;点…

深度学习实战20(进阶版)-文件智能搜索系统,可以根据文件内容进行关键词搜索,快速找到文件

大家好&#xff0c;我是微学AI&#xff0c;今天给大家带来深度学习实战项目-文件智能搜索系统&#xff0c;文件智能搜索系统是一种能够帮助用户通过文件的内容快速搜索和定位文件的软件系统。 随着互联网和数字化技术的普及&#xff0c;数据和信息呈现爆炸式增长的趋势&#xf…

ubuntu 将jupyter-lab保存为桌面快捷方式和favourites

ubuntu: 将jupyter-lab保存为桌面快捷方式和favourites desktop shortcut 建立一个新的desktop文件 cd ~/Desktop touch Jupyter-lab.desktop将文件修改成如下&#xff1a; [Desktop Entry] Version1.0 NameJupyterlab CommentBack up your data with one click Exec/home/cjb/…

SpringCloud学习笔记(一)

单体应用架构 在诞⽣之初&#xff0c;拉勾的⽤户量、数据量规模都⽐较⼩&#xff0c;项目所有的功能模块都放在一个工程中编码、编译、打包并且部署在一个Tomcat容器中的架构模式就是单体应用架构。 优点&#xff1a; 高效开发&#xff1a;项⽬前期开发节奏快&#xff0c;团…

02-Oracle数据库的启动与关闭

本文章主要讲解Oracle数据库的启动与关闭方法&#xff0c;详细讲解启动Oracle的命令&#xff0c;三种启动数据库的方法及区别&#xff1b;关闭数据库的4种方法及他们的区别。 启动和关闭数据库 •数据库没启动前&#xff0c;只有拥有DBA权限或者以sysoper或sysdba身份才能连接到…

设计跳表(动态设置节点高度)

最近学习redis的zset时候&#xff0c;又看到跳表的思想&#xff0c;突然对跳表的设置有了新的思考 这是19年设计的跳表&#xff0c;在leetcode的执行时间是200ms 现在我对跳表有了新的想法 1、跳表的设计&#xff0c;类似二分查找&#xff0c;但是不是二分查找&#xff0c;比较…

基于Canal的数据同步

基于Canal的数据同步 一、 系统结构 该数据同步系统由Spring Boot和Canal共同组成。 Spring Boot 是一个流行的 Java Web 框架&#xff0c;而 Canal 则是阿里巴巴开源的 MySQL 数据库的数据变更监听框架。结合 Spring Boot 和 Canal&#xff0c;可以实现 MySQL 数据库的实时数…

ASGCN之图卷积网络(GCN)

文章目录前言1. 理论部分1.1 为什么会出现图卷积网络&#xff1f;1.2 图卷积网络的推导过程1.3 图卷积网络的公式2. 代码实现参考资料前言 本文从使用图卷积网络的目的出发&#xff0c;先对图卷积网络的来源与公式做简要介绍&#xff0c;之后通过一个例子来代码实现图卷积网络…

Linux命令行安装Oracle19c教程和踩坑经验

安装 下载 从 Oracle官方下载地址 需要的版本&#xff0c;本次安装是在Linux上使用yum安装&#xff0c;因此下载的是RPM。另外&#xff0c;需要说明的是&#xff0c;Oracle加了锁的下载需要登录用户才能安装&#xff0c;而用户是可以免费注册的&#xff0c;这里不做过多说明。 …

最新使用nvm控制node版本步骤

一、完全卸载已经安装的node、和环境变量 ①、打开控制面板的应用与功能&#xff0c;搜索node&#xff0c;点击卸载 ②、打开环境变量&#xff0c;将node相关的所有配置清除 ③、打开命令行工具&#xff0c;输入node-v&#xff0c;没有版本号则卸载成功 二、下载nvm安装包 ①…

SBUS的协议详解

SBUS 1.串口配置&#xff1a; 100k波特率&#xff0c; 8位数据位&#xff08;在stm32中要选择9位&#xff09;&#xff0c; 偶校验&#xff08;EVEN), 2位停止位&#xff0c; 无控流&#xff0c;25个字节&#xff0c; 2.协议格式&#xff1a; [startbyte] [data1][data2]……

单月涨粉超600w,直播销售额破5亿,2月的黑马都是谁?

2月的抖音&#xff0c;黑马多多&#xff0c;处处有爆点。有直播间热度不减&#xff0c;在带货领域持续位列前茅&#xff1b;有达人通过“抓马式”连麦直播&#xff0c;涨粉657w&#xff1b;有0.01元的低价商品&#xff0c;一天热卖超1000w个。那么&#xff0c;2月有哪些主播表现…

微服务实战03-注册数据服务

EurekaServer &#xff0c;它扮演的角色是注册中心&#xff0c;用于注册各种微服务&#xff0c;以便于其他微服务找到和访问。有了EurekaServer&#xff0c;还需要一些微服务&#xff0c;注册到EurekaServer上去。 这一节&#xff0c;我们来写一个注册微服务。为了简单起见&am…