[Nacos] Nacos Server处理订阅请求 (九)

news2024/12/23 9:55:57

文章目录

      • 1.InstanceController#list()
      • 2.InstanceController#doSrvIpxt()
      • 3.总结

1.InstanceController#list()

Nacos Server处理订阅请求

在这里插入图片描述

主要还是从请求中获取参数, 比如namespceId、serviceName、agent(指定提交请求的客户端是哪种类型)、clusters、clusterIP、udpPort(后续UDP通信会使用)、app、tenant, 最后调用方法对参数进行处理

2.InstanceController#doSrvIpxt()

对请求进行详细处理

    public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
            int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {
        // 不同agent,生成不同的clientInfo
        ClientInfo clientInfo = new ClientInfo(agent);
        // 创建一个JSON Node,其就是当前方法返回的结果。后续代码就是对这个Node的各种初始化
        ObjectNode result = JacksonUtils.createEmptyJsonNode();
        // 从注册表中获取当前服务
        Service service = serviceManager.getService(namespaceId, serviceName);
        long cacheMillis = switchDomain.getDefaultCacheMillis();

        // now try to enable the push
        try {
            if (udpPort > 0 && pushService.canEnablePush(agent)) {

                // 创建当前发出订阅请求的Nacos client的UDP Client, PushClient
                // 注意,在Nacos的UDP通信中,Nacos Server充当的是UDP Client,Nacos Client充当的是UDP Server
                pushService
                        .addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),
                                pushDataSource, tid, app);
                cacheMillis = switchDomain.getPushCacheMillis(serviceName);
            }
        } catch (Exception e) {
            Loggers.SRV_LOG
                    .error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);
            cacheMillis = switchDomain.getDefaultCacheMillis();
        }

        // 若注册表中没有该服务,则直接结束
        if (service == null) {
            if (Loggers.SRV_LOG.isDebugEnabled()) {
                Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
            }
            result.put("name", serviceName);
            result.put("clusters", clusters);
            result.put("cacheMillis", cacheMillis);
            // 注意,hosts为空
            result.replace("hosts", JacksonUtils.createEmptyArrayNode());
            return result;
        }

        // 代码直到这里,说明注册表中存在该服务
        // 检测该服务是否被禁。若是被禁的服务,直接抛出异常
        checkIfDisabled(service);

        List<Instance> srvedIPs;

        // 获取到当前服务的所有实例,包含所有持久/临时实例
        srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));

        // filter ips using selector:
        // 若选择器不空,则根据选择算法选择可用的intance列表,默认情况下,选择器不做任何过滤
        if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
            srvedIPs = service.getSelector().select(clientIP, srvedIPs);
        }

        // 若最终选择的结果为空,则直接结束
        if (CollectionUtils.isEmpty(srvedIPs)) {

            if (Loggers.SRV_LOG.isDebugEnabled()) {
                Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
            }

            if (clientInfo.type == ClientInfo.ClientType.JAVA
                    && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
                result.put("dom", serviceName);
            } else {
                result.put("dom", NamingUtils.getServiceName(serviceName));
            }

            result.put("name", serviceName);
            result.put("cacheMillis", cacheMillis);
            result.put("lastRefTime", System.currentTimeMillis());
            result.put("checksum", service.getChecksum());
            result.put("useSpecifiedURL", false);
            result.put("clusters", clusters);
            result.put("env", env);
            // 注意,hosts为空
            result.set("hosts", JacksonUtils.createEmptyArrayNode());
            result.set("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
            return result;
        }

        // 代码走到这里,说明具有可用的instance
        Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
        // 这个map只有两个key,True与False
        // key为true的value中存放的是所有健康的instance
        // key为false的value存放的是所有不健康的instance
        ipMap.put(Boolean.TRUE, new ArrayList<>());
        ipMap.put(Boolean.FALSE, new ArrayList<>());

        // 根据instance的健康状态,将所有instance分流放入map的不同key的value中
        for (Instance ip : srvedIPs) {
            // 这个语句写的非常好
            // 健康加入健康的列表, 不健康的加入不健康的列表
            ipMap.get(ip.isHealthy()).add(ip);
        }

        // isCheck为true,表示需要检测instance的保护阈值
        if (isCheck) {
            // reachProtectThreshold 是否达到了保护阈值, false 为没有达到
            result.put("reachProtectThreshold", false);
        }

        // 获取服务的保护阈值
        double threshold = service.getProtectThreshold();

        // 若  "健康instance数量/instance总数" <= 保护阈值,则说明需要启动保护机制了
        if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {

            Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName);
            if (isCheck) {
                // true表示启动保护机制
                result.put("reachProtectThreshold", true);
            }

            // 健康数量小于阈值, 则从所有实例中调用, 可能会有不健康实例, 可以保证健康实例不被压崩溃
            // 将所有不健康的instance添加到的key为true的instance列表,
            // 即key为true的value中(instance列表)存放的是所有instance实例
            // 包含所有健康的与不健康的instance
            ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
            // 清空key为false的value(不健康的instance列表)
            ipMap.get(Boolean.FALSE).clear();
        }

        if (isCheck) {
            result.put("protectThreshold", service.getProtectThreshold());
            result.put("reachLocalSiteCallThreshold", false);

            return JacksonUtils.createEmptyJsonNode();
        }

        ArrayNode hosts = JacksonUtils.createEmptyArrayNode();

        // 注意,这个ipMap中存放着所有健康与不健康的instance列表
        for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
            List<Instance> ips = entry.getValue();

            // 若客户端只要健康的instance,且当前遍历的map的key为false,则跳过
            if (healthyOnly && !entry.getKey()) {
                continue;
            }

            // 遍历的这个ips可能是所有不健康的instance列表,
            // 也可能是所有健康的instance列表,
            // 也可能是所有健康与不健康的instance列表总和
            for (Instance instance : ips) {

                // 跳过禁用的instance
                if (!instance.isEnabled()) {
                    continue;
                }

                ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();

                // 将当前遍历的instance转换为JSON
                ipObj.put("ip", instance.getIp());
                ipObj.put("port", instance.getPort());
                // deprecated since nacos 1.0.0:
                ipObj.put("valid", entry.getKey());
                ipObj.put("healthy", entry.getKey());
                ipObj.put("marked", instance.isMarked());
                ipObj.put("instanceId", instance.getInstanceId());
                ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));
                ipObj.put("enabled", instance.isEnabled());
                ipObj.put("weight", instance.getWeight());
                ipObj.put("clusterName", instance.getClusterName());
                if (clientInfo.type == ClientInfo.ClientType.JAVA
                        && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
                    ipObj.put("serviceName", instance.getServiceName());
                } else {
                    ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));
                }

                ipObj.put("ephemeral", instance.isEphemeral());
                hosts.add(ipObj);

            }  // end-for
        } // end-for

        result.replace("hosts", hosts);
        if (clientInfo.type == ClientInfo.ClientType.JAVA
                && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
            result.put("dom", serviceName);
        } else {
            result.put("dom", NamingUtils.getServiceName(serviceName));
        }
        result.put("name", serviceName);
        result.put("cacheMillis", cacheMillis);
        result.put("lastRefTime", System.currentTimeMillis());
        result.put("checksum", service.getChecksum());
        result.put("useSpecifiedURL", false);
        result.put("clusters", clusters);
        result.put("env", env);
        result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
        return result;
    }
  1. 不同agent,生成不同的clientInfo, java、c、c++、go、nginx、dnsf
    在这里插入图片描述

  2. pushService.addClient(): 创建当前发出订阅请求的Nacos client的UDP Client, PushClient, Nacos Server充当的是UDP Client,Nacos Client充当的是UDP Server
    在这里插入图片描述
    在这里插入图片描述
    获取到了UDP通信客户端PushClient, 并写入到一个缓存map中

    public void addClient(PushClient client) {
        // client is stored by key 'serviceName' because notify event is driven by serviceName change
        String serviceKey = UtilsAndCommons.assembleFullServiceName(client.getNamespaceId(), client.getServiceName());
        // clientMap是一个缓存map,用于存放当前Nacos Server中所有instance对应的UDP Client
        // 其是一个双层map,外层map的key为  namespaceId##groupId@@微服务名称,value为内层map
        // 内层map的key为代表一个instance的字符串,value为该instance对应的UDP Client,即PushClient
        ConcurrentMap<String, PushClient> clients = clientMap.get(serviceKey);
        // 若当前服务的内层map为null,则创建一个并放入到缓存map
        if (clients == null) {
            clientMap.putIfAbsent(serviceKey, new ConcurrentHashMap<>(1024));
            clients = clientMap.get(serviceKey);
        }
        PushClient oldClient = clients.get(client.toString());
        // 从内层map中获取当前instance对应的的PushClient,
        // 若该PushClient不为null,则更新一个最后引用时间戳;
        // 若该PushClient为null,则将当前这个PushClient作为PushClient
        // 写入到内层map,即写入到了缓存map
        if (oldClient != null) {
            // 更新最后引用时间戳
            oldClient.refresh();
        } else {
            PushClient res = clients.putIfAbsent(client.toString(), client);
            if (res != null) {
                Loggers.PUSH.warn("client: {} already associated with key {}", res.getAddrStr(), res.toString());
            }
            Loggers.PUSH.debug("client: {} added for serviceName: {}", client.getAddrStr(), client.getServiceName());
        }
    }
  1. 获取当前服务的所有实例, 包括持久和临时
    在这里插入图片描述
    public List<Instance> srvIPs(List<String> clusters) {
        if (CollectionUtils.isEmpty(clusters)) {
            clusters = new ArrayList<>();
            clusters.addAll(clusterMap.keySet());
        }
        // 获取到当前服务的所有cluster中的所有instance
        return allIPs(clusters);
    }

    public List<Instance> allIPs(List<String> clusters) {
        List<Instance> result = new ArrayList<>();
        for (String cluster : clusters) {
            Cluster clusterObj = clusterMap.get(cluster);
            if (clusterObj == null) {
                continue;
            }
            // 将当前遍历cluster的所有instance添加到result集合
            // 包含所有持久实例与临时实例
            result.addAll(clusterObj.allIPs());
        }
        return result;
    }

    public List<Instance> allIPs() {
        List<Instance> allInstances = new ArrayList<>();
        // 持久实例
        allInstances.addAll(persistentInstances);
        // 临时实例
        allInstances.addAll(ephemeralInstances);
        return allInstances;
    }

