Nacos 进阶篇---服务发现:服务之间请求调用链路分析(六)

news2024/11/20 12:04:00
一、引言

   前面几个章节把Nacos服务注册从客户端到服务端,整个流程源码都分析了一遍。
   本章节我们来分析,order-service、stock-service 完成Nacos注册后,可以通过Feign的方式,来完成服务之间的调用。那它的底层是如何实现的?做了哪些操作 ?本章就来进行解答!

二、目录

目录

一、引言

二、目录

三、客户端服务发现源码

四、服务端实例查询源码

五、总结


再分析源码前,我们先来了解下Nacos服务之间的调用流程:

  • 首先每个客户端服务都有一个本地缓存列表,这个缓存列表会定时从服务端进行更新
  • 当 order-service 去调用 stock-service 服务时,会根据服务名去本地缓存获取微服务实例,但通过服务名称会获取多个实例,所以需要再根据负载均衡选择其中一个
  • 最终 order-service 服务拿到 ip+port 实例信息,发起HTTP调用,拿到返回结果。

注意:之前很多同学都误以为是 Nacos 服务端去请求 stock-serivce,然后把结果返回给 order-service,这样做 Nacos 服务端的压力就太大了,千万不要搞混淆了。

本章重点:

  • 首先Nacos客户端是怎么调用实例查询接口的,是如何维护好本地缓存的 ?
  • 其次Nacos服务端实例查询接口是如何实现的 ?

三、客户端服务发现源码

主线任务:Nacos客户端是怎么调用实例查询接口的,是如何维护好本地缓存的 ?

  在微服务组件当中有个ribbon依赖,它主要是在我们微服务架构当中发挥 负载均衡 的作用。因为我们在线上生产部署的实例往往都是集群机构的,Ribbon会从集群实例中,根据负载均衡的算法选举出最终被调用的一台机器实例。

在我们Nacos当中,也是整合了Ribbon,来实现负载均衡的,从而可以调用Nacos服务端实例列表接口。

可以看到,在Nacos 注册依赖中,也是整合了Ribbon的依赖。

在Ribbon下面,有个 ServerList 接口,这是个扩展接口,这个接口的作用就是获取 server 列表。Nacos 有对这个接口做实现,从而整合Ribbon

/**
 * Interface that defines the methods sed to obtain the List of Servers
 * @author stonse
 *
 * @param <T>
 */
public interface ServerList<T extends Server> {

    public List<T> getInitialListOfServers();
    
    /**
     * Return updated list of servers. This is called say every 30 secs
     * (configurable) by the Loadbalancer's Ping cycle
     * 
     */
    public List<T> getUpdatedListOfServers();   

}

在 NacosServerList 当中,继承了抽象类 AbstractServerList,在 AbstractServerLis当中实现了 ServerList 的两个接口。

public class NacosServerList extends AbstractServerList<NacosServer> {

    private NacosDiscoveryProperties discoveryProperties;

    private String serviceId;

    public NacosServerList(NacosDiscoveryProperties discoveryProperties) {
       this.discoveryProperties = discoveryProperties;
    }

    @Override
    public List<NacosServer> getInitialListOfServers() {
       return getServers();
    }

    @Override
    public List<NacosServer> getUpdatedListOfServers() {
       return getServers();
    }

    private List<NacosServer> getServers() {
       try {
          String group = discoveryProperties.getGroup();
          List<Instance> instances = discoveryProperties.namingServiceInstance()
                .selectInstances(serviceId, group, true);
          return instancesToServerList(instances);
       }
       catch (Exception e) {
          throw new IllegalStateException(
                "Can not get service instances from nacos, serviceId=" + serviceId,
                e);
       }
    }

    private List<NacosServer> instancesToServerList(List<Instance> instances) {
       List<NacosServer> result = new ArrayList<>();
       if (CollectionUtils.isEmpty(instances)) {
          return result;
       }
       for (Instance instance : instances) {
          result.add(new NacosServer(instance));
       }

       return result;
    }

    public String getServiceId() {
       return serviceId;
    }

    @Override
    public void initWithNiwsConfig(IClientConfig iClientConfig) {
       this.serviceId = iClientConfig.getClientName();
    }

}

