Nacos 进阶篇---集群:选举心跳健康检查劳动者(九)

news2025/1/11 7:07:06
一、引言

   本章将是我们第二阶段,开始学习集群模式下,Nacos 是怎么去操作的 ?

本章重点:

  • 在Nacos服务端当中,会去开启健康心跳检查定时任务。如果是在Nacos集群下,大家思考一下,有没有必要所有的集群实例都去执行开启健康心跳检查定时任务?还是只有当中一个实例去执行健康心跳检查定时任务,然后把结果同步给其他集群实例的节点 ?大家可以思考一下~
  • 既然Nacos有健康心跳检查定时任务,如果微服务健康实例状态发生了改变,这个时候Nacos是怎么把健康实例同步给其他Nacos 集群节点的 ?代码怎么实现的 ?

带着这些问题我们一起往下看吧 ~

二、目录 

目录

          一、引言

二、目录 

三、集群心跳健康检查架构分析

四、集群心跳健康检查选举源码分析

五、集群实例健康状态同步源码分析

六、本章总结


三、集群心跳健康检查架构分析

我们先来分析第一问题。 在Nacos集群下,所有的集群实例都去执行开启健康心跳检查定时任务?还是只有当中一个实例去执行健康心跳检查定时任务,然后把结果同步给其他集群实例的节点 ?

  • 如果是在Nacos集群下,所有的集群实例都去执行开启健康心跳检查定时任务。那么就会出现跑出来结果不一致的问题,那么以哪个集群实例结果为准呢 ?很明显这种方式很不合理。
  • 那么就是第二种方式的了,只有当中一个实例去执行健康心跳检查定时任务,然后把结果同步给其他集群实例的节点 。

第二种方式明显更加靠谱点,逻辑也更加简洁。在Nacos集群当中也是这么做的,所有集群实例都会开启健康心跳检查任务,但是真正执行健康心跳任务检查逻辑的只有一个实例,在执行完成后。会有一个定时任务,把结果同步给其他集群节点

那我们接下来看看源码当中,Nacos 是怎么去实现的~

四、集群心跳健康检查选举源码分析

既然是 ” 心跳健康检查 “ ,我们还是要看服务端实例注册接口中的 ClientBeatCheckTask 任务:

那我们直接看 ClientBeatCheckTask 当中的 run 方法,一开始有两个 if 判断方法:

// 集群下,判断自身节点是否需要执行心跳健康检查任务,如果不需要,直接 return
if (!getDistroMapper().responsible(service.getName())) {
    return;
}

// 判断是否需要开启健康任务检查,默认为: true
if (!getSwitchDomain().isHealthCheckEnabled()) {
    return;
}

那么集群下是如何保证只有一台节点去执行定时任务的,关键点就在于第一个判断当中 responsible方法,那我们具体来看下代码:

public boolean responsible(String serviceName) {

    // 获取集群节点的数量
    final List<String> servers = healthyList;

    // 如果为单机模式,就直接返回为 true
    if (!switchDomain.isDistroEnabled() || EnvUtil.getStandaloneMode()) {
        return true;
    }

    // 没有可用健康集群的节点,就直接返回 false
    if (CollectionUtils.isEmpty(servers)) {
        // means distro config is not ready yet
        return false;
    }

    int index = servers.indexOf(EnvUtil.getLocalAddress());
    int lastIndex = servers.lastIndexOf(EnvUtil.getLocalAddress());
    if (lastIndex < 0 || index < 0) {
        return true;
    }

    // 把 serviceName 的进行 hash操作,然后和 servers.size() 取模,最终只有一个集群节点能够返回 true
    int target = distroHash(serviceName) % servers.size();
    return target >= index && target <= lastIndex;
}

通过这个方法我们可以得知,在Nacos集群下,只会有一个节点去执行定时任务。那么该节点定时执行完,怎么把结果同步给其他集群节点的呢 ?

我们一起来往下接着看~

五、集群实例健康状态同步源码分析

本节重点:在Nacos集群下,只会有一个节点去执行定时任务。那么该节点定时执行完,怎么把结果同步给其他集群节点的呢 ?

在 ServiceManager 类中,init() 方法被 @PostConstruct 注解修饰,在Spring 创建 Bean的时候,会去执行 init()方法。在这个方法当中,会去开启心跳健康检查同步的定时任务,我们一起来看下~

