七、Nacos源码系列:Nacos服务发现

news2025/1/12 7:39:57

目录

一、服务发现

二、getServices():获取服务列表

2.1、获取服务列表

2.2、总结图

三、getInstances(serviceId):获取服务实例列表 

3.1、从缓存中获取服务信息

3.2、缓存为空,执行订阅服务

3.2.1、调度更新,往线程池中提交一个UpdateTask任务

3.2.2、订阅服务 

 3.2.3、处理服务信息

3.3、非订阅模式,通过grpc发送ServiceQueryRequest服务查询请求

3.4、筛选满足条件的实例 

3.5、总结图


一、服务发现

在discovery-provider项目的pom.xml中,我们引入了如下依赖:

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
    <version>2.2.9.RELEASE</version>
</dependency>

SpringCloud很多功能都是基于SpringBoot项目的自动配置原理来扩展实现的,下面我们查看spring-cloud-starter-alibaba-nacos-discovery-2.2.9.RELEASE.jar包路径下的spring.factories的"org.springframework.boot.autoconfigure.EnableAutoConfiguration"自动装配类配置,如下图:

如上图,跟客户端服务发现有关的有两个自动配置类:NacosDiscoveryClientConfiguration和NacosDiscoveryAutoConfiguration。

相关源码如下:

@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
@ConditionalOnBlockingDiscoveryEnabled
@ConditionalOnNacosDiscoveryEnabled
@AutoConfigureBefore({ SimpleDiscoveryClientAutoConfiguration.class,
		CommonsClientAutoConfiguration.class })
// 在NacosDiscoveryAutoConfiguration自动装配类执行完成后才执行
@AutoConfigureAfter(NacosDiscoveryAutoConfiguration.class)
public class NacosDiscoveryClientConfiguration {

    // 创建DiscoveryClient bean对象
	@Bean
	public DiscoveryClient nacosDiscoveryClient(
			NacosServiceDiscovery nacosServiceDiscovery) {
		return new NacosDiscoveryClient(nacosServiceDiscovery);
	}

	@Bean
	@ConditionalOnMissingBean
	@ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled",
			matchIfMissing = true)
	public NacosWatch nacosWatch(NacosServiceManager nacosServiceManager,
			NacosDiscoveryProperties nacosDiscoveryProperties) {
		return new NacosWatch(nacosServiceManager, nacosDiscoveryProperties);
	}

}


@Configuration(proxyBeanMethods = false)
// spring.cloud.discovery.enabled=true时才生效,缺省值为true
@ConditionalOnDiscoveryEnabled
// spring.cloud.nacos.discovery.enabled=true时才生效,缺省值为true
@ConditionalOnNacosDiscoveryEnabled
public class NacosDiscoveryAutoConfiguration {

	@Bean
	@ConditionalOnMissingBean
	public NacosDiscoveryProperties nacosProperties() {
        // 匹配配置文件中以“spring.cloud.nacos.discovery”为前缀的那些属性,
        // 如namespace、username、password、serverAddr等属性
		return new NacosDiscoveryProperties();
	}

    // 创建NacosServiceDiscovery bean对象
	@Bean
	@ConditionalOnMissingBean
	public NacosServiceDiscovery nacosServiceDiscovery(
			NacosDiscoveryProperties discoveryProperties,
			NacosServiceManager nacosServiceManager) {
		return new NacosServiceDiscovery(discoveryProperties, nacosServiceManager);
	}

}

在NacosDiscoveryAutoConfiguration自动配置类中,创建了一个NacosServiceDiscovery的bean对象,然后在NacosDiscoveryClientConfiguration自动装配时,创建DiscoveryClient的bean对象,传入前面创建的NacosServiceDiscovery对象。

重点关注NacosDiscoveryClient这个类,NacosDiscoveryClient的源码如下:

public class NacosDiscoveryClient implements DiscoveryClient {

	private static final Logger log = LoggerFactory.getLogger(NacosDiscoveryClient.class);

	/**
	 * Nacos Discovery Client Description.
	 */
	public static final String DESCRIPTION = "Spring Cloud Nacos Discovery Client";

	private NacosServiceDiscovery serviceDiscovery;

	public NacosDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) {
		this.serviceDiscovery = nacosServiceDiscovery;
	}

	@Override
	public String description() {
		return DESCRIPTION;
	}

	@Override
	public List<ServiceInstance> getInstances(String serviceId) {
		try {
			return serviceDiscovery.getInstances(serviceId);
		}
		catch (Exception e) {
			throw new RuntimeException(
					"Can not get hosts from nacos server. serviceId: " + serviceId, e);
		}
	}

	@Override
	public List<String> getServices() {
		try {
			return serviceDiscovery.getServices();
		}
		catch (Exception e) {
			log.error("get service name from nacos server fail,", e);
			return Collections.emptyList();
		}
	}

}

