【dubbo3】看懂消费者如何发现提供者

news2025/1/15 7:22:52

服务发现机制

 服务发现是RPC框架非常重要的能力。典型的服务发现一般有两种:接口级服务发现、应用级服务发现。
 接口级服务发现典型代表是dubbo2基于zk的服务发现机制。提供者直接向注册中心注册接口信息及地址,消费者通过接口从注册中心拿到对应的地址。
 应用级服务发现典型代表是spring-cloud基于eureka的服务发现机制。提供者向注册中心注册应用信息及对应的地址,消费者从注册中心拿到应用的ip。传统的应用级服务发现消费者需要自己另外维护请求接口与应用直接关系,一般是通过请求路径匹配应用的规则来实现。这样当需要发起请求时,先找到请求对应的应用,然后再查找应用对应的IP。
 dubbo3的应用级服务发现的特别之处在于提供者在注册应用的同时,也向注册中心注册接口与应用的关系。这样消费者就无需另外维护这个关系。
以ZK作为注册中心为例,dubbo3的服务注册与发现过程大致如下图所示
在这里插入图片描述

  • 提供者注册应用信息、接口与应用的关系
  • 消费者通过接口从注册中心获取应用,再获取应用信息、IP
  • 消费者组装服务调用对象

服务注册

 提供者告知注册中心:我是谁、我的地址、我有哪些服务。其中我是谁、我的地址就是注册应用实例;我有哪些服务就是注册接口与应用的映射关系

应用实例

 dubbo应用实例在zk下的节点路径是/services/appx/ipx。appx是对应的应用名,是持久节点;ipx是对应的机器ip,是临时节点。appx下可能有多个ip,代表多个应用部署多台机器。
dubbo启动时向zk创建对应节点。

org.apache.dubbo.config.deploy.DefaultApplicationDeployer#prepareApplicationInstance

public void prepareApplicationInstance() {
        ...

        if (isRegisterConsumerInstance()) {
            exportMetadataService();
            if (hasPreparedApplicationInstance.compareAndSet(false, true)) {
                // 注:此处调用注册应用实例
                registerServiceInstance();
            }
        }
    }


org.apache.dubbo.registry.client.AbstractServiceDiscovery#register

public synchronized void register() throws RuntimeException {
        ... 
        this.serviceInstance = createServiceInstance(this.metadataInfo);
        ...
        //计算当前实例的revision,并注册到注册中心
        boolean revisionUpdated = calOrUpdateInstanceRevision(this.serviceInstance);
        if (revisionUpdated) {
            reportMetadata(this.metadataInfo);
            //注:向注册中心注册应用信息
            doRegister(this.serviceInstance);
        }
    }

注册接口应用映射

 提供者如果只告知注册中心自己的名称、地址,消费者是无法知道接口的地址。所以提供者还需要告知自己有哪些接口,这样消费者就可以根据接口找到应用,在根据应用找到地址。dubbo3接口应用映射关系节点路径:/dubbo/mapping/servicex,servicex是对应的接口类全路径,是持久节点。该节点的值是:appx,appy,appx、appy是对应的应用,多个应用名之间用","隔开。
 dubbo3在接口暴露的时候注册对应的节点:

org.apache.dubbo.config.ServiceConfig#exported

protected void exported() {
        ...
        //注:服务暴露时触发注册接口应用映射关系
        boolean succeeded = serviceNameMapping.map(url);
        ...
    }


org.apache.dubbo.registry.client.metadata.MetadataServiceNameMapping#map

