[Nacos] Nacos Server之间的操作 (十一)

news2025/1/11 11:08:32

文章目录

      • 1.ServiceManager#init()
        • 1.1 定时发送任务
        • 1.2 定时更新状态任务
        • 1.3 定时清除空service任务

1.ServiceManager#init()

在这里插入图片描述

    @PostConstruct
    public void init() {
        // 启动了一个定时任务:每60s当前Server会向其它Nacos Server发送一次本机注册表
        // 本机注册表是以各个服务的checksum(字串拼接)形式被发送的
        GlobalExecutor.scheduleServiceReporter(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);

        // 从其它Nacos Server获取到注册表中的所有instance的最新状态并更新到本地注册表
        GlobalExecutor.submitServiceUpdateManager(new UpdatedServiceProcessor());

        if (emptyServiceAutoClean) {

            Loggers.SRV_LOG.info("open empty service auto clean job, initialDelay : {} ms, period : {} ms",
                    cleanEmptyServiceDelay, cleanEmptyServicePeriod);

            // delay 60s, period 20s;

            // This task is not recommended to be performed frequently in order to avoid
            // the possibility that the service cache information may just be deleted
            // and then created due to the heartbeat mechanism

            // 启动了一个定时任务:每30s清理一次注册表中的空service
            // 空service,即没有任何instance的service
            GlobalExecutor.scheduleServiceAutoClean(new EmptyServiceAutoClean(), cleanEmptyServiceDelay,
                    cleanEmptyServicePeriod);
        }

        try {
            Loggers.SRV_LOG.info("listen for service meta change");
            consistencyService.listen(KeyBuilder.SERVICE_META_KEY_PREFIX, this);
        } catch (NacosException e) {
            Loggers.SRV_LOG.error("listen for service meta change failed!");
        }
    }
  1. 启动了一个定时任务:每60s当前Server会向其它Nacos Server发送一次本机注册表
  2. 从其它Nacos Server获取到注册表中的所有instance的最新状态并更新到本地注册表
  3. 启动了一个定时任务:每30s清理一次注册表中的空service

1.1 定时发送任务

启动了一个定时任务:每60s当前Server会向其它Nacos Server发送一次本机注册表

ServiceReporter#run()

        @Override
        public void run() {
            try {

                // map的key为namespaceId,value为一个Set集合,集合中存放的是当前
                // namespace中所有service的名称
                // 这个map中存放的是当前注册表中所有服务的名称
                Map<String, Set<String>> allServiceNames = getAllServiceNames();

                if (allServiceNames.size() <= 0) {
                    //ignore
                    return;
                }

                // 遍历所有的namespace
                for (String namespaceId : allServiceNames.keySet()) {

                    ServiceChecksum checksum = new ServiceChecksum(namespaceId);

                    // 遍历当前namespace中的所有服务名称
                    for (String serviceName : allServiceNames.get(namespaceId)) {
                        // 若当前服务不归当前Server负责,则直接跳过
                        if (!distroMapper.responsible(serviceName)) {
                            continue;
                        }

                        // 从注册表中获取到当前遍历的服务
                        Service service = getService(namespaceId, serviceName);

                        if (service == null || service.isEmpty()) {
                            continue;
                        }
                        // 重新计算当前service的checksum
                        service.recalculateChecksum();
                        // 将计算好的checksum写入到map
                        checksum.addItem(serviceName, service.getChecksum());
                    }  // end-内层for

                    Message msg = new Message();

                    // 将当前namespace中的所有服务的checksum写入到msg中,
                    // 将来将msg发送给其它nacos
                    msg.setData(JacksonUtils.toJson(checksum));

                    // 获取到所有nacos
                    Collection<Member> sameSiteServers = memberManager.allMembers();

                    if (sameSiteServers == null || sameSiteServers.size() <= 0) {
                        return;
                    }

                    // 遍历所有nacos,要将msg发送出去
                    for (Member server : sameSiteServers) {
                        // 若当前遍历的server是当前server,则直接跳过
                        if (server.getAddress().equals(NetUtils.localServer())) {
                            continue;
                        }
                        // 将msg发送给当前遍历的server
                        synchronizer.send(server.getAddress(), msg);
                    }
                }  // end-外层for
            } catch (Exception e) {
                Loggers.SRV_LOG.error("[DOMAIN-STATUS] Exception while sending service status", e);
            } finally {
                // 开启下一次定时执行
                GlobalExecutor.scheduleServiceReporter(this, switchDomain.getServiceStatusSynchronizationPeriodMillis(),
                        TimeUnit.MILLISECONDS);
            }
        }
    }
  1. 获取当前注册表中所有服务的名称allServiceNames
    在这里插入图片描述

  2. 遍历所有的namespace, 遍历当前namespace中的所有服务名称

    • 从注册表中获取到当前遍历的服务, 重新计算当前service的checksum, 保持至map中
    • 获取并遍历所有nacos,要将msg发送出去, msg就是上面的checksum的封装