可以看到,NacosDiscoveryClient实现了SpringCloud的DiscoveryClient接口,重点是getInstances()和getServices()方法,而且都是由NacosServiceDiscovery类去实现。

public class NacosServiceDiscovery {

    // 跟配置文件中以“spring.cloud.nacos.discovery”前缀的属性配置对应上
	private NacosDiscoveryProperties discoveryProperties;

    // nacos服务管理器对象
	private NacosServiceManager nacosServiceManager;

	public NacosServiceDiscovery(NacosDiscoveryProperties discoveryProperties,
			NacosServiceManager nacosServiceManager) {
		this.discoveryProperties = discoveryProperties;
		this.nacosServiceManager = nacosServiceManager;
	}

	/**
	 * 返回指定group和servic的所有实例
	 */
	public List<ServiceInstance> getInstances(String serviceId) throws NacosException {
		// 配置文件中配置的group组名
        String group = discoveryProperties.getGroup();
		// namingService(): 通过反射创建一个NacosNamingService对象
        // 最终会调用NacosNamingService#selectInstances()方法
        List<Instance> instances = namingService().selectInstances(serviceId, group,
				true);
        // 将Instance包装成NacosServiceInstance对象返回
		return hostToServiceInstanceList(instances, serviceId);
	}

	/**
	 * 返回指定group的所有服务名称
	 */
	public List<String> getServices() throws NacosException {
        // 配置文件中配置的group组名
		String group = discoveryProperties.getGroup();
        // namingService(): 通过反射创建一个NacosNamingService对象
        // 最终会调用NamingGrpcClientProxy#getServiceList()方法
		ListView<String> services = namingService().getServicesOfServer(1,
				Integer.MAX_VALUE, group);
    	// 返回所有服务名称
		return services.getData();
	}

	public static List<ServiceInstance> hostToServiceInstanceList(
			List<Instance> instances, String serviceId) {
		List<ServiceInstance> result = new ArrayList<>(instances.size());
		for (Instance instance : instances) {
			ServiceInstance serviceInstance = hostToServiceInstance(instance, serviceId);
			if (serviceInstance != null) {
				result.add(serviceInstance);
			}
		}
		return result;
	}

	public static ServiceInstance hostToServiceInstance(Instance instance,
			String serviceId) {
		if (instance == null || !instance.isEnabled() || !instance.isHealthy()) {
			return null;
		}
		NacosServiceInstance nacosServiceInstance = new NacosServiceInstance();
		nacosServiceInstance.setHost(instance.getIp());
		nacosServiceInstance.setPort(instance.getPort());
		nacosServiceInstance.setServiceId(serviceId);

		Map<String, String> metadata = new HashMap<>();
		metadata.put("nacos.instanceId", instance.getInstanceId());
		metadata.put("nacos.weight", instance.getWeight() + "");
		metadata.put("nacos.healthy", instance.isHealthy() + "");
		metadata.put("nacos.cluster", instance.getClusterName() + "");
		if (instance.getMetadata() != null) {
			metadata.putAll(instance.getMetadata());
		}
		metadata.put("nacos.ephemeral", String.valueOf(instance.isEphemeral()));
		nacosServiceInstance.setMetadata(metadata);

		if (metadata.containsKey("secure")) {
			boolean secure = Boolean.parseBoolean(metadata.get("secure"));
			nacosServiceInstance.setSecure(secure);
		}
		return nacosServiceInstance;
	}

	private NamingService namingService() {
		return nacosServiceManager.getNamingService();
	}

}

接下来,我们分析前面介绍到的两个重要方法:getInstances(serviceId)和getServices()。

二、getServices():获取服务列表

2.1、获取服务列表

// namingService(): 通过反射创建一个NacosNamingService对象
// NamingFactory#createNamingService(java.util.Properties)
public static NamingService createNamingService(Properties properties) throws NacosException {
    try {
        Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
        Constructor constructor = driverImplClass.getConstructor(Properties.class);
        return (NamingService) constructor.newInstance(properties);
    } catch (Throwable e) {
        throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
    }
}
//getServices()调用栈大体如下:
 namingService().getServicesOfServer(1, Integer.MAX_VALUE, group);
    NacosNamingService#getServicesOfServer
        clientProxy.getServiceList(pageNo, pageSize, groupName, selector)
            NamingClientProxyDelegate#getServiceList
                grpcClientProxy.getServiceList(pageNo, pageSize, groupName, selector);

