Nacos2.X源码分析:服务注册、服务发现流程

news2024/11/17 12:28:30

文章目录

  • Nacos2.1.X源码
    • 源码下载
    • 服务注册
      • NacosClient端
      • NacosServer端
    • 服务发现
      • NacosClient端
      • NacosServer端

Nacos2.1.X源码

源码下载

源码下载地址



服务注册

官方文档,对于NamingService接口服务注册方法的说明

Nacos2.X 服务注册总流程图
在这里插入图片描述



NacosClient端

一个小变动,Nacos1.X版本,在spring.factories文件中服务注册相关的bean是在NacosDiscoveryAutoConfiguration这个自动配置类中的,而2.X版本改到了NacosServiceRegistryAutoConfiguration配置类中,当然调用流程没有改,还是父类监听事件 --> bind() --> start() --> register() -->registerInstance()

这里直接就从NamingService接口的registerInstance(...)方法开始

@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    NamingUtils.checkInstanceIsLegal(instance);
    // 调用NacosServer端,发送服务注册, Nacos2.X采用的是grpc方式请求,所以这里是去NamingClientProxyDelegate实现类的方法中
    clientProxy.registerService(serviceName, groupName, instance);
}



// 补充说明,对上面一行代码的说明
// 如果在看源码时,遇到了一个方法,实在是不知道应该去看哪一个实现类,也不能debug的情况下,那么就去找调用方定义的位置,比如这里
public class NacosNamingService implements NamingService {
    // clientProxy变量定义的位置
    private NamingClientProxy clientProxy;

    private void init(Properties properties) throws NacosException {
        ...
            // 该变量赋值的位置,也就定位到了原来是要去NamingClientProxyDelegate这个实现类中看方法
            this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);
    }

    ...

        @Override
        public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        NamingUtils.checkInstanceIsLegal(instance);
        // 所以这里是去NamingClientProxyDelegate实现类的方法中
        clientProxy.registerService(serviceName, groupName, instance);
    }
}

NamingClientProxyDelegate类的registerService(...)方法中会判断当前要注册的实例是否为临时实例,如果是临时实例就使用grpc的方式请求NacosService,如果是持久化实例就还是使用http的方式请求,一般情况下都是临时实例,所以会采用grpc的方式调用

现在进入到了NamingGrpcClientProxy.registerService(...)方法中了

看完了下面的方法,我们知道这里其实:

  • 构建的一个InstanceRedoData对象,存入一个registeredInstances集合中;

  • 直接发送了一个grpc请求,请求参数是InstanceRequest;

  • 请求发送完后修改registeredInstances集合中InstanceRedoData对象的一个属性值

  • 此时是不是有一个疑问,我的instance怎么没有定时发送心跳的任务嘞?

@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName,
                       instance);
    // 构建的一个InstanceRedoData对象,存入一个registeredInstances集合中
    redoService.cacheInstanceForRedo(serviceName, groupName, instance);
    // 进行服务注册
    doRegisterService(serviceName, groupName, instance);
}


public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
    // NacosClient这边会封装为一个请求对象,
    // 我们这里可以利用InstanceRequest这个类名去NacosServer端找该调用接口具体实现位置,一般的命名就是后面加一个Handler进行拼接
    InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
                                                  NamingRemoteConstants.REGISTER_INSTANCE, instance);
    // 使用grpc的方式发送请求
    requestToServer(request, Response.class);
    // 这里就从registeredInstances集合中取出上面构建的InstanceRedoData对象,并把它的registered属性设置为true
    redoService.instanceRegistered(serviceName, groupName);
}


// 真正发送grpc请求方法
private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass)
    throws NacosException {
    try {
        request.putAllHeader(getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));
        
        // 通过rpcClient发送请求,更具体的实现就没必要去看了,都是grpc相关的内容了
        Response response = requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);
        
        if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {
            throw new NacosException(response.getErrorCode(), response.getMessage());
        }
        if (responseClass.isAssignableFrom(response.getClass())) {
            return (T) response;
        }
        NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'",
                            response.getClass().getName(), responseClass.getName());
    } catch (Exception e) {
        throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);
    }
    throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");
}



NacosServer端

在Nacos1.X版本时,我们能够根据NacosClient端发送的请求url去找controller,比如http请求的url为/v1/ns/instance,那么就找InstanceController

