Spring Cloud Alibaba - Nacos源码分析(二)

news2025/1/10 11:17:31

目录

一、Nacos服务端服务注册

1、服务端调用接口

2、服务注册

instanceServiceV2.registerInstance

EphemeralClientOperationServiceImpl.registerInstance

ServiceManager

clientManager

Client实例AbstractClient

ClientOperationEvent.ClientRegisterServiceEvent

二、Nacos服务端健康检查

1、长连接

2、健康检查

三、Nacos客户端服务发现

订阅处理流程

四、Nacos客户端服务订阅机制

1、Nacos订阅概述

2、定时任务开启

3、定时任务执行内容


一、Nacos服务端服务注册

1、服务端调用接口

客户端在注册服务的时候实际上是调用的NamingService.registerInstance这个方法来完成实例的注册。本质上讲服务注册就是调用的对应接口nacos/v1/ns/instance。

Nacos架构

nacos-naming模块下的controllers中的InstanceController,注册实例与它有关。

@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + UtilsAndCommons.NACOS_NAMING_INSTANCE_CONTEXT)
public class InstanceController {
...
    @CanDistro
    @PostMapping
    @Secured(action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {
        
        final String namespaceId = WebUtils
                .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);
        
        final Instance instance = HttpRequestInstanceBuilder.newBuilder()
                .setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();
        //注册服务实例
        getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
        NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(), "", false, namespaceId,
                NamingUtils.getGroupName(serviceName), NamingUtils.getServiceName(serviceName), instance.getIp(),
                instance.getPort()));
        return "ok";
    }
....
//----------------------------------------------------
public class UtilsAndCommons {
    
    // ********************** Nacos HTTP Context ************************ \\
    
    public static final String NACOS_SERVER_CONTEXT = "/nacos";
    
    public static final String NACOS_SERVER_VERSION = "/v1";
    
    public static final String NACOS_SERVER_VERSION_2 = "/v2";
    
    public static final String DEFAULT_NACOS_NAMING_CONTEXT = NACOS_SERVER_VERSION + "/ns";
    
    public static final String DEFAULT_NACOS_NAMING_CONTEXT_V2 = NACOS_SERVER_VERSION_2 + "/ns";
    
    public static final String NACOS_NAMING_CONTEXT = DEFAULT_NACOS_NAMING_CONTEXT;
    
    public static final String NACOS_NAMING_CATALOG_CONTEXT = "/catalog";
    
    public static final String NACOS_NAMING_INSTANCE_CONTEXT = "/instance";
...

getInstanceOperator(),就是判断是否采用Grpc协议,很明显这个位置走的是instanceServiceV2

    private InstanceOperator getInstanceOperator() {
        return instanceServiceV2;
    }

2、服务注册

instanceServiceV2.registerInstance

实际上instanceServiceV2就是InstanceOperatorClientImpl,所以我们来看这里面的registerInstance方法

    @Override
    public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
        NamingUtils.checkInstanceIsLegal(instance);
        //判断是否为瞬时对象(临时客户端)
        boolean ephemeral = instance.isEphemeral();
        //获取客户端ID
        String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);
        //通过客户端ID创建客户端连接
        createIpPortClientIfAbsent(clientId);
        //获取服务
        Service service = getService(namespaceId, serviceName, ephemeral);
        //具体注册服务
        clientOperationService.registerInstance(service, instance, clientId);
    }

Nacos2.0以后新增Client模型一个客户端gRPC长连接对应一个Client,每个Client有自己唯一的id(clientId)。Client负责管理一个客户端的服务实例注册Publish和服务订阅Subscribe。

public interface Client {
    // 客户端id/gRPC的connectionId
    String getClientId();

    // 是否临时客户端
    boolean isEphemeral();
    // 客户端更新时间
    void setLastUpdatedTime();
    long getLastUpdatedTime();