// 最终会调用NamingGrpcClientProxy#getServiceList()方法
public ListView<String> getServiceList(int pageNo, int pageSize, String groupName, AbstractSelector selector)
        throws NacosException {
    // 构建ServiceListRequest请求(服务列表请求),指定命名空间ID、服务组名
    ServiceListRequest request = new ServiceListRequest(namespaceId, groupName, pageNo, pageSize);
    if (selector != null) {
        if (SelectorType.valueOf(selector.getType()) == SelectorType.label) {
            request.setSelector(JacksonUtils.toJson(selector));
        }
    }
    // 发送服务列表请求给Nacos服务端,接下来由服务端处理
    ServiceListResponse response = requestToServer(request, ServiceListResponse.class);
    // 组装返回值出去
    ListView<String> result = new ListView<>();
    result.setCount(response.getCount());
    result.setData(response.getServiceNames());
    return result;
}

接下来,我们看看服务端怎么处理这个服务列表请求的。通过对ServiceListRequest类引用的追踪,我们发现是在com.alibaba.nacos.naming.remote.rpc.handler.ServiceListRequestHandler#handle这个方法中对客户端提交的服务列表请求进行处理的。

// 处理客户端提交的服务列表请求
public ServiceListResponse handle(ServiceListRequest request, RequestMeta meta) throws NacosException {
    // ServiceManager.getInstance()通过单例返回一个ServiceManager对象
   
    /**
     * 获取指定命令空间下的所有服务,在ServiceManager中存在一个map保存着每个命名空间中的所有服务。
     * ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps = new ConcurrentHashMap<>(1 << 2)
     * key: namespaceId
     * value: Set<Service>
     * 注册实例的时候,就往这个map写入了数据
     *
     * ServiceManager.getInstance().getSingletons(request.getNamespace())相当于执行:
     * namespaceSingletonMaps.getOrDefault(namespace, new HashSet<>(1))
     */
    Collection<Service> serviceSet = ServiceManager.getInstance().getSingletons(request.getNamespace());

    // 构建响应结果
    ServiceListResponse result = ServiceListResponse.buildSuccessResponse(0, new LinkedList<>());
    if (!serviceSet.isEmpty()) {
        // 过滤指定分组的Service,添加groupServiceName,格式如:groupA@@serviceA
        Collection<String> serviceNameSet = selectServiceWithGroupName(serviceSet, request.getGroupName());
        // 按分页裁剪serviceNameSet
        List<String> serviceNameList = ServiceUtil
                .pageServiceName(request.getPageNo(), request.getPageSize(), serviceNameSet);
        result.setCount(serviceNameSet.size());
        result.setServiceNames(serviceNameList);
    }
    return result;
}

从源码可以看出,Nacos服务端从ServiceManager管理器中的一个map(namespaceSingletonMaps)中拿出指定命名空间那些Service,并根据筛选条件过滤满足条件的Service,然后组装好groupServiceName(格式如:groupA@@serviceA)并返回。

2.2、总结图

三、getInstances(serviceId):获取服务实例列表 

// 调用栈如下:
// namingService().selectInstances(serviceId, group,true);
	// NamingService#selectInstances(serviceName, groupName, healthy, true)
		// NamingService#selectInstances(serviceName, groupName, new ArrayList<String>(), healthy, true)
// getInstances(serviceId)方法最终会调用NacosNamingService#selectInstances()获取实例信息。
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,
        boolean subscribe) throws NacosException {
    
    ServiceInfo serviceInfo;
    // 集群名称,使用逗号分隔
    String clusterString = StringUtils.join(clusters, ",");

    // 是否订阅,默认是订阅的
    if (subscribe) {
        /**
         * 1.从缓存中获取ServiceInfo
         * ConcurrentMap<String, ServiceInfo> serviceInfoMap
         * key:  groupName@@serviceName  或者  groupName@@serviceName@@clusterString
         * value: ServiceInfo
         */

        // 示例:serviceName=discovery-provider   groupName=DEFAULT_GROUP
        serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
        // 2.缓存为空,执行订阅服务
        if (null == serviceInfo) {
            serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
        }
    } else {
        // 3.非订阅,通过grpc发送ServiceQueryRequest服务查询请求
        serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
    }
    // 4.筛选满足条件的实例
    return selectInstances(serviceInfo, healthy);
}