在Nacos2.X版本中使用了grpc,我们可以根据发送的请求对象类名去找,比如上面NacosClient端使用grpc发送的请求对象是InstanceRequest,那么我们就可以去NacosServer端找InstanceRequestHandler这种直接在请求类名后面拼接一个handler的类。



现在进入到InstanceRequestHandler

@Component
public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {
    
    private final EphemeralClientOperationServiceImpl clientOperationService;
    
    public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {
        this.clientOperationService = clientOperationService;
    }
    
    @Override
    @Secured(action = ActionTypes.WRITE)
    public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
        // 创建一个service对象,这里和Nacos1.X有一些小变动,
        // Nacos1.X的Service对象中能同时保存持久化实例和非持久化实例集合
        // Nacos2.X的Service对象只能保存一种了,要么该service对应持久化实例,要么就对应非持久化实例
        Service service = Service
                .newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
        // 判断请求类型,是注册实例还是注销实例,进而调用对应的方法
        switch (request.getType()) {
            case NamingRemoteConstants.REGISTER_INSTANCE:
                // 调用 下方 注册服务的方法
                return registerInstance(service, request, meta);
            case NamingRemoteConstants.DE_REGISTER_INSTANCE:
                return deregisterInstance(service, request, meta);
            default:
                throw new NacosException(NacosException.INVALID_PARAM,
                        String.format("Unsupported request type %s", request.getType()));
        }
    }
    
    private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {
        // 注册实例,会进入到service层的registerInstance()方法
        clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
        return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
    }
    
    private InstanceResponse deregisterInstance(Service service, InstanceRequest request, RequestMeta meta) {
        // 注销实例,会进入到service层的deregisterInstance()方法
        clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());
        return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE);
    }
    
}



Service层的deregisterInstance()方法:

  • 将service添加进singletonRepository<Service, Service>namespaceSingletonMaps<namespace, Set<Service>>集合中,

  • 从singletonRepository集合中取出最先添加的service

  • 根据clientId取出Client

  • 将service与instance添加至Client对象中的publishers<Service, InstancePublishInfo>集合中

  • 发布三个事件:ClientChangedEvent、ClientRegisterServiceEvent、InstanceMetadataEvent

此时是不是有一个疑问:service是存在多个instance的,此时service保存在一边的集合中,而instance又保存在Client对象内的集合中。那怎么没有service与多个instance之间的对应关系嘞?或者会不会有一个Map<service,Set<Client>> 这样的集合存在嘞?

public void registerInstance(Service service, Instance instance, String clientId) {
    // 把service存入singletonRepository集合 如果不存在的前提下,并存入namespaceSingletonMaps集合中该服务命名空间对应的set集合中
    // 从singletonRepository集合中取出最先添加的service,因为之后新创建的service不会添加进singletonRepository集合中
    Service singleton = ServiceManager.getInstance().getSingleton(service);
    if (!singleton.isEphemeral()) {
        throw new NacosRuntimeException(...);
    }
    // clientId就是NacosClient发送请求时传递的一个connectionId 。client对象存放的就是NacosClient客户端相关的信息
    // 对于临时实例来说,getClient()就是从clients集合中取,返回的是IpPortBasedClient对象
    // 客户端与服务端建立连接之后,服务端就会生成一个Client对象,服务端会通过客户端传过来的connectionId来找到对应的Client对象
    Client client = clientManager.getClient(clientId);
    // 对客户端进行一些校验
    if (!clientIsLegal(client, clientId)) {
        return;
    }
    // 把客户端封装的instance对象 转换为 服务端这边的instance对象
    InstancePublishInfo instanceInfo = getPublishInfo(instance);
    // 将service以及instanceInfo对象保存至 publishers <Service, InstancePublishInfo>  这个Map集合中
    // 同时发布一个ClientChangedEvent事件
    client.addServiceInstance(singleton, instanceInfo);
    // 设置最后修改时间
    client.setLastUpdatedTime();
    // 再发布两个事件ClientRegisterServiceEvent、InstanceMetadataEvent
    NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
    NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}


