Nacos 进阶篇---Nacos服务端怎么维护不健康的微服务实例 ?(七)

news2025/1/18 11:41:10
一、引言

  在 Nacos 后台管理服务列表中,我们可以看到微服务列表,其中有一栏叫“健康实例数”    (如下图),表示对应的客户端实例信息是否可用状态。

 

那Nacos服务端是怎么感知客户端的状态是否可用呢 ?

本章重点:

  • 实例心跳接口做了哪些事情 ?
  • 服务端是怎么维护不健康的实例的,怎么下线不健康实例的,做了哪些操作 ?

二、目录     

目录

一、引言

二、目录        

三、服务端实例心跳接口源码分析

四、服务端实例心跳健康检查定时任务源码分析

五、总结


   

三、服务端实例心跳接口源码分析

主线任务:实例心跳接口做了哪些事情 ?

 在客户端服务发起注册的时候 (在第二章节),会开启一个心跳任务,每5s发送一次健康心跳检查,告诉服务端我这个服务还活着。(前面已经讲过

public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {

    if (NAMING_LOGGER.isDebugEnabled()) {
        NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
    }
    
    // 组装请求参数
    Map<String, String> params = new HashMap<String, String>(8);
    Map<String, String> bodyMap = new HashMap<String, String>(2);
    if (!lightBeatEnabled) {
        bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
    }
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
    params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
    params.put("ip", beatInfo.getIp());
    params.put("port", String.valueOf(beatInfo.getPort()));
    
    // 发送实例心跳接口请求
    String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
    return JacksonUtils.toObj(result);
}

服务端接受到实例心跳接口,会现在内存注册表中找 Instance,如果找不到会重新注册。然后提交一个 clientBeatProcessor 异步任务,更改 lastBeat 属性

@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {

    // 省略部分代码

    // 获取请求参数namespaceId、serviceName
    String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
    Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);

    // 通过namespaceId、serviceName、ip、port、clusterName 从内存注册表当中获取对应的 Instance 实例对象
    Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);

    // 如果 instance 为空,那么会重新注册
    if (instance == null) {
        if (clientBeat == null) {
            result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
            return result;
        }

        instance = new Instance();
        instance.setPort(clientBeat.getPort());
        instance.setIp(clientBeat.getIp());
        instance.setWeight(clientBeat.getWeight());
        instance.setMetadata(clientBeat.getMetadata());
        instance.setClusterName(clusterName);
        instance.setServiceName(serviceName);
        instance.setInstanceId(instance.getInstanceId());
        instance.setEphemeral(clientBeat.isEphemeral());

        // 这里调用重新注册的方法
        serviceManager.registerInstance(namespaceId, serviceName, instance);
    }

    // 通过namespaceId、serviceName获取对应的 Service
    Service service = serviceManager.getService(namespaceId, serviceName);

    if (service == null) {
        throw new NacosException(NacosException.SERVER_ERROR,
                "service not found: " + serviceName + "@" + namespaceId);
    }
    if (clientBeat == null) {
        clientBeat = new RsInfo();
        clientBeat.setIp(ip);
        clientBeat.setPort(port);
        clientBeat.setCluster(clusterName);
    }

    // 重点:开启异步任务,更改 lastBeat 属性
    service.processClientBeat(clientBeat);
    
    // 省略部分代码
    return result;
}

接着往下看重点 service.processClientBeat() 任务,这个方法会开启一个异步任务,异步任务的话肯定会有run 方法,那我们直接看 clientBeatProcessor 对象中的 run 方法

public void processClientBeat(final RsInfo rsInfo) {
    ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
    clientBeatProcessor.setService(this);
    clientBeatProcessor.setRsInfo(rsInfo);
    // 立即执行
    HealthCheckReactor.scheduleNow(clientBeatProcessor);
}

在异步任务当中,首先会获取当前节点下所有的临时实例,然后通过 ip+port 找到当前 instance,然后把 instance 中的 lastBeat属性更改为当前时间,并且如果 该 instance 为不健康状态,更改为健康状态

public class ClientBeatProcessor implements Runnable {