上述流程的基本逻辑为:

  • 如果是订阅模式,则直接从本地缓存获取服务信息(ServiceInfo),然后从中获取实例列表,这是因为订阅机制会自动同步服务器实例的变化到本地。如果本地缓存中没有,那说明是首次调用,则进行订阅,在订阅完成后会获得到服务信息。
  • 如果是非订阅模式,那就直接请求服务器端,获得服务信息。

3.1、从缓存中获取服务信息

public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {
    NAMING_LOGGER.debug("failover-mode: {}", failoverReactor.isFailoverSwitch());
    // 组装服务名(带组名):groupName@@serviceName
    // 例如:DEFAULT_GROUP@@discovery-provider
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    // 如果指定了集群,那么key还会加上"@@clusters"
    String key = ServiceInfo.getKey(groupedServiceName, clusters);
    if (failoverReactor.isFailoverSwitch()) {
        return failoverReactor.getService(key);
    }
    // ConcurrentMap<String, ServiceInfo> serviceInfoMap
    // 从缓存中获取服务信息
    return serviceInfoMap.get(key);
}

3.2、缓存为空,执行订阅服务

serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);

// clientProxy在构造方法中初始化为:NamingClientProxyDelegate
this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, nacosClientProperties, changeNotifier);

实际上调用的是NamingClientProxyDelegate#subscribe()方法:

public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
    NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters);
    // 服务名称(带组名)  格式:groupName@@serviceName
    String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
    // 如果集群名称非空,key还需要拼接上集群名称
    String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
    // 调度更新,往线程池中提交一个UpdateTask任务
    serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
    // 获取缓存中的服务信息
    ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
    if (null == result || !isSubscribed(serviceName, groupName, clusters)) {
        // 缓存中不存在对应的服务信息 或者 SubscriberRedoData还未注册,则执行订阅
        result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
    }
    // 处理服务信息:获取老的服务信息,将新的服务信息重新存入客户端缓存中,对比新的服务信息,如发生变更,则发布实例变更数据,并同步serviceInfo数据到本地文件
    serviceInfoHolder.processServiceInfo(result);
    return result;
}

主要逻辑有下面三个,分析如下。

3.2.1、调度更新,往线程池中提交一个UpdateTask任务

public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {
    if (!asyncQuerySubscribeService) {
        return;
    }
    // 组装key   格式:groupName@@serviceName@@clusters
    String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
    // Map<String, ScheduledFuture<?>> futureMap = new HashMap<>()
    // futureMap用于保存UpdateTask线程池任务的执行结果
    if (futureMap.get(serviceKey) != null) {
        return;
    }
    synchronized (futureMap) {
        // double check双重检查,如果非空,直接返回,也就是相同的groupName@@serviceName@@clusters,只会存在一个UpdateTask任务
        if (futureMap.get(serviceKey) != null) {
            return;
        }

        // 往线程池中添加一个更新任务
        // UpdateTask实现了Runnable接口,需要关注其run()方法
        ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));
        futureMap.put(serviceKey, future);
    }
}

private synchronized ScheduledFuture<?> addTask(UpdateTask task) {
    // 延迟1s执行UpdateTask
    return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}

可以看到,使用了一个map来保存线程池任务的响应,延迟1s执行调度更新任务。我们看下UpdateTask的源码:

public class UpdateTask implements Runnable {
    
    long lastRefTime = Long.MAX_VALUE;
    
    private boolean isCancel;
    
    private final String serviceName;
    
    private final String groupName;
    
    private final String clusters;
    
    private final String groupedServiceName;
    
    private final String serviceKey;
    
    /**
     * the fail situation. 1:can't connect to server 2:serviceInfo's hosts is empty
     */
    private int failCount = 0;
    
    public UpdateTask(String serviceName, String groupName, String clusters) {
        this.serviceName = serviceName;
        this.groupName = groupName;
        this.clusters = clusters;
        this.groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        this.serviceKey = ServiceInfo.getKey(groupedServiceName, clusters);
    }
    
    @Override
    public void run() {
        long delayTime = DEFAULT_DELAY;
        
        try {
            if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(
                    serviceKey)) {
                NAMING_LOGGER.info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters);
                isCancel = true;
                return;
            }
            
            ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
            if (serviceObj == null) {
                // 使用grpc向服务端发送ServiceQueryRequest服务查询请求
                serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
                // 处理服务信息:获取老的服务信息,将新的服务信息重新存入客户端缓存中,对比新的服务信息,如发生变更,则发布实例变更数据,并同步serviceInfo数据到本地文件
                serviceInfoHolder.processServiceInfo(serviceObj);
                lastRefTime = serviceObj.getLastRefTime();
                return;
            }
            
