Nacos源码 (3) 注册中心

news2025/4/10 8:37:40

本文将从一个服务注册示例入手,通过阅读客户端、服务端源码,分析服务注册、服务发现原理。

使用的2.0.2的版本。


返回目录


客户端

创建NacosNamingService对象

NacosNamingService nacosNamingService = new NacosNamingService(NACOS_HOST);

NacosNamingService提供两个构造方法:

public NacosNamingService(String serverList) throws NacosException {
    Properties properties = new Properties();
    properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverList);
    init(properties);
}

public NacosNamingService(Properties properties) throws NacosException {
    init(properties);
}

第二个方法的properties的key在PropertyKeyConst常量类可以找到,如:

  • namespace
  • username
  • password
  • serverAddr
  • clusterName
  • 其他

构造方法中会初始化一些参数和组件:

  • 初始化namespace参数

  • 创建InstancesChangeNotifier对象,它实现了Subscriber接口,监听InstancesChangeEvent事件

    public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {
    
        // key使用serviceName + groupName + clusters组合而成
        // value是监听器集合
        private final Map<String, ConcurrentHashSet<EventListener>> listenerMap;
    
        // 锁
        private final Object lock = new Object();
    
  • 向NotifyCenter注册InstancesChangeEvent事件,注册之前创建的InstancesChangeNotifier对象监听服务实例变化

    NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
    NotifyCenter.registerSubscriber(changeNotifier);
    
    // NotifyCenter维护着EventPublisher集,Subscriber会被注册到EventPublisher上
    // EventPublisher提供publish方法向Event队列推送事件
    // EventPublisher是一个Thread类,run方法从Event队列取事件通知Subscriber来处理
    
  • 创建NamingClientProxyDelegate对象,用于与服务端通信,它是一个代理,内部使用其他的NamingClientProxy实现:

    • NamingHttpClientProxy
    • NamingGrpcClientProxy - 默认使用该实现类,其中有healthCheck检测服务端是否健康,服务端直接响应成功无操作

服务注册

NacosNamingService nacosNamingService = new NacosNamingService(NACOS_HOST);
nacosNamingService.registerInstance(ORDER_SERVICE, "192.168.0.100", 9999);

提供多个重载的registerInstance方法,最终使用这个方法:

public void registerInstance(String serviceName, String groupName, String ip, int port, String clusterName)
            throws NacosException {
    Instance instance = new Instance();
    instance.setIp(ip);
    instance.setPort(port);
    instance.setWeight(1.0);
    instance.setClusterName(clusterName);
    registerInstance(serviceName, groupName, instance);
}

public void registerInstance(String serviceName, String groupName, Instance instance)
            throws NacosException {
    // 此处clientProxy是NamingClientProxyDelegate对象
    clientProxy.registerService(serviceName, groupName, instance);
}

NamingClientProxyDelegate的registerService方法会选择一个具体的NamingClientProxy对象与服务端通信,默认使用NamingGrpcClientProxy对象。

NamingGrpcClientProxy的registerService方法构建InstanceRequest请求对象,之后使用RpcClient对象发送请求并接收响应。

RpcClient内部通过GrpcConnection对象使用GRPC来访问服务端。

内部的GRPC代码是使用protoc和protobuf-maven-plugin生成的,通信细节此处不做介绍。

服务下线

nacosNamingService.deregisterInstance(ORDER_SERVICE, "192.168.0.100", 9999);

deregisterInstance服务下线:

public void deregisterInstance(String serviceName,
                               String groupName,
                               String ip,
                               int port,
                               String clusterName) throws NacosException {
    Instance instance = new Instance();
    instance.setIp(ip);
    instance.setPort(port);
    instance.setClusterName(clusterName);
    deregisterInstance(serviceName, groupName, instance);
}

public void deregisterInstance(String serviceName,
                               String groupName,
                               Instance instance) throws NacosException {
    clientProxy.deregisterService(serviceName, groupName, instance);
}

查询实例

示例代码:

NacosNamingService namingService = new NacosNamingService(NACOS_HOST);
List<Instance> instances = namingService.getAllInstances(ORDER_SERVICE, true);

System.out.printf(">> instance count=%d\n", instances.size());

for (Instance instance : instances) {
    System.out.printf(">> serviceName=%s, id=%s, cluster=%s, ip=%s, port=%s\n",
            instance.getServiceName(), instance.getInstanceId(),
            instance.getClusterName(), instance.getIp(), instance.getPort());
}

提供了几个重载的getAllInstances方法,最重要的参数就是subscribe,当为true时,会向服务端发送订阅请求,之后一直从ServiceInfoHolder中获取服务实例信息,而不再向服务端发送查询请求。

public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
        boolean subscribe) throws NacosException {
    ServiceInfo serviceInfo;
    String clusterString = StringUtils.join(clusters, ",");
    if (subscribe) {
        serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
        if (null == serviceInfo) {
            // 订阅请求
            serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
        }
    } else {
        // 查询请求
        serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
    }
    List<Instance> list;
    if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
        return new ArrayList<Instance>();
    }
    return list;
}