可以看到,order-service 去调用 stock-service 时,最终会走到 getUpdatedListOfServers 方法。那我们就具体来分析这个方法~

我们就来看 getServers 方法,这里重点看 selectInstances() 方法。

private List<NacosServer> getServers() {
    try {
        // 读取分组
        String group = discoveryProperties.getGroup();
        // 通过服务名称、分组、true表示只需要健康实例,查询列表
        // 我们重点看 seelctInstances方法
        List<Instance> instances = discoveryProperties.namingServiceInstance()
                .selectInstances(serviceId, group, true);
        // 把 Instance 转换成 NacosServer 类型
        return instancesToServerList(instances);
    }
    catch (Exception e) {
        throw new IllegalStateException(
                "Can not get service instances from nacos, serviceId=" + serviceId,
                e);
    }
}

selectInstances 前两个都是重载方法,重点看 hostReactor.getServiceInfo() 方法。

@Override
public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException {
    return selectInstances(serviceName, groupName, healthy, true);
}

@Override
public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy, boolean subscribe)
        throws NacosException {
    return selectInstances(serviceName, groupName, new ArrayList<String>(), healthy, subscribe);
}

// 上面都是方法重载,最终调用到这个方法!!
@Override
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,
                                      boolean subscribe) throws NacosException {

    // 默认为 true
    ServiceInfo serviceInfo;
    if (subscribe) {
        // 重点看这个方法
        serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
                StringUtils.join(clusters, ","));
    } else {
        serviceInfo = hostReactor
                .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
                        StringUtils.join(clusters, ","));
    }
    return selectInstances(serviceInfo, healthy);
}

重点看 getServiceInfo() 方法。在这个方法中,如果本地缓存为空,就会去查询Nacos实例列表接口,然后写入到本地缓存当中

public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {

    NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
    String key = ServiceInfo.getKey(serviceName, clusters);
    if (failoverReactor.isFailoverSwitch()) {
        return failoverReactor.getService(key);
    }

    // 这里去查询本地缓存
    ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);

    // 本地缓存为空,去调用 Nacos实例列表接口,查询Nacos内存注册表数据
    if (null == serviceObj) {
        serviceObj = new ServiceInfo(serviceName, clusters);

        serviceInfoMap.put(serviceObj.getKey(), serviceObj);

        updatingMap.put(serviceName, new Object());
        updateServiceNow(serviceName, clusters);
        updatingMap.remove(serviceName);

    } else if (updatingMap.containsKey(serviceName)) {

        if (UPDATE_HOLD_INTERVAL > 0) {
            // 这里的锁就有必要说一下了,这个锁的作用说白了就是为了防止避免 "HTTP重复调用的"!
            // 假设当一条线程进来 serviceObj 为空,这时就会走上面 调用 Nacos实例列表接口的步骤方法。当调用Nacos实例列表接口的方法
            // 还没执行完的时候,又进来了一条线程发现 因为还在等待 Nacos实例列表接口数据的返回,所以还没来得及往本地缓存列表写入数据,
            // 这时本地缓存数据还是空的,所以这里会让该线程等待 5s。等查询实例列表的线程执行完之后,在 finnaly 最后:
            // 如果oldService 不等于 null,它需要去唤醒其他正在等待的线程。
            synchronized (serviceObj) {
                try {
                    serviceObj.wait(UPDATE_HOLD_INTERVAL);
                } catch (InterruptedException e) {
                    NAMING_LOGGER
                            .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                }
            }
        }
    }

    // 定时任务重复执行,维护本地缓存
    scheduleUpdateIfAbsent(serviceName, clusters);

    // 最终是从本地缓存中 获取实例列表数据
    return serviceInfoMap.get(serviceObj.getKey());
}

如果本地缓存查不到对应的服务数据,那么就会向 Nacos 发起实例列表查询接口。

private void updateServiceNow(String serviceName, String clusters) {
    try {
        // 本地缓存没有数据,最终会调用到这个方法
        updateService(serviceName, clusters);
    } catch (NacosException e) {
        NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
    }
}