            if (serviceObj.getLastRefTime() <= lastRefTime) {
                // 使用grpc向服务端发送ServiceQueryRequest服务查询请求
                serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
                // 处理服务信息:获取老的服务信息,将新的服务信息重新存入客户端缓存中,对比新的服务信息,如发生变更,则发布实例变更数据,并同步serviceInfo数据到本地文件
                serviceInfoHolder.processServiceInfo(serviceObj);
            }
            lastRefTime = serviceObj.getLastRefTime();
            if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
                // 记录失败次数
                incFailCount();
                return;
            }
            // TODO multiple time can be configured.
            delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
            // 重置失败次数
            resetFailCount();
        } catch (NacosException e) {
            handleNacosException(e);
        } catch (Throwable e) {
            handleUnknownException(e);
        } finally {
            if (!isCancel) {
                // 注意:延时时间最长为60s,时长和失败次数相关(失败次数越大,延时时间越长)
                executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60),
                        TimeUnit.MILLISECONDS);
            }
        }
    }
    
    private void handleNacosException(NacosException e) {
        incFailCount();
        int errorCode = e.getErrCode();
        if (NacosException.SERVER_ERROR == errorCode) {
            handleUnknownException(e);
        }
        NAMING_LOGGER.warn("Can't update serviceName: {}, reason: {}", groupedServiceName, e.getErrMsg());
    }
    
    private void handleUnknownException(Throwable throwable) {
        incFailCount();
        NAMING_LOGGER.warn("[NA] failed to update serviceName: {}", groupedServiceName, throwable);
    }
    
    private void incFailCount() {
        int limit = 6;
        if (failCount == limit) {
            return;
        }
        failCount++;
    }
    
    private void resetFailCount() {
        failCount = 0;
    }
}

run()方法主要逻辑就是使用grpc向服务端发送ServiceQueryRequest服务查询请求,然后处理服务信息,获取老的服务信息,将新的服务信息重新存入客户端缓存中,对比新的服务信息,如发生变更,则发布实例变更数据,并同步serviceInfo数据到本地文件。

这里有重试机制,最多重试6次,延时时间最长为60s,时长和失败次数相关(失败次数越大,延时时间越长)。

3.2.2、订阅服务 

public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
    if (NAMING_LOGGER.isDebugEnabled()) {
        NAMING_LOGGER.debug("[GRPC-SUBSCRIBE] service:{}, group:{}, cluster:{} ", serviceName, groupName, clusters);
    }
    // 缓存SubscriberRedoData重做数据,定时使用redoData重新订阅,
    // 具体实现在RedoScheduledTask(由NamingGrpcRedoService定时调度),最终调用的也是NamingGrpcClientProxy#doSubscribe

    // 缓存重做数据保存在map中:private final ConcurrentMap<String, SubscriberRedoData> subscribes = new ConcurrentHashMap<>();
    redoService.cacheSubscriberForRedo(serviceName, groupName, clusters);
    // 使用grpc发送服务订阅请求
    return doSubscribe(serviceName, groupName, clusters);
}

public void cacheSubscriberForRedo(String serviceName, String groupName, String cluster) {
    String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), cluster);
    SubscriberRedoData redoData = SubscriberRedoData.build(serviceName, groupName, cluster);
    // private final ConcurrentMap<String, SubscriberRedoData> subscribes = new ConcurrentHashMap<>();
    synchronized (subscribes) {
        subscribes.put(key, redoData);
    }
}

订阅服务首先会缓存SubscriberRedoData重做数据,实际上就是保存在一个map中,后续可以定时使用SubscriberRedoData重做数据来重新订阅,然后使用grpc发送服务订阅请求。

我们来看下如何订阅服务的。

public ServiceInfo doSubscribe(String serviceName, String groupName, String clusters) throws NacosException {
    // 构建一个SubscribeServiceRequest客户端订阅请求
    // 服务端处理代码: com.alibaba.nacos.naming.remote.rpc.handler.SubscribeServiceRequestHandler.handle
    SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters,
            true);
    // grpc请求Nacos服务端进行订阅
    SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);
    // 标记SubscriberRedoData重做数据为已订阅
    redoService.subscriberRegistered(serviceName, groupName, clusters);
    return response.getServiceInfo();
}

通过grpc向Nacos服务端发起一个订阅请求,服务端真正的处理是在:com.alibaba.nacos.naming.remote.rpc.handler.SubscribeServiceRequestHandler#handle()方法。