    public static final long CLIENT_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);

    private RsInfo rsInfo;

    private Service service;

    @JsonIgnore
    public PushService getPushService() {
        return ApplicationUtils.getBean(PushService.class);
    }

    public RsInfo getRsInfo() {
        return rsInfo;
    }

    public void setRsInfo(RsInfo rsInfo) {
        this.rsInfo = rsInfo;
    }

    public Service getService() {
        return service;
    }

    public void setService(Service service) {
        this.service = service;
    }

    @Override
    public void run() {
        Service service = this.service;
        if (Loggers.EVT_LOG.isDebugEnabled()) {
            Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
        }

        // 本小节重点方法
        // 获取当前 ip、clusterName
        String ip = rsInfo.getIp();
        String clusterName = rsInfo.getCluster();
        int port = rsInfo.getPort();
        Cluster cluster = service.getClusterMap().get(clusterName);
        // 获取当前 cluster 下所有的临时实例
        List<Instance> instances = cluster.allIPs(true);

        // 遍历临时实例
        for (Instance instance : instances) {
            // 通过判断ip、port,确认是否是当前 instance 的实例
            if (instance.getIp().equals(ip) && instance.getPort() == port) {
                if (Loggers.EVT_LOG.isDebugEnabled()) {
                    Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
                }
                // 把 lastBeat属性更改为当前时间
                instance.setLastBeat(System.currentTimeMillis());
                if (!instance.isMarked()) {
                    // 如果 instance 为不健康状态,更改为健康状态
                    if (!instance.isHealthy()) {
                        instance.setHealthy(true);
                        Loggers.EVT_LOG
                                .info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
                                        cluster.getService().getName(), ip, port, cluster.getName(),
                                        UtilsAndCommons.LOCALHOST_SITE);
                        getPushService().serviceChanged(service);
                    }
                }
            }
        }
    }
}

小结

     首先在 客户端服务发起注册的时候 (在第二章节),会开启一个心跳任务,每5s发送一次健康心跳检查,告诉服务端我这个服务还活着。(前面已经讲过)

    那么服务端接受到了 实例心跳接口的请求,会现在内存注册表中找 Instance,如果找不到会重新注册。然后提交一个 clientBeatProcessor 异步任务,在异步任务当中,首先会找到当前集群下的所有临时实例,然后通过 ip +port 找到当前instance 实例,把当前instance 中的 lastBeat属性更改为当前时间,如果 instance 为不健康状态,更改为健康状态,到此实例心跳接口就结束了。

四、服务端实例心跳健康检查定时任务源码分析

主线任务:服务端是怎么维护不健康的实例的,怎么下线不健康实例的,做了哪些操作 ?

     这块代码是在服务端 register(注册)接口当中的,之前分析过 register 注册逻辑,因为这块是分支代码,前面没细看。

   我们来看下 createEmptyService 这个方法了,里面有个异步任务,作用就是:检查有哪些客户端是不健康的状态,如果不健康就需要对它进行处理

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {

    // 不知道是创建了一个什么服务
    createEmptyService(namespaceId, serviceName, instance.isEphemeral());

    // 根据namespaceId、serviceName获取 Service服务
    Service service = getService(namespaceId, serviceName);

    // service为空就抛出异常
    if (service == null) {
        throw new NacosException(NacosException.INVALID_PARAM,
                "service not found, namespace: " + namespaceId + ", service: " + serviceName);
    }

    // 上面都是分支代码
    // 主线任务:添加服务实例
    addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

我们直接看重点代码,直接跳到开启异步任务这里。上面的代码流程:createEmptyService()-> createServiceIfAbsent()-> putServiceAndInit(service) -> service.init();

public void init() {
    // 开启异步延时任务 clientBeatCheckTask ,每5s执行一次
    HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
    for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
        entry.getValue().setService(this);
        entry.getValue().init();
    }
}

本章重点,开启了一个 clientBeatCheckTask 异步任务。

@Override
public void run() {
    try {
        // 本章重点
        // 获取全部临时实例
        List<Instance> instances = service.allIPs(true);

        for (Instance instance : instances) {
            // 当前时间 - instance中 lastBeat属性时间  > 15s
            if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                if (!instance.isMarked()) {
                    if (instance.isHealthy()) {
                        // 如果这个 instance 实例还是健康状态,就更改为 "不健康状态"!
                        instance.setHealthy(false);
                        Loggers.EVT_LOG
                                .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                        instance.getIp(), instance.getPort(), instance.getClusterName(),
                                        service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                        instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                        // 事件发布监听事件,通过 upd 协议发送通知
                        getPushService().serviceChanged(service);
                        ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                    }
                }
            }
        }

        if (!getGlobalConfig().isExpireInstance()) {
            return;
        }

        // 这里还是遍历 临时实例
        for (Instance instance : instances) {

            if (instance.isMarked()) {
                continue;
            }

            // 当前时间 - instance中 lastBeat属性时间  > 30s
            if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                        JacksonUtils.toJson(instance));
                // 直接从注册表中删除当前 instance
                deleteIp(instance);
            }
        }

    } catch (Exception e) {
        Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
    }

}

