死磕Nacos系列:Nacos是如何更新服务信息的?

news2024/11/26 10:18:04

前言

说到服务信息,我们还是得回到NamingService,因为这是和NacosServer进行服务注册的核心组件,内部提供了注册、获取Nacos实例的能力。至于其他组件,如Ribbon,在调用时需要所有实例信息来进行负载,那肯定就是通过NamingService的能力来获取到所有的实例。

NamingService

NamingService中获取实例主要有两类方法,一类是getAllInstances、另一类是selectInstances,它们最主要的区别就是selectInstances增加了对实例是否健康的过滤的支持。

既然如此,那我们直接来就看看selectInstances的逻辑:

@Override
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,
        boolean subscribe) throws NacosException {
    
    ServiceInfo serviceInfo;
    if (subscribe) {
        serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
                StringUtils.join(clusters, ","));
    } else {
        serviceInfo = hostReactor
                .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
                        StringUtils.join(clusters, ","));
    }
    return selectInstances(serviceInfo, healthy);
}

上述代码有两个逻辑:

1、传入的subscribe,如果是true,就代表是订阅模式,就走hostReactor.getServiceInfo查询,反之走hostReactor.getServiceInfoDirectlyFromServer查询。
2、根据实例的健康状态进行过滤返回。

那我们就先走进HostReactor,对它提供的两个能力进行分析:

HostReactor

这个类是在实例化NamingService时在构造函数中实例化的一个对象。

而在HostReactor实例化时,其构造函数会创建一个定时线程池,核心线程数量可以通过spring.cloud.nacos.discovery.namingPollingThreadCount进行控制,默认是当前机器核心数的一半,最少为1。

也会实例化一个FailoverReactor,是一个容灾备份反应器,其内部实例化了一个单线程的定时线程池,内部由两个延迟定时任务组成:

  • SwitchRefresher

    每隔5秒去检查文件系统中是否有cacheDir + "/failover00-00---000-VIPSRV_FAILOVER_SWITCH-000---00-00"文件,如果有,那么就标记为容灾模式。

    cacheDir的取值如下:

    private void initCacheDir() {
        cacheDir = System.getProperty("com.alibaba.nacos.naming.cache.dir");
        if (StringUtils.isEmpty(cacheDir)) {
            cacheDir = System.getProperty("user.home") + "/nacos/naming/" + namespace;
        }
    }
    
  • FailoverFileReader

    如果是容灾模式,就从文件中读取Service信息。

  • DiskFileWriter

    初始延迟30分钟,后每隔1天serviceInfoMap(服务信息)写入到cacheDir + "/failover文件夹下。

整个FailoverReactor的示意图如下:

image-20231126131554348

在了解了HostReactor的基本情况后,我们来对上面调用的两个方法进行分析。

getServiceInfo

public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
    
    NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
    String key = ServiceInfo.getKey(serviceName, clusters);
    // 如果是容灾模式,就从failoverReactor中获取文件系统中缓存的ServiceInfo信息
    if (failoverReactor.isFailoverSwitch()) {
        return failoverReactor.getService(key);
    }
    
    // 从HostReactor维护的serviceInfoMap中取ServiceInfo
    ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
    
    // 如果HostReactor中没有
    if (null == serviceObj) {
        serviceObj = new ServiceInfo(serviceName, clusters);
        // 存入serviceInfoMap
        serviceInfoMap.put(serviceObj.getKey(), serviceObj);
        
        updatingMap.put(serviceName, new Object());
        // 查询服务端的这个Service的信息,如果有,则使用和接收到服务端Service推送一致的更新处理
        updateServiceNow(serviceName, clusters);
        updatingMap.remove(serviceName);
        
    } else if (updatingMap.containsKey(serviceName)) {
        // 如果原来的serviceInfoMap有数据,但updatingMap又存在,说明可能存在了并发问题,则需要锁住serviceObj一段时间,等待执行完成
        if (UPDATE_HOLD_INTERVAL > 0) {
            // hold a moment waiting for update finish
            synchronized (serviceObj) {
                try {
                    serviceObj.wait(UPDATE_HOLD_INTERVAL);
                } catch (InterruptedException e) {
                    NAMING_LOGGER
                            .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                }
            }
        }
    }
    
    // 用一个定时线程池去执行Servcie的主动更新
    scheduleUpdateIfAbsent(serviceName, clusters);
    
    // 从本地serviceInfoMap中获取ServiceInfo
    return serviceInfoMap.get(serviceObj.getKey());
}