    // 服务实例注册/注销/查询
    boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo);
    InstancePublishInfo removeServiceInstance(Service service);
    InstancePublishInfo getInstancePublishInfo(Service service);
    Collection<Service> getAllPublishedService();

    // 服务订阅/取消订阅/查询订阅
    boolean addServiceSubscriber(Service service, Subscriber subscriber);
    boolean removeServiceSubscriber(Service service);
    Subscriber getSubscriber(Service service);
    Collection<Service> getAllSubscribeService();
    // 生成同步给其他节点的client数据
    ClientSyncData generateSyncData();
    // 是否过期
    boolean isExpire(long currentTime);
    // 释放资源
    void release();
}

EphemeralClientOperationServiceImpl.registerInstance

EphemeralClientOperationServiceImpl实际负责处理服务注册,那我们来看具体方法

    @Override
    public void registerInstance(Service service, Instance instance, String clientId) throws NacosException {
        NamingUtils.checkInstanceIsLegal(instance);
        //确保Service单例存在
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        if (!singleton.isEphemeral()) {
            throw new NacosRuntimeException(NacosException.INVALID_PARAM,
                    String.format("Current service %s is persistent service, can't register ephemeral instance.",
                            singleton.getGroupedServiceName()));
        }
        //根据客户端id,找到客户端
        Client client = clientManager.getClient(clientId);
        if (!clientIsLegal(client, clientId)) {
            return;
        }
        //客户端Instance模型,转换为服务端Instance模型
        InstancePublishInfo instanceInfo = getPublishInfo(instance);
        //将Instance储存到Client里
        client.addServiceInstance(singleton, instanceInfo);
        client.setLastUpdatedTime();
        client.recalculateRevision();
        //建立Service与ClientId的关系
        NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
        NotifyCenter
                .publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
    }

ServiceManager

Service的容器是ServiceManager,但是在com.alibaba.nacos.naming.core.v2包下,容器中Service都是单例。

public class ServiceManager {
    
    private static final ServiceManager INSTANCE = new ServiceManager();
    //单例Service,可以查看Service的equals和hasCode方法
    private final ConcurrentHashMap<Service, Service> singletonRepository;
    //namespace下的所有service
    private final ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps;
    
    private ServiceManager() {
        singletonRepository = new ConcurrentHashMap<>(1 << 10);
        namespaceSingletonMaps = new ConcurrentHashMap<>(1 << 2);
    }
    
    public static ServiceManager getInstance() {
        return INSTANCE;
    }
    
    public Set<Service> getSingletons(String namespace) {
        return namespaceSingletonMaps.getOrDefault(namespace, new HashSet<>(1));
    }
    
    /**
     * Get singleton service. Put to manager if no singleton.
     *
     * @param service new service
     * @return if service is exist, return exist service, otherwise return new service
     */
//通过Map储存单例的Service
    public Service getSingleton(Service service) {
        singletonRepository.computeIfAbsent(service, key -> {
            NotifyCenter.publishEvent(new MetadataEvent.ServiceMetadataEvent(service, false));
            return service;
        });
        Service result = singletonRepository.get(service);
        namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), namespace -> new ConcurrentHashSet<>());
        namespaceSingletonMaps.get(result.getNamespace()).add(result);
        return result;
    }
...

所以从这个位置可以看出,当调用这个注册方法的时候ServiceManager负责管理Service单例

clientManager

这是一个接口这里我们要看它对应的一个实现类ConnectionBasedClientManager,这个实现类负责管理长连接clientId与Client模型的映射关系

// 根据clientId查询Client
    @Override
    public Client getClient(String clientId) {
        return clients.get(clientId);
    }

Client实例AbstractClient

负责存储当前客户端的服务注册表,即Service与Instance的关系。注意对于单个客户端来说,同一个服务只能注册一个实例

public abstract class AbstractClient implements Client {
...
    @Override
    public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
        if (null == publishers.put(service, instancePublishInfo)) {
            if (instancePublishInfo instanceof BatchInstancePublishInfo) {
                MetricsMonitor.incrementIpCountWithBatchRegister(instancePublishInfo);
            } else {
                MetricsMonitor.incrementInstanceCount();
            }
        }
        NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
        Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
        return true;
    }

ClientOperationEvent.ClientRegisterServiceEvent