// 接下来就是上面方法中调用的一些其他方法的代码
------------------------------------------------------------------------------------------------------------------------
/**
* 两个集合保存的数据:
*      singletonRepository <Service, Service>
*      namespaceSingletonMaps  ConcurrentHashMap<namespace, Set<Service>>
*/
public Service getSingleton(Service service) {
    // 如果singletonRepository集合中没有当前service,那么就存进该集合中
    singletonRepository.putIfAbsent(service, service);
    // 从集合中取出service,这里有两种情况,
    // 1. 如果上面singletonRepository集合中刚开始不存在当前service,那么这里获取的就是方法参数中传过来的对象,
    // 2. 如果上面singletonRepository集合中之前就已经存在了该service,那么这里获取的就是之前存入该集合的service对象
    Service result = singletonRepository.get(service);
    // 该service的命名空间是否在namespaceSingletonMaps集合中存在,如果不存在则创建一个set集合
    namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), (namespace) -> new ConcurrentHashSet<>());
    // 将service添加进namespaceSingletonMaps集合中命名空间对应的set集合中
    namespaceSingletonMaps.get(result.getNamespace()).add(result);
    return result;
} 

------------------------------------------------------------------------------------------------------------------------
    public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
    // parseToHealthCheckInstance() 把服务注册时生成的 InstancePublishInfo 对象 转换为 HealthCheckInstancePublishInfo类型的对象
    return super.addServiceInstance(service, parseToHealthCheckInstance(instancePublishInfo));
}

@Override
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
    
    // 将service与instance添加至Client对象中的publishers<Service, InstancePublishInfo>集合中
    if (null == publishers.put(service, instancePublishInfo)) {
        MetricsMonitor.incrementInstanceCount();
    }
    // 发布ClientChangedEvent事件
    NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
    Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
    return true;
}



至此,NacosClient端请求NacosServer端进行服务注册的一次请求就结束了,但NacosServer端真正处理的逻辑还没有结束,接下来要去看监听事件是如何进行处理的。在idea中使用ctrl+shift+R进行全文搜索,看看某个事件对象到底是在哪里进行处理的。我们这里先看核心的ClientRegisterServiceEvent事件

ClientServiceIndexesManager.onEvent(...),该方法:

  • 进一步判断事件类型,去调用各个事件向对应的方法
  • 在服务注册对应的方法中,将service和clientid存入publisherIndexes<Service, Set<clientId>>集合中
  • 发布一个ServiceChangedEvent事件
@Component
public class ClientServiceIndexesManager extends SmartSubscriber {
    ...
    @Override
    public void onEvent(Event event) {
        // 根据两类事件去调用对应的方法,再进行更细致的判断
        if (event instanceof ClientEvent.ClientDisconnectEvent) {
            handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
        } else if (event instanceof ClientOperationEvent) {
            // 注册服务、注销服务、订阅服务、取消订阅,相关的事件
            handleClientOperation((ClientOperationEvent) event);
        }
    }
    
    ...
    
    private void handleClientOperation(ClientOperationEvent event) {
        Service service = event.getService();
        String clientId = event.getClientId();
        if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
            // 注册服务
            addPublisherIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
            // 注销服务
            removePublisherIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
            // 订阅服务
            addSubscriberIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
            // 取消订阅
            removeSubscriberIndexes(service, clientId);
        }
    }
    
    //服务注册
    private void addPublisherIndexes(Service service, String clientId) {
        // 将service和clientid存入publisherIndexes<Service, Set<clientId>>集合中
        // 也就是此集合保证了service和多个instance之间的对应关系
        publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
        publisherIndexes.get(service).add(clientId);
        // 发布一个ServiceChangedEvent事件
        NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
    }
    
    ...
}



其实到现在为止,服务注册的主要功能已经实现了,已经将service和instance保存在内存中了。

一个集合专门存放service,singletonRepository <Service, Service> 集合

一个命名空间下所有的service,namespaceSingletonMaps <namespace, Set<Service>>集合

一个集合存放Client,clients <clientId, IpPortBasedClient>集合

Client对象中的publishers <Service, InstancePublishInfo>集合

service和Client的绑定publisherIndexes <Service, Set<clientId>>集合

而对于这里又发布的ServiceChangedEvent事件,其实从名字就能看出来这个一个服务改变的事件,那么我们就可以大胆猜一下这之后的处理逻辑很大概率就是把最新的改动推送给其他位置,其他位置进行更新。

所以我们这里先不继续往下了。



服务发现

Nacos2.X 服务发现在线流程图

在这里插入图片描述



NacosClient端

NamingService接口中,它其中有两类方法是用来获取服务实例的:

  • getAllInstances(...) 获取全部实例
  • selectInstances(...) 根据条件获取过滤后的实例列表。可以获取健康或不健康的服务实例

