nacos 服务发现获取列表源码分析

news2024/9/29 13:28:17

nacos 服务发现获取列表源码是注册中心最重要的技术点之一,其获取服务列表理论上是在首次接口调用时获取,有时候配置饥饿加载,即服务启动时就获取服务列表:今天我们从一个入口解析获取配置列表;

一、客户端源码

1、自动装配:

commons 中

 nacos 中

 

2、以nacos包为例 com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(Endpoint.class)
@ConditionalOnNacosDiscoveryEnabled
public class NacosDiscoveryEndpointAutoConfiguration {

	@Bean
	@ConditionalOnMissingBean
	@ConditionalOnAvailableEndpoint//初始化点击此类
	public NacosDiscoveryEndpoint nacosDiscoveryEndpoint(
			NacosServiceManager nacosServiceManager,
			NacosDiscoveryProperties nacosDiscoveryProperties) {
		return new NacosDiscoveryEndpoint(nacosServiceManager, nacosDiscoveryProperties);
	}

	@Bean
	@ConditionalOnEnabledHealthIndicator("nacos-discovery")
	public HealthIndicator nacosDiscoveryHealthIndicator(
			NacosServiceManager nacosServiceManager,
			NacosDiscoveryProperties nacosDiscoveryProperties) {
		Properties nacosProperties = nacosDiscoveryProperties.getNacosProperties();
		return new NacosDiscoveryHealthIndicator(
				nacosServiceManager.getNamingService(nacosProperties));
	}

}

3、点击 com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpoint

    @ReadOperation
	public Map<String, Object> nacosDiscovery() {
		Map<String, Object> result = new HashMap<>();
		result.put("NacosDiscoveryProperties", nacosDiscoveryProperties);

		NamingService namingService = nacosServiceManager
				.getNamingService(nacosDiscoveryProperties.getNacosProperties());
		List<ServiceInfo> subscribe = Collections.emptyList();

		try {
			subscribe = namingService.getSubscribeServices();
			for (ServiceInfo serviceInfo : subscribe) {
                //获取应用实例,点击
				List<Instance> instances = namingService.getAllInstances(
						serviceInfo.getName(), serviceInfo.getGroupName());
				serviceInfo.setHosts(instances);
			}
		}
		catch (Exception e) {
			log.error("get subscribe services from nacos fail,", e);
		}
		result.put("subscribe", subscribe);
		return result;
	}

4、点击namingService.getAllInstances方法进入

com.alibaba.nacos.client.naming.NacosNamingService#getAllInstances(java.lang.String, java.lang.String)

    @Override
    public List<Instance> getAllInstances(String serviceName, String groupName) throws NacosException {//点击进入
        return getAllInstances(serviceName, groupName, new ArrayList<String>());
    }

来到

    @Override
    public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters) throws NacosException {
        //点击
        return getAllInstances(serviceName, groupName, clusters, true);
    }

来到

    @Override
    public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException {

        ServiceInfo serviceInfo;
        if (subscribe) {
            //点击获取实例
            serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        } else {
            serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        }
        List<Instance> list;
        if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
            return new ArrayList<Instance>();
        }
        return list;
    }

 5、进入com.alibaba.nacos.client.naming.core.HostReactor#getServiceInfo

    public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {

        NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
        String key = ServiceInfo.getKey(serviceName, clusters);
        if (failoverReactor.isFailoverSwitch()) {
            return failoverReactor.getService(key);
        }

        ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);

        if (null == serviceObj) {
            serviceObj = new ServiceInfo(serviceName, clusters);

            serviceInfoMap.put(serviceObj.getKey(), serviceObj);

            updatingMap.put(serviceName, new Object());
            updateServiceNow(serviceName, clusters);//点击进入
            updatingMap.remove(serviceName);

        } else if (updatingMap.containsKey(serviceName)) {

            if (UPDATE_HOLD_INTERVAL > 0) {
                // hold a moment waiting for update finish
                synchronized (serviceObj) {
                    try {
                        serviceObj.wait(UPDATE_HOLD_INTERVAL);
                    } catch (InterruptedException e) {
                        NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                    }
                }
            }
        }
        //延时执行的定时任务,更新客户端缓存
        scheduleUpdateIfAbsent(serviceName, clusters);

        return serviceInfoMap.get(serviceObj.getKey());
    }