这里的目的是为了过滤目标服务得到最终Instance列表建立Service与Client的关系,建立Service与Client的关系就是为了加速查询。
发布ClientRegisterServiceEvent事件,ClientServiceIndexesManager监听,ClientServiceIndexesManager维护了两个索引:
- Service与发布clientId
- Service与订阅clientId

@Component
public class ClientServiceIndexesManager extends SmartSubscriber {
    
    private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
    
    private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();
    
    public ClientServiceIndexesManager() {
        NotifyCenter.registerSubscriber(this, NamingEventPublisherFactory.getInstance());
    }
...
    private void handleClientOperation(ClientOperationEvent event) {
        Service service = event.getService();
        String clientId = event.getClientId();
        if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
            addPublisherIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
            removePublisherIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
            addSubscriberIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
            removeSubscriberIndexes(service, clientId);
        }
    }
//建立Service与发布Client的关系
    private void addPublisherIndexes(Service service, String clientId) {
        publisherIndexes.computeIfAbsent(service, key -> new ConcurrentHashSet<>());
        publisherIndexes.get(service).add(clientId);
        NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
    }

这个索引关系建立以后,还会触发ServiceChangedEvent,代表服务注册表变更。对于注册表变更紧接着还要做两个事情:1.通知订阅客户端 2.Nacos集群数据同步。

二、Nacos服务端健康检查

1、长连接

长连接,指在一个连接上可以连续发送多个[数据包],在连接保持期间,如果没有数据包发送,需要双方发链路检测包。

注册中心客户端2.0之后使用gRPC代替http,会与服务端建立长连接,但仍然保留了对旧http客户端的支持。

NamingClientProxy接口负责底层通讯,调用服务端接口。有三个实现类:

  1. NamingClientProxyDelegate:代理类,对所有NacosNamingService中的方法进行代理,根据实际情况选择http或gRPC协议请求服务端。
  2. NamingGrpcClientProxy:底层通讯基于gRPC长连接。
  3. NamingHttpClientProxy:底层通讯基于http短连接。使用的都是老代码基本没改,原来1.0NamingProxy重命名过来的。

以客户端服务注册为例,NamingClientProxyDelegate代理了registerService方法。
 

public class NacosNamingService implements NamingService {
...
    private NamingClientProxy clientProxy;
...
    @Override
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        NamingUtils.checkInstanceIsLegal(instance);
        clientProxy.registerService(serviceName, groupName, instance);
    }

NamingClientProxyDelegate会根据instance实例是否是临时节点而选择不同的协议。
临时instance:gRPC
持久instance:http

public class NamingClientProxyDelegate implements NamingClientProxy {
...
    private final NamingHttpClientProxy httpClientProxy;
    
    private final NamingGrpcClientProxy grpcClientProxy;
...
    @Override
    public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
        getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
    }
...
// 临时节点,走grpc长连接;持久节点,走http短连接
    private NamingClientProxy getExecuteClientProxy(Instance instance) {
        return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
    }

2、健康检查

        在之前的1.x版本中临时实例走Distro协议内存存储,客户端向注册中心发送心跳来维持自身healthy状态,持久实例走Raft协议持久化存储,服务端定时与客户端建立tcp连接做健康检查。
        但是2.0版本以后持久化实例没有什么变化,但是2.0临时实例不在使用心跳,而是通过长连接是否存活来判断实例是否健康。

ConnectionManager负责管理所有客户端的长连接。
每3s检测所有超过20s没发生过通讯的客户端,向客户端发起ClientDetectionRequest探测请求,如果客户端在1s内成功响应,则检测通过,否则执行unregister方法移除Connection。
如果客户端持续与服务端通讯,服务端是不需要主动探活的。

@Service
public class ConnectionManager {
...
    Map<String, Connection> connections = new ConcurrentHashMap<>();
...
    @PostConstruct
    public void start() {
        
        initConnectionEjector();
        // Start UnHealthy Connection Expel Task.:启动不健康连接排除功能
        RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(() -> {
            runtimeConnectionEjector.doEject();
        }, 1000L, 3000L, TimeUnit.MILLISECONDS);
        
    }