我们主要看NamingService.selectInstances(...)方法的实现逻辑

该方法的逻辑:

  • 先查本地缓存serviceInfoMap <key, ServiceInfo>,将查询的服务实例进行健康与不健康的筛选
  • 查询本地缓存如果没有查询到,那么就会调用NamingClientProxyDelegate.subscribe(..)方法,该方法会开启一个定时任务去NacosServer端拉取,并更新本地缓存。除此之外还会向NacosServer端发送一个grpc请求
@Override
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,
                                      boolean subscribe) throws NacosException {

    ServiceInfo serviceInfo;
    String clusterString = StringUtils.join(clusters, ",");
    // subscribe 是否订阅,默认值是true
    if (subscribe) {
        // 先从本地缓存serviceInfoMap中取
        serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
        // 如果本地缓存中没有我们需要的服务实例列表信息,那么就向NacosServer端发送一个grpc请求,进行获取服务实例列表数据
        // 这里还会开启一个任务,每隔一段时间向Nacos拉取服务实例信息
        if (null == serviceInfo) {
            serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
        }
    } else {
        serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
    }
    // 移除与healthy不相同的服务实例,比如healthy为true表示我只要健康的服务实例,那么就要移除不健康实例
    return selectInstances(serviceInfo, healthy);
}

查询本地缓存的方法

public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {
    NAMING_LOGGER.debug("failover-mode: {}", failoverReactor.isFailoverSwitch());
    // groupName + @@ + serviceName
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    // 通过 grouped、ServiceName、clusters 生成一个key
    String key = ServiceInfo.getKey(groupedServiceName, clusters);
    if (failoverReactor.isFailoverSwitch()) {
        return failoverReactor.getService(key);
    }
    // 根据这个key去本地缓存serviceInfoMap中去找
    return serviceInfoMap.get(key);
}



NamingClientProxyDelegate.subscribe(..)方法,该方法会开启一个定时任务去NacosServer端拉取,并更新本地缓存。除此之外还会向NacosServer端发送一个grpc请求

public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
    NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters);
    // 生成key
    String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
    String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
    // 添加一个延迟1s执行任务,该任务会定期想NacosServer端拉取服务实例信息
    // 该定时任务和Nacos1.X版本一样,没什么改动,最多就是http变为了grpc
    serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
    // 从本地缓存中取
    ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
    // 如果没有就发送grpc请求,因为上面的延迟任务会延迟1s执行,所以这里从本地缓存中取不到数据
    if (null == result || !isSubscribed(serviceName, groupName, clusters)) {
        result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
    }
    // 将查询的数据存入本地缓存中
    serviceInfoHolder.processServiceInfo(result);
    return result;
}

// ----------------------------------------------------------------------------------------
// 首先看 定时向NacosServer端拉取服务实例的实现
public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {
    String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
    if (futureMap.get(serviceKey) != null) {
        return;
    }
    synchronized (futureMap) {
        if (futureMap.get(serviceKey) != null) {
            return;
        }

        // 添加一个延迟1s执行任务,该任务会定期想NacosServer端拉取服务实例信息
        ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));
        futureMap.put(serviceKey, future);
    }
}


@Override
public void run() {
    long delayTime = DEFAULT_DELAY;

    try {
        if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(
            serviceKey)) {
            NAMING_LOGGER.info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters);
            isCancel = true;
            return;
        }

        // 从本地缓存中取,如果取不到那么就会通过queryInstancesOfService()方法去向NacosServer端发送请求查询服务实例列表
        ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
        if (serviceObj == null) {
            // 发请求
            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
            // 将结果保存本地缓存
            serviceInfoHolder.processServiceInfo(serviceObj);
            lastRefTime = serviceObj.getLastRefTime();
            return;
        }

        // 如果本地缓存中的ServiceInfo对象的lastRefTime属性 小于等于了 lastRefTime也要发送请求
        // 所以大部分情况下 下面的if都会满足,因为每一次发送获取服务实例列表的请求后都会更新lastRefTime的值,下一次执行该任务这个就会相等
        // 那么就又要继续发送请求
        if (serviceObj.getLastRefTime() <= lastRefTime) {
            // 发请求
            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
            // 将结果保存本地缓存
            serviceInfoHolder.processServiceInfo(serviceObj);
        }
        // 更新lastRefTime值
        lastRefTime = serviceObj.getLastRefTime();
        if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
            incFailCount();
            return;
        }
        // TODO multiple time can be configured.
        delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
        // 刷新失败次数
        resetFailCount();
    } catch (Throwable e) {
        incFailCount();
        NAMING_LOGGER.warn("[NA] failed to update serviceName: {}", groupedServiceName, e);
    } finally {
        // 嵌套调用自己,6s <= 间隔时间 <= 60s,间隔时间和失败次数有关
        if (!isCancel) {
            executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60),TimeUnit.MILLISECONDS);
        }
    }
}