来到 updateServiceNow(serviceName, clusters) 方法

com.alibaba.nacos.client.naming.core.HostReactor#updateServiceNow

    public void updateServiceNow(String serviceName, String clusters) {
        ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
        try {
            //正式获取服务列表,点击
            String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);
            if (StringUtils.isNotEmpty(result)) {
                processServiceJSON(result);
            }
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
        } finally {
            if (oldService != null) {
                synchronized (oldService) {
                    oldService.notifyAll();
                }
            }
        }
    }

 6、来到com.alibaba.nacos.client.naming.net.NamingProxy#queryList

    public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
        throws NacosException {

        final Map<String, String> params = new HashMap<String, String>(8);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put("clusters", clusters);
        params.put("udpPort", String.valueOf(udpPort));
        params.put("clientIP", NetUtils.localIP());
        params.put("healthyOnly", String.valueOf(healthyOnly));
        //此处开始去nacos服务端获取列表
        return reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/list", params, HttpMethod.GET);
    }

7、点击 scheduleUpdateIfAbsent(serviceName, clusters);

 public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
        if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
            return;
        }

        synchronized (futureMap) {
            if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
                return;
            }
            //定时获取服务端最新数据,并更新到本地的服务,点击UpdateTask
            ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
            futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
        }
    }

 8、点击UpdateTask 来到

com.alibaba.nacos.client.naming.core.HostReactor.UpdateTask#run

        @Override
        public void run() {
            try {
                ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));

                if (serviceObj == null) {
                    updateServiceNow(serviceName, clusters);//获取更新
                    executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
                    return;
                }

                if (serviceObj.getLastRefTime() <= lastRefTime) {
                    updateServiceNow(serviceName, clusters);//获取更新
                    serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                } else {
                    // if serviceName already updated by push, we should not override it
                    // since the push data may be different from pull through force push
                    refreshOnly(serviceName, clusters);
                }

                executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);

                lastRefTime = serviceObj.getLastRefTime();
            } catch (Throwable e) {
                NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
            }

        }

二、服务端源码

com.alibaba.nacos.naming.controllers.InstanceController#list

1、入口

    @GetMapping("/list")
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
    public ObjectNode list(HttpServletRequest request) throws Exception {

        String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);

        String agent = WebUtils.getUserAgent(request);
        String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
        String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
        int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
        String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
        boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));

        String app = WebUtils.optional(request, "app", StringUtils.EMPTY);

        String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);

        boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
        //核心入口
        return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
                healthyOnly);
    }