public void updateService(String serviceName, String clusters) throws NacosException {
    ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
    try {
        // 发起Http请求调用 Nacos实例接口,获取数据
        String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);

        // 获取到数据,写入到本地缓存当中
        if (StringUtils.isNotEmpty(result)) {
            processServiceJson(result);
        }
    } finally {
        if (oldService != null) {
            synchronized (oldService) {
                oldService.notifyAll();
            }
        }
    }
}

public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
        throws NacosException {

    // 组装请求参数
    final Map<String, String> params = new HashMap<String, String>(8);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, serviceName);
    params.put("clusters", clusters);
    params.put("udpPort", String.valueOf(udpPort));
    params.put("clientIP", NetUtils.localIP());
    params.put("healthyOnly", String.valueOf(healthyOnly));

    // 对 /nacos/v1/ns/instance/list 接口,发起Http请求调用
    return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
}

/instance/list 请求地址对应 Nacos 服务端查询实例列表接口,如下图:

在 getServiceInfo 方法当中,获取完实例数据,就会去执行 UpdateTask 定时任务,在定时任务当中,如果本地缓存为空,就会再去调用Nacos实例接口,更新本地缓存。并且这个定时任务是会重复执行的。

最终是从本地缓存中 serviceInfoMap 直接获取实例数据,从这里也可以看出本地缓存其实是一个 Map 结构。

    // 定时任务重复执行,维护本地缓存
    scheduleUpdateIfAbsent(serviceName, clusters);

    // 最终是从本地缓存中 获取实例列表数据
    return serviceInfoMap.get(serviceObj.getKey());
四、服务端实例查询源码

主线任务:Nacos服务端实例查询接口是如何实现的 ?

刚才通过客户端源码的分析,我们知道最终请求到 Nacos 服务端 /nacos/v1/ns/instance/list 接口,我们来看下这个接口怎么实现的。在list方法当中,首先去组装请求参数,然后去调用 doSrvIpxt 方法。

@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {
    
    // 组装请求参数
    String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
    String agent = WebUtils.getUserAgent(request);
    String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
    String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
    int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
    String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
    boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
    String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
    String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
    boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
    
    // 查询实例列表
    return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
            healthyOnly);
}

在 doSrvIpxt 方法当中很多都是分支代码逻辑,我们主要看下面这个几个方法就行。

最终还是从Nacos注册表当中获取了实例数据进行返回的。

// 获取实例列表
srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));

public List<Instance> srvIPs(List<String> clusters) {
    if (CollectionUtils.isEmpty(clusters)) {
        clusters = new ArrayList<>();
        clusters.addAll(clusterMap.keySet());
    }
    // 拿到需要查询的 集群对象
    return allIPs(clusters);
}

public List<Instance> allIPs(List<String> clusters) {
    List<Instance> result = new ArrayList<>();
    // 遍历集群对象
    for (String cluster : clusters) {
        Cluster clusterObj = clusterMap.get(cluster);
        if (clusterObj == null) {
            continue;
        }
        // 获取 cluster 对象中所有的 Instance 实例
        result.addAll(clusterObj.allIPs());
    }

    return result;
}

public List<Instance> allIPs() {
    // 返回持久化实例、临时实例
    List<Instance> allInstances = new ArrayList<>();
    allInstances.addAll(persistentInstances);
    allInstances.addAll(ephemeralInstances);
    return allInstances;
}
五、总结

    每一个客户端服务都有一个本地缓存列表,先根据服务名去本地缓存列表当中找,如果没有就去调用Nacos实例查询接口查询注册表当中数据,查到的话进行返回,同步更新本地缓存列表,同时也会开启定时任务来维护本地缓存列表。

如果根据服务名在本地缓存列表查到多个服务实例,最终会根据负载均衡的策略选择其中一个, 进行 HTTP 调用。

    最后我们还看了Nacos实例查询接口,实例数据是从Nacos注册表当中获取,进行返回的。

最后的最后,别忘了把源码分析图补充完整:

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1686788.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

linux下的docker使用