public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {
    String namespaceId = request.getNamespace();
    String serviceName = request.getServiceName();
    String groupName = request.getGroupName();
    String app = request.getHeader("app", "unknown");
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    // 构建一个Service服务,指定为临时实例
    Service service = Service.newService(namespaceId, groupName, serviceName, true);
    // 构建Subscriber订阅者对象
    Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(),
            namespaceId, groupedServiceName, 0, request.getClusters());
    // serviceStorage.getData(service): 从缓存中获取serviceInfo
    // metadataManager.getServiceMetadata(service).orElse(null): 从内存(map)获取ServiceMetadata
    // ServiceUtil.selectInstancesWithHealthyProtection(): 仅包含有保护机制的健康实例
    ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service),
            metadataManager.getServiceMetadata(service).orElse(null), subscriber.getCluster(), false,
            true, subscriber.getIp());
    if (request.isSubscribe()) {
        // 订阅服务
        clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());
        NotifyCenter.publishEvent(new SubscribeServiceTraceEvent(System.currentTimeMillis(),
                meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));
    } else {
        // 取消订阅服务
        clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());
        NotifyCenter.publishEvent(new UnsubscribeServiceTraceEvent(System.currentTimeMillis(),
                meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));
    }
    return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
}

我们重点关注订阅服务的方法:

public void subscribeService(Service service, Subscriber subscriber, String clientId) {
    Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
    Client client = clientManager.getClient(clientId);
    if (!clientIsLegal(client, clientId)) {
        return;
    }
    // 添加到订阅者列表中,实际上就是保存在map中
    // 订阅者列表: protected final ConcurrentHashMap<Service, Subscriber> subscribers = new ConcurrentHashMap<>(16, 0.75f, 1);
    client.addServiceSubscriber(singleton, subscriber);
    client.setLastUpdatedTime();
    // 发布客户端订阅事件
    NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
}

将service添加到订阅者列表中,然后发布客户端订阅事件,这个在之前分析过,客户端订阅事件是在com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager#handleClientOperation进行处理,

核心逻辑就是将服务添加到ClientServiceIndexesManager的subscriberIndexes订阅者列表中:

private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();

 3.2.3、处理服务信息

public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
    String serviceKey = serviceInfo.getKey();
    if (serviceKey == null) {
        return null;
    }
    // 获取老的服务
    ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
    if (isEmptyOrErrorPush(serviceInfo)) {
        //empty or error push, just ignore
        return oldService;
    }
    // 重新存入客户端缓存中
    serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
    // 对比下服务信息是否发生变更
    boolean changed = isChangedServiceInfo(oldService, serviceInfo);
    if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
        serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
    }
    MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
    if (changed) {
        NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),
                JacksonUtils.toJson(serviceInfo.getHosts()));
        // 如果发生改变,发送实例变更事件,处理源码在:InstancesChangeNotifier
        NotifyCenter.publishEvent(new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),
                serviceInfo.getClusters(), serviceInfo.getHosts()));
        // 同步serviceInfo数据到本地文件
        DiskCache.write(serviceInfo, cacheDir);
    }
    return serviceInfo;
}

3.3、非订阅模式,通过grpc发送ServiceQueryRequest服务查询请求

真正执行的是com.alibaba.nacos.client.naming.remote.NamingClientProxyDelegate#queryInstancesOfService():

public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,
        boolean healthyOnly) throws NacosException {
    return grpcClientProxy.queryInstancesOfService(serviceName, groupName, clusters, udpPort, healthyOnly);
}

public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,
        boolean healthyOnly) throws NacosException {
    // 构建服务查询请求
    // Nacos服务端处理是在:com.alibaba.nacos.naming.remote.rpc.handler.ServiceQueryRequestHandler.handle
    ServiceQueryRequest request = new ServiceQueryRequest(namespaceId, serviceName, groupName);
    request.setCluster(clusters);
    request.setHealthyOnly(healthyOnly);
    request.setUdpPort(udpPort);
    // 通过grpc请求Nacos服务端处理
    QueryServiceResponse response = requestToServer(request, QueryServiceResponse.class);
    return response.getServiceInfo();
}

queryInstancesOfService()核心就是构建了一个服务查询请求,通过grpc请求Nacos服务端,接下来我们直接看服务端的处理代码,具体是在:com.alibaba.nacos.naming.remote.rpc.handler.ServiceQueryRequestHandler#handle。