在现在就去更新服务信息的方法updateServiceNow中,最终会调用的processServiceJson方法,方法太长,直接说逻辑:

public ServiceInfo processServiceJson(String json) {
  ....   
}

1、如果是以前就存在于本地serviceInfoMap中的数据,就分别计算出其中新增,修改,删除的实例,如果是修改的实例,需要更新心跳信息。

2、更新本地serviceInfoMap

3、使用EventDispatcher发布通知事件。

4、ServiceInfo写入本地磁盘。

在定时更新服务信息的方法scheduleUpdateIfAbsent中:

public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
    if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
        return;
    }
    
    synchronized (futureMap) {
        if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
            return;
        }
        
        ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
        futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
    }
}

如果futureMap中不存在这个Service的任务,才会使用addTask进行添加。addTask中就是将UpdateTask延迟1s执行一次。具体后续调用是在UpdateTask中实现的。

@Override
public void run() {
    long delayTime = DEFAULT_DELAY;
    
    try {
        ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
        
        if (serviceObj == null) {
            updateService(serviceName, clusters);
            return;
        }
        
        if (serviceObj.getLastRefTime() <= lastRefTime) {
            updateService(serviceName, clusters);
            serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
        } else {
            // if serviceName already updated by push, we should not override it
            // since the push data may be different from pull through force push
            refreshOnly(serviceName, clusters);
        }
        
        lastRefTime = serviceObj.getLastRefTime();
        
        if (!eventDispatcher.isSubscribed(serviceName, clusters) && !futureMap
                .containsKey(ServiceInfo.getKey(serviceName, clusters))) {
            // abort the update task
            NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
            return;
        }
        if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
            incFailCount();
            return;
        }
        delayTime = serviceObj.getCacheMillis();
        resetFailCount();
    } catch (Throwable e) {
        incFailCount();
        NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
    } finally {
        executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
    }
}

上述代码简化一下,就是以下几个点:

1、如果本地serviceInfoMap中没有这个Service,就去服务端查询并更新。

2、如果通过这样的拉模式下最后修改的时间是大于这个Service本身的修改时间的,才进行更新。

3、下一次执行的时间是根据失败次数来定的,比如第一次失败,那就是delayTime左移一位,失败几次就左移几次,最多左移6次,且最大延迟60s执行。

getServiceInfoDirectlyFromServer

public ServiceInfo getServiceInfoDirectlyFromServer(final String serviceName, final String clusters)
        throws NacosException {
    String result = serverProxy.queryList(serviceName, clusters, 0, false);
    if (StringUtils.isNotEmpty(result)) {
        return JacksonUtils.toObj(result, ServiceInfo.class);
    }
    return null;
}

这个方法就是直接请求服务端拿到ServiceInfo的数据。

思考

看到这里,我们会不会有个疑问?那就是HostReactor是通过定时执行去更新服务信息的,那如果在时间间隔内有其他Servcie信息的更新呢?那我们岂不是得等到下一次任务执行时才能得到更新后的信息?

Nacos是考虑到了的,通过用定时任务通过HTTP去拉数据,和接收服务端通过UDP推送的数据,一拉一推来保证数据的实时性。

HostReactor中的PushReceiver就是客户端侧对服务端侧推数据的处理器。

PushReceiver

HostReactor在实例化时,其构造方法中也会实例化一个PushReceiver,其内部是一个单线程的定时线程池,死循环,用来接收来自服务端的信息,以及向服务端发送ACK确认信息。

image-20231126122032248

核心代码如下:

@Override
public void run() {
    while (!closed) {
        try {
            
            // byte[] is initialized with 0 full filled by default
            byte[] buffer = new byte[UDP_MSS];
            DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
            
            udpSocket.receive(packet);
            
            String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
            NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
            
            PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
            String ack;
            if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
                hostReactor.processServiceJson(pushPacket.data);
                
                // send ack to server
                ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
                        + "\"\"}";
            } else if ("dump".equals(pushPacket.type)) {
                // dump data to server
                ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
                        + "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap()))
                        + "\"}";
            } else {
                // do nothing send ack only
                ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                        + "\", \"data\":" + "\"\"}";
            }
            
            udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
                    packet.getSocketAddress()));
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] error while receiving push data", e);
        }
    }
}

上述代码主要有以下几个逻辑:

1、死循环接收udp的数据。