docker是什么&#xff0c;docker翻译过来的意思就是码头工人&#xff0c;顾名思义&#xff0c;docker本质上就是一个搬运工&#xff0c;只不过从搬运货物改成了搬运程序&#xff0c;使搬运的不同的程序能够独立的运行在码头上的不同容器内&#xff0c;互不干扰&#xff0c;而他…

不使用ScrollRect 和 HorizontalLayoutGroup做的横向循环列表

一、 版本一 1.前情提要 因为需要展示300多个相同的物体&#xff0c;但是如果全部放在场景内&#xff0c;运行起来会很卡&#xff0c;所以想到了用无限循环&#xff0c;然后动态填充不同的数据。 做的这个没有用HorizontalLayoutGroup 和 ScrollRect 。 1.没有使用Horizontal…

Git原理及常用命令小结——实用版(ing......)、Git设置用户名邮箱

Git基本认识 Git把数据看作是对小型文件系统的一组快照&#xff0c;每次提交更新&#xff0c;或在Git中保存项目状态时&#xff0c;Git主要对当时的全部文件制作一个快照并保存这个快照的索引。同时&#xff0c;为了提高效率&#xff0c;如果文件没有被修改&#xff0c;Git不再…

JSON的序列化与反序列化以及VSCode执行Run Code 报错

JSON JSON: JavaScript Object Notation JS对象简谱 , 是一种轻量级的数据交换格式。 JSON格式 { "name":"金苹果", "info":"种苹果" } 一个对象&#xff1a;由一个大括号表示.括号中通过键值对来描述对象的属性 (可以理解为, 大…

操作系统总结(2)

目录 2.1 进程的概念、组成、特征 &#xff08;1&#xff09;知识总览 &#xff08;2&#xff09;进程的概念 &#xff08;3&#xff09;进程的组成—PCB &#xff08;4&#xff09;进程的组成---程序段和数据段 &#xff08;5&#xff09;程序是如何运行的呢&#xff1f…

Android和flutter交互,maven库的形式导入aar包

记录遇到的问题&#xff0c;在网上找了很多资料&#xff0c;都是太泛泛了&#xff0c;使用后&#xff0c;还不能生效&#xff0c;缺少详细的说明&#xff0c;或者关键代码缺失&#xff0c;我遇到的问题用红色的标注了 导入aar包有两种模式 1.比较繁琐的&#xff0c;手动将aar…

Java8-HashMap实现原理

目录 HashMap原理 hashmap的put流程&#xff1a; HashMap扩容机制&#xff1a; HashMap的寻址算法&#xff1a; HashMap原理 HashMap的底层数据结构是由&#xff0c;数组&#xff0c;链表和红黑树组成的。 当我们往HashMap中put元素的时候&#xff0c;利用key的hashCode重…

HC32F103BCB使用SPI获取AS5040编码器数据

1.AS5040介绍 2.硬件电路 硬件上使用SSI通信方式连接。 3.配置硬件SPI 查看手册&#xff0c;AS5040时序 可以看到在空闲阶段不发生数据传输的时候时钟(CLK)和数据(DO)都保持高电位(tCLKFE阶段)&#xff0c;在第一个脉冲的下降沿触发编码器载入发送数据&#xff0c;然后每一个…

【Unity Shader入门精要 第9章】更复杂的光照(四)

1. 透明度测试物体的阴影 对于物体有片元丢弃的情况&#xff0c;比如透明度测试或者后边会讲到的消融效果&#xff0c;使用默认的 ShadowCaster Pass 会产生问题&#xff0c;这是因为该Pass在生成阴影映射纹理时&#xff0c;没有考虑被丢弃的片元&#xff0c;而是使用完整的模…

FTP文件传输议

FTP是一种文件传输协议&#xff1a;用来上传和下载&#xff0c;实现远程共享文件&#xff0c;和统一管理文件 工作原理&#xff1a;用于互联网上的控制文件的双向传输是一个应用程序。工作在TCP/IP协议簇的&#xff0c;其传输协议是TCP协议提高文件传输的共享性和可靠性&#…

阅读笔记——《AFLNeTrans:状态间关系感知的网络协议模糊测试》