public QueryServiceResponse handle(ServiceQueryRequest request, RequestMeta meta) throws NacosException {
    String namespaceId = request.getNamespace();
    String groupName = request.getGroupName();
    String serviceName = request.getServiceName();
    Service service = Service.newService(namespaceId, groupName, serviceName);
    String cluster = null == request.getCluster() ? "" : request.getCluster();
    boolean healthyOnly = request.isHealthyOnly();
    // 从缓存中获取serviceInfo
    ServiceInfo result = serviceStorage.getData(service);
    // 从内存(map)获取ServiceMetadata
    ServiceMetadata serviceMetadata = metadataManager.getServiceMetadata(service).orElse(null);
    // 获取有保护机制的健康实例
    result = ServiceUtil.selectInstancesWithHealthyProtection(result, serviceMetadata, cluster, healthyOnly, true,
            meta.getClientIp());
    return QueryServiceResponse.buildSuccessResponse(result);
}

3.4、筛选满足条件的实例 

private List<Instance> selectInstances(ServiceInfo serviceInfo, boolean healthy) {
    List<Instance> list;
    if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
        return new ArrayList<>();
    }

    // 遍历所有实例,直接移除掉不满足条件的实例
    Iterator<Instance> iterator = list.iterator();
    while (iterator.hasNext()) {
        Instance instance = iterator.next();
        // 筛选出健康、启用、权重大于0的实例
        if (healthy != instance.isHealthy() || !instance.isEnabled() || instance.getWeight() <= 0) {
            iterator.remove();
        }
    }
    
    return list;
}

3.5、总结图

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1440381.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【算法】{画决策树 + dfs + 递归 + 回溯 + 剪枝} 解决排列、子集问题(C++)

文章目录 1. 前言2. 算法例题 理解思路、代码46.全排列78.子集 3. 算法题练习1863.找出所有子集的异或总和再求和47.全排列II17.电话号码的字母组合 1. 前言 dfs问题 我们已经学过&#xff0c;对于排列、子集类的问题&#xff0c;一般可以想到暴力枚举&#xff0c;但此类问题用…

基于Chrome插件的Chatgpt对话无损导出markdown格式(Typora完美显示)

刚刚提交插件到Chrome插件商店正在审核&#xff0c;想尝试的可以先使用&#xff1a; https://github.com/thisisbaiy/ChatGPT-To-Markdown-google-plugin/tree/main 我将源代码上传至了GitHub&#xff0c;欢迎star, IssueGoogle插件名称为&#xff1a;ChatGPT to MarkDown plus…

海外云手机——平台引流的重要媒介

随着互联网的飞速发展&#xff0c;跨境电商、短视频引流以及游戏行业等领域正经历着迅猛的更新换代。在这个信息爆炸的时代&#xff0c;流量成为至关重要的资源&#xff0c;而其中引流环节更是关乎业务成功的关键。海外云手机崭露头角&#xff0c;成为这一传播过程中的重要媒介…

【保姆级教程|YOLOv8改进】【7】多尺度空洞注意力(MSDA),DilateFormer实现暴力涨点

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】&#xff0c;共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 《------往期经典推…

Ribbon全方位解析:构建弹性的Java微服务

第1章 引言 大家好,我是小黑,咱们今天聊聊Ribbon,这货是个客户端负载均衡工具,用在Spring Cloud里面能让咱们的服务调用更加灵活和健壮。负载均衡,听起来挺高大上的,其实就是把外界的请求平摊到多个服务器上,避免某个服务器压力太大,其他的却在那儿闲着。 Ribbon的牛…

Springboot整合JUnit5框架

目录 第一章、在pom文件中导入依赖第二章、新建测试类第三章、新建测试方法 友情提醒: 先看文章目录&#xff0c;大致了解文章知识点结构&#xff0c;点击文章目录可直接跳转到文章指定位置。 第一章、在pom文件中导入依赖 SpringBoot2.2x之后的版本中spring-boot-starter-te…

Python(21)正则表达式中的“元字符”

大家好&#xff01;我是码银&#x1f970; 欢迎关注&#x1f970;&#xff1a; CSDN&#xff1a;码银 公众号&#xff1a;码银学编程 获取资源&#xff1a;公众号回复“python资料” 在本篇文章中介绍的是正则表达式中一部分具有特殊意义的专用字符&#xff0c;也叫做“元…

基于51 单片机的交通灯系统 源码+仿真+ppt

主要内容&#xff1a; 1&#xff09;南北方向的绿灯、东西方向的红灯同时亮40秒。 2&#xff09;南北方向的绿灯灭、黄灯亮5秒&#xff0c;同时东西方向的红灯继续亮。 3&#xff09;南北方向的黄灯灭、左转绿灯亮&#xff0c;持续20秒&#xff0c;同时东西方向的红灯继续…

rust语言tokio库底层原理解析