public boolean map(URL url) {
       ...
       
        boolean result = true;
        for (Map.Entry<String, MetadataReport> entry : metadataReportInstance.getMetadataReports(true).entrySet()) {
            MetadataReport metadataReport = entry.getValue();
            String appName = applicationModel.getApplicationName();
            try {
               ...
                boolean succeeded;
                int currentRetryTimes = 1;
                String newConfigContent = appName;
                do {
                    ConfigItem configItem = metadataReport.getConfigItem(serviceInterface, DEFAULT_MAPPING_GROUP);
                    String oldConfigContent = configItem.getContent();
                    if (StringUtils.isNotEmpty(oldConfigContent)) {
                        boolean contains = StringUtils.isContains(oldConfigContent, appName);
                        if (contains) {
                            succeeded = true;
                            break;
                        }
                        //注:将应用名拼在已有内容后面
                        newConfigContent = oldConfigContent + COMMA_SEPARATOR + appName;
                    }
                    //注:写入注册中心, 节点路径 /dubbo/mapping/接口类全路径,值是appNameX,appNameY
                    succeeded = metadataReport.registerServiceAppMapping(serviceInterface, DEFAULT_MAPPING_GROUP, newConfigContent, configItem.getTicket());
                } while (!succeeded && currentRetryTimes++ <= CAS_RETRY_TIMES);
              ....
        }

        return result;
    }

服务发现

 消费者知道自己要调用哪个接口,它必须从注册拿到接口对应的ip才能将请求发过去。首先消费者需要订阅"接口应用映射"节点,拿到接口对应的应用;其次消费者订阅"应用实例"节点,拿到应用对应的实例ip。

订阅接口应用映射

 dubbo3在服务引用时完成订阅动作,DefaultMappingListener是监听dubbo mapping的核心类

org.apache.dubbo.registry.client.ServiceDiscoveryRegistry#doSubscribe

public void doSubscribe(URL url, NotifyListener listener) {
        url = addRegistryClusterKey(url);

        serviceDiscovery.subscribe(url, listener);

        boolean check = url.getParameter(CHECK_KEY, false);

        String key = ServiceNameMapping.buildMappingKey(url);
        Lock mappingLock = serviceNameMapping.getMappingLock(key);
        try {
            mappingLock.lock();
            Set<String> subscribedServices = serviceNameMapping.getCachedMapping(url);
            try {
            //注:创建新的MapplingListener
                MappingListener mappingListener = new DefaultMappingListener(url, subscribedServices, listener);
                // 注:首次获取接口对应的映射 并且 监听zk节点变化
                subscribedServices = serviceNameMapping.getAndListen(this.getUrl(), url, mappingListener);
                mappingListeners.put(url.getProtocolServiceKey(), mappingListener);
            } catch (Exception e) {
                logger.warn("Cannot find app mapping for service " + url.getServiceInterface() + ", will not migrate.", e);
            }
           ...
        } finally {
            mappingLock.unlock();
        }
    }


org.apache.dubbo.registry.client.ServiceDiscoveryRegistry.DefaultMappingListener#onEvent

public synchronized void onEvent(MappingChangedEvent event) {
          
           ...
            
            Set<String> newApps = event.getApps();
            Set<String> tempOldApps = oldApps;

           ...

            try {
                mappingLock.lock();
                if (CollectionUtils.isEmpty(tempOldApps) && newApps.size() > 0) {
                    serviceNameMapping.putCachedMapping(ServiceNameMapping.buildMappingKey(url), newApps);
                    subscribeURLs(url, listener, newApps);
                    oldApps = newApps;
                    return;
                }

                for (String newAppName : newApps) {
                    if (!tempOldApps.contains(newAppName)) {
                        serviceNameMapping.removeCachedMapping(ServiceNameMapping.buildMappingKey(url));
                        serviceNameMapping.putCachedMapping(ServiceNameMapping.buildMappingKey(url), newApps);
                        // old instance listener related to old app list that needs to be destroyed after subscribe refresh.
                        ServiceInstancesChangedListener oldListener = listener.getServiceListener();
                        if (oldListener != null) {
                            String appKey = toStringKeys(toTreeSet(tempOldApps));
                            Lock appSubscriptionLock = getAppSubscription(appKey);
                            try {
                                appSubscriptionLock.lock();
                                //注:从老的listener中移除当前接口,如果老的listener没有被任何接口监听则destroy。
                                oldListener.removeListener(url.getServiceKey(), listener);
                                if (!oldListener.hasListeners()) {
                                    oldListener.destroy();
                                    removeAppSubscriptionLock(appKey);
                                }
                            } finally {
                                appSubscriptionLock.unlock();
                            }
                        }

                        subscribeURLs(url, listener, newApps);
                        oldApps = newApps;
                        return;
                    }
                }
            } finally {
                mappingLock.unlock();
            }
        }


org.apache.dubbo.registry.client.ServiceDiscoveryRegistry#subscribeURLs

protected void subscribeURLs(URL url, NotifyListener listener, Set<String> serviceNames) {
        serviceNames = toTreeSet(serviceNames);
        String serviceNamesKey = toStringKeys(serviceNames);
        String serviceKey = url.getServiceKey();
        logger.info(String.format("Trying to subscribe from apps %s for service key %s, ", serviceNamesKey, serviceKey));

        // register ServiceInstancesChangedListener
        Lock appSubscriptionLock = getAppSubscription(serviceNamesKey);
        try {
            appSubscriptionLock.lock();
            ServiceInstancesChangedListener serviceInstancesChangedListener = serviceListeners.get(serviceNamesKey);
            if (serviceInstancesChangedListener == null) {
                serviceInstancesChangedListener = serviceDiscovery.createListener(serviceNames);
                serviceInstancesChangedListener.setUrl(url);
                for (String serviceName : serviceNames) {
                //注:首次获取应用实例,触发实例变化事件,组装接口invoker
                    List<ServiceInstance> serviceInstances = serviceDiscovery.getInstances(serviceName);
                    if (CollectionUtils.isNotEmpty(serviceInstances)) {
                        serviceInstancesChangedListener.onEvent(new ServiceInstancesChangedEvent(serviceName, serviceInstances));
                    }
                }
                serviceListeners.put(serviceNamesKey, serviceInstancesChangedListener);
            }
			//注:将监听器挂到zk节点上
            if (!serviceInstancesChangedListener.isDestroyed()) {
                serviceInstancesChangedListener.setUrl(url);
                listener.addServiceListener(serviceInstancesChangedListener);
                serviceInstancesChangedListener.addListenerAndNotify(url, listener);
                serviceDiscovery.addServiceInstancesChangedListener(serviceInstancesChangedListener);
            } else {
               // 注:如果该监听器之前destroy了,则从本地移除。 这里有一点不严谨:如果某个接口的mapping值变化顺序是appNameX -> appNameY -> appNameX,则第一次从appNameX变化到appNameY时,appNameX的ServiceInstanceChangedListener会destroy但是没有从本地移除。那么第二次从appNameY变回到appNameX时,接口会走到这段逻辑,导致接口的服务地址不会得到更新。当然,这种场景很少出现。
                logger.info(String.format("Listener of %s has been destroyed by another thread.", serviceNamesKey));
                serviceListeners.remove(serviceNamesKey);
            }
        } finally {
            appSubscriptionLock.unlock();
        }
    }

订阅应用实例

 消费者通过接口应用映射拿到了接口对应的应用,还要知道应用对应的实例ip才能发请求

org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener#onEvent

public void onEvent(ServiceInstancesChangedEvent event) {
       if (destroyed.get() || !accept(event) || isRetryAndExpired(event)) {
           return;
       }
       doOnEvent(event);
   }

   /**
    * @param event
    */
   private synchronized void doOnEvent(ServiceInstancesChangedEvent event) {
       if (destroyed.get() || !accept(event) || isRetryAndExpired(event)) {
           return;
       }

       refreshInstance(event);

       if (logger.isDebugEnabled()) {
           logger.debug(event.getServiceInstances().toString());
       }
       //注:这里一大段是从新的实例分别获取各个实例提供的接口,从而组装消费者接口引用的invokers
       Map<String, List<ServiceInstance>> revisionToInstances = new HashMap<>();
       Map<ServiceInfo, Set<String>> localServiceToRevisions = new HashMap<>();

       // grouping all instances of this app(service name) by revision
       for (Map.Entry<String, List<ServiceInstance>> entry : allInstances.entrySet()) {
           List<ServiceInstance> instances = entry.getValue();
           for (ServiceInstance instance : instances) {
               String revision = getExportedServicesRevision(instance);
               if (revision == null || EMPTY_REVISION.equals(revision)) {
                   if (logger.isDebugEnabled()) {
                       logger.debug("Find instance without valid service metadata: " + instance.getAddress());
                   }
                   continue;
               }
               List<ServiceInstance> subInstances = revisionToInstances.computeIfAbsent(revision, r -> new LinkedList<>());
               subInstances.add(instance);
           }
       }

       // get MetadataInfo with revision
       for (Map.Entry<String, List<ServiceInstance>> entry : revisionToInstances.entrySet()) {
           String revision = entry.getKey();
           List<ServiceInstance> subInstances = entry.getValue();

           MetadataInfo metadata = subInstances.stream()
               .map(ServiceInstance::getServiceMetadata)
               .filter(Objects::nonNull)
               .filter(m -> revision.equals(m.getRevision()))
               .findFirst()
               .orElseGet(() -> serviceDiscovery.getRemoteMetadata(revision, subInstances));

           parseMetadata(revision, metadata, localServiceToRevisions);
           // update metadata into each instance, in case new instance created.
           for (ServiceInstance tmpInstance : subInstances) {
               MetadataInfo originMetadata = tmpInstance.getServiceMetadata();
               if (originMetadata == null || !Objects.equals(originMetadata.getRevision(), metadata.getRevision())) {
                   tmpInstance.setServiceMetadata(metadata);
               }
           }
       }

       int emptyNum = hasEmptyMetadata(revisionToInstances);
       if (emptyNum != 0) {// retry every 10 seconds
           hasEmptyMetadata = true;
           if (retryPermission.tryAcquire()) {
               if (retryFuture != null && !retryFuture.isDone()) {
                   // cancel last retryFuture because only one retryFuture will be canceled at destroy().
                   retryFuture.cancel(true);
               }
               try {
                   retryFuture = scheduler.schedule(new AddressRefreshRetryTask(retryPermission, event.getServiceName()), 10_000L, TimeUnit.MILLISECONDS);
               } catch (Exception e) {
                   logger.error("Error submitting async retry task.");
               }
               logger.warn("Address refresh try task submitted");
           }

           // return if all metadata is empty, this notification will not take effect.
           if (emptyNum == revisionToInstances.size()) {
               // 1-17 - Address refresh failed.
               logger.error(REGISTRY_FAILED_REFRESH_ADDRESS, "metadata Server failure", "",
                   "Address refresh failed because of Metadata Server failure, wait for retry or new address refresh event.");

               return;
           }
       }
       hasEmptyMetadata = false;

       Map<String, Map<Integer, Map<Set<String>, Object>>> protocolRevisionsToUrls = new HashMap<>();
       Map<String, List<ProtocolServiceKeyWithUrls>> newServiceUrls = new HashMap<>();
       for (Map.Entry<ServiceInfo, Set<String>> entry : localServiceToRevisions.entrySet()) {
           ServiceInfo serviceInfo = entry.getKey();
           Set<String> revisions = entry.getValue();

           Map<Integer, Map<Set<String>, Object>> portToRevisions = protocolRevisionsToUrls.computeIfAbsent(serviceInfo.getProtocol(), k -> new HashMap<>());
           Map<Set<String>, Object> revisionsToUrls = portToRevisions.computeIfAbsent(serviceInfo.getPort(), k -> new HashMap<>());
           Object urls = revisionsToUrls.get(revisions);
           if (urls == null) {
               urls = getServiceUrlsCache(revisionToInstances, revisions, serviceInfo.getProtocol(), serviceInfo.getPort());
               revisionsToUrls.put(revisions, urls);
           }

           List<ProtocolServiceKeyWithUrls> list = newServiceUrls.computeIfAbsent(serviceInfo.getPath(), k -> new LinkedList<>());
           list.add(new ProtocolServiceKeyWithUrls(serviceInfo.getProtocolServiceKey(), (List<URL>) urls));
       }

       this.serviceUrls = newServiceUrls;
       //注:触发服务地址变化,组装新的服务invoker
       this.notifyAddressChanged();
   }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/992713.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

1 Linux输入子系统

1 Linux输入子系统 https://www.cnblogs.com/beijiqie1104/p/11418082.html Linux input 子系统详解 https://www.cnblogs.com/yikoulinux/p/15208238.html

2023-9-8 求组合数(四)

题目链接&#xff1a;求组合数 IV #include <iostream> #include <algorithm>using namespace std;const int N 5010;int primes[N], cnt; bool st[N]; // 每个质数的次数 int sum[N];void get_primes(int n) {for(int i 2; i < n; i){if(!st[i]) primes[cnt]…

5.删除链表元素问题

1.删除特点节点 给你一个链表的头节点 head 和一个整数 val &#xff0c;请你删除链表中所有满足 Node.val val 的节点&#xff0c;并返回 新的头节点 。 示例 1&#xff1a; 输入&#xff1a;head [1,2,6,3,4,5,6], val 6 输出&#xff1a;[1,2,3,4,5]示例 2&#xff1a; 输…

【LeetCode: 207.课程表:拓扑排序+图】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…

【C/C++】BMP格式32位转24位

问题 如题 解决方法 bmp文件格式参考:【C/C++】BITMAP格式分析_vc++ bitmap头文件_sunriver2000的博客-CSDN博客BITMAP文件大体上分成四个部分,如下表所示。文件部分长度(字节)位图文件头 Bitmap File Header14位图信息数据头 Bitmap Info Header40调色板 Palette4*n (n≥…

Linux 安装elasticsearch-7.5.1

相关链接 官⽹&#xff1a; https://www.elastic.co/cn/downloads/elasticsearch 下载&#xff1a; wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.5.1-linux-x86_64.tar.gz 分词器&#xff1a; https://github.com/medcl/elasticsearch-an…

MJDK 如何实现压缩速率的 5 倍提升?

MJDK 是基于 OpenJDK 构建的美团 JDK 发行版。本文主要介绍 MJDK 是如何在保障 java.util.zip.* API 及压缩格式兼容性的前提下&#xff0c;实现压缩/解压缩速率提升 5-10 倍的效果。希望相关的经验能够帮助到更多的技术同学。 1 前言 数据压缩技术[1]因可有效降低数据存储及…

appium环境搭建

一.appium环境搭建 1.python3 python3的下载安装这里就不多做介绍了&#xff0c;当然你也可以选择自己喜欢的语音&#xff0c;比如java… 2.jdk 1&#xff09;下载地址 官网(需登录账号)&#xff1a; https://www.oracle.com/java/technologies/downloads/ 百度网盘&…

2022护网行动经验分享(2023护网招人)

今年的护网又开始摇人了&#xff0c;不知道大家有想法没&#xff1f; 去年的护网结束之后&#xff0c;朋友圈感觉是在过年&#xff0c;到处是倒计时和庆祝声。 看得出来防守方们7*24小时的看监控还是比较无奈的。 本次复盘基于我对整个护网行动的观察总结而来&#xff0c;仅…

Spring Security OAuth2 远程命令执行漏洞

文章目录 一、搭建环境二、漏洞验证三、准备payload四、执行payload五、变形payload 一、搭建环境 cd vulhub/spring/CVE-2016-4977/ docker-compose up -d 二、漏洞验证 访问 http://192.168.10.171:8080/oauth/authorize?response_type${233*233}&client_idacme&s…

早期传言和升级:Apple Watch Ultra 2,我们的期待!

Apple Watch Ultra 2可能正在研制中,为去年的Apple Watch Ultras带来升级。现在,该公司提供了一款适合跑步者和户外运动爱好者的智能手表,我们迫切希望看到第二代型号将如何改进。 作为Apple Watch Series 8和Apple Watch SE(2022)的替代品,Apple Watch Ultra具有所有苹…

MPP 与 SMP 的区别,终于有人讲明白了【文末送书】

文章目录 导读01 SMP1. SMP 的典型特征2. SMP的优缺点 02 分布式MPP计算架构1. MPP 架构核心原理2. MPP 典型特征3. MPP优缺点 写作末尾 导读 当今数据计算领域主要的应用程序和模型可大致分为在线事务处理&#xff08;On-line Transaction Processing &#xff0c;OLTP&#…

笔试记录-扔鸡蛋问题

写目录 一个鸡蛋两个鸡蛋K个鸡蛋 今天面试官问了我这个扔鸡蛋问题&#xff0c;以前学过&#xff0c;但是面试的时候想不起来了&#xff0c;应该是直接寄了&#xff0c;接下来总结一下这个问题的动态规划做法. 问题&#xff1a;有一个N层高的楼&#xff0c;现在给你若干个鸡蛋&a…

实现在一张图片中寻找另一张图片的目标

OpenCV库中的SIFT特征检测算法和FLANN&#xff08;快速最近邻搜索库&#xff09;匹配算法来找到一个图片中的元素在另一个图片中的位置&#xff0c;并在源图片中标出它们的位置。 以下是一个简单的例子&#xff0c;使用OpenCV库&#xff0c;利用SIFT特征检测算法&#xff0c;在…

Kafka中Producer源码解读

Producer源码解读 在 Kafka 中, 我们把产生消息的一方称为 Producer 即 生产者, 它是 Kafka 的核心组件之一, 也是消息的来源所在。它的主要功能是将客户端的请求打包封装发送到 kafka 集群的某个 Topic 的某个分区上。那么这些生产者产生的消息是怎么传到 Kafka 服务端的呢&a…

如何使用Google Compute Engine入门指南快速创建和配置您的云虚拟机实例

文章目录 步骤1&#xff1a;创建 Google Cloud Platform&#xff08;GCP&#xff09;账户步骤2&#xff1a;设置 GCP 项目步骤3&#xff1a;启用 Google Compute Engine API步骤4&#xff1a;安装 Google Cloud SDK步骤5&#xff1a;创建虚拟机实例步骤6&#xff1a;连接到虚拟…

ADL200N单相逆流监测多功能仪表在光伏中的应用-安科瑞黄安南

安科瑞电气-黄安南 ,18/*76-150-/6237 随着光伏行业的发展&#xff0c;部分地区村级变压器及工业用电变压器容量与光伏项目的装机容量处于饱和。电网公司要求对后建的光伏并网系统为不可逆流发电系统&#xff0c;指光伏并网系统所发生的电由本地负载消耗&#xff0c;多余的电不…

100万级连接,爱奇艺WebSocket网关如何架构

说在前面 在40岁老架构师 尼恩的读者社区(50)中&#xff0c;很多小伙伴拿到一线互联网企业如阿里、网易、有赞、希音、百度、滴滴的面试资格。 最近&#xff0c;尼恩指导一个小伙伴简历&#xff0c;写了一个《高并发网关项目》&#xff0c;此项目帮这个小伙拿到 字节/阿里/微…

IDEFICS 简介: 最先进视觉语言模型的开源复现

我们很高兴发布 IDEFICS ( Image-aware Decoder Enhanced la Flamingo with Ininterleaved Cross-attention S ) 这一开放视觉语言模型。IDEFICS 基于 Flamingo&#xff0c;Flamingo 作为最先进的视觉语言模型&#xff0c;最初由 DeepMind 开发&#xff0c;但目前尚未公开发布…

【广州华锐互动】元宇宙技术如何赋能传统工业企业?

随着科技的飞速发展&#xff0c;我们正处于工业革命4.0的时代&#xff0c;数字化、网络化和智能化正在深刻地改变着我们的生活和工作方式。在这个变革的大潮中&#xff0c;工业元宇宙平台应运而生&#xff0c;为企业带来了前所未有的机遇和挑战。 广州华锐互动开发的工业元宇宙…