@PostConstruct
public void init() {
    // 同步心跳健康检查异结果异步任务
    GlobalExecutor.scheduleServiceReporter(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);

    // 处理 同步心跳健康检查异结果异步任务  内存队列 + 异步任务
    GlobalExecutor.submitServiceUpdateManager(new UpdatedServiceProcessor());

    // 省略部分代码
}

那我们先来看下 同步心跳健康检查异结果异步任务代码,ServiceReporter 当中的 run() 方法:

我们可以把这块代码分成三个部分,这样看更容易理解:

第一部分:获取当前所有服务,key:命名空间 value:服务名称

// 获取全部服务,key:命名空间   value:服务名称
Map<String, Set<String>> allServiceNames = getAllServiceNames();

if (allServiceNames.size() <= 0) {
    //ignore
    return;
}

第二部分:遍历 allServiceNames 中的每一个命名空间 ,封装请求参数 ,准备同步健康心跳检查结果

// 遍历 allServiceNames 中的每一个命名空间 ,封装请求参数 ,准备同步健康心跳检查结果
for (String namespaceId : allServiceNames.keySet()) {
    ServiceChecksum checksum = new ServiceChecksum(namespaceId);

    // 遍历每一个命名空间对应 serviceName 服务名称
    for (String serviceName : allServiceNames.get(namespaceId)) {
        if (!distroMapper.responsible(serviceName)) {
            continue;
        }

        Service service = getService(namespaceId, serviceName);

        if (service == null || service.isEmpty()) {
            continue;
        }

        service.recalculateChecksum();

        // 添加请求参数
        checksum.addItem(serviceName, service.getChecksum());
    }

    // 封装 Message 对象数据,把请求对象转换成JSON
    Message msg = new Message();
    msg.setData(JacksonUtils.toJson(checksum));

    Collection<Member> sameSiteServers = memberManager.allMembers();
    if (sameSiteServers == null || sameSiteServers.size() <= 0) {
        return;
    }

第三部分:同步结果到其他集群节点

for (Member server : sameSiteServers) {
    // 判断是否是当前集群的节点,如果是就跳过
    if (server.getAddress().equals(NetUtils.localServer())) {
        continue;
    }

    // 重点:同步其他集群节点
    synchronizer.send(server.getAddress(), msg);
}

在 synchronizer.send(server.getAddress(), msg); 这个方法当中,会通过HTTP 方式给其他集群节点同步心跳任务健康检查结果:

@Override
public void send(final String serverIP, Message msg) {
    if (serverIP == null) {
        return;
    }

    // 创建请求参数
    Map<String, String> params = new HashMap<String, String>(10);
    params.put("statuses", msg.getData());
    params.put("clientIP", NetUtils.localServer());

    // 拼接 url 地址
    String url = "http://" + serverIP + ":" + EnvUtil.getPort() + EnvUtil.getContextPath()
            + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/service/status";

    if (IPUtil.containsPort(serverIP)) {
        url = "http://" + serverIP + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT
                + "/service/status";
    }

    try {
        // 异步发送 http 请求,请求地址:http://ip/v1/ns/service/status , 同步心跳健康检查结果
        HttpClient.asyncHttpPostLarge(url, null, JacksonUtils.toJson(params), new Callback<String>() {

            // 代码省略
        });
    } catch (Exception e) {
        Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: " + serverIP, e);
    }

}

通过代码可以得知,最终也是通过 HTTP 的方式来进行数据同步的,也能够看出请求地址是v1/ns/service/status。接下来我们一起来看下请求地址对应的接口代码逻辑,其实代码很好找,看下图:

这块代码就不细讲了,主要逻辑就是 判断服务状态是否有变动 ,有变动的话就 包装 ServiceKey 对象, 放入到 toBeUpdatedServicesQueue 阻塞队列当中。

代码如下:

public void addUpdatedServiceToQueue(String namespaceId, String serviceName, String serverIP, String checksum) {
    lock.lock();
    try {
        // 包装 ServiceKey 对象, 放入到 toBeUpdatedServicesQueue 阻塞队列当中
        toBeUpdatedServicesQueue.offer(new ServiceKey(namespaceId, serviceName, serverIP, checksum), 5, TimeUnit.MILLISECONDS);
    } catch (Exception e) {
        toBeUpdatedServicesQueue.poll();
        toBeUpdatedServicesQueue.add(new ServiceKey(namespaceId, serviceName, serverIP, checksum));
        Loggers.SRV_LOG.error("[DOMAIN-STATUS] Failed to add service to be updated to queue.", e);
    } finally {
        lock.unlock();
    }
}

我们刚刚分析的在 ServiceManager类中的 init 方法(代码如下),第一个线程任务就是同步心跳健康检查结果的异步任务,那么我们接下来分析第二个线程任务。

@PostConstruct
public void init() {
    // 同步心跳健康检查异结果异步任务
    GlobalExecutor.scheduleServiceReporter(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);

    // 处理 同步心跳健康检查异结果异步任务  内存队列 + 异步任务
    GlobalExecutor.submitServiceUpdateManager(new UpdatedServiceProcessor());

    // 省略部分代码
}

第二个线程任务类是:UpdatedServiceProcessor,我们从run 方法中(代码如下),能够看出是一个 while 循环,并且是没有结束条件的。在循环的逻辑当中,会从toBeUpdatedServicesQueue阻塞队列中一直取任务,取到任务之后,又是提交了一个线程池任务。

@Override
public void run() {
    ServiceKey serviceKey = null;

    try {
        while (true) {
            try {
                // 从阻塞队列当中一直获取任务
                serviceKey = toBeUpdatedServicesQueue.take();
            } catch (Exception e) {
                Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while taking item from LinkedBlockingDeque.");
            }

            if (serviceKey == null) {
                continue;
            }
            // 把任务提交到线程池执行
            GlobalExecutor.submitServiceUpdate(new ServiceUpdater(serviceKey));
        }
    } catch (Exception e) {
        Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while update service: {}", serviceKey, e);
    }
}

那我们接着看 ServiceUpdater 当中的 run 方法,代码如下:

@Override
public void run() {
    try {
        // 调用更改健康状态方法
        updatedHealthStatus(namespaceId, serviceName, serverIP);
    } catch (Exception e) {
        Loggers.SRV_LOG
                .warn("[DOMAIN-UPDATER] Exception while update service: {} from {}, error: {}", serviceName,
                        serverIP, e);
    }
}

public void updatedHealthStatus(String namespaceId, String serviceName, String serverIP) {
    Message msg = synchronizer.get(serverIP, UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));

    // 解析参数
    JsonNode serviceJson = JacksonUtils.toObj(msg.getData());
    ArrayNode ipList = (ArrayNode) serviceJson.get("ips");
    Map<String, String> ipsMap = new HashMap<>(ipList.size());
    for (int i = 0; i < ipList.size(); i++) {

        String ip = ipList.get(i).asText();
        String[] strings = ip.split("_");
        ipsMap.put(strings[0], strings[1]);
    }

    Service service = getService(namespaceId, serviceName);

    if (service == null) {
        return;
    }

    // 是否改变标识
    boolean changed = false;

    // 遍历全部实例信息,更新健康状态
    List<Instance> instances = service.allIPs();
    for (Instance instance : instances) {
        boolean valid = Boolean.parseBoolean(ipsMap.get(instance.toIpAddr()));
        if (valid != instance.isHealthy()) {
            changed = true;
            instance.setHealthy(valid);
            Loggers.EVT_LOG.info("{} {SYNC} IP-{} : {}:{}@{}", serviceName,
                    (instance.isHealthy() ? "ENABLED" : "DISABLED"), instance.getIp(), instance.getPort(),
                    instance.getClusterName());
        }
    }

    // 如果实例健康状态改变了,那么就发布 服务改变事件,使用 upd 的方式通知客户端
    if (changed) {
        pushService.serviceChanged(service);
        if (Loggers.EVT_LOG.isDebugEnabled()) {
            StringBuilder stringBuilder = new StringBuilder();
            List<Instance> allIps = service.allIPs();
            for (Instance instance : allIps) {
                stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
            }
            Loggers.EVT_LOG
                    .debug("[HEALTH-STATUS-UPDATED] namespace: {}, service: {}, ips: {}", service.getNamespaceId(),
                            service.getName(), stringBuilder.toString());
        }
    }

}