1.2 定时更新状态任务

从其它Nacos Server获取到注册表中的所有instance的最新状态并更新到本地注册表

UpdatedServiceProcessor#run() -> ServiceUpdater#run() -> ServiceUpdater#updatedHealthStatus()

        @Override
        public void run() {
            ServiceKey serviceKey = null;

            try {
                // 运行一个无限循环
                while (true) {
                    try {
                        // 从队列中取出一个元素
                        // toBeUpdatedServicesQueue 中存放的是来自于其它Server的服务状态发生变更的服务
                        serviceKey = toBeUpdatedServicesQueue.take();
                    } catch (Exception e) {
                        Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while taking item from LinkedBlockingDeque.");
                    }

                    if (serviceKey == null) {
                        continue;
                    }
                    // 另外启用一个线程来完成ServiceUpdater任务
                    GlobalExecutor.submitServiceUpdate(new ServiceUpdater(serviceKey));
                }
            } catch (Exception e) {
                Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while update service: {}", serviceKey, e);
            }
        }
    }

        @Override
        public void run() {
            try {
                updatedHealthStatus(namespaceId, serviceName, serverIP);
            } catch (Exception e) {
                Loggers.SRV_LOG
                        .warn("[DOMAIN-UPDATER] Exception while update service: {} from {}, error: {}", serviceName,
                                serverIP, e);
            }
        }
    }

1.