public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,
                                           boolean healthyOnly) throws NacosException {
    // 定时任务,查询服务实例列表的请求对象ServiceQueryRequest
    ServiceQueryRequest request = new ServiceQueryRequest(namespaceId, serviceName, groupName);
    request.setCluster(clusters);
    request.setHealthyOnly(healthyOnly);
    request.setUdpPort(udpPort);
    // 发送请求
    QueryServiceResponse response = requestToServer(request, QueryServiceResponse.class);
    return response.getServiceInfo();
}


//----------------------------------------------------------------------------------------
// 接下来看NamingClientProxyDelegate.subscribe(..)方法中,直接发送的grpc请求
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
    if (NAMING_LOGGER.isDebugEnabled()) {
        NAMING_LOGGER.debug("[GRPC-SUBSCRIBE] service:{}, group:{}, cluster:{} ", serviceName, groupName, clusters);
    }
    redoService.cacheSubscriberForRedo(serviceName, groupName, clusters);
    // 向NacosServer发送一个grpc请求,拉取服务实例列表信息,
    return doSubscribe(serviceName, groupName, clusters);
}

public ServiceInfo doSubscribe(String serviceName, String groupName, String clusters) throws NacosException {
    // 封装请求对象SubscribeServiceRequest
    SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters,true);
    // 发送grpc请求
    SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);
    redoService.subscriberRegistered(serviceName, groupName, clusters);
    return response.getServiceInfo();
}



NacosServer端

上面发送grpc请求对象是SubscribeServiceRequest,所以我们直接在NacosServer端找SubscribeServiceRequestHandler

下方handle方法处理逻辑为:

  • 获取请求参数
  • 将请求参数封装为一个service对象,该对象是被订阅方
  • 创建一个Subscriber对象,该对象是订阅方
  • 通过调用serviceStorage.getData(service)方法获取到ServiceInfo对象,ServiceInfo对象中包含了instance集合
  • 进行订阅的处理逻辑
@Component
public class SubscribeServiceRequestHandler extends RequestHandler<SubscribeServiceRequest, SubscribeServiceResponse> {

    ...

    @Override
    @Secured(action = ActionTypes.READ)
    public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {
        // 获取请求参数
        String namespaceId = request.getNamespace();
        String serviceName = request.getServiceName();
        String groupName = request.getGroupName();
        String app = request.getHeader("app", "unknown");
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        // 将请求参数封装为一个service对象
        Service service = Service.newService(namespaceId, groupName, serviceName, true);
        // 创建一个Subscriber对象,其中存放订阅方的ip信息,以及订阅哪一个service的信息
        Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(),
                                               namespaceId, groupedServiceName, 0, request.getClusters());
        // 响应给客户端的数据,核心方法就是getData(service)
        ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service),
                                                metadataManager.getServiceMetadata(service).orElse(null), subscriber);
        // 是否订阅
        if (request.isSubscribe()) {
            // 查看订阅的逻辑
            clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());
        } else {
            clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());
        }
        return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
    }
}



我们首先来看serviceStorage.getData(service)方法获取到ServiceInfo对象,看看该方法的处理逻辑

  • 查询serviceDataIndexes缓存,是否有我们要查询service对应的ServiceInfo对象
  • 如果缓存中没有那就创建一个新的ServiceInfo对象,并去服务注册表中找该服务所有的实例
  • 先根据service,从publisherIndexes <Service, Set<clientId>>集合中找出所有的clientId
  • 在遍历clientId,根据clientId从clients <clientId, IpPortBasedClient>集合中找对应的Client对象
  • 再从Client对象中publishers <Service, InstancePublishInfo>集合,找出InstancePublishInfo对象
  • 再把InstancePublishInfo转换为instance对象,最后得到Set<Instance>集合,存入ServiceInfo对象中
  • 在上面的过程中,会创建一个service与cluster的对应关系集合 serviceClusterIndex <Service, Set<clusters>>
  • 将serviceInfo对象存入缓存serviceDataIndexes <Service, ServiceInfo>