在上面代码中,注意是先解析我们的 msg.getData()参数,然后获取注册表中全部的 Instance 实例列,进行遍历,在健康状态有变动的情况下,会直接更改它的 healthy 属性。在方法的最后,如果有更新 healthy属性的情况下,最终也会发布服务改变事件来通知客户端进行更新。

六、本章总结

在本章节我们首先知道了,在Nacos集群下,是只有一个集群节点去执行心跳健康检查定时任务的,然后把结果同步给其他集群的节点。那么是怎么同步给其他集群节点的呢 ?

在Nacos 服务端是有一个定时任务,来和其他集群节点进行数据同步的。通过源码分析,我们知道最终也是通过 HTTP 的方式进行同步的,采用了 异步任务 + 阻塞队列的方式 的设计架构。这样的好处就是快,先把任务都给接受放入到阻塞队列当中,就立马返回。然后后台会开启一条线程不断从阻塞队列当中获取任务进行处理。

把本章流程图补充完整:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1908023.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[FreeRTOS 基础知识] 任务通知 概念

文章目录 任务通知 定义FreeRTOS 任务通知机制 任务通知 定义 实时操作系统&#xff08;RTOS&#xff09;的任务通知机制是一种用于任务间通信和同步的机制。在FreeRTOS中&#xff0c;任务通知允许一个任务向另一个任务发送通知&#xff0c;表明某个事件已经发生或者某些条件已…