在这里插入图片描述

    public void updatedHealthStatus(String namespaceId, String serviceName, String serverIP) {
        // 从其它server获取指定服务的数据
        Message msg = synchronizer.get(serverIP, UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
        JsonNode serviceJson = JacksonUtils.toObj(msg.getData());

        ArrayNode ipList = (ArrayNode) serviceJson.get("ips");
        // 这个map中存放的是来自于其它Nacos中的、当前服务所包含的所有instance的健康状态
        // map的key为ip:port,value为healthy
        Map<String, String> ipsMap = new HashMap<>(ipList.size());
        // 遍历ipList
        for (int i = 0; i < ipList.size(); i++) {
            // 这个ip字符串的格式是:  ip:port_healthy
            String ip = ipList.get(i).asText();
            String[] strings = ip.split("_");
            // 将当前遍历instance的地址及健康状态写入到map
            ipsMap.put(strings[0], strings[1]);
        }

        // 从注册表中获取当前服务
        Service service = getService(namespaceId, serviceName);

        if (service == null) {
            return;
        }

        boolean changed = false;

        // 获取到注册表中当前服务的所有instance
        List<Instance> instances = service.allIPs();
        // 遍历注册表中当前服务的所有instance
        for (Instance instance : instances) {
            // 获取来自于其它nacos的当前遍历instance的健康状态
            boolean valid = Boolean.parseBoolean(ipsMap.get(instance.toIpAddr()));
            // 若当前instance在注册表中记录的状态与外来的状态不一致,则以外来的为准
            if (valid != instance.isHealthy()) {
                changed = true;
                // 将注册表中的instance状态修改为外来的状态
                instance.setHealthy(valid);
                Loggers.EVT_LOG.info("{} {SYNC} IP-{} : {}:{}@{}", serviceName,
                        (instance.isHealthy() ? "ENABLED" : "DISABLED"), instance.getIp(), instance.getPort(),
                        instance.getClusterName());
            }
        }

        // 只要有一个instance的状态发生了变更,那么这个changed的值就为true
        if (changed) {
            // 发布状态变更事件
            pushService.serviceChanged(service);
            if (Loggers.EVT_LOG.isDebugEnabled()) {
                StringBuilder stringBuilder = new StringBuilder();
                List<Instance> allIps = service.allIPs();
                for (Instance instance : allIps) {
                    stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
                }
                Loggers.EVT_LOG
                        .debug("[HEALTH-STATUS-UPDATED] namespace: {}, service: {}, ips: {}", service.getNamespaceId(),
                                service.getName(), stringBuilder.toString());
            }
        }

    }
  1. 从其它server获取指定服务的数据msg
  2. 获取到注册表中当前服务的所有instance, 遍历, 获取来自于其它nacos的当前遍历instance的健康状态, 如果当前instance在注册表中记录的状态与外来的状态不一致,则以外来的为准, 将注册表中的instance状态修改为外来的状态
  3. 只要有一个instance的状态发生了变更, 发布状态变更事件, 在上一节的Nacos Server与Client的UDP通信中有这个方法的详情分析

1.3 定时清除空service任务

启动了一个定时任务:每30s清理一次注册表中的空service, 即没有任何instance的service

EmptyServiceAutoClean#run()

        @Override
        public void run() {

            // Parallel flow opening threshold

            // 这是一个并行流开启阈值:当一个namespace中包含的service的数量超过100时,
            // 会将注册创建为一个并行流,否则就是一个串行流
            int parallelSize = 100;

            // 遍历注册表
            // stringServiceMap 就是注册表的内层map
            serviceMap.forEach((namespace, stringServiceMap) -> {
                Stream<Map.Entry<String, Service>> stream = null;
                // 若当前遍历的元素(namespace)中包含的服务的数量超出了阈值,
                // 则生成一个并行流
                if (stringServiceMap.size() > parallelSize) {
                    // 并行流
                    stream = stringServiceMap.entrySet().parallelStream();
                } else {
                    // 串行流
                    stream = stringServiceMap.entrySet().stream();
                }
                stream.filter(entry -> {
                    final String serviceName = entry.getKey();
                    // 只要当前遍历的服务需要当前server负责,则通过过滤
                    return distroMapper.responsible(serviceName);

                    // 这里的forEach遍历的元素一定是最终需要由当前server处理的服务
                }).forEach(entry -> stringServiceMap.computeIfPresent(entry.getKey(), (serviceName, service) -> {
                    // 空的service
                    if (service.isEmpty()) {

                        // To avoid violent Service removal, the number of times the Service
                        // experiences Empty is determined by finalizeCnt, and if the specified
                        // value is reached, it is removed

                        // 若当前服务为空的次数超出了最大允许值,则删除这个服务,防止暴力删除
                        if (service.getFinalizeCount() > maxFinalizeCount) {
                            Loggers.SRV_LOG.warn("namespace : {}, [{}] services are automatically cleaned", namespace,
                                    serviceName);
                            try {
                                // 删除服务
                                easyRemoveService(namespace, serviceName);
                            } catch (Exception e) {
                                Loggers.SRV_LOG.error("namespace : {}, [{}] services are automatically clean has "
                                        + "error : {}", namespace, serviceName, e);
                            }
                        }

                        // 计数器 + 1
                        service.setFinalizeCount(service.getFinalizeCount() + 1);

                        Loggers.SRV_LOG
                                .debug("namespace : {}, [{}] The number of times the current service experiences "
                                                + "an empty instance is : {}", namespace, serviceName,
                                        service.getFinalizeCount());
                    } else {
                        // 如果当前service不空instance的话
                        // 将计数器归零
                        service.setFinalizeCount(0);
                    }
                    return service;
                }));
            });
        }
  1. 遍历注册表, 如果遍历的元素中的服务数量超过阈值则注册并行流, 如果没有则生成串行流, 去解决清除任务
  2. 先判断是否是空的service, 如果是的话则若当前服务为空的次数超出了最大允许值,则删除这个服务,防止暴力删除, 并删除服务
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/590572.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

水声声波频率如何划分?水声功率放大器可将频率放大到20MHz吗?

水声声波频率如何划分&#xff1f;水声功率放大器可将频率放大到20MHz吗&#xff1f; 现如今我们可以在地球任意地区实现通信&#xff0c;是因为电磁波的作用。但是我们都知道海洋占了全球十分之七面积&#xff0c;电磁波在水下衰减速度太快&#xff0c;无法做到远距离传输&am…

linux内核内存管理slab

一、概述 linux内存管理核心是伙伴系统&#xff0c;slab&#xff0c;slub&#xff0c;slob是基于伙伴系统之上提供api&#xff0c;用于内核内存分配释放管理&#xff0c;适用于小内存&#xff08;小于&#xff11;页&#xff09;分配与释放&#xff0c;当然大于&#xff11;页…

Ext JS嵌套分组表格的实现

这里的嵌套分组表格指的是这样一种表格 表格的每一行可以展开下一层的Grid展开的嵌套表格是一个分组的表格显示的效果如下图: 这种显示的方式可以显示 3个层级的数据,比如这里的国家 、 将军等级、将军信息。 如果最外层再使用分组的表格, 则可以显示 4个层级的信息, 这种…

Ethercat学习-从站FOE固件更新(QT上位机)

文章目录 简介1、源码简介1、ec_FOEread2、ec_FOEwrite3、ec_FOEdefinehook 2、程序思路3、修改实现1、ecx_FOEwrite_gxf2、ecx_FOEread_gxf 4、其他5、结果6、源码连接 简介 FOE协议与下位机程序实现过程之前文章有提到&#xff0c;这里不做介绍了。这里主要介绍1、QT上位机通…

Java开发 - 让你少走弯路的Redis的主从复制

前言 大家举举手&#xff0c;让我看看还有多少人不会配置Redis的主从&#xff0c;主主这些的。故事发生在前段时间&#xff0c;小伙伴看到了博主的MySQL主从&#xff0c;就问博主有没有Redis的主从配置教程&#xff0c;本以为网上到处都是教程的博主打开网页一搜&#xff0c;好…

SpringCloud:分布式缓存之Redis主从

1.搭建主从架构 单节点Redis的并发能力是有上限的&#xff0c;要进一步提高Redis的并发能力&#xff0c;就需要搭建主从集群&#xff0c;实现读写分离。 2.主从数据同步原理 2.1.全量同步 主从第一次建立连接时&#xff0c;会执行全量同步&#xff0c;将master节点的所有数据…

VSCode+Git+TortoiseGit+Gitee

目录 一、VSCode 1、VSCode(visual studio code)下载安装 2、VSCode使用技巧和经验 2.1、设置字体: 2.2、快捷方式 2.3、安装插件 二、Git下载安装 三、TortoiseGit 1、TortoiseGit 简介 2、下载安装Git及Tortoisegit 3、Tortoisegit拉取gitee仓库到本地 4、Git拉取…

Linux 终端安装并使用tmux管理远程会话 tmux使用教程

文章目录 1 Tmux简介1.1 会话与窗口1.2 tmux功能 2 tmux安装2.1 源码安装2.2 命令行安装 3 基本用法&#xff08;命令行&#xff09;3.1 创建窗口3.2 分离会话 切换会话3.3 连接会话3.4 关闭会话并杀死进行对会话进行重命名 4 Tmux 的快捷键5 窗口操作与窗格操作参考 1 Tmux简介…

Ctfshow基础二刷(1)

前言&#xff1a; 前两天的信安给我整emo了&#xff0c;头一回打正经比赛&#xff0c;结果发现基础太差&#xff0c;代码审计烂得一踏糊涂。 寻思寻思&#xff0c;从头整一遍基础。又买了安恒出的新书。争取7号去吉林打省队选拔不给导儿丢脸吧呜呜 文件包含 web78: 这题一…

前端gojs中禁用指定节点的选中效果

代码思路 适用于禁用某些节点的选中状态&#xff0c;选中节点时判断该节点要不要禁用 点击节点的时候&#xff0c;判断节点要不要禁用选中效果 如果禁用&#xff0c;就在选中时&#xff0c;把选中节点重置为最近一次非禁用的节点 diagram.select&#xff1a;选中节点 diagram.…

INCA使用记录(一):INCA新建工程及观测标定

目录 1、概述 2、INCA实用方法 2.1、新建工程-添加A2L 2.2、添加工作空间 2.3、添加实验选项 ​2.4、添加硬件配置 2.5、添加工程elf 2.6、初始化工程 2.7、测量与观测参数 2.8、更换A2L之后如何更新工程 1、概述 INCA作为汽车行业常用的一种XCP处理工具&#xff0c;对…

javascript基础十一:JavaScript中执行上下文和执行栈是什么?

一、执行上下文 简单的来说&#xff0c;执行上下文是对Javascript代码执行环境的一种抽象概念&#xff0c;只要有Javascript代码运行&#xff0c;那么它就一定是运行在执行上下文中 执行上下文的类型分为三种&#xff1a; 全局执行上下文&#xff1a;只有一个&#xff0c;浏…

基于MPC的自适应巡航控制(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

Kyligence x 明道云|低代码平台助力中小企业实现存量背景下的创新增长

国内大部分制造企业在经历疫情后&#xff0c;终于迎来了市场端的消费需求的恢复和增长&#xff0c;但如何在激烈的竞争中以更少投入&#xff0c;获得更高回报&#xff0c;在市场上获得一席生存之地&#xff0c;成为了悬在众多企业头上的达摩克利斯之剑。在市场野蛮生长阶段时&a…

使用PYQT5和VTK实现一个六轴跟随的电路板转动动画效果

实现过程&#xff1a; 关于六轴&#xff1a; 线下有一个带有六轴姿态传感器的硬件设备&#xff0c;将采集到的三轴加速度和角速度的值每隔1秒通过串口发送给电脑&#xff0c;电脑上位机使用的是pyqt5&#xff0c;在python中调用serial模块进行串口数据的接收&#xff0c;接收…

专业是要选软工还是人工智能?

大家好&#xff0c;我是帅地。 在帅地的训练营里&#xff0c;也有不少 26 届的学员&#xff0c;不过大一即将过去&#xff0c;部分学校是到了大一后面或者大二才开始细分专业方向的&#xff0c;包括一些想要转专业的同学&#xff0c;也需要选择一个细分的方向&#xff0c;而且…

10:mysql----存储引擎--进阶篇

目录 1:MySQL体系结构 2:存储引擎简介 3:存储引擎特点 4:存储引擎选择 1:MySQL体系结构 连接层 : 最上层是一些客户端和链接服务&#xff0c;主要完成一些类似于连接处理、授权认证、及相关的安全方案。服务器也会为安全接入的每个客户端验证它所具有的操作权限。 服务层 :…

抽象轻松JavaScript

想象一样&#xff0c;现在有一个苹果&#xff0c;两个苹果&#xff0c;一箱苹果在你面前 看&#xff0c;上面的三种苹果&#xff0c;&#xff08;我写的是苹果就是苹果&#xff09; 语境1 例如你现在要搬运苹果&#xff01; 那么现在上面有苹果&#xff0c;一个&#xff0c;两…

阿里云的数据库架构如何设计,以实现高可用性和容灾性?

阿里云的数据库架构如何设计&#xff0c;以实现高可用性和容灾性&#xff1f;   在当今的数字化时代&#xff0c;数据库作为应用程序的核心组件之一&#xff0c;对于企业的正常运行至关重要。这篇文章将为您解析阿里云如何设计其数据库架构&#xff0c;以实现高可用性和容灾性…

加法器种类介绍

二进制加法器 二进制加法器接收加数A和B,以及进位Ci,输出和S,以及进位输出Co.二进制加法器的真值表如下&#xff1a; 逻辑表达式&#xff1a; S A ⊕ B ⊕ C i SA⊕B⊕C_i SA⊕B⊕Ci​ C o A B B C i A C i C_oABBC_iAC_i Co​ABBCi​ACi​ 从实现的角度&#xff0c;可以…