public ServiceInfo getData(Service service) {
    // serviceDataIndexes集合缓存中是否有我们要查询的service,如果有就直接返回,如果没有就调用getPushData()方法
    // serviceDataIndexes就相当于是注册表的一份缓存数据,实现了读写分离,服务注册时添加 publisherIndexes <Service, Set<clientId>>集合
    // 服务发现时读取serviceDataIndexes <Service, ServiceInfo>集合
    return serviceDataIndexes.containsKey(service) ? serviceDataIndexes.get(service) : getPushData(service);
}

public ServiceInfo getPushData(Service service) {
    // 通过service对象构建出一个ServiceInfo对象
    ServiceInfo result = emptyServiceInfo(service);
    if (!ServiceManager.getInstance().containSingleton(service)) {
        return result;
    }
    // hosts属性就是instance集合,所以核心看getAllInstancesFromIndex()关键方法
    result.setHosts(getAllInstancesFromIndex(service));
    // 存入缓存中  serviceDataIndexes <Service, ServiceInfo>  ServiceInfo中存的是List<Instance>
    serviceDataIndexes.put(service, result);
    return result;
}

private List<Instance> getAllInstancesFromIndex(Service service) {
    Set<Instance> result = new HashSet<>();
    Set<String> clusters = new HashSet<>();
    // 遍历service对应的所有clientId
    for (String each : serviceIndexesManager.getAllClientsRegisteredService(service)) {
        // 从client对象中取出instance封装之后的InstancePublishInfo对象
        Optional<InstancePublishInfo> instancePublishInfo = getInstanceInfo(each, service);
        if (instancePublishInfo.isPresent()) {
            // 得到instance对象,添加进集合
            Instance instance = parseInstance(service, instancePublishInfo.get());
            result.add(instance);
            clusters.add(instance.getClusterName());
        }
    }
    // cache clusters of this service
    // 存入缓存  serviceClusterIndex  <Service, Set<clusters>>
    serviceClusterIndex.put(service, clusters);
    return new LinkedList<>(result);
}



我们接下来再来看订阅相关的处理逻辑,这里会调用到EphemeralClientOperationServiceImpl.subscribeService(..)方法中,该方法业务逻辑:

  • singletonRepository <Service, Service> 集合中取出被订阅方的service对象
  • 在根据clientId获取到Client对象,它代表着订阅方
  • 将service和subscriber 保存在Client对象中的subscribers <Service, Subscriber> 集合中
  • 发布ClientSubscribeServiceEvent事件
public void subscribeService(Service service, Subscriber subscriber, String clientId) {
    // 从singletonRepository这个集合中把service对象取出来,如果集合中不存在的话那就用新创建的
    // 这个singleton 是被订阅方
    Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
    // 获取Client,是订阅方
    Client client = clientManager.getClient(clientId);
    if (!clientIsLegal(client, clientId)) {
        return;
    }
    // 将service和subscriber 保存在Client对象中的subscribers <Service, Subscriber> 集合中
    // 表示value service 订阅了 key service
    client.addServiceSubscriber(singleton, subscriber);
    // 修改最后更新时间
    client.setLastUpdatedTime();
    // 发布ClientSubscribeServiceEvent事件
    NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
}

处理该事件的位置和处理服务注册的位置是一样的,都是ClientServiceIndexesManager.onEvent(...),方法:

  • 和服务注册的处理逻辑一样,创建一个集合,将service和多个订阅方进行绑定subscriberIndexes <Service, Set<subscriberClientId>>
  • 发布ServiceSubscribedEvent事件
@Component
public class ClientServiceIndexesManager extends SmartSubscriber {
    ...
        @Override
        public void onEvent(Event event) {
        // 根据两类事件去调用对应的方法,再进行更细致的判断
        if (event instanceof ClientEvent.ClientDisconnectEvent) {
            handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
        } else if (event instanceof ClientOperationEvent) {
            // 注册服务、注销服务、订阅服务、取消订阅,相关的事件
            handleClientOperation((ClientOperationEvent) event);
        }
    }

    ...