2、如果是"dom"或者"service"类型的消息,会交由HostReactor进行处理。

3、向服务端发送ack确认信息。

总结

Nacos是通过定时任务使用HTTP拉数据,和接收服务端通过UDP推送的数据来实现更新服务信息的目的。

今天的内容中还涉及到了Nacos的容灾处理,可以通过在磁盘中配置达到开启本地容灾的模式。在获取实例时,就会去本地磁盘中的备份文件中去找服务实例的数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1257666.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

性价比高的学生台灯什么牌子好?备考用眼必备护眼台灯推荐

近期&#xff0c;浙江温州市场监督管理局开展了流通领域学生用品质量监督抽查&#xff0c;本期抽检4批LED读写台灯&#xff0c;检出1批产品不合格&#xff0c;不合格检出率为25%。广东久量股份有限公司生产的1款LED时尚无级调光台灯&#xff0c;检出“标记、爬电距离和电气间隙…

Unity 打印每次代码保存耗时

unity每次编辑代码的时候&#xff0c;都需要保存&#xff0c;unity右下角的小圆圈总是转个不停&#xff0c;那么每次编辑代码后&#xff0c;unity到底需要多久时间呢&#xff0c;下面就有代码可以获取 保存时间。 using UnityEngine; using UnityEditor; using UnityEditor.Com…

高功率工业RFID读写器怎么用?

工业读写器相比于通用型读写器&#xff0c;IP防护等级更高&#xff0c;抗干扰性能也更强&#xff0c;能够适应工业应用中的各种恶劣环境。在具体应用中&#xff0c;工业读写器根据功率大小也可分为大功率、中功率&#xff0c;功率大小直接影响读距远近&#xff0c;因此在实际应…

轻松配置PPPoE连接:路由器设置和步骤详解

在家庭网络环境中&#xff0c;我们经常使用PPPoE&#xff08;点对点协议过夜&#xff09;连接来接入宽带互联网。然而&#xff0c;对于一些没有网络专业知识的人来说&#xff0c;配置PPPoE连接可能会有些困难。在本文中&#xff0c;我将详细介绍如何轻松配置PPPoE连接&#xff…

Java代码生成器,一键在线生成,支持自定义模板

【Java代码生成神器】自动化生成Java实体类、代码、增删改查功能&#xff01;点击访问 推荐一个自己每天都在用的Java代码生成器&#xff01;这个网站支持在线生成Java代码&#xff0c;包含完整的Controller\Service\Entity\Dao代码&#xff0c;完整的增删改查功能&#xff01…

Mysql使用周期性计划任务定时备份,发现备份的文件都是空的?为什么?如何解决?

&#x1f468;‍&#x1f393;博主简介 &#x1f3c5;云计算领域优质创作者   &#x1f3c5;华为云开发者社区专家博主   &#x1f3c5;阿里云开发者社区专家博主 &#x1f48a;交流社区&#xff1a;运维交流社区 欢迎大家的加入&#xff01; &#x1f40b; 希望大家多多支…

请停止在简历上写: 精通Python, 会害了你

离了个大谱&#xff01; 瑞银暑期实习生都要求精通Python? 你以为能用Python演示“hello world" 就是精通Python了么&#xff1f; too yang too天真 一、不会Python的我们不要 1、瑞士银行 瑞士银行的暑期实习岗位JD中要求应聘者精通编程语言&#xff0c;特别是C或…

苹果cms搭建教程附带免费模板

准备工作: 一台服务器域名源码安装好NGINX+PHP7.0+MYSQL5.5 安装php7.0的扩展,fileinfo和 sg11,不安装网站会搭建失败。 两个扩展都全部安装好了之后 点击-服务-重载配置 这样我们的网站环境就配置完成啦 下载苹果cms 苹果cms程序github链接:选择mac10!下载即可 http…

数字技术,为企业全面预算管理贡献数智力量

近年来&#xff0c;我国数字技术的急速发展使得企业预算管理方式产生了诸多变化。先进的技术是全面预算管理系统被广泛应用的保障&#xff0c;企业管理也逐渐从传统的独立信息化系统朝着数智化、自动化主导的集群方向转变。以数据为核心、技术为支撑的全面预算管理系统&#xf…

什么是包装生产ERP?可以带给企业哪些优势