鸿蒙语言基础类库:【@ohos.url (URL字符串解析)】

URL字符串解析 说明&#xff1a; 本模块首批接口从API version 7开始支持。后续版本的新增接口&#xff0c;采用上角标单独标记接口的起始版本。开发前请熟悉鸿蒙开发指导文档&#xff1a;gitee.com/li-shizhen-skin/harmony-os/blob/master/README.md点击或者复制转到。 导入…

【9-2:RPC设计】

RPC 1. 基础1.1 定义&特点1.2 具体实现框架1.3 应用场景2. RPC的关键技术点&一次调用rpc流程2.1 RPC流程流程两个网络模块如何连接的呢?其它特性RPC优势2.2 序列化技术序列化方式PRC如何选择序列化框架考虑因素2.3 应用层的通信协议-http什么是IO操作系统的IO模型有哪…

【Excel技巧大揭秘】如何轻松绕过Excel工作表保护密码?

在日常工作中&#xff0c;我们时常会遇到设置了工作表保护的Excel文件&#xff0c;本意是为了数据安全&#xff0c;但偶尔在急需编辑文件时却遗忘了密码&#xff0c;这无疑让人感到头疼。面对这样的困境&#xff0c;别担心&#xff0c;本文将为您揭秘两种高效解决策略&#xff…

go语言day12 包 init() time包 file包

包 包中的 结构体 及结构体属性 方法 都可以通过设置首字母大小写来实现权限访问&#xff0c;首字母大写任何包中都可以访问&#xff0c;首字母小写只在同包中可以访问。 再导入包go文件时&#xff0c;可以给.go文件取别名。 在导入的包名前加入 _ 意思是调用该包的初始…

OpenHarmony 入门——单元测试UnitTest快速入门

引言 OpenHarmony 的单元测试&#xff08;UnitTest&#xff09;是一个关键的软件开发过程&#xff0c;它确保代码的各个部分能够按预期工作&#xff0c;OpenHarmony的测试框架中提供了很多种的单元测试&#xff0c;今天简单介绍下UnitTest 类型的TDD测试。 OpenHarmony 的TDD …

尚品汇-(十五)

&#xff08;1&#xff09;快速入门 SpringBoot形式创建 Maven形式创建&#xff1a; 加入依赖&#xff1a; 创建启动类&#xff1a; 设置头文件 就想Jsp的<%Page %>一样 &#xff0c;Thymeleaf的也要引入标签规范。不加这个虽然不影响程序运行&#xff0c;但是你的idea…

【楚怡杯】职业院校技能大赛 “Python程序开发”赛项样题三

Python程序开发实训 &#xff08;时量&#xff1a;240分钟&#xff09; 中国XX 实训说明 注意事项 1. 请根据提供的实训环境&#xff0c;检查所列的硬件设备、软件清单、材料清单是否齐全&#xff0c;计算机设备是否能正常使用。 2. 实训结束前&#xff0c;在实训平台提供的…

vue项目实现堆叠卡片拖动切换效果

实际效果 实现流程 1. 实现卡片位置堆叠 将父元素的 position 设置成relative &#xff0c;卡片的position 设置成 absolute 即可。 2. 消除图片的移动 如果卡片上有图片&#xff0c;默认拖动的时候就会导致像上图一样变成了选中图片移动&#xff0c;从而没法触发拖动事件。消…