点击

    public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
            int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {

        ClientInfo clientInfo = new ClientInfo(agent);
        ObjectNode result = JacksonUtils.createEmptyJsonNode();
        Service service = serviceManager.getService(namespaceId, serviceName);
        long cacheMillis = switchDomain.getDefaultCacheMillis();

        // now try to enable the push
        try {
            if (udpPort > 0 && pushService.canEnablePush(agent)) {

                pushService
                        .addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),
                                pushDataSource, tid, app);
                cacheMillis = switchDomain.getPushCacheMillis(serviceName);
            }
        } catch (Exception e) {
            Loggers.SRV_LOG
                    .error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);
            cacheMillis = switchDomain.getDefaultCacheMillis();
        }

        if (service == null) {
            if (Loggers.SRV_LOG.isDebugEnabled()) {
                Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
            }
            result.put("name", serviceName);
            result.put("clusters", clusters);
            result.put("cacheMillis", cacheMillis);
            result.replace("hosts", JacksonUtils.createEmptyArrayNode());
            return result;
        }

        checkIfDisabled(service);

        List<Instance> srvedIPs;
        //获取服务的所有ip
        srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));

        // filter ips using selector:
        if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
            srvedIPs = service.getSelector().select(clientIP, srvedIPs);
        }

        if (CollectionUtils.isEmpty(srvedIPs)) {

            if (Loggers.SRV_LOG.isDebugEnabled()) {
                Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
            }

            if (clientInfo.type == ClientInfo.ClientType.JAVA
                    && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
                result.put("dom", serviceName);
            } else {
                result.put("dom", NamingUtils.getServiceName(serviceName));
            }

            result.put("name", serviceName);
            result.put("cacheMillis", cacheMillis);
            result.put("lastRefTime", System.currentTimeMillis());
            result.put("checksum", service.getChecksum());
            result.put("useSpecifiedURL", false);
            result.put("clusters", clusters);
            result.put("env", env);
            result.set("hosts", JacksonUtils.createEmptyArrayNode());
            result.set("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
            return result;
        }

        Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
        ipMap.put(Boolean.TRUE, new ArrayList<>());
        ipMap.put(Boolean.FALSE, new ArrayList<>());
        //循环放入服务ip
        for (Instance ip : srvedIPs) {
            ipMap.get(ip.isHealthy()).add(ip);
        }

        if (isCheck) {
            result.put("reachProtectThreshold", false);
        }

        double threshold = service.getProtectThreshold();

        if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {

            Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName);
            if (isCheck) {
                result.put("reachProtectThreshold", true);
            }

            ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
            ipMap.get(Boolean.FALSE).clear();
        }

        if (isCheck) {
            result.put("protectThreshold", service.getProtectThreshold());
            result.put("reachLocalSiteCallThreshold", false);

            return JacksonUtils.createEmptyJsonNode();
        }

        ArrayNode hosts = JacksonUtils.createEmptyArrayNode();
        //循环封装数据
        for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
            List<Instance> ips = entry.getValue();

            if (healthyOnly && !entry.getKey()) {
                continue;
            }

            for (Instance instance : ips) {

                // remove disabled instance:
                if (!instance.isEnabled()) {
                    continue;
                }

                ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();

                ipObj.put("ip", instance.getIp());
                ipObj.put("port", instance.getPort());
                // deprecated since nacos 1.0.0:
                ipObj.put("valid", entry.getKey());
                ipObj.put("healthy", entry.getKey());
                ipObj.put("marked", instance.isMarked());
                ipObj.put("instanceId", instance.getInstanceId());
                ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));
                ipObj.put("enabled", instance.isEnabled());
                ipObj.put("weight", instance.getWeight());
                ipObj.put("clusterName", instance.getClusterName());
                if (clientInfo.type == ClientInfo.ClientType.JAVA
                        && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
                    ipObj.put("serviceName", instance.getServiceName());
                } else {
                    ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));
                }

                ipObj.put("ephemeral", instance.isEphemeral());
                hosts.add(ipObj);

            }
        }
        //放到返回值里面
        result.replace("hosts", hosts);
        if (clientInfo.type == ClientInfo.ClientType.JAVA
                && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
            result.put("dom", serviceName);
        } else {
            result.put("dom", NamingUtils.getServiceName(serviceName));
        }
        result.put("name", serviceName);
        result.put("cacheMillis", cacheMillis);
        result.put("lastRefTime", System.currentTimeMillis());
        result.put("checksum", service.getChecksum());
        result.put("useSpecifiedURL", false);
        result.put("clusters", clusters);
        result.put("env", env);
        result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
        return result;
    }

 

到此,服务发现的源码流程分析完毕,下篇我们nacos配置中心源码,敬请期待!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/186283.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

推荐系统之推荐缓存服务

5.6 推荐缓存服务 学习目标 目标 无应用 无 5.6.1 待推荐结果的redis缓存 目的&#xff1a;对待推荐结果进行二级缓存&#xff0c;多级缓存减少数据库读取压力步骤&#xff1a; 1、获取redis结果&#xff0c;进行判断 如果redis有&#xff0c;读取需要推荐的文章数量放回&am…