包装生产是比较常见的加工业务&#xff0c;不同的包装生产有不同的业务流程和管理模式&#xff0c;随着原材料成本、人工成本和水电等成本的上涨&#xff0c;企业也面临较大的经营压力。 如何整合各项资源&#xff0c;优化生产过程&#xff0c;降低运作成本&#xff0c;提升商…

MIT_线性代数笔记:第 07 讲 求解 Ax=0:主变量,特解

目录 前言计算零空间 Nullspace特解 Special solutions行最简阶梯矩阵 Reduced row echelon form &#xff08;rref&#xff09; 前言 我们定义了矩阵的列空间和零空间&#xff0c;那么如何求得这些子空间呢&#xff1f;本节课的内容即从定义转到算法。 计算零空间 Nullspace…

某生物科技巨头:引入安全工具,推动基因科技领域智能化发展

某生物科技巨头是生物科技领域的领导者&#xff0c;业务覆盖行业全产业链、全应用领域&#xff0c;是全球领先的科学技术服务提供商和精准医疗服务运营商。一直以来&#xff0c;该生物科技机构都致力于加速推动以基因科技为支撑的生命数字化建设&#xff0c;实现批量短基因快速…

redis Redis::geoAdd 无效,phpstudy 如何升级redis版本

redis 查看当前版本命令 INFO SERVERwindows 版redis 进入下载 geoadd 功能在3.2之后才有的&#xff0c;但是phpstudy提供的最新的版本也是在3.0&#xff0c;所以需要升级下 所以想出一个 挂狗头&#xff0c;卖羊肉的方法&#xff0c;下载windows 的程序&#xff0c;直接替…

第二证券:煤炭板块震荡走高 潞安环能、晋控煤业涨超5%

证券时报网讯&#xff0c;煤炭板块27日盘中发力走高&#xff0c;到发稿&#xff0c;潞安环能、晋控煤业涨超5%&#xff0c;平煤股份、山西焦煤涨逾3%&#xff0c;恒源煤电、开滦股份等上扬。 职业方面&#xff0c;近期寒潮来袭&#xff0c;气温下降带动居民用电需求增加&#…

海外Leads Generation产业:中国出海群体的行业大机会

Leads Generation&#xff08;简称LeadsGen&#xff09;指的是集中精力吸引和开发潜在客户的营销策略。通过引导式的营销策略&#xff0c;企业分发内容吸引潜在客户&#xff0c;引导客户留下电话/邮件/姓名等信息。基于这些信息&#xff0c;企业可建立潜在客户数据库&#xff0…

iOS移动应用程序的备案与SHA-1值查看

​ 目录 &#x1f4dd;iOS移动应用程序的备案与SHA-1值查看 引言 第一部分&#xff1a;App备案 第二部分&#xff1a;查看SHA-1值 引言 在开发和发布移动应用程序时&#xff0c;进行App备案是非常重要的一步&#xff0c;它是确保您的应用在合规性方面符合相关法规的过程。…

Redis分布式锁实现Redisson 15问

在一个分布式系统中&#xff0c;由于涉及到多个实例同时对同一个资源加锁的问题&#xff0c;像传统的synchronized、ReentrantLock等单进程情况加锁的api就不再适用&#xff0c;需要使用分布式锁来保证多服务实例之间加锁的安全性。常见的分布式锁的实现方式有zookeeper和redis…

C#工程中Form_xx.cs不能在设计器中查看

环境&#xff1a;VS2022 直接上图&#xff1a; 原因&#xff1a; 写了个类在Form_xx.cs中从For继承的部分类之前&#xff0c;移动到之后&#xff0c;保证窗体类是代码中的首个类即可&#xff0c;如图&#xff1a;

使用Pytorch从零开始构建Energy-based Model

知识回顾: [1] 生成式建模概述 [2] Transformer I&#xff0c;Transformer II [3] 变分自编码器 [4] 生成对抗网络&#xff0c;高级生成对抗网络 I&#xff0c;高级生成对抗网络 II [5] 自回归模型 [6] 归一化流模型 [7] 基于能量的模型 [8] 扩散模型 I, 扩散模型 II 在本教程中…

EtherCAT从站XML文件组成元素详解(1):制造商信息

0 工具准备 1.EtherCAT从站XML文件(本文使用GL20-RTU-ECT) 2.ETG.2000 S (R) V1.0.71 前言 EtherCAT从站的设备描述文件ESI(EtherCAT Slave Information)是联系主站和从站的桥梁,主站可以通过xml格式的从站设备描述文件识别从站的特征信息、获取对象字典信息、进行组态等。…