用Vue3和Plotly.js绘制交互式3D小提琴图

本文由ScriptEcho平台提供技术支持 项目地址&#xff1a;传送门 Vue 中使用 Plotly.js 创建小提琴图 应用场景介绍 小提琴图是一种统计图&#xff0c;用于显示数据的分布和中心趋势。它结合了箱线图和密度图的特点&#xff0c;可以直观地展示数据的分散性和形状。 代码基本…

python如何进行pip换源

hello&#xff0c;大家好&#xff0c;我是一名测试开发工程师&#xff0c;至今已在自动化测试领域深耕9个年头&#xff0c;现已将本人实战多年的多终端自动化测试框架【wyTest】开源啦&#xff0c;请大家快来体验并关注我吧。 Python的包管理工具pip是开发者必备的利器之一。然…

YOLOv9:一个关注信息丢失问题的目标检测

本文来自公众号“AI大道理” 当前的深度学习方法关注的是如何设计最合适的目标函数&#xff0c;使模型的预测结果最接近地面的真实情况。同时&#xff0c;必须设计一个适当的体系结构&#xff0c;以方便获取足够的预测信息。 现有方法忽略了一个事实&#xff0c;即输入数据在逐…

理解JS与多线程

理解JS与多线程 什么是四核四线程&#xff1f; 一个CPU有几个核它就可以跑多少个线程&#xff0c;四核四线程就说明这个CPU同一时间最多能够运行四个线程&#xff0c;四核八线程是使用了超线程技术&#xff0c;使得单个核像有两个核一样&#xff0c;速度比四核四线程有多提升。…

el-scrollbar实现自动滚动到底部(AI聊天)

目录 项目背景 实现步骤 实现代码 完整示例代码 项目背景 chatGPT聊天消息展示滚动面板&#xff0c;每次用户输入提问内容或者ai进行流式回答时需要不断的滚动到底部确保展示最新的消息。 实现步骤 采用element ui 的el-scrollbar作为聊天消息展示组件。 通过操作dom来实…

Linux学习看这一篇就够了,超超超牛的Linux基础入门

引言 小伙伴们&#xff0c;不管是学习c还是学习其他语言在我们学的路上都绕不过操作系统&#xff0c;而且&#xff0c;老生常谈的Linux更是每个计算机人的必修&#xff0c;那么我们对Linux的了解可能只是从别人那听到的简单的这个系统很牛&#xff0c;巴拉巴拉的&#xff0c;但…

挑战全网最清晰解决文本文件乱码方案

标题 文本文件出现乱码之全网最清晰解决方案乱码出现的原因解决方案第一步&#xff1a;获取文件的原始编码格式。第二步&#xff0c;获取当前系统的格式第三步&#xff0c;将文件的内容以当前系统编码格式进行译码并且输出到新的文件中第四步&#xff0c;删除原文件&#xff0c…

韦东山嵌入式linux系列-LED驱动程序

之前学习STM32F103C8T6的时候&#xff0c;学习过对应GPIO的输出&#xff1a; 操作STM32的GPIO需要3个步骤&#xff1a; 使用RCC开启GPIO的时钟、使用GPIO_Init函数初始化GPIO、使用输入/输出函数控制GPIO口。 【STM32】GPIO输出-CSDN博客 这里再看看STM32MP157的GPIO引脚使用…

【智能算法改进】多策略改进的蜣螂优化算法

目录 1.算法原理2.改进点3.结果展示4.参考文献5.代码获取 1.算法原理 【智能算法】蜣螂优化算法&#xff08;DBO&#xff09;原理及实现 2.改进点 混沌反向学习初始化 采用 Pwlcm 分段混沌映射&#xff0c;由于 Pwlcm 在其定义区间上具有均匀的密度函数&#xff0c;在特定的…

windows电脑桌面便签在哪里找?

在忙碌的工作中&#xff0c;我们经常会有很多事情需要记住。这时&#xff0c;电脑桌面便签就成为了我们的好帮手。那么&#xff0c;在Windows电脑上&#xff0c;我们该如何找到桌面便签呢&#xff1f;下面&#xff0c;就让我来为大家详细介绍一下。 其实&#xff0c;Windows电…