服务订阅

示例代码:

NacosNamingService namingService = new NacosNamingService(NACOS_HOST);
namingService.subscribe(ORDER_SERVICE, new EventListener() {
    @Override
    public void onEvent(Event event) {
        NamingEvent e = (NamingEvent) event;
        System.out.println("serviceName=" + e.getServiceName());
        List<Instance> instances = e.getInstances();
        System.out.printf(">> instance count=%d\n", instances.size());

        for (Instance instance : instances) {
            System.out.printf(">> serviceName=%s, id=%s, cluster=%s, ip=%s, port=%s\n",
                    instance.getServiceName(), instance.getInstanceId(),
                    instance.getClusterName(), instance.getIp(), instance.getPort());
        }
    }
});

TimeUnit.SECONDS.sleep(1200);

subscribe方法:

public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
        throws NacosException {
    String clusterString = StringUtils.join(clusters, ",");
    // 将listener保存到listenerMap中
    changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
    // 发送订阅请求
    clientProxy.subscribe(serviceName, groupName, clusterString);
}

实例变化的方法调用栈:

在这里插入图片描述

当收到服务端的实例变化事件时,会触发grpc层的观察者监听:

public void onMessage(RespT message) {
  if (firstResponseReceived && !streamingResponse) {
    throw Status.INTERNAL
        .withDescription("More than one responses received for unary or client-streaming call")
        .asRuntimeException();
  }
  firstResponseReceived = true;
  // 调用观察者
  observer.onNext(message);

  if (streamingResponse && adapter.autoFlowControlEnabled) {
    // Request delivery of the next inbound message.
    adapter.request(1);
  }
}

此处的observer是在创建rpc连接的时候注册的:

