Spring Cloud Nacos源码讲解(七)- Nacos客户端服务订阅机制的核心流程

news2024/9/28 17:28:10

Nacos客户端服务订阅机制的核心流程

​ 说起Nacos的服务订阅机制,大家会觉得比较难理解,那我们就来详细分析一下,那我们先从Nacos订阅的概述说起

Nacos订阅概述

​ Nacos的订阅机制,如果用一句话来描述就是:Nacos客户端通过一个定时任务,每6秒从注册中心获取实例列表,当发现实例发生变化时,发布变更事件,订阅者进行业务处理(更新实例,更改本地缓存)。

​ 以下是订阅方法的主线流程,涉及内容比较多,细节比较复杂,所以这里我们主要学习核心部分。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0Coadaqu-1677029556577)(image-20211025182722346.png)]

定时任务开启

​ 其实订阅本质上就是服务发现的一种方式,也就是在服务发现的时候执行订阅方法,触发定时任务去拉取服务端的数据。

​ NacosNamingService中暴露的许多重载的subscribe,重载的目的就是让大家少写一些参数,这些参数呢,Nacos给默认处理了。最终这些重载方法都会调用到下面这个方法:

@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
    throws NacosException {
    if (null == listener) {
        return;
    }
    String clusterString = StringUtils.join(clusters, ",");
    changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
    clientProxy.subscribe(serviceName, groupName, clusterString);
}

​ 这里我们先来看subscribe方法,大家可能有些眼熟它是clientProxy类型调用的方法,实际上就是NamingClientProxyDelegate.subscribe(),所以其实这里和之前的服务发现中调用的是一个方法,这里其实是在做服务列表的查询,所以得出结论查询和订阅都调用了同一个方法

@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
    String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
    String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
    // 定时调度UpdateTask
    serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
    // 获取缓存中的ServiceInfo
    ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
    if (null == result) {
        // 如果为null,则进行订阅逻辑处理,基于gRPC协议
        result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
    }
    // ServiceInfo本地缓存处理
    serviceInfoHolder.processServiceInfo(result);
    return result;
}

​ 但是这里我们要关注这里的任务调度,该方法包含了构建serviceKey、通过serviceKey判断重复、最后添加UpdateTask,而其中的addTask的实现就是发起了一个定时任务:

public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {
    String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
    if (futureMap.get(serviceKey) != null) {
        return;
    }
    synchronized (futureMap) {
        if (futureMap.get(serviceKey) != null) {
            return;
        }
		//构建UpdateTask
        ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));
        futureMap.put(serviceKey, future);
    }
}

​ 定时任务延迟一秒执行:

private synchronized ScheduledFuture<?> addTask(UpdateTask task) {
    return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}

​ 所以在这里我们得出结论,核心为:调用订阅方法和发起定时任务。

定时任务执行内容

​ UpdateTask封装了订阅机制的核心业务逻辑,我们来看一下流程图:

​ 当我们知道了整体流程以后,我们再来看对应源码:

@Override
public void run() {
    long delayTime = DEFAULT_DELAY;

    try {
        // 判断是服务是否订阅和未开启过定时任务,如果订阅过直接不在执行
        if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) {
            NAMING_LOGGER
                .info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters);
            return;
        }
		
        // 获取缓存的service信息
        ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
        // 如果为空
        if (serviceObj == null) {
            // 根据serviceName从注册中心服务端获取Service信息
            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
            // 处理本地缓存
            serviceInfoHolder.processServiceInfo(serviceObj);
            lastRefTime = serviceObj.getLastRefTime();
            return;
        }
		
        // 过期服务,服务的最新更新时间小于等于缓存刷新(最后一次拉取数据的时间)时间,从注册中心重新查询
        if (serviceObj.getLastRefTime() <= lastRefTime) {
            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
            // 处理本地缓存
            serviceInfoHolder.processServiceInfo(serviceObj);
        }
        //刷新更新时间
        lastRefTime = serviceObj.getLastRefTime();
        if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
            incFailCount();
            return;
        }
        // 下次更新缓存时间设置,默认6秒
        // TODO multiple time can be configured.
        delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
        // 重置失败数量为0(可能会出现失败情况,没有ServiceInfo,连接失败)
        resetFailCount();
    } catch (Throwable e) {
        incFailCount();
        NAMING_LOGGER.warn("[NA] failed to update serviceName: {}", groupedServiceName, e);
    } finally {
        // 下次调度刷新时间,下次执行的时间与failCount有关,failCount=0,则下次调度时间为6秒,最长为1分钟
        // 即当无异常情况下缓存实例的刷新时间是6秒
        executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
    }
}