    /**
     * init connection ejector.
     */
    public void initConnectionEjector() {
        String connectionRuntimeEjector = null;
        try {
            connectionRuntimeEjector = ControlConfigs.getInstance().getConnectionRuntimeEjector();
            Collection<RuntimeConnectionEjector> ejectors = NacosServiceLoader.load(RuntimeConnectionEjector.class);
            for (RuntimeConnectionEjector runtimeConnectionEjectorLoad : ejectors) {
                if (runtimeConnectionEjectorLoad.getName().equalsIgnoreCase(connectionRuntimeEjector)) {
                    Loggers.CONNECTION.info("Found connection runtime ejector for name {}", connectionRuntimeEjector);
                    runtimeConnectionEjectorLoad.setConnectionManager(this);
                    runtimeConnectionEjector = runtimeConnectionEjectorLoad;
                }
            }
        } catch (Throwable throwable) {
            Loggers.CONNECTION.warn("Fail to load  runtime ejector ", throwable);
        }
        
        if (runtimeConnectionEjector == null) {
            Loggers.CONNECTION
                    .info("Fail to find connection runtime ejector for name {},use default", connectionRuntimeEjector);
            NacosRuntimeConnectionEjector nacosRuntimeConnectionEjector = new NacosRuntimeConnectionEjector();
            nacosRuntimeConnectionEjector.setConnectionManager(this);
            runtimeConnectionEjector = nacosRuntimeConnectionEjector;
        }
    }
    //注销(移出)连接方法
    public synchronized void unregister(String connectionId) {
        Connection remove = this.connections.remove(connectionId);
        if (remove != null) {
            String clientIp = remove.getMetaInfo().clientIp;
            AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
            if (atomicInteger != null) {
                int count = atomicInteger.decrementAndGet();
                if (count <= 0) {
                    connectionForClientIp.remove(clientIp);
                }
            }
            remove.close();
            LOGGER.info("[{}]Connection unregistered successfully. ", connectionId);
            clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
        }
    }

移除connection后,继承ClientConnectionEventListener的ConnectionBasedClientManager会移除Client,发布ClientDisconnectEvent事件。

@Component("connectionBasedClientManager")
public class ConnectionBasedClientManager extends ClientConnectionEventListener implements ClientManager {
...
    @Override
    public boolean clientDisconnected(String clientId) {
        Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
        ConnectionBasedClient client = clients.remove(clientId);
        if (null == client) {
            return true;
        }
        client.release();
        boolean isResponsible = isResponsibleClient(client);
        NotifyCenter.publishEvent(new ClientOperationEvent.ClientReleaseEvent(client, isResponsible));
        NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client, isResponsible));
        return true;
    }

ClientDisconnectEvent会触发几个事件:
1)Distro协议:同步移除的client数据。
2)清除两个索引缓存:ClientServiceIndexesManager中Service与发布Client的关系;ServiceStorage中Service与Instance的关系。
3)服务订阅:ClientDisconnectEvent会间接触发ServiceChangedEvent事件,将服务变更通知客户端。

三、Nacos客户端服务发现

Nacos客户端的服务发现,其实就是封装参数、调用服务接口、获得返回实例列表。

但是如果我们要是细化这个流程,会发现不仅包括了通过NamingService获取服务列表,在获取服务列表的过程中还涉及到通信流程协议(Http or gPRC)、订阅流程、故障转移流程等。

public class NamingExample {
...
System.out.println("instances after register: " + naming.getAllInstances("nacos.test.3"));

关注getAllInstances方法,那我们就需要看一下这个方法的具体操作,当然这其中需要经过一系列的重载方法调用
其实这里的方法比入口多出了几个参数,这里不仅有服务名称,还有分组名、集群列表、是否订阅,重载方法中的其他参数已经在各种重载方法的调用过程中设置了默认值,比如:
​分组名称默认:DEFAULT_GROUOP
集群列表:默认为空数组
是否订阅:订阅

public class NacosNamingService implements NamingService {
...
    @Override
    public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
            boolean subscribe) throws NacosException {
        ServiceInfo serviceInfo;
        String clusterString = StringUtils.join(clusters, ",");
        // 是否是订阅模式
        if (subscribe) {
            // 先从客户端缓存获取服务信息
            serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
            if (null == serviceInfo || !clientProxy.isSubscribed(serviceName, groupName, clusterString)) {
                // 如果本地缓存不存在服务信息,则进行订阅
                serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
            }
        } else {
            // 如果未订阅服务信息,则直接从服务器进行查询
            serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
        }
        // 从服务信息中获取实例列表
        List<Instance> list;
        if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
            return new ArrayList<>();
        }
        return list;
    }