【参考文献】洪玄泉,贾鹏,刘嘉勇.AFLNeTrans&#xff1a;状态间关系感知的网络协议模糊测试[J].信息网络安全,2024,24(01):121-132.【注】本文仅为作者个人学习笔记&#xff0c;如有冒犯&#xff0c;请联系作者删除。 目录 摘要 1、引言 2、背景及动机 2.1、网络协议实现程…

正点原子LWIP学习笔记(二)MAC简介

MAC简介 一、MAC简介&#xff08;了解&#xff09;二级目录三级目录 二、ST的ETH框架&#xff08;了解&#xff09;三、SMI站管理接口&#xff08;熟悉&#xff09;四、介质接口MII、RMII&#xff08;熟悉&#xff09; 一、MAC简介&#xff08;了解&#xff09; STM32 的 MAC …

Ubuntu24.04设置静态IP地址

Ubuntu24.04设置静态IP地址 前言&#xff1a;vm17.5的动态IP问题 第一个是设置的静态IP我们可以看到是forever&#xff0c;第二个则是动态IP则是一天的时间。 如果我们不设置静态IP的话&#xff0c;那么可能在本地测试项目的时候&#xff0c;第二天发现一些服务不能用了&#…

13.js对象

定义 一种复杂数据类型&#xff0c;是无序的&#xff08;不保留键的插入顺序&#xff09;&#xff0c;以键值对&#xff08;{key:value})形式存放的数据集合 对象的创建 &#xff08;1&#xff09;字面量创建 var 对象名{ } &#xff08;2&#xff09;内部构造函数创建 v…

VirtualBox安装ubuntu22.04记录

一,VirtualBox 软件安装 虚拟机&#xff08;Virtual Machine&#xff09;指通过软件模拟的具有完整硬件系统功能的、运行在一个完全隔离环境中的完整计算机系统。在实体计算机中能够完成的工作在虚拟机中都能够实现。 常见的虚拟机软件主要有两款 VMware 和 VirtualBox 。VMwar…

争议湖北消费金融2023年业绩,营收下滑or财务内控重大缺陷?

近日&#xff0c;湖北消费金融股份有限公司&#xff08;下称“湖北消费金融”&#xff09;披露了2023年度相关信息&#xff0c;包括股权结构、关联方、董事会、分支机构、资产负债情况等信息。 据介绍&#xff0c;湖北消费金融的注册资本为10.058亿元&#xff0c;法定代表人为…

linux---线程控制

线程和进程 以前我们要同时跑多个程序&#xff0c;可以通过fork()多个子进程&#xff0c;然后通过系统函数进行程序的替换&#xff0c;但是创建进程代价大&#xff0c;不仅要拷贝一份父进程的地址空间&#xff0c;页表&#xff0c;文件表述符表等。但是线程不需要因为是进程的…

使用JavaScript日历小部件和DHTMLX Gantt的应用场景(三)

DHTMLX Suite UI 组件库允许您更快地构建跨平台、跨浏览器 Web 和移动应用程序。它包括一组丰富的即用式 HTML5 组件&#xff0c;这些组件可以轻松组合到单个应用程序界面中。 DHTMLX Gantt是用于跨浏览器和跨平台应用程序的功能齐全的Gantt图表&#xff0c;可满足项目管理应用…

【调试笔记-20240520-Linux-在 WSL2 / Ubuntu 20.04 中编译 QEMU 可运行的 OVMF 固件】

调试笔记-系列文章目录 调试笔记-20240520-Linux-在 WSL2 / Ubuntu 20.04 中编译 QEMU 可运行的 OVMF 固件 文章目录 调试笔记-系列文章目录调试笔记-20240520-Linux-在 WSL2 / Ubuntu 20.04 中编译 QEMU 可运行的 OVMF 固件 前言一、调试环境操作系统&#xff1a;Windows 10 …

作业-day-240523

思维导图 知识点问答 1、IO多路复用的原理 1、创建一个检测文件描述符的容器 fd_set fds; 2、将需要检测的文件描述符放入容器中 FD_SET(文件描述符&#xff0c;&fds); 3、通过一个阻塞函数阻塞等待容器中是否有事件产生&#xff0c;如果有一个或多个事件产生&#xff0c…