3.总结

Nacos Server处理订阅请求的主要任务:

  1. 创建了Nacos Client对应的UDP通信客户端PushClient, 并写入一个缓存map
  2. 从注册表中获取到指定服务的所有可用的instance, 并封装为Json

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/579930.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023全国酒店数据

数据内容字段结构 hotel_id int(11) NOT NULL, name varchar(100) DEFAULT NULL, name_en varchar(100) DEFAULT NULL, short_name varchar(100) DEFAULT NULL, province varchar(20) DEFAULT NULL, city_id int(11) DEFAULT NULL, city varchar(20…

R语言实践——使用rWCVP在WCVP中匹配名称

使用rWCVP在WCVP中匹配名称 加载库工作流1. 示例数据&#xff1a;IUCN红色名录2. 将匹配的名称解析为接受名2.1 模糊匹配2.2 多项匹配2.3 将评估与接受的名称相关联 3. 可视化匹配过程4. 得到最终数据集 加载库 世界维管植物名录提供了所有已知维管植物物种的全球共识观点&…

LabView中条件结构的使用方法1

LabView中的条件结构包含一个或多个子程序框图&#xff0c;即分支&#xff0c;当满足某个条件时&#xff0c;相应的分支会被执行。也就是说&#xff0c;在条件结构执行时&#xff0c;仅有一个分支被执行。当程序存在两种或多种可能性时&#xff0c;可以使用条件结构。 1 创建条…

攻防世界安卓逆向练习

一.easy-so jadx分析程序逻辑 可以看到关键在于cyberpeace.CheckString()函数 双击跟进之后可以发现是native层函数 ida查看so文件 程序逻辑: 将字符串保存到新的空间buffer中第一个判断是将buffer的前16个字符和后16个字符进行交换第二个判断是将buffer的2个相邻的字符互换位…

算法|10.从暴力递归到动态规划3

算法|10.从暴力递归到动态规划3 1.纸牌游戏 题意&#xff1a;给定一个整型数组arr&#xff08;都是正数&#xff09;&#xff0c;代表数值不同的纸牌排成一条线。玩家A和玩家B依次拿走每张纸牌&#xff0c;规定玩家A先拿&#xff0c;玩家B后拿。但是每个玩家每次只能拿走最左…

(10) 朴素贝叶斯

文章目录 1 概述2 不同分布下的贝叶斯2.1 高斯朴素贝叶斯GaussianNB2.1.1 认识高斯朴素贝叶斯2.1.2 探索贝叶斯&#xff1a;高斯朴素贝叶斯擅长的数据集2.1.3 探索贝叶斯&#xff1a;高斯朴素贝叶斯的拟合效果与运算速度 2.2 概率类模型的评估指标2.2.1 布里尔分数Brier Score2…

PCIE知识点-022:PCIe 时钟结构

图1&#xff1a;参考时钟结构示意图[4] 1. Common Refclk Architecture Common Refclk Architecture&#xff0c;即同源参考时钟架构&#xff0c;PCIe收发设备共用一个时钟源&#xff0c;是目前是使用最为广泛的方案。 缺点&#xff1a; 对于适用同一 Common Clock 作为参考时…

大数据入门(十三)- HDFS的Shell操作

零.HDFS的Shell操作 一.进程启停管理 1.一键启停脚本 Hadoop HDFS组件内置了HDFS集群的一键启停脚本。 1&#xff09;$HADOOP_HOME/sbin/start-dfs.sh&#xff0c;一键启动HDFS集群 执行原理&#xff1a; &#xff08;1&#xff09;在执行此脚本的机器上&#xff0c;启动Secon…

动态规划2:题目

目录 第1题 Fibonacci 第2题 字符串分割(Word Break) .第3题 三角矩阵(Triangle) 第4题 路径总数(Unique Paths) 第5题 最小路径和(Minimum Path Sum) 第6题 背包问题 第7题 回文串分割(Palindrome Partitioning) 第8题 编辑距离(Edit Distance) 第9题 不同子序列(Dist…

Dubbo入门案例

1.基于以下图实现服务 提供者、消费者 2.前期工作 父POM <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLo…

如何在本地安装多个nodejs版本,方便前端开发

目录 &#x1f4dd;一&#xff0c;使用 nvm&#xff08;Node Version Manager&#xff09;: &#x1f4dd;二&#xff0c;使用 n&#xff08;Node.js 版本管理器&#xff09;: &#x1f4e2;要在本地安装多个 Node.js 版本以便于前端开发&#xff0c;你可以使用工具如 nvm&am…

js常用事件

js常用事件如下&#xff1a; onmouseover&#xff1a;鼠标被移到某元素之上&#xff1b; onmouseout&#xff1a;鼠标从某元素移开&#xff1b; onfocus&#xff1a;元素获得焦点&#xff1b; onblur&#xff1a;元素失去焦点&#xff1b; onclick&#xff1a;鼠标单击事件…

蓝桥杯嵌入式STM32G431RBT6竞赛指南与模板——最后的绝唱

谨以此文和我去年前的一篇蓝桥杯单片机的教程构成电子类的青铜双壁. 国信长天单片机竞赛训练之原理图讲解及常用外设原理&#xff08;遗失的章节-零&#xff09;_昊月光华的博客-CSDN博客 目录 时钟树 串口重定向&#xff1a;printf输出 动态点灯(点灯大师) 按键(常用状态…

RabbitMQ学习-发布确认高级

发布确认springboot版本 确认机制方案&#xff1a; 代码架构图&#xff1a; 配置文件&#xff1a; 在application.properties全局配置文件中添加spring.rabbitmq.publish-confirm-type属性&#xff0c;这个属性有以下几种值 none:禁用发布确认模式(默认)0 correlated:发布消…

Redis的SDS+IntSet+Dict

一)SDS 在redis中&#xff0c;保存key的是字符串&#xff0c;value往往是字符串或者是字符串的集合&#xff0c;可见字符串是redis中最常用的一种数据结构: 但是在redis中并没有直接使用C语言的字符串&#xff0c;因为C语言的字符串存在很多问题 1)获取字符串的长度需要通过运算…

算法12.从暴力递归到动态规划5

算法|12.从暴力递归到动态规划5 1.机器人行进问题 题意&#xff1a;假设有排成一行的N个位置记为1~N&#xff0c;N一定大于或等于2 开始时机器人在其中的M位置上(M一定是1~N中的一个) 如果机器人来到1位置&#xff0c;那么下一步只能往右来到2位置&#xff1b; 如果机器人来到…

stc15w404as使用keil做库,提供头文件,供调用

背景 有个项目使用需要使用库&#xff0c;将代码封装起来&#xff0c;仅仅留下调试接口&#xff0c;给用户使用&#xff0c;调试一些参数。这样工程看起来更简单&#xff0c;也方便客户维护。 也有一些使用场景&#xff0c;需要把自己的代码封装起来&#xff0c;这个是怕被别…

电脑msvcp120.dll缺失怎么办?由于找不到msvcp120.dll的解决方案

MSVCP120.dll文件是Windows操作系统中的一种动态链接库文件。它是由Microsoft C软件包提供的重要组件。当系统提示“MSVCP120.dll文件缺失”时&#xff0c;可能会导致某些应用程序无法正常运行。 以下是修复MSVCP120.dll缺失问题的几种方法&#xff1a; 方法一&#xff1a;修复…

如何在华为OD机试中获得满分?Java实现【公共子串计算】一文详解!

✅创作者&#xff1a;陈书予 &#x1f389;个人主页&#xff1a;陈书予的个人主页 &#x1f341;陈书予的个人社区&#xff0c;欢迎你的加入: 陈书予的社区 &#x1f31f;专栏地址: Java华为OD机试真题&#xff08;2022&2023) 文章目录 1、题目描述2、输入描述3、输出描述…

<学习笔记>从零开始自学Python-之-web应用框架Django( 十四)上线部署(阿里云+Nginx+uwsgi+MySQL)

好了&#xff0c;我们现在有了一个完整的网站&#xff0c;在自己电脑上跑起来没问题了&#xff0c;但是我们做网站肯定不只是为了在本机上自己欣赏&#xff0c;总要放到网上去让别人来浏览。这一章我们就完整跑一遍Django项目的生产环境部署。 1、基本原理 想让你的网站在公网…