private StreamObserver<Payload> bindRequestStream(
        final BiRequestStreamGrpc.BiRequestStreamStub streamStub,
        final GrpcConnection grpcConn) {

    return streamStub.requestBiStream(new StreamObserver<Payload>() {

        @Override
        public void onNext(Payload payload) {
            try {
                Object parseBody = GrpcUtils.parse(payload);
                final Request request = (Request) parseBody;
                if (request != null) {
                    try {
                        // 调用ServerRequestHandler处理请求
                        Response response = handleServerRequest(request);
                        if (response != null) {
                            response.setRequestId(request.getRequestId());
                            sendResponse(response);
                        }

// ...

NamingPushRequestHandler的处理逻辑:

public Response requestReply(Request request) {
    if (request instanceof NotifySubscriberRequest) {
        NotifySubscriberRequest notifyResponse = (NotifySubscriberRequest) request;
        serviceInfoHolder.processServiceInfo(notifyResponse.getServiceInfo());
        return new NotifySubscriberResponse();
    }
    return null;
}

serviceInfoHolder.processServiceInfo方法:

public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
    String serviceKey = serviceInfo.getKey();
    if (serviceKey == null) {
        return null;
    }
    ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
    if (isEmptyOrErrorPush(serviceInfo)) {
        //empty or error push, just ignore
        return oldService;
    }
    serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
    boolean changed = isChangedServiceInfo(oldService, serviceInfo);
    if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
        serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
    }
    MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
    if (changed) {
        // 推送一个InstancesChangeEvent事件
        NotifyCenter.publishEvent(new InstancesChangeEvent(
                serviceInfo.getName(), serviceInfo.getGroupName(),
                serviceInfo.getClusters(), serviceInfo.getHosts()));
        DiskCache.write(serviceInfo, cacheDir);
    }
    return serviceInfo;
}

推送一个InstancesChangeEvent事件:

  1. NotifyCenter维护着一个EventPublisher集,当有事件时,会选择一个目标EventPublisher

  2. 通过publish方法将事件保存到一个Event队列

    public boolean publish(Event event) {
        checkIsStart();
        boolean success = this.queue.offer(event);
        if (!success) {
            // 当队列操作失败时,直接使用当前线程处理事件
            receiveEvent(event);
            return true;
        }
        return true;
    }
    
  3. EventPublisher是一个线程,在NotifyCenter初始化时启动。run方法会从Event队列取事件,使用receiveEvent(event)进行处理

  4. receiveEvent方法查找所有的Subscriber,其中就有最初创建的InstancesChangeNotifier,调用订阅者onEvent方法

服务端

服务注册

InstanceRequestHandler处理器

注册中心的rpc处理器在com.alibaba.nacos.naming.remote.rpc.handler包,处理服务注册和下线的处理器是InstanceRequestHandler类:

public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {

    private final EphemeralClientOperationServiceImpl clientOperationService;

    public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {
        this.clientOperationService = clientOperationService;
    }

    @Secured(action = ActionTypes.WRITE, parser = NamingResourceParser.class)
    public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
        Service service = Service
                .newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
        switch (request.getType()) {
            // 服务注册
            case NamingRemoteConstants.REGISTER_INSTANCE:
                return registerInstance(service, request, meta);
            // 服务下线
            case NamingRemoteConstants.DE_REGISTER_INSTANCE:
                return deregisterInstance(service, request, meta);
            default:
                throw new NacosException(NacosException.INVALID_PARAM,
                        String.format("Unsupported request type %s", request.getType()));
        }
    }

    // 服务注册
    private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {
        clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
        return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
    }

    // 服务下线
    private InstanceResponse deregisterInstance(
            Service service, InstanceRequest request, RequestMeta meta) {
        clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());
        return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE);
    }
}

服务注册核心流程

public void registerInstance(Service service, Instance instance, String clientId) {
    Service singleton = ServiceManager.getInstance().getSingleton(service);
    Client client = clientManager.getClient(clientId);
    InstancePublishInfo instanceInfo = getPublishInfo(instance);
    // Add a new instance for service for current client
    // 1. 给当前客户端绑定service -> instance关系
    // 2. 推送一个ClientChangedEvent事件
    client.addServiceInstance(singleton, instanceInfo);
    client.setLastUpdatedTime();

    // 推送ClientRegisterServiceEvent和InstanceMetadataEvent事件
    NotifyCenter.publishEvent(
        new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
    NotifyCenter.publishEvent(
        new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}
  1. 给当前客户端绑定service -> instance关系
  2. 推送一个ClientChangedEvent事件
  3. 推送ClientRegisterServiceEvent事件
  4. 推送InstanceMetadataEvent事件

事件处理流程

ClientChangedEvent事件:Client changed event. Happened when Client add or remove service. 会由DistroClientDataProcessor进行处理,同步客户端数据到所有服务节点

ClientRegisterServiceEvent事件:Client register service event. 由ClientServiceIndexesManager进行处理,ClientServiceIndexesManager类维护clientId与service的注册关系和订阅关系。另外该处理器会推送一个ServiceChangedEvent事件。

InstanceMetadataEvent事件:实例元数据事件。由NamingMetadataManager进行处理,NamingMetadataManager管理客户端注册的服务和实例元数据信息。InstanceMetadataEvent事件会触发该处理器的实例过期判断

ServiceChangedEvent事件:Service data changed event. 有两个处理器:

  • NamingSubscriberServiceV2Impl - 触发回调服务订阅者任务
  • DoubleWriteEventListener - 触发将服务信息同步到其他nacos节点任务

服务下线

服务下线核心流程

public void deregisterInstance(Service service, Instance instance, String clientId) {
    Service singleton = ServiceManager.getInstance().getSingleton(service);
    Client client = clientManager.getClient(clientId);
    // Remove service instance from client
    // 1. 解除当前客户端的service -> instance关系
    // 2. 推送一个ClientChangedEvent事件
    InstancePublishInfo removedInstance = client.removeServiceInstance(singleton);
    client.setLastUpdatedTime();

    // 推送ClientDeregisterServiceEvent和InstanceMetadataEvent事件
    if (null != removedInstance) {
        NotifyCenter.publishEvent(
            new ClientOperationEvent.ClientDeregisterServiceEvent(singleton, clientId));
        NotifyCenter.publishEvent(
            new MetadataEvent.InstanceMetadataEvent(singleton, removedInstance.getMetadataId(), true));
    }
}
  1. 解除当前客户端的service -> instance关系
  2. 推送一个ClientChangedEvent事件
  3. 推送ClientDeregisterServiceEvent事件
  4. 推送InstanceMetadataEvent事件

事件处理流程

基本与服务注册流程相同。

ClientChangedEvent事件:Client changed event. Happened when Client add or remove service. 会由DistroClientDataProcessor进行处理,同步客户端数据到所有服务节点

ClientDeregisterServiceEvent事件:Client deregister service event. 由ClientServiceIndexesManager进行处理,ClientServiceIndexesManager类维护clientId与service的注册关系和订阅关系。另外该处理器会推送一个ServiceChangedEvent事件。

InstanceMetadataEvent事件:实例元数据事件。由NamingMetadataManager进行处理,NamingMetadataManager管理客户端注册的服务和实例元数据信息。InstanceMetadataEvent事件会触发该处理器的实例过期判断

ServiceChangedEvent事件:Service data changed event. 有两个处理器:

  • NamingSubscriberServiceV2Impl - 触发回调服务订阅者任务
  • DoubleWriteEventListener - 触发将服务信息同步到其他nacos节点任务

服务实例心跳

  1. 客户端会周期性的发送healthCheck请求
  2. 服务端每次收到客户端请求时都会更新对应connection的活跃时间戳
  3. 服务端也会周期性的检查客户端connection的活跃时间戳和客户端IP连接数,当超过一定的时间不活跃,服务端会发一个检测请求给客户端,当连接数超过阈值时将重置多余的连接

客户端healthCheck请求

客户端会周期性发送healthCheck请求,默认每5秒执行一次,在RpcClient中:

clientEventExecutor.submit(new Runnable() {
    @Override
    public void run() {
        while (true) {
            try {
                if (isShutdown()) {
                    break;
                }
                ReconnectContext reconnectContext = reconnectionSignal
                        .poll(keepAliveTime, TimeUnit.MILLISECONDS);
                if (reconnectContext == null) {
                    //check alive time.
                    if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {
                        boolean isHealthy = healthCheck();
                        if (!isHealthy) {
                            if (currentConnection == null) {
                                continue;
                            }
                            
                            RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();
                            if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {
                                break;
                            }
// ...

healthCheck健康检查:

private boolean healthCheck() {
    HealthCheckRequest healthCheckRequest = new HealthCheckRequest();
    try {
        Response response = this.currentConnection.request(healthCheckRequest, 3000L);
        return response != null && response.isSuccess();
    } catch (NacosException e) {
        //ignore
    }
    return false;
}

如果检查失败,将重新建立连接。

服务端记录connection活跃时间戳

服务端每次收到客户端请求时都会更新对应connection的活跃时间戳。

服务端使用GrpcRequestAcceptor作为业务层请求Acceptor入口,这个类会将GRPC的请求转为业务层请求,并转发到对应的RequestHandler处理器。

在其request方法中,会刷新对应connection的活跃时间戳:

Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
RequestMeta requestMeta = new RequestMeta();
requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
requestMeta.setLabels(connection.getMetaInfo().getLabels());
// 刷新connection的活跃时间戳
connectionManager.refreshActiveTime(requestMeta.getConnectionId());
Response response = requestHandler.handleRequest(request, requestMeta);
Payload payloadResponse = GrpcUtils.convert(response);
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();

服务端connection活跃检查

服务端周期性检查客户端connection的活跃时间戳和客户端IP连接数,当超过一定的时间不活跃,服务端会发一个检测请求给客户端,当连接数超过阈值时将重置多余的连接。

服务端使用ConnectionManager管理连接:

Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>();

在启动时,会创建周期性任务检查connections的活跃状态,默认每3秒执行一次,以下为代码片段:

// 检查长时间不活跃的连接和超过最大连接数的连接
for (Map.Entry<String, Connection> entry : entries) {
    Connection client = entry.getValue();
    String clientIp = client.getMetaInfo().getClientIp();
    AtomicInteger integer = expelForIp.get(clientIp);
    if (integer != null && integer.intValue() > 0) {
        integer.decrementAndGet();
        expelClient.add(client.getMetaInfo().getConnectionId());
        expelCount--;
    } else if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) {
        outDatedConnections.add(client.getMetaInfo().getConnectionId());
    }
}

// ...

// 重置超过最大连接数的连接
for (String expelledClientId : expelClient) {
    try {
        Connection connection = getConnection(expelledClientId);
        if (connection != null) {
            ConnectResetRequest connectResetRequest = new ConnectResetRequest();
            connectResetRequest.setServerIp(serverIp);
            connectResetRequest.setServerPort(serverPort);
            connection.asyncRequest(connectResetRequest, null);
        }
    } catch (ConnectionAlreadyClosedException e) {
        unregister(expelledClientId);
    } catch (Exception e) {

    }
}

// ...

if (CollectionUtils.isNotEmpty(outDatedConnections)) {
    Set<String> successConnections = new HashSet<>();
    final CountDownLatch latch = new CountDownLatch(outDatedConnections.size());
    for (String outDateConnectionId : outDatedConnections) {
        try {
            Connection connection = getConnection(outDateConnectionId);
            if (connection != null) {
                // 给客户端发检测请求
                ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest();
                connection.asyncRequest(clientDetectionRequest, new RequestCallBack() {
                    @Override
                    public Executor getExecutor() {
                        return null;
                    }

                    @Override
                    public long getTimeout() {
                        return 1000L;
                    }

                    @Override
                    public void onResponse(Response response) {
                        latch.countDown();
                        if (response != null && response.isSuccess()) {
                            connection.freshActiveTime();
                            successConnections.add(outDateConnectionId);
                        }
                    }

                    @Override
                    public void onException(Throwable e) {
                        latch.countDown();
                    }
                });
            } else {
                latch.countDown();
            }

        } catch (ConnectionAlreadyClosedException e) {
            latch.countDown();
        } catch (Exception e) {
            latch.countDown();
        }
    }

    latch.await(3000L, TimeUnit.MILLISECONDS);

    // 移除失败的已断开连接
    for (String outDateConnectionId : outDatedConnections) {
        if (!successConnections.contains(outDateConnectionId)) {
            unregister(outDateConnectionId);
        }
    }
}

客户端断开连接

业务处理流程

GRPC连接层检测到连接断开之后,会触发GrpcServer的transportTerminated事件:

public void transportTerminated(Attributes transportAttrs) {
    String connectionId = null;
    try {
        connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
    } catch (Exception e) {
        // Ignore
    }
    if (StringUtils.isNotBlank(connectionId)) {
        // 使用ConnectionManager移除连接
        connectionManager.unregister(connectionId);
    }
}

ConnectionManager移除连接:

public synchronized void unregister(String connectionId) {
    // 从Connection集移除连接
    Connection remove = this.connections.remove(connectionId);
    if (remove != null) {
        String clientIp = remove.getMetaInfo().clientIp;
        AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
        // IP连接数--
        if (atomicInteger != null) {
            int count = atomicInteger.decrementAndGet();
            if (count <= 0) {
                connectionForClientIp.remove(clientIp);
            }
        }
        remove.close();
        // 通知ClientManager层断开连接
        clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
    }
}

ConnectionBasedClientManager的clientDisconnected方法:

public boolean clientDisconnected(String clientId) {
    ConnectionBasedClient client = clients.remove(clientId);
    if (null == client) {
        return true;
    }
    client.release();
    // 推送一个ClientDisconnectEvent事件
    NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
    return true;
}

事件处理流程

ClientDisconnectEvent事件:Client disconnect event. Happened when Client disconnect with server.

  • ClientServiceIndexesManager - 维护注册和订阅关系
  • DistroClientDataProcessor - 同步客户端数据到所有服务节点
  • NamingMetadataManager - 维护客户端注册的服务和实例元数据信息

查询实例

ServiceQueryRequestHandler处理器

ServiceQueryRequestHandler类负责客户端的服务实例查询请求:

public class ServiceQueryRequestHandler extends RequestHandler<ServiceQueryRequest, QueryServiceResponse> {
    
    private final ServiceStorage serviceStorage;
    
    private final NamingMetadataManager metadataManager;
    
    public ServiceQueryRequestHandler(ServiceStorage serviceStorage,
                                      NamingMetadataManager metadataManager) {
        this.serviceStorage = serviceStorage;
        this.metadataManager = metadataManager;
    }
    
    @Override
    @Secured(action = ActionTypes.READ, parser = NamingResourceParser.class)
    public QueryServiceResponse handle(
           ServiceQueryRequest request, RequestMeta meta) throws NacosException {

        String namespaceId = request.getNamespace();
        String groupName = request.getGroupName();
        String serviceName = request.getServiceName();
        Service service = Service.newService(namespaceId, groupName, serviceName);
        String cluster = null == request.getCluster() ? "" : request.getCluster();
        boolean healthyOnly = request.isHealthyOnly();

        // ServiceInfo封装服务基本信息和其实例集合
        ServiceInfo result = serviceStorage.getData(service);
        ServiceMetadata serviceMetadata = metadataManager.getServiceMetadata(service).orElse(null);
        result = ServiceUtil
            .selectInstancesWithHealthyProtection(result, serviceMetadata, cluster, healthyOnly, true);

        return QueryServiceResponse.buildSuccessResponse(result);
    }
}

查询服务实例:

public ServiceInfo getData(Service service) {
    // 如果缓存里面有服务信息则直接从缓存查找
    return serviceDataIndexes.containsKey(service) ? serviceDataIndexes.get(service) : getPushData(service);
}

public ServiceInfo getPushData(Service service) {
    ServiceInfo result = emptyServiceInfo(service);
    if (!ServiceManager.getInstance().containSingleton(service)) {
        return result;
    }
    // 从ClientServiceIndexesManager查找
    result.setHosts(getAllInstancesFromIndex(service));
    serviceDataIndexes.put(service, result);
    return result;
}

private List<Instance> getAllInstancesFromIndex(Service service) {
    Set<Instance> result = new HashSet<>();
    Set<String> clusters = new HashSet<>();
    // 从ClientServiceIndexesManager查找service绑定的client集
    for (String each : serviceIndexesManager.getAllClientsRegisteredService(service)) {
        // 查找该client注册的实例信息
        Optional<InstancePublishInfo> instancePublishInfo = getInstanceInfo(each, service);
        if (instancePublishInfo.isPresent()) {
            Instance instance = parseInstance(service, instancePublishInfo.get());
            result.add(instance);
            clusters.add(instance.getClusterName());
        }
    }
    // cache clusters of this service
    serviceClusterIndex.put(service, clusters);
    return new LinkedList<>(result);
}

private Optional<InstancePublishInfo> getInstanceInfo(String clientId, Service service) {
    // 获取到client对象
    Client client = clientManager.getClient(clientId);
    if (null == client) {
        return Optional.empty();
    }
    // 查找该client指定service注册的实例信息
    // AbstractClient使用Map<Service, InstancePublishInfo>结构保存
    // 前文介绍过在服务注册时会使用client.addServiceInstance方法添加注册信息
    return Optional.ofNullable(client.getInstancePublishInfo(service));
}

前文介绍过ClientServiceIndexesManager类维护clientId与service的注册关系和订阅关系。

服务订阅

SubscribeServiceRequestHandler处理器

SubscribeServiceRequestHandler类负责客户端的服务订阅请求:

public class SubscribeServiceRequestHandler extends 
             RequestHandler<SubscribeServiceRequest, SubscribeServiceResponse> {

    private final ServiceStorage serviceStorage;

    private final NamingMetadataManager metadataManager;

    private final EphemeralClientOperationServiceImpl clientOperationService;

    public SubscribeServiceRequestHandler(ServiceStorage serviceStorage,
            NamingMetadataManager metadataManager,
            EphemeralClientOperationServiceImpl clientOperationService) {
        this.serviceStorage = serviceStorage;
        this.metadataManager = metadataManager;
        this.clientOperationService = clientOperationService;
    }

    @Secured(action = ActionTypes.READ, parser = NamingResourceParser.class)
    public SubscribeServiceResponse handle(
           SubscribeServiceRequest request, RequestMeta meta) throws NacosException {

        String namespaceId = request.getNamespace();
        String serviceName = request.getServiceName();
        String groupName = request.getGroupName();
        String app = request.getHeader("app", "unknown");
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        Service service = Service.newService(namespaceId, groupName, serviceName, true);

        // 封装Subscriber对象:客户端IP、版本、命名空间等
        Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app,
                meta.getClientIp(), namespaceId, groupedServiceName, 0, request.getClusters());

        ServiceInfo serviceInfo = handleClusterData(serviceStorage.getData(service),
                metadataManager.getServiceMetadata(service).orElse(null),
                subscriber);

        if (request.isSubscribe()) {
            // 服务订阅
            clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());
        } else {
            // 取消订阅
            clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());
        }
        return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
    }

    private ServiceInfo handleClusterData(
            ServiceInfo data, ServiceMetadata metadata, Subscriber subscriber) {
        return StringUtils.isBlank(subscriber.getCluster()) ? data
                : ServiceUtil.selectInstancesWithHealthyProtection(data, metadata, subscriber.getCluster());
    }
}

服务订阅核心流程

public void subscribeService(Service service, Subscriber subscriber, String clientId) {
    Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
    Client client = clientManager.getClient(clientId);
    // 为该client绑定service -> subscriber关系
    client.addServiceSubscriber(singleton, subscriber);
    client.setLastUpdatedTime();
    // 推送一个ClientSubscribeServiceEvent事件
    NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
}

事件处理流程

ClientSubscribeServiceEvent事件:Client subscribe service event. 由ClientServiceIndexesManager进行处理,ClientServiceIndexesManager类维护clientId与service的注册关系和订阅关系

private void addSubscriberIndexes(Service service, String clientId) {
    subscriberIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
    // Only first time add need notify event.
    if (subscriberIndexes.get(service).add(clientId)) {
        // 推送一个ServiceSubscribedEvent事件
        NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId));
    }
}

ServiceSubscribedEvent事件:Service is subscribed by one client event. NamingSubscriberServiceV2Impl进行处理。

public void onEvent(Event event) {
    if (event instanceof ServiceEvent.ServiceChangedEvent) {
        // If service changed, push to all subscribers.
        ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
        Service service = serviceChangedEvent.getService();
        delayTaskEngine.addTask(
            service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
    } else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
        // 触发一次订阅者回调,把被订阅的服务的信息推送给订阅者
        ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
        Service service = subscribedEvent.getService();
        delayTaskEngine.addTask(
            service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(),
                subscribedEvent.getClientId()));
    }
}

取消服务订阅

public void unsubscribeService(Service service, Subscriber subscriber, String clientId) {
    Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
    Client client = clientManager.getClient(clientId);
    client.removeServiceSubscriber(singleton);
    client.setLastUpdatedTime();
    NotifyCenter.publishEvent(new ClientOperationEvent.ClientUnsubscribeServiceEvent(singleton, clientId));
}

推送一个ClientUnsubscribeServiceEvent事件,还是使用ClientServiceIndexesManager来处理,移除订阅关系。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/874843.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

华为OD机试 - 最长的连续子序列 (Java 2022Q4 100分)

目录 专栏导读一、题目描述二、输入描述三、输出描述四、解题思路五、Java算法源码六、效果展示1、输入2、输出 华为OD机试 2023B卷题库疯狂收录中&#xff0c;刷题点这里 专栏导读 本专栏收录于《华为OD机试&#xff08;JAVA&#xff09;真题&#xff08;A卷B卷&#xff09;》…

ISIS技术(第三十七课)

1 分享一下华为官网上的一张地图 官网地址:https://support.huawei.com/hedex/hdx.do?docid=EDOC1000105967&id=ZH-CN_CONCEPT_0000001501534705 2 路由的分类 -直连路由 直接连接的路由,且配置了IP地址之后(在同一网段内),就是直连路由。 -非直连路由 -静态路由…

如何在金属制品业运用IPD?

金属制品行业是指以金属材料为原料&#xff0c;通过加工、制造、加工等工艺制造出各种金属制品的企业和产业。这些金属制品包括但不限于机械设备、工具、建筑材料、家具、电子产品、交通运输设备等。金属制品加工业是机械装备行业的一个子行业&#xff0c;包括结构性金属制品制…

SpringBoot对一个URL通过method(GET、POST、PUT、DELETE)实现增删改查操作

目录 1. rest风格基础2. 开启方法3. 实战练习 1. rest风格基础 我们都知道GET、POST、PUT、DELETE分别对应查、增、改、删除 虽然Postman这些工具可以直接发送GET、POST、PUT、DELETE请求。但是RequestMapping并不支持PUT和DELETE请求操作。需要我们手动开启 2. 开启方法 P…

ModaHub魔搭社区:Milvus Cloud素材集合帖,等你查收

Hi~Milvus Cloud 的各位朋友,这是一期 Milvus Cloud 素材弹药库的集中汇总帖。随着向量数据库的火爆,越来越多的伙伴开始关注到向量数据库并开始使用 Milvus Cloud 。 考虑到目前信息获取的渠道多且分散,我们专门为大家整理了一期 Milvus Cloud 信息集合帖,让大家可以在快…

iPhone苹果手机触屏失灵无法关机,如何强制重启

参考:https://zhuanlan.zhihu.com/p/615223121 1&#xff0c;只轻按一下音量上键后快速松开 2&#xff0c;只轻按一下音量下键后快速松开 3&#xff0c;只按住右侧电源键长按不松手&#xff0c;直到手机关机。

Tomcat的多实例和动静分离

目录 一、多实例 二、 nginxtomcat的负载均衡和动静分离 三、Tomcat 客户端->四层代理->七层代理->tomcat服务器 实验&#xff1a; 问题总结&#xff1a; tomcat日志文件&#xff1a;/usr/local/tomcat/logs/catalina.out 一、多实例 在一台服务器上有多个tomc…

python——案例19:九九乘法表

案例19&#xff1a;九九乘法表for i in range(1,10): #i是行&#xff0c;j是列for j in range(1,i1): #确保内循环中的列小于等于列print(%d*%d%2ld%(i,j,i*j),end ) #计算方法&#xff0c;并且确保内容连续print()

2023.8.14论文阅读

文章目录 ESPNet: Efficient Spatial Pyramid of Dilated Convolutions for Semantic Segmentation摘要本文方法实验结果 DeepFusion: Lidar-Camera Deep Fusion for Multi-Modal 3D Object Detection摘要本文方法实验结果 ESPNet: Efficient Spatial Pyramid of Dilated Convo…

【不限于联想Y9000P电脑关盖再打开时黑屏的解决办法】

不限于联想Y9000P电脑关盖再打开时黑屏的解决办法 问题的前言问题的出现问题拟解决 问题的前言 事情发生在昨天&#xff0c;更新了Win11系统后&#xff1a; 最惹人注目的三处地方就是&#xff1a; 1.可以查看时间的秒数了&#xff1b; 2.右键展示的内容变窄了&#xff1b; 3.按…

JimuReport积木报表 v1.6.0版本发布—免费的可视化报表

项目介绍 一款免费的数据可视化报表&#xff0c;含报表和大屏设计&#xff0c;像搭建积木一样在线设计报表&#xff01;功能涵盖&#xff0c;数据报表、打印设计、图表报表、大屏设计等&#xff01; Web 版报表设计器&#xff0c;类似于excel操作风格&#xff0c;通过拖拽完成报…

腾讯会议:云上协奏,远程韶华

腾讯会议的原理及历史 摘要 本论文介绍了腾讯会议的原理和历史。腾讯会议是一款基于云计算和通信技术的在线会议平台,由腾讯公司推出。通过分析腾讯会议的工作原理和演进历史,我们可以深入了解该平台是如何实现高效、便捷、安全的远程协作和沟通的。 1. 引言 近年来,随着…

PostgreSql 备份恢复

一、概述 数据库备份一般可分为物理备份和逻辑备份&#xff0c;其中物理备份又可分为物理冷备和物理热备&#xff0c;下面就各种备份方式进行详细说明&#xff08;一般情况下&#xff0c;生产环境采取的定时物理热备逻辑备份的方式&#xff0c;均是以下述方式为基础进一步研发编…

Qt开发技术:Q3D图表开发笔记:Q3DSurface三维曲面图介绍、Demo以及代码详解

前言 qt提供了q3d进行三维开发&#xff0c;虽然这个框架没有得到大量运用也不是那么成功&#xff0c;性能上也有很大的欠缺&#xff0c;但是普通的点到为止的应用展示还是可以的。   其中就包括华丽绚烂的三维图表&#xff0c;数据量不大的时候是可以使用的。   前面介绍了…

心法利器[96] | 写了个向量检索的baseline

心法利器 本栏目主要和大家一起讨论近期自己学习的心得和体会&#xff0c;与大家一起成长。具体介绍&#xff1a;仓颉专项&#xff1a;飞机大炮我都会&#xff0c;利器心法我还有。 2022年新一版的文章合集已经发布&#xff0c;累计已经60w字了&#xff0c;获取方式看这里&…

山东布谷科技直播软件源码Nginx服务器横向扩展:搭建更稳定的平台服务

在直播软件源码平台中&#xff0c;服务器扮演着重要的角色&#xff0c;关系着视频传输、数据处理、用户管理等工作的顺利完成。随着互联网的迅猛发展&#xff0c;直播行业也随之崛起&#xff0c;全世界的人们都加入到了直播软件源码平台中&#xff0c;用户流量的增加让渡武器的…

绘画AI工具的介绍与使用----强到离谱-2023年必备免费好用的AI工具

一.绘画AI www.seaart.ai 这个是网站地址&#xff0c;进去之后直接注册登录即可&#xff0c;几乎都是免费使用&#xff0c;不用担心是否要VIP 点击网站进入之后登录&#xff0c;然后进入主页面&#xff0c;一张图片给你介绍清楚主页 我会根据菜单栏来给大家演示&#xff0c;首…

自动驾驶TPM技术杂谈 ———— PAPS(Partially Automated Parking System)标准(ISO 20900)

文章目录 PAPS类型与要求类型1类型2 功能要求车位类型水平空间车位水平线车位垂直空间车位垂直线车位车库 功能状态泊入流程泊出流程 性能验收车位标线验收标准性能测试水平空间车位水平线车位垂直空间车位垂直线车位车库车位 PAPS类型与要求 对于PAPS&#xff0c;需要驾驶员操…

【Matplotlib】一文带你掌握Matplotlib绘制各种图形

文章目录 前言一、折线图1 - 单线2 - 多线 二、柱状图&#xff08;条形图&#xff09;1 - 单柱2 - 多柱3 - 堆叠4 - 条形 三、直方图四、箱型图1 - 单个2 - 多个 五、散点图1 - 散点图2 - 气泡图 六、饼图1 - 饼图2 - 甜甜圈 | 空心3 - 甜甜圈 | 实心 七、面积图八、热力图九、…

【BASH】回顾与知识点梳理(二十三)

【BASH】回顾与知识点梳理 二十三 二十三. Linux 账号管理&#xff08;二&#xff09;23.1 账号管理新增与移除使用者&#xff1a; useradd, 相关配置文件, passwd, usermod, userdelusermoduserdel 23.2 用户功能&#xff08;普通用户可使用&#xff09;idfingerchfnchsh 23.3…