​ 业务逻辑最后会计算下一次定时任务的执行时间,通过delayTime来延迟执行。delayTime默认为 1000L * 6,也就是6秒。而在finally里面真的发起下一次定时任务。当出现异常时,下次执行的时间与失败次数有关,但最长不超过1分钟。

总结:

  1. 订阅方法的调用,并进行EventListener的注册,后面UpdateTask要用来进行判断;

  2. 通过委托代理类来处理订阅逻辑,此处与获取实例列表方法使用了同一个方法;

  3. 通过定时任务执行UpdateTask方法,默认执行间隔为6秒,当发生异常时会延长,但不超过1分钟;

  4. UpdateTask方法中会比较本地是否存在缓存,缓存是否过期。当不存在或过期时,查询注册中心,获取最新实例,更新最后获取时间,处理ServiceInfo。

  5. 重新计算定时任务时间,循环执行流程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/371238.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

十四、项目实战二(CORS、同源策略、cookie跨域)

项目实战二 需求 以前后端分离的方式实现学生的增删改查操作 学生列表接口 url:/students/ 请求方法&#xff1a;get 参数&#xff1a; 格式&#xff1a;查询参数 参数名类型是否必传说明pageint否页码&#xff0c;默认为1sizeinit否每页数据条数默认为10namestr否根据姓…

SpringBoot+HttpClient+JsonPath提取A接口返回值作为参数调用B接口

前言 在做java接口自动化中&#xff0c;我们常常需要依赖多个接口&#xff0c;A接口依赖B&#xff0c;C&#xff0c;D接口的响应作为请求参数&#xff1b;或者URL中的参数是从其他接口中提取返回值作获取参数这是必不可少的。那么怎么实现呢&#xff1f;下面就来介绍多业务依赖…

大数据技术之Hive(三)函数

hive有大量内置函数&#xff0c;大致可分为&#xff1a;单行函数、聚合函数、炸裂函数、窗口函数。查看内置函数show functions;查看内置函数用法desc function upper;查看内置函数详细信息desc function extended upper;一、单行函数单行函数的特点是一进一出&#xff0c;输入…

Kubernetes之服务的基本管理

svc是kubernetes最核心的概念&#xff0c;通过创建Service&#xff0c;可以为一组具有相同功能的容器应用提供一个统一的入口地址&#xff0c;并将请求进行负载分发到后端的各个容器应用上。pod生命周期短不稳定&#xff0c;pod异常后新生成的pod的IP会发生变化&#xff0c;通过…

JSD2212班第二次串讲-面向对象阶段

1. 概述 系统介绍一下接下来阶段要学习的内容&#xff1a; 1.JAVA&#xff1a; 1.1 java语言基础 1.2 java面向对象阶段&#xff08;很重要&#xff01;起的承上启下&#xff01;&#xff01;&#xff09; 1.3 API阶段&#xff08;Application Interface &#xff09;学会看字…

Spring Cloud @RefreshScope 原理分析:代理类的创建

背景 前面我们知道&#xff0c;被 RefreshScope 注解的实例&#xff0c;在扫描生成 BeanDefiniton 时&#xff0c;被偷龙转凤了&#xff1a;注册了两个 Bean 定义&#xff0c;一个 beanName 同名、类型是 LockedScopedProxyFactoryBean.class 代理工厂 Bean&#xff0c;一个 s…

6-Java中新建一个文件、目录、路径

文章目录前言1-文件、目录、路径2-在当前路径下创建一个文件3-在当前路径下创建一个文件夹&#xff08;目录&#xff09;3.1 测试1-路径已经存在3.2 测试2-路径不存在3.2 创建不存在的路径并新建文件3.3 删除已存在的文件并新建4-总结前言 学习Java中如何新建文件、目录、路径…

ASE40N50SH-ASEMI高压MOS管ASE40N50SH

编辑-Z ASE40N50SH在TO-247封装里的静态漏极源导通电阻&#xff08;RDS(ON)&#xff09;为100mΩ&#xff0c;是一款N沟道高压MOS管。ASE40N50SH的最大脉冲正向电流ISM为160A&#xff0c;零栅极电压漏极电流(IDSS)为1uA&#xff0c;其工作时耐温度范围为-55~150摄氏度。ASE40N…

【架构师】零基础到精通——架构发展

博客昵称&#xff1a;架构师Cool 最喜欢的座右铭&#xff1a;一以贯之的努力&#xff0c;不得懈怠的人生。 作者简介&#xff1a;一名Coder&#xff0c;软件设计师/鸿蒙高级工程师认证&#xff0c;在备战高级架构师/系统分析师&#xff0c;欢迎关注小弟&#xff01; 博主小留言…