(深度学习快速入门)第四章第一节:基础图像处理知识

文章目录一&#xff1a;位图和矢量图二&#xff1a;图像分辨率三&#xff1a;颜色模式&#xff08;1&#xff09;RGB&#xff08;2&#xff09;HSB&#xff08;2&#xff09;灰度图四&#xff1a;通道五&#xff1a;亮度、对比度和饱和度六&#xff1a;图像平滑和锐化&#xff…

D3股权穿透图

前言&#xff1a;最近做了一个项目&#xff0c;主要就是实现各种类似企查查的各种图谱&#xff0c;欢迎交流。后期将完成的谱图全部链接上&#xff0c;目前已大致实现了&#xff1a; 【企业关系图谱】、【企业构成图谱】、【股权穿透图】、【股权结构图】、【关联方认定图】 准…

【蓝桥杯基础题】2018年省赛—日志统计

&#x1f451;专栏内容&#xff1a;蓝桥杯刷题⛪个人主页&#xff1a;子夜的星的主页&#x1f495;座右铭&#xff1a;前路未远&#xff0c;步履不停 目录一、题目描述1.问题描述2.输入格式3.输出格式4.一个例子二、题目分析1、暴力法2、双指针三、代码汇总1、暴力代码汇总2、双…

【Mysql第一期 数据库概述】

文章目录1. 为什么要使用数据库2. 数据库与数据库管理系统2.1 数据库的相关概念2.2 数据库特点2.3SQL优点3.常见的数据库介绍1.Oracle2.SQL Server3.MySQL4.Access5.DB26.PostgreSQL7.SQLite8.informix4. MySQL介绍4.1Mysql重大历史事件4.2 关于MySQL 8.04.3 Why choose MySQL?…

linux内核读文件代码分析

linux下“一切皆文件”,所有设备都可以被抽象成文件,用户态可以通过open、read、write、llseek等api操作一个文件,通过系统调用进入内核态,最终访问到pagecache/磁盘上的数据,然后返回给用户态。 kernel version:v6.2-rc4 社区master主干 用户态应用程序调用read接口,通…

【转载】车载传感器与云端数据交换标准SensorIS的理解与使用

原文 https://zhuanlan.zhihu.com/p/386277784 1、什么是SensorIS?SensorIS全称是Sensor Interface Specification&#xff0c;翻译为中文就是传感器接口规范&#xff0c;是由来自全球汽车行业的主机厂、地图和数据提供商、传感器制造商和电信运营商共同组成的开放团体发布的一…

JavaEE day10 初识SpringMVC

JSON简介 JSON &#xff1a;JavaScript Object Notation JS对象表示法 是轻量级的文本数据交换格式&#xff0c;但是JSON仍然独立于语言和平台。其解析器和库支持许多不同的编程语言。目前非常多的动态编程语言&#xff08;java&#xff0c;PHP&#xff09;都支持JSON。JSON…

禅道好用吗?优缺点及类似10大项目管理系统介绍

类似禅道的十大项目管理软件&#xff1a;1、一站式研发项目管理软件PingCode&#xff1b;2、通用型项目协作工具Worktile&#xff1b;3、开源项目管理软件Redmine&#xff1b;4、免费项目管理软件Trello&#xff1b;5、无代码项目管理软件Monday&#xff1b;6、IT项目追踪管理工…

面试宝典-数据库基础

数据库基础前言一、数据库1.1 sql练习题1.2 sql语句执行顺序1.3 sql语句编写前言 本文主要记录B站视频视频链接的内容&#xff0c;做到知识梳理和总结的作用&#xff0c;项目git地址。 一、数据库 1.1 sql练习题 user表数据: idusername1张三2李四3王五4小刘 user_role表数…

CrackQL:一款功能强大的图形化密码爆破和模糊测试工具