这个流程基本逻辑为:
如果是订阅模式,则直接从本地缓存获取服务信息(ServiceInfo),然后从中获取实例列表,这是因为订阅机制会自动同步服务器实例的变化到本地。如果本地缓存中没有,那说明是首次调用,则进行订阅,在订阅完成后会获得服务信息。
​如果是非订阅模式,那就直接请求服务器端,获得服务信息。

订阅处理流程

在刚才的流程中,涉及到了订阅的逻辑,入口代码为获取实例列表中的方法:

serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);

以下是具体分析。首先这里的clientProxy是NamingClientProxy类的对象,对应的实现类是NamingClientProxyDelegate对应subscribe实现如下:

public class NamingClientProxyDelegate implements NamingClientProxy {
...
    @Override
    public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
        NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters);
        String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
        String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
        // 定时调度UpdateTask
        serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
        // 获取缓存中的ServiceInfo
        ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
        if (null == result || !isSubscribed(serviceName, groupName, clusters)) {
            // 如果为null,则进行订阅逻辑处理,基于gRPC协议
            result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
        }
        // ServiceInfo本地缓存处理
        serviceInfoHolder.processServiceInfo(result);
        return result;
    }

具体流程分析:
1. 订阅方法先开启定时任务,这个定时任务的主要作用就是用来定时同步服务端的实例信息,并进行本地缓存更新等操作,但是如果是首次这里将会直接返回来走下一步。
2. 判断本地缓存是否存在,如果本地缓存存在ServiceInfo信息,则直接返回。如果不存在,则默认采用gRPC协议进行订阅,并返回ServiceInfo。
3. grpcClientProxy的subscribe订阅方法就是直接向服务器发送了一个订阅请求,并返回结果。
4. 最后,ServiceInfo本地缓存处理。这里会将获得的最新ServiceInfo与本地内存中的ServiceInfo进行比较,更新,发布变更时间,磁盘文件存储等操作。其实,这一步的操作,在订阅定时任务中也进行了处理。

四、Nacos客户端服务订阅机制

1、Nacos订阅概述

Nacos的订阅机制,如果用一句话来描述就是:Nacos客户端通过一个定时任务,每6秒从注册中心获取实例列表,当发现实例发生变化时,发布变更事件,订阅者进行业务处理(更新实例,更改本地缓存)。

2、定时任务开启

其实订阅本质上就是服务发现的一种方式,也就是在服务发现的时候执行订阅方法,触发定时任务去拉取服务端的数据。
NacosNamingService中暴露的许多重载的subscribe,重载的目的就是让大家少写一些参数,这些参数呢,Nacos给默认处理了。最终这些重载方法都会调用到下面这个方法:

public class NacosNamingService implements NamingService {
...
    @Override
    public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
            throws NacosException {
        if (null == listener) {
            return;
        }
        String clusterString = StringUtils.join(clusters, ",");
        changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
        clientProxy.subscribe(serviceName, groupName, clusterString);
    }

        这里我们先来看subscribe方法,大家可能有些眼熟它是clientProxy类型调用的方法,实际上就是NamingClientProxyDelegate.subscribe(),所以其实这里和之前的服务发现中调用的是一个方法,这里其实是在做服务列表的查询,所以得出结论查询和订阅都调用了同一个方法

public class NamingClientProxyDelegate implements NamingClientProxy {
...
    @Override
    public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
        NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters);
        String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
        String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
        // 定时调度UpdateTask
        serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
        // 获取缓存中的ServiceInfo
        ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
        if (null == result || !isSubscribed(serviceName, groupName, clusters)) {
            // 如果为null,则进行订阅逻辑处理,基于gRPC协议
            result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
        }
        // ServiceInfo本地缓存处理
        serviceInfoHolder.processServiceInfo(result);
        return result;
    }

        但是这里我们要关注这里的任务调度,该方法包含了构建serviceKey、通过serviceKey判断重复、最后添加UpdateTask,而其中的addTask的实现就是发起了一个定时任务:

public class ServiceInfoUpdateService implements Closeable {
...
    public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {
        if (!asyncQuerySubscribeService) {
            return;
        }
        String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
        if (futureMap.get(serviceKey) != null) {
            return;
        }
        synchronized (futureMap) {
            if (futureMap.get(serviceKey) != null) {
                return;
            }
            //构建UpdateTask
            ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));
            futureMap.put(serviceKey, future);
        }
    }

定时任务延迟一秒执行:

    private synchronized ScheduledFuture<?> addTask(UpdateTask task) {
        return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
    }

所以在这里我们得出结论,核心为:调用订阅方法和发起定时任务。

3、定时任务执行内容

UpdateTask封装了订阅机制的核心业务逻辑

public class ServiceInfoUpdateService implements Closeable {
    private static final long DEFAULT_DELAY = 1000L;
    
    private static final int DEFAULT_UPDATE_CACHE_TIME_MULTIPLE = 6;
...
        @Override
        public void run() {
            long delayTime = DEFAULT_DELAY;
            
            try {
                // 判断是服务是否订阅和未开启过定时任务,如果订阅过直接不在执行
                if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(
                        serviceKey)) {
                    NAMING_LOGGER.info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters);
                    isCancel = true;
                    return;
                }
                // 获取缓存的service信息
                ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
                // 如果为空
                if (serviceObj == null) {
                    // 根据serviceName从注册中心服务端获取Service信息
                    serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
                    // 处理本地缓存
                    serviceInfoHolder.processServiceInfo(serviceObj);
                    lastRefTime = serviceObj.getLastRefTime();
                    return;
                }
                // 过期服务,服务的最新更新时间小于等于缓存刷新(最后一次拉取数据的时间)时间,从注册中心重新查询
                if (serviceObj.getLastRefTime() <= lastRefTime) {
                    serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
                    // 处理本地缓存
                    serviceInfoHolder.processServiceInfo(serviceObj);
                }
                //刷新更新时间
                lastRefTime = serviceObj.getLastRefTime();
                if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
                    incFailCount();
                    return;
                }
                // 下次更新缓存时间设置,默认6秒
                // TODO multiple time can be configured.
                delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
                // 重置失败数量为0(可能会出现失败情况,没有ServiceInfo,连接失败)
                resetFailCount();
            } catch (NacosException e) {
                handleNacosException(e);
            } catch (Throwable e) {
                handleUnknownException(e);
            } finally {
                if (!isCancel) {
                    // 下次调度刷新时间,下次执行的时间与failCount有关,failCount=0,则下次调度时间为6秒,最长为1分钟
                    // 即当无异常情况下缓存实例的刷新时间是6秒
                    executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60),
                            TimeUnit.MILLISECONDS);
                }
            }
        }

 业务逻辑最后会计算下一次定时任务的执行时间,通过delayTime来延迟执行。delayTime默认为 1000L * 6,也就是6秒。而在finally里面真的发起下一次定时任务。当出现异常时,下次执行的时间与失败次数有关,但最长不超过1分钟。

总结:

1. 订阅方法的调用,并进行EventListener的注册,后面UpdateTask要用来进行判断;
2. 通过委托代理类来处理订阅逻辑,此处与获取实例列表方法使用了同一个方法;
3. 通过定时任务执行UpdateTask方法,默认执行间隔为6秒,当发生异常时会延长,但不超过1分钟;
5. UpdateTask方法中会比较本地是否存在缓存,缓存是否过期。当不存在或过期时,查询注册中心,获取最新实例,更新最后获取时间,处理ServiceInfo。
6. 重新计算定时任务时间,循环执行流程。

Spring Cloud Alibaba - Nacos

天下事有难易乎?为之,则难者亦易矣;不为,则易者亦难矣。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/605565.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023《中国好声音》全国巡演Channel[V]歌手大赛广州赛区半决赛圆满举行!