小结:

  • 第一个循环的作用,为了筛选出不健康的 Instance 实例,并且把 Instance 中的 healthy  属性改为 false。那么怎么筛选出不健康的实例的 ?利用的就是 Instance 中的 lastBeat 属性。如果是健康的实例,那么客户端就会每5s调一次实例心跳接口,更新 lastBeat 属性为当前时间。如果是不健康的实例,那么 Instance 实例 中的 lastBeat 属性是不会变化的,一旦 lastBeat 跟当前时间比超过 15s,就会被认定为不健康的实例。
  • 第二个循环的作用,找出那些 Instance 是需要删除的,如果 lastBeat 跟当前时间比超过 30s,Nacos 会把该 Instance 从注册表当中进行删除。
五、总结

总结:

     本章讲了Nacos怎么维护整个微服务实例健康状态的流程,在客户端发起注册服务时会有心跳任务,每5s给服务端发送一次心态,服务端会把该 Instance 实例中的lastBeat 属性更新为当前时间。并且在服务端实例注册的时候,会开启心跳健康检查任务,把 lastBeat 跟当前时间比超过 15s,就会被标识为不健康的实例,把lastBeat 跟当前时间比超过 30s,Nacos 会把该 Instance 从注册表当中进行删除

最后的最后,别忘了把源码分析图补充完整: 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1682477.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

DataBinding viewBinding(视图绑定与数据双向绑定)简单案例 (kotlin)

先上效果&#xff1a; 4个view的文字都是通过DataBinding填充的。交互事件&#xff1a;点击图片&#xff0c;切换图片 创建项目&#xff08;android Studio 2023.3.1&#xff09; Build.gradle(:app) 引入依赖库&#xff08;完整源码&#xff09; buildFeatures { vie…

常见算法(1)

1.基本查找/顺序查找 核心&#xff1a;从0索引之后挨个查找 实现代码&#xff1a; public class test {public static void main(String [] arg) throws ParseException {int[] arr {121,85,46,15,55,77,63,49};int number55;System.out.println(bashi(arr,number));}publi…

el-upload上传图片,视频可获取视频时长。

对element-ui组件的upload组件再一次封装&#xff0c;简单记录。下面是效果图。 注意点&#xff1a;该组件现在仅支持单图和单个视频上传。 <template><div :style"myStyle"><divclass"uploads":style"{width: upWith px,height: up…

多商户消费券系统源码(ThinkPHP+FastAdmin+微信公众号)

打造智能促销新体验 一、引言&#xff1a;消费券系统的时代意义 在当今这个数字化高速发展的时代&#xff0c;电子商务和移动支付已经成为人们日常生活的重要组成部分。随着市场竞争的加剧&#xff0c;多商户消费券系统作为一种创新的促销手段&#xff0c;正逐渐受到商家和消…

容器监控方案

1、docker部署prometheus Prometheus是一套开源的系统监控报警框架&#xff0c;它基于时序数据库&#xff0c;并通过HTTP协议周期性地从被监控的组件中抓取指标数据。以下是一些关于Prometheus的详细介绍&#xff1a; 基本概念&#xff1a;Prometheus所有采集的监控数据均以指…

Swift使用JSONDecoder处理json数据,实现json序列化和反序列化

Json数据处理是开发中不可获取的一项技能&#xff0c;如果你不会处理json数据&#xff0c;那你离失业就不远了&#xff0c;所以学完了swift基础教程&#xff0c;还是先老老实实学习一下json处理吧&#xff0c;有了这项技能&#xff0c;你才可以继续下一个网络请求阶段的开发&am…

深度学习之基于YoloV5车型识别系统

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 文章目录 一项目简介 二、功能三、系统四. 总结 一项目简介 一、项目背景与意义 在智能交通、安全监控等领域&#xff0c;车型识别技术具有重要的应用价值。传统的车型识别方法…

栈(基于动态顺序表实现的栈)

栈的简单介绍 关于栈的性质咳咳 栈&#xff1a;栈是一种特殊的线性表,其中只让在一端插入和删除元素。 后进先出 进行插入删除的那一端叫栈顶&#xff0c;另一端叫栈底 我们实现的栈是基于一个动态顺序表的的栈&#xff0c;会实现栈的 入栈&#xff0c;出栈&#xff0c;获取…

【NumPy】关于numpy.eye()函数,看这一篇文章就够了

&#x1f9d1; 博主简介&#xff1a;阿里巴巴嵌入式技术专家&#xff0c;深耕嵌入式人工智能领域&#xff0c;具备多年的嵌入式硬件产品研发管理经验。 &#x1f4d2; 博客介绍&#xff1a;分享嵌入式开发领域的相关知识、经验、思考和感悟&#xff0c;欢迎关注。提供嵌入式方向…