目录 1 rust版本及tokio版本说明1 tokio简介2 tokio::main2.1 tokio::main使用多线程模式2.2 tokio::main使用单线程模式 3 builder.build()函数3.1 build_threaded_runtime()函数新的改变功能快捷键合理的创建标题&#xff0c;有助于目录的生成如何改变文本的样式插入链接与图…

前端JavaScript篇之对执行上下文的理解

目录 对执行上下文的理解创建执行上下文 对执行上下文的理解 当我们在执行JavaScript代码时&#xff0c;JavaScript引擎会创建并维护一个执行上下文栈来管理执行上下文。执行上下文有三种类型&#xff1a;全局执行上下文、函数执行上下文和eval函数执行上下文。 在写代码的时…

第十三、十四个知识点:用javascript获取表单的内容并加密

我们先来写一段代码&#xff1a; <body><form action"#" method"post">//写一个表单<span>用户名&#xff1a;</span><input type"text" id"username" name"username"><span>密码&a…

BGP 双归不同运营商并且客户之间互为主备的部署实验

一、拓朴&#xff1a; 要求&#xff1a; 1、双方 ISP 均不得将客户 AS 做为穿越 AS 2、对于客户业务的出流量&#xff1a;客户 AS100 和 200 访问 ISP 时&#xff0c;AS100优选从 Line-1 线路&#xff0c;AS200 优选从 Line-2 访问&#xff0c;但当 Line-1 和 …

Springboot+vue的社区智慧养老监护管理平台设计与实现(有报告),Javaee项目,springboot vue前后端分离项目

演示视频&#xff1a; Springbootvue的社区智慧养老监护管理平台设计与实现&#xff08;有报告&#xff09;&#xff0c;Javaee项目&#xff0c;springboot vue前后端分离项目 项目介绍&#xff1a; 本文设计了一个基于Springbootvue的前后端分离的社区智慧养老监护管理平台设…

springboot项目启动报错:dynamic-datasource can not find primary datasource

项目启动报错信息 Caused by: com.baomidou.dynamic.datasource.exception.CannotFindDataSourceException: dynamic-datasource can not find primary datasourceat com.baomidou.dynamic.datasource.DynamicRoutingDataSource.determinePrimaryDataSource(DynamicRoutingDat…

Prime(VulnHub)

Prime 文章目录 Prime1、nmap2、web渗透随便看看首页隐写查看目录爆破gobusterferoxbusterdirsearchdirb whatwebsearchsploit WordPress 5.2.2/dev/secret.txtFuzz_For_Webwfuzzimage.phpindex.php location.txtsecrettier360文件包含漏洞包含出password.txt尝试ssh登入尝试登…

AD9361多片同步设计方法

本文基于ZC706FMCOMMS5的平台&#xff0c;介绍了多片AD9361同步的方法。并将该设计移植到自行设计的ZYNQ70354片AD9361(实现8路同步收发)的电路板上。本设计采用纯逻辑的方式&#xff0c;仅使用了ZYNQ芯片的PL部分。 9361多芯片同步主要包括基带同步和射频同步两大块任务。其中…

idea自带的HttpClient使用

1. 全局变量配置 {"local":{"baseUrl": "http://localhost:9001/"},"test": {"baseUrl": "http://localhost:9002/"} }2. 登录并将结果设置到全局变量 PostMapping("/login")public JSONObject login(H…

Mac电脑到手后的配置

一、Homebrew 1、Homebrew安装 /bin/zsh -c "$(curl -fsSL https://gitee.com/cunkai/HomebrewCN/raw/master/Homebrew.sh)" 桌面的Old_Homebrew文件夹&#xff0c;没有你需要的可以删除。 2、Homebrew卸载 /bin/zsh -c "$(curl -fsSL https://gitee.com/c…

2023年ABC123公众号年刊下载(PDF电子书)

Part1 前言 大家好&#xff0c;我是ABC_123。2023年公众号正式更名为"希潭实验室"。除了分享日常红队攻防、渗透测试技术文章之外&#xff0c;重点加强了APT案例分析方面的内容。公众号关注度得到进一步提升&#xff0c;关注人数已达到3万5千人。原计划在2023年编写…

【FPGA开发】Modelsim和Vivado的使用

本篇文章包含的内容 一、FPGA工程文件结构二、Modelsim的使用三、Vivado的使用3.1 建立工程3.2 分析 RTL ANALYSIS3.2.1 .xdc约束&#xff08;Constraints&#xff09;文件的产生 3.3 综合 SYNTHESIS3.4 执行 IMPLEMENTATION3.5 烧录程序3.6 程序固化3.6.1 SPI约束3.6.2 .bin文…