PHP - ChatGpt API 接入 ,代码,亲测!(最简单!)

由于最近ChatGpt 大火&#xff0c;但是门槛来说是对于大家最头疼的环节&#xff0c; 我自己也先开发了一个个人小程序&#xff01;大家可以访问使用下&#xff0c; 由此ChatGpt 有一个API 可以仅供大伙对接 让我来说下资质&#xff1a; 1&#xff1a;首先要搞得到一个 ChatGp…

HttpRunner接口自动化测试框架--常见问题

本篇文章主要总结在使用httprunner框架做接口自动化测试过程中遇到的问题 官方的问题总结&#xff1a;Issues httprunner/httprunner GitHub 1.在参数化过程中读取CSV文件&#xff0c;不能读取出整型来。 读取下方文件数据&#xff0c;全部是字符串格式 原因&#xff1a;c…

操作系统(复试准备)

操作系统&#xff08;复试准备&#xff09; 第一章知识点 操作系统概述 操作系统的概念 负责协调软硬件等计算机资源的工作 为上层用户&#xff0c;应用程序提供简单易用的接口 是一种系统软件 操作系统的功能与目标 资源的管理者 处理机管理&#xff0c;存储器管理&#x…

Linux 基础知识:指令与shell

目录一、操作系统二、指令三、shell一、操作系统 什么是操作系统&#xff1f; 单纯的操作系统应该是指操作系统内核。内核的作用就是管理计算机的软硬件资源&#xff0c;让计算机在合适的时候干合适的事情。 但是有一个问题&#xff0c;并不是人人都会直接通过内核来操作计算机…

异常信息记录入库

方案介绍 将异常信息放在日志里面&#xff0c;如果磁盘定期清理&#xff0c;会导致很久之前的日志丢失&#xff0c;因此考虑将日志中的异常信息存在表里&#xff0c;方便后期查看定位问题。 由于项目是基于SpringBoot构架的&#xff0c;所以采用AdviceControllerExceptionHand…

【跟着ChatGPT学深度学习】ChatGPT带我入门深度学习

❤️觉得内容不错的话&#xff0c;欢迎点赞收藏加关注&#x1f60a;&#x1f60a;&#x1f60a;&#xff0c;后续会继续输入更多优质内容❤️&#x1f449;有问题欢迎大家加关注私戳或者评论&#xff08;包括但不限于NLP算法相关&#xff0c;linux学习相关&#xff0c;读研读博…

树与二叉树与森林的相关性质

文章目录树的度树的性质二叉树的性质二叉树与森林树的度 树的度指的是树内所有节点的度数的最大值。 节点的度&#xff1a;节点所拥有的子树的数量。简单来说&#xff0c;我们直接数分支即可&#xff0c;例如下图&#xff1a; 在这颗二叉树中&#xff0c;节点2的度为2&#…

【Java】Synchronized锁原理和优化

一、synchronized介绍 synchronized中文意思是同步&#xff0c;也称之为”同步锁“。 synchronized的作用是保证在同一时刻&#xff0c; 被修饰的代码块或方法只会有一个线程执行&#xff0c;以达到保证并发安全的效果。 synchronized是Java中解决并发问题的一种最常用的方法…

K3S系列文章-使用AutoK3s在腾讯云上安装高可用K3S集群

开篇 《K3s 系列文章》《Rancher 系列文章》 方案 在腾讯云上安装 K3S 后续会在这套 K3S 集群上安装 Rancher 方案目标 高可用3 台master 的 k3s 集群 数据备份k3s 数据备份到 腾讯云对象存储 cos 尽量复用公有云的能力Tencent Cloud Controller Manager (❌ 因为腾讯云已…

【LINUX】环境变量以及main函数的参数

文章目录前言环境变量常见环境变量&#xff1a;设置环境变量&#xff1a;和环境变量相关的命令&#xff1a;环境变量的组织方式&#xff1a;获取环境变量环境变量可以被子进程继承环境变量总结main函数的参数前言 大家好久不见&#xff0c;今天分享的内容是环境变量和main函数…

JUC并发编程与源码分析笔记09-原子类操作之十八罗汉增强

基本类型原子类 AtomicInteger、AtomicBoolean、AtomicLong。 常用API&#xff1a; public final int get();// 获取当前的值 public final int getAndSet(int newValue);// 获取当前值&#xff0c;并设置新值 public final int getAndIncrement();// 获取当前的值&#xff0…