15:00面试,15:08就出来了,问的问题有点变态。。。

从小厂出来&#xff0c;没想到在另一家公司又寄了。 到这家公司开始上班&#xff0c;加班是每天必不可少的&#xff0c;看在钱给的比较多的份上&#xff0c;就不太计较了。没想到8月一纸通知&#xff0c;所有人不准加班&#xff0c;加班费不仅没有了&#xff0c;薪资还要降40%…

【重学C++】02 脱离指针陷阱:深入浅出 C++ 智能指针

前言 大家好&#xff0c;今天是【重学C】系列的第二讲&#xff0c;我们来聊聊C的智能指针。 为什么需要智能指针 在上一讲《01 C如何进行内存资源管理》中&#xff0c;提到了对于堆上的内存资源&#xff0c;需要我们手动分配和释放。管理这些资源是个技术活&#xff0c;一不…

Thinkphp内核开发盲盒商城源码v2.0 对接易支付/阿里云短信/七牛云存储

源码简介 这套系统是我从以前客户手里拿到的,100完整可用,今天测试防红链接失效了,需要修改防红API即可!前端页面展示我就不放了,懂的都懂 优点是Thinkphp开发的&#xff0c;二开容易。 源码图片 资源获取&#xff1a;Thinkphp内核开发盲盒商城源码v2.0 对接易支付/阿里云短…

浅谈hook下的免杀研究

文章目录 前记实现过程后记reference 前记 原理 CS在高版本中推出了sleep mask功能&#xff0c;即在beacon sleep时对堆进行加密混淆&#xff0c;绕过内存扫描&#xff0c;在恢复运行前还原&#xff0c;防止进程崩溃。beacon每次运行的时间远短于sleep时间&#xff0c;内存扫描…

python实现520表白图案

今天是520哦&#xff0c;作为程序员有必要通过自己的专业知识来向你的爱人表达下你的爱意。那么python中怎么实现绘制520表白图案呢&#xff1f;这里给出方法&#xff1a; 1、使用图形库&#xff08;如turtle&#xff09; 使用turtle模块&#xff0c;你可以绘制各种形状和图案…

GPT‑4o普通账户也可以免费用

网址 https://chatgpt.com/ 试了一下&#xff0c;免费的确实显示GPT‑4o的模型&#xff0c;问了一下可以联网&#xff0c;不知道能不能通过插件出图 有兴趣的可以试试

海山数据库(He3DB)代理ProxySQL使用详解:(一)架构说明与安装

一、ProxySQL介绍 1.1 简介 业界比较知名的MySQL代理&#xff0c;由ProxySQL LLC公司开发并提供专业的服务支持&#xff0c;基于GPLv3开源协议进行发布,大部分配置项可动态变更。后端的MySQL实例可根据用途配置到不同的hostgroup中&#xff0c;由ProxySQL基于7层网络协议,将来…

第二证券股市资讯:突传重磅!高盛最新发声,事关中国股票!

外资猛买我国财物。 高盛在最新发布的陈述中称&#xff0c;海外对冲基金已连续第四周增持我国股票。另据彭博社的数据显现&#xff0c;上星期&#xff0c;我国是新式商场国家中录得最大资金流入的商场&#xff0c;达4.88亿美元&#xff08;约合人民币35亿元&#xff09;。 北…

滴滴三面 | Go后端研发

狠狠的被鞭打了快两个小时… 注意我写的题解不一定是对的&#xff0c;如果你认为有其他答案欢迎评论区留言 bg&#xff1a;23届 211本 社招 1. 自我介绍 2. 讲一个项目的点&#xff0c;因为用到了中间件平台的数据同步&#xff0c;于是开始鞭打数据同步。。 3. 如果同步的时候…

OpenFeign高级用法:缓存、QueryMap、MatrixVariable、CollectionFormat优雅地远程调用

码到三十五 &#xff1a; 个人主页 微服务架构中&#xff0c;服务之间的通信变得尤为关键。OpenFeign&#xff0c;一个声明式的Web服务客户端&#xff0c;使得REST API的调用变得更加简单和优雅。OpenFeign集成了Ribbon和Hystrix&#xff0c;具有负载均衡和容错的能力&#xff…

LInux实验二--进程间通信--信号

一、实验原理&#xff1a; 信号类似 windows 下的消息,用于通知进程有某种事件发生。只要知道进程的进 程号,就可以向进程发送信号。而进程可以自行定义对信号的处理方法。 二、实验内容&#xff1a; 实例一&#xff1a;编写实例&#xff0c;让子进程在启动2s后杀死父进程 /…