        private void handleClientOperation(ClientOperationEvent event) {
        Service service = event.getService();
        String clientId = event.getClientId();
        if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
            // 注册服务
            addPublisherIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
            // 注销服务
            removePublisherIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
            // 订阅服务
            addSubscriberIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
            // 取消订阅
            removeSubscriberIndexes(service, clientId);
        }
    }

    private void addSubscriberIndexes(Service service, String clientId) {
        // 服务订阅的处理
        // 这里会有一个集合  subscriberIndexes <Service, Set<subscriberClientId>>
        // 把这个service和多个订阅方进行绑定
        subscriberIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
        // Fix #5404, Only first time add need notify event.
        if (subscriberIndexes.get(service).add(clientId)) {
            // 订阅方的clientId添加完后就会发布一个ServiceSubscribedEvent事件,此时订阅的主流程已经结束
            NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId));
        }
    }

    ...
}



在服务发现过程中出现的集合:

服务注册表的副本,serviceDataIndexes <Service, ServiceInfo> ServiceInfo中存的是List<Instance>

服务与cluster的对应关系 serviceClusterIndex <Service, Set<clusters>>

Client对象中的subscribers <Service, Subscriber> 集合,保存着Subscriber订阅了Service服务

service和多个订阅方进行绑定 subscriberIndexes <Service, Set<subscriberClientId>>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1909043.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2024年浙江省高考分数一分一段数据可视化

下图根据 2024 年浙江高考一分一段表绘制&#xff0c;可以看到&#xff0c;竞争最激烈的分数区间在620分到480分之间。 不过&#xff0c;浙江是考两次取最大&#xff0c;不是很有代表性。看看湖北的数据&#xff0c;580分到400分的区段都很卷。另外&#xff0c;从这个图也可以…

【Mac】Folder Icons for mac(文件夹个性化图标修改软件)软件介绍

软件介绍 Folder Icons for Mac 是一款专为 macOS 设计的应用程序&#xff0c;主要用于个性化和定制你的文件夹图标。以下是它的主要特点和使用方法&#xff1a; 主要特点&#xff1a; 个性化文件夹图标 Folder Icons for Mac 允许用户为 macOS 上的任何文件夹定制图标。你…

k8s集群如kubeadm init和kube-flannel.yam问题

查看k8s中角色内容kubectl get all (显示pod和server以及delment) 删除应用资源选择删除先删除部署查看部署和pod没了服务还在&#xff0c;但资源和功能以及删除&#xff0c;删除服务kubectl delete 服务名&#xff08;部署名&#xff09;&#xff0c;get pods 获取默认空间的容…

Android C++系列:Linux进程(二)

1. fork #include <unistd.h> pid_t fork(void);子进程复制父进程的0到3g空间和父进程内核中的PCB,但id号不同。 fork调用一次返回两次 父进程中返回子进程ID子进程中返回0读时共享,写时复制#include <sys/types.h> #include <unistd.h> #include <…

高颜值官网(4):酒店民宿网站12个,看着看着就醉了。

对于高星级酒店或者高端酒店来说&#xff0c;拥有一个高颜值的官方网站是非常重要的。一个精美、专业的网站设计可以有效地展现酒店的品牌形象和服务质量&#xff0c;吸引目标客户群体并提高预订转化率。 这次分享12个&#xff0c;都是超高颜值的。

机器学习中的可解释性

「AI秘籍」系列课程&#xff1a; 人工智能应用数学基础 人工智能Python基础 人工智能基础核心知识 人工智能BI核心知识 人工智能CV核心知识 为什么我们需要了解模型如何进行预测 我们是否应该始终信任表现良好的模型&#xff1f;模型可能会拒绝你的抵押贷款申请或诊断你患…

break 和 continue 的区别与用法

break 和 continue 的区别与用法 1、break 语句2、continue 语句3、总结 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; 在JAVA中&#xff0c;break 和 continue 是两种常用的控制流语句&#xff0c;它们主要用于在循环结构中改变程序的执行…

怎样卸载电脑上自带的游戏?

卸载电脑上自带的游戏通常是一个简单的过程&#xff0c;以下是几种常见的方法&#xff0c;您可以根据自己的操作系统版本选择相应的步骤进行操作&#xff1a; 方法一&#xff1a;通过“设置”应用卸载&#xff08;适用于Windows 10和Windows 11&#xff09; 1. 点击开始菜单&…

【链表】- 链表相交

1. 对应力扣题目连接 链表相交 2. 实现思路 链表详情&#xff1a; 考虑使用双指针&#xff1a; 解法一&#xff1a; 具体代码&#xff0c;详见3. 实现案例代码解析&#xff1a; 思路&#xff1a;因为链表按照如图的箭头走向&#xff0c;走的总路程是相等的&#xff0c;一…