2023年5月27-28日&#xff0c;由腾扬广告、Channel[V]、盛娱星汇联合主办的2023《中国好声音》全国巡演Channel[V]歌手大赛广州赛区半决赛在广州番禺天河城正式打响&#xff0c;自广州赛区赛事启动以来&#xff0c;汇集了近五千名音乐人参与其中&#xff0c;历经2个多月、超40场…

【数据库复习】第七章 数据库设计

数据库设计的过程(六个阶段) ⒈需求分析阶段 准确了解与分析用户需求&#xff08;包括数据与处理&#xff09; 最困难、最耗费时间的一步 ⒉概念结构设计阶段 整个数据库设计的关键 通过对用户需求进行综合、归纳与抽象&#xff0c;形成一个独立于具体DBMS的概念模型 ⒊…

基于微信小程序蛋糕店商城管理系统的设计与实现

1&#xff1a;后端采用技术 SpringBoot 、Mybatis、Mybatis-plus、Redis、阿里云短信息服务、Hutool 邮箱服务、WebSocket通讯服务、OSS对象存储服务、支付宝沙箱服务&#xff0c;接口简单限流、简单定时任务。。。。。。 2&#xff1a;前端采用技术 Vue2、Vue2-uploader组件、…

[图表]pyecharts模块-日历图

[图表]pyecharts模块-日历图 先来看代码&#xff1a; import random import datetimeimport pyecharts.options as opts from pyecharts.charts import Calendarbegin datetime.date(2017, 1, 1) end datetime.date(2017, 12, 31) data [[str(begin datetime.timedelta(d…

Leetcode 110-平衡二叉树

1. 递归法求解 递归三部曲&#xff1a; 确定递归函数的参数及其返回值确定终止条件确定单层递归逻辑 深度&#xff1a;从上往下 高度&#xff1a;从下往上 1.1 根据深度求解 构建求二叉树节点深度的函数&#xff08;后序遍历&#xff09;递归求该树是否是平衡二叉树&#…

国产化麒麟linux系统开发编译常见问题汇总

团队自研股票软件关注威信龚总号&#xff1a;QStockView&#xff0c;下载 1 问题处理 1.1 Unknown module in QT:QJsonDocument 缺少QJsonDocument 解决方法&#xff1a; Pro文件中加上 QTcore; 播放器库问题 1.2 代码中汉字乱码需要设置文件编码格式 原因分析&…

2023-06-03:redis中pipeline有什么好处,为什么要用 pipeline?

2023-06-03&#xff1a;redis中pipeline有什么好处&#xff0c;为什么要用 pipeline&#xff1f; 答案2023-06-03&#xff1a; Redis客户端执行一条命令通常包括以下四个阶段&#xff1a; 1.发送命令&#xff1a;客户端将要执行的命令发送到Redis服务器。 2.命令排队&#…

内网安全:Cobalt Strike 工具 渗透多层内网主机.(正向 || 反向)

内网安全&#xff1a;Cobalt Strike 工具 渗透多层内网主机. Cobalt Strike 是一款以 metasploit 为基础的 GUI 的框架式渗透工具&#xff0c;又被业界人称为 CS。拥有多种协议主机上线方式&#xff0c;集成了端口转发&#xff0c;服务扫描&#xff0c;自动化溢出&#xff0c;…

Docker容器化Java程序

&#x1f497;wei_shuo的个人主页 &#x1f4ab;wei_shuo的学习社区 &#x1f310;Hello World &#xff01; Docker容器化Java程序 Docker&#xff1a;用于创建和管理容器的开源平台 Java运行环境&#xff1a;Java是一个跨平台的编程语言&#xff0c;因此在CentOS系统中需要安…

一个帮助写autoprefixer配置的网站

前端需要用到postcss的工具&#xff0c;用到一个插件叫autoprefixer&#xff0c;这个插件能够给css属性加上前缀&#xff0c;进行一些兼容的工作。 如何安装之类的问题在csdn上搜一下都能找到&#xff08;注意&#xff0c;vite是包含postcss的&#xff0c;不用在项目中安装pos…