关于CrackQL CrackQL是一款功能强大的图形化密码爆破和模糊测试工具&#xff0c;在该工具的帮助下&#xff0c;广大研究人员可以针对密码安全和应用程序安全进行渗透测试。 除此之外&#xff0c;CrackQL同时也是一款通用的GraphQL渗透测试工具&#xff0c;它可以控制速率限制…

垃圾分类智能分析系统 yolov7

垃圾分类智能分析系统应用pythonyolov7网络模型深度学习识别技术&#xff0c;自动识别违规投放行为并现场进行语音提示实时预警。如垃圾满溢抓拍预警、人脸识别、工服识别、厨余垃圾混投未破袋识别预警、垃圾落地识别预警、人来扔垃圾语音提醒等。我们选择当下YOLO最新的卷积神…

数组去重的七种方法

数组去重的七种方法1. 双重for循环2. forindexOf3.es6 set4.filter5.includes6.创建一个新的object7.new Map()1. 双重for循环 第1种是定义一个新的空数组&#xff0c;再执行嵌套双循环&#xff0c;监测空数组中如果没有的元素&#xff0c;push进空数组中。这个方法考察了conti…

AcWing - 寒假每日一题2023(DAY 16——DAY 20)

文章目录一、AcWing 4455. 出行计划&#xff08;简单&#xff09;1. 实现思路2. 实现代码二、AcWing 4510. 寻宝&#xff01;大冒险&#xff01;&#xff08;简单&#xff09;1. 实现思路2. 实现代码三、AcWing 3422. 左孩子右兄弟&#xff08;中等&#xff09;1. 实现思路2. 实…

【MySQL】过年没有回老家,在出租屋里整理了一些思维导图

Xmind导图知识点Mysql知识点SQL知识点Mybatis知识点面试题分享MySQL部分Mybatis部分Mysql知识点 通过下面的图片可以看出&#xff0c;MySQL基础语法分为四部分&#xff1a;连接数据库&#xff0c;对数据库的操作&#xff0c;对表中的数据操作&#xff0c;对表操作等等。 SQL…

python exe程序注册为window系统服务

1、使用pyinstaller将py打包成exe 1、安装 pip install pyinstaller2、打包成exe可执行文件 pyinstaller -F packTest.py #packTest.py为待打包的py文件打包成功后会在同级目录中生成两个文件夹和一个文件&#xff0c;分别为dist和build文件夹&#xff0c;以及一份与.py文件同…

Java——最大子数组和

题目链接 leetcode在线oj题——最大子数组和 题目描述 给你一个整数数组 nums &#xff0c;请你找出一个具有最大和的连续子数组&#xff08;子数组最少包含一个元素&#xff09;&#xff0c;返回其最大和。 子数组 是数组中的一个连续部分。 题目示例 输入&#xff1a;…

Vulnhub DC-4靶机渗透

环境准备DC-4靶机 ip&#xff1a;&#xff1f;&#xff1f;&#xff1f;&#xff1f;&#xff1f;&#xff1f;&#xff1f;kali攻击机 ip&#xff1a;192.168.153.128一、信息收集kali攻击机中&#xff0c;使用 arp-scan -l 扫描c段&#xff08;-l为扫描c段&#xff09;确定靶…

自动驾驶——智能配电

一、汽车配电 汽车配电&#xff08;Power Distrubition Unit&#xff0c;PDU&#xff09;分为低压配电与高压配电&#xff0c;即低压PDU与高压PDU。 二、传统控制方式——PCB式电器盒 传统配电盒&#xff08;机电器件&#xff09;&#xff1a; &#xff08;1&#xff09;继…

为什么要做黑盒测试?黑盒测试有什么作用?

对于软件测试的从业者来说&#xff0c;黑盒测试是十分重要的测试方式&#xff0c;它可以弥补白盒测试检查不到的部分。可能刚刚入门的测试小白&#xff0c;对于为什么要做黑盒测试&#xff1f;黑盒测试有什么作用&#xff1f;仍然抱有很大的疑问。下面小编就来从黑盒测试的概念…