泰迪智能科技受邀北京物资学院共讨校企合作交流

为落实“访企拓岗促就业”专项行动工作要求&#xff0c;推动科研成果向实际应用转化&#xff0c;培养适应新时代需求的高素质人才&#xff0c;拓宽毕业生就业渠道&#xff0c;提升就业竞争力。7月1日&#xff0c;广东泰迪智能科技股份有限公司区域总监曹玉红到访北京物资学院开…

两张图片合并(右上角添加水印,兼容矢量图)保留原来的颜色

无缝合并两张图片&#xff08;封面右上角添加logo&#xff09;-- opencv &#xff1a; 进行添加logo(水印)由于使用了cv2.seamlessClone&#xff0c;cv2.seamlessClone使用了泊松克隆&#xff08;Poisson Cloning&#xff09;&#xff0c;会根据周围的颜色信息进行颜色调整&…

基于单片机的饲料搅拌机控制系统设计

摘要 &#xff1a; 文章主要从软件和硬件两个部分对基于单片机的饲料搅拌机控制系统进行研究设计 。 硬件部分主要由传感器模块 、 信号采集模块、 键盘接入模块 、 LED 显示模块 、 继电器模块以及看门狗模块组成 。 软件部分在 KeilC51 软件基础上重点对控制系统主程序 、…

Android - 云游戏本地悬浮输入框实现

一、简述 云游戏输入法分两种情况,以云化原神为例,分为 云端输入法 和 本地输入法,运行效果如下: 云端输入法本地输入法云端输入法 就是运行在云端设备上的输入法,对于不同客户端来说(Android、iPhone),运行效果一致。 本地输入法 则是运行在用户侧设备上的输入法,对…

sizeof跟strlen的用法及差异

sizeof是一个操作符&#xff0c;不是函数&#xff1b; 而strlen是一个库函数&#xff1b; sizeof是计算所占内存空间的&#xff0c;不管你内容是什么&#xff0c;只要知道占多少内存&#xff0c; 而strlen是跟内容有关的&#xff0c;它是计算字符串长度的&#xff08;字符数…

vue学习day01-vue的概念、创建Vue实例、插值表达式、响应式、安装Vue开发者工具

1、vue的概念 Vue是一个用于构建用户界面的渐进式 框架 &#xff08;1&#xff09;构建用户界面&#xff1a;基于数据动态渲染页面 &#xff08;2&#xff09;渐进式&#xff1a;循序渐进的学习 &#xff08;3&#xff09;框架&#xff1a;一条完整的项目解决方案&#xff…

[Java]Swing版坦克大战小游戏项目开发(1)——new出一个窗口

highlight: xcode theme: vuepress 前言 本系列文章带领 Java 初学者学习使用 Java 语言结合 Swing 技术以及设计模式开发一款经典小游戏——坦克大战。通过这个小游戏&#xff0c;你可以学会很多实用的设计模式思想&#xff0c;并且提高你的编码水平。 熟悉Frame Frame 类是 J…

C++ | Leetcode C++题解之第224题计算计算器

题目&#xff1a; 题解&#xff1a; class Solution { public:int calculate(string s) {stack<int> ops;ops.push(1);int sign 1;int ret 0;int n s.length();int i 0;while (i < n) {if (s[i] ) {i;} else if (s[i] ) {sign ops.top();i;} else if (s[i] …

亚马逊erp同步订单自动发货,告别手动,批量自动发货流...

大家好&#xff0c;今天讲下店飞飞订单之后如何让它自动发货。批量自动发货点开ERP上的订单&#xff0c;比如每天出个几十单爆单就最好的。订单跟ERP上配对成功不需要手动发货&#xff0c;自动发货都是批量发货&#xff0c;就不用一个个去点了。 那就讲下怎么设置的。 点开品…

【面试八股总结】线程基本概念,线程、进程和协程区别,线程实现

一、什么是线程&#xff1f; 线程是“轻量级进程”&#xff0c;是进程中的⼀个实体&#xff0c;是程序执⾏的最小单元&#xff0c;也是被系统独立调度和分配的基本单位。 线程是进程当中的⼀条执行流程&#xff0c;同⼀个进程内多个线程之间可以共享代码段、数据段、打开的文件…