[图表]pyecharts模块-柱状图2

[图表]pyecharts模块-柱状图2 先来看代码&#xff1a; from pyecharts import options as opts from pyecharts.charts import Bar from pyecharts.faker import Fakerx Faker.dogs Faker.animal xlen len(x) y [] for idx, item in enumerate(x):if idx < xlen / 2:y…

Visual Studio Code里如何运行html (Windows 10 和 Mac OS)

在Web 开发时&#xff0c;作为Web 开发基本都是从编写 HTML 网页开始的。这篇文章讲的是如何起步配置开发环境来运行 HTML 代码。 在Windows和Mac 的 VS Code中都可以运行 HTML。 打开VS Code&#xff0c;在VS Code中安装&#xff0c;Code Runner&#xff0c; 如下所示 2、这…

【群智能算法改进】一种改进的算术优化算法 改进算术优化算法 改进AOA[1]【Matlab代码#37】

文章目录 【获取资源请见文章第5节&#xff1a;资源获取】1. 基础算术优化算法2. 改进算术优化算法2.1 随机概率因子2.2 强制切换机制 3. 部分代码展示4. 仿真结果展示5. 资源获取 【获取资源请见文章第5节&#xff1a;资源获取】 1. 基础算术优化算法 算术优化算法是一类基于…

mount -l | grep bpf

BPF & Cillum mount -l | grep bpfBPF&#xff08;Berkeley Packet Filter&#xff09;文件系统netfilter和tcprofiling和tracingHTTP、gRPC和Kafka等协议VXLAN组网模式BGP&#xff08;Border Gateway Protocol&#xff09; mount -l | grep bpf 这是一个通过运行mount -l…

Linux 实操篇-Linux 磁盘分区、挂载

Linux 实操篇-Linux 磁盘分区、挂载 Linux 分区 原理介绍 Linux 来说无论有几个分区&#xff0c;分给哪一目录使用&#xff0c;它归根结底就只有一个根目录&#xff0c;一个独立且唯一的文件结构, Linux 中每个分区都是用来组成整个文件系统的一部分。Linux 采用了一种叫“载…

使用数据库连接池来快速访问数据库Druid

使用数据库连接池来快速访问数据库Druid 简介为什么使用连接池原理及使用连接池访问数据库的优势开源项目 alibaba/druid 地址 使用方法准备jar包定义Druid的配置文件 代码示例 简介 数据库连接池是一个容器&#xff0c;负责分配、管理数据库的连接(Connection)。通过连接池可…

[图表]pyecharts模块-反转柱状图

[图表]pyecharts模块-反转柱状图 先来看代码&#xff1a; from pyecharts import options as opts from pyecharts.charts import Bar from pyecharts.faker import Fakerc (Bar().add_xaxis(Faker.choose()).add_yaxis("商家A", Faker.values()).add_yaxis("…

图论与算法(2)图的基本表示

1. 图的分类 &#xff08;1&#xff09; 有向图和无向图&#xff1a; 有向图&#xff08;Directed Graph&#xff09;&#xff1a;图中的边具有方向&#xff0c;表示节点之间的单向关系。无向图&#xff08;Undirected Graph&#xff09;&#xff1a;图中的边没有方向&#x…

【Web开发技术】数据缓存中间件Redis(非关系型数据库)

文章目录 一、引言1、介绍2、五种常用数据类型 二、配置1、下载2、使用 三、使用1、命令行操作&#xff08;1&#xff09;字符串&#xff08;2&#xff09;哈希&#xff08;3&#xff09;列表&#xff08;4&#xff09;集合 set&#xff08;5&#xff09;有序集合 sorted set&a…

python基础练习题20道,快收藏起来检测自己吧 !

前言 嗨喽&#xff0c;大家好呀~这里是爱看美女的茜茜呐 实例001&#xff1a;数字组合 题目&#xff1a; 有四个数字&#xff1a;1、2、3、4&#xff0c;能组成多少个互不相同且无重复数字的三位数&#xff1f;各是多少&#xff1f; 答案跳转 实例002&#xff1a;“个税计…