nacos源码分析-服务注册(服务端)

news2025/1/10 10:46:45

安装Nacos源码

上一篇文章我们了解了《Nacos服务注册》客户端源码,本篇文章我们来看一下服务注册Nacos服务端的源码执行情况。首先需要下载Nacos源码, https://github.com/alibaba/nacos/releases/tag/1.4.3 ,在这里插入图片描述
解压之后使用IDEA工具导入即可。

在这里插入图片描述
但是编译过后发现代码会报错,主要是缺少实体类,比如:
在这里插入图片描述

安装protobuf

这主要是应该nacos数据通信底层使用到protobuf进行序列化(与JSON类似),是Google提供的一种数据序列化协议

Protocol Buffers 是一种轻便高效的结构化数据存储格式,可以用于结构化数据序列化,很适合做数据存储或 RPC 数据交换格式。它可用于通讯协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式。

所以这里我们需要安装protobuf ,先去下载 https://github.com/protocolbuffers/protobuf/releases,下载window版本如下:
在这里插入图片描述

  • 下载之后解压

在这里插入图片描述

  • 然后需要配置环境变量

在这里插入图片描述

  • 找到consistency模块,进入src/main

在这里插入图片描述

  • 进入main目录,执行cmd命令
protoc --java_out=./java ./proto/consistency.proto
protoc --java_out=./java ./proto/Data.proto

效果如下:
在这里插入图片描述

启动Nacos

找到console控制台,启动Nacos,第一次启动会报错,因为默认是以集群方式启动,会出现jdbc.properties找不到的错误
在这里插入图片描述

  • 然后指定为单机启动,指定VM参数

在这里插入图片描述

  • 启动成功

在这里插入图片描述

  • 访问 http://localhost:8848/nacos/index.html 进入控制台

在这里插入图片描述

  • 到这里,nacos服务端的源码就启动成功了,那么我们尝试启动nacos-client程序,让他注册到nacos-server

在这里插入图片描述

  • 查看控制台,nacos-client成功注册到服务端

在这里插入图片描述

服务注册

在上一章节《Nacos源码分析-服务注册(客户端)》我们有分析到,nacos-client提交注册的地址是post /nacos/v1/ns/instance,那么我们在nacos-server源码中找到该接口,它位于 naming 模块中的/controllers包下的InstanceController接口中。源码如下

@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {
    
    @Autowired
    private SwitchDomain switchDomain;
    
    @Autowired
    private PushService pushService;
    
    @Autowired
    private ServiceManager serviceManager;
    
  	...省略...
    
    /**
      注册一个新的实例
     * Register new instance.
     *
     * @param request http request
     * @return 'ok' if success
     * @throws Exception any error during register
     */
	@CanDistro
    @PostMapping
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    //request请求对象中包括了注册的服务的port,namespaceId,groupName,serviceName,ip,集群名等等
    public String register(HttpServletRequest request) throws Exception {
        //拿到注册的服务的:namespaceId,默认是public
        final String namespaceId = WebUtils
                .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        //拿到注册的服务的:serviceName服务名会把组名加在前面,比如:DEFAULT_GROUP@@nacos-client
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        //检查服务名的格式:groupName@@serviceName
        NamingUtils.checkServiceNameFormat(serviceName);
        //解析请求参数,封装服务实例对戏,把注册的服务封装为Instance,其中包括IP,端口,服务名等
        final Instance instance = parseInstance(request);
        //使用ServiceManger注册服务实例
        serviceManager.registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }

	//解析要注册的服务实例
	private Instance parseInstance(HttpServletRequest request) throws Exception {
        //拿到服务名 DEFAULT_GROUP@@nacos-client
        String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        //拿到app,没配置就是:unknown
        String app = WebUtils.optional(request, "app", "DEFAULT");
        //拿到注册服务的:IP,是否开启服务,权重,健康状况,等封装为Instance 对象
        Instance instance = getIpAddress(request);
        instance.setApp(app);
        instance.setServiceName(serviceName);
        // Generate simple instance id first. This value would be updated according to
        // 生成实例的ID:192.168.174.1#8080#DEFAULT#DEFAULT_GROUP@@nacos-client
        instance.setInstanceId(instance.generateInstanceId());
        //设置最后的心跳时间为当前时间
        instance.setLastBeat(System.currentTimeMillis());
        String metadata = WebUtils.optional(request, "metadata", StringUtils.EMPTY);
        if (StringUtils.isNotEmpty(metadata)) {
            instance.setMetadata(UtilsAndCommons.parseMetadata(metadata));
        }
        //验证实例
        instance.validate();
        
        return instance;
    }

register方法中会从请求对象中拿到注册的参数比如IP,是否开启服务,权重,健康状况等,然后封装为 instance对象,交给 serviceManager.registerInstance 去注册,下面是 serviceManager.registerInstance的源码

缓存和初始化serivce

@Component
public class ServiceManager implements RecordListener<Service> {
    
    /**
     * Map(namespace, Map(group::serviceName, Service)).
     */
    private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
  
   ...省略部分代码...
	//注册服务实例
	public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
	        //1.会尝试从serviceMap(服务注册表)中获取到服务实例,如果没有就会创建一个Service,
	        // 并设置好属性:GroupName,namespaceId,serviceName。然后存储到ServiceManager的一个ConcurrentHashMap中
	        // 服务注册表的结构是Map<String,Map<String,Service>>
	        createEmptyService(namespaceId, serviceName, instance.isEphemeral());
	        //从注册表中获取服务,注册表是一个Map<String,Map<String,Service>>结构,
	        // 先根据namespaceId取得到Map<String,Service>,然后再根据serviceName取Service
	        Service service = getService(namespaceId, serviceName);
	        //参数无效,没有找到服务
	        if (service == null) {
	            throw new NacosException(NacosException.INVALID_PARAM,
	                    "service not found, namespace: " + namespaceId + ", service: " + serviceName);
	        }
	        //添加 instance 服务实例到注册表
	        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
	    }
	    
	    ...省略部分代码...
	    
		//二.创建service,并初始化
		public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
		            throws NacosException {
	        Service service = getService(namespaceId, serviceName);
	        //如果服务不存在就创建一个service
	        if (service == null) {
	            
	            Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
	            service = new Service();
	            service.setName(serviceName);
	            service.setNamespaceId(namespaceId);
	            service.setGroupName(NamingUtils.getGroupName(serviceName));
	            // now validate the service. if failed, exception will be thrown
	            service.setLastModifiedMillis(System.currentTimeMillis());
	            service.recalculateChecksum();
	            if (cluster != null) {
	                cluster.setService(service);
	                service.getClusterMap().put(cluster.getName(), cluster);
	            }
	            service.validate();
	            //保存service和初始化service
	            putServiceAndInit(service);
	            if (!local) {
	                addOrReplaceService(service);
	            }
	        }
	    }
		//保存service和初始化service
		private void putServiceAndInit(Service service) throws NacosException {
				//保存service
		        putService(service);
		        service = getService(service.getNamespaceId(), service.getName());
		        //初始化service
		        service.init();
		        //consistencyService.listen实现数据一致性监听
		        consistencyService
		                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
		        consistencyService
		                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
		        Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
		    }
		
	  	//保存service到注册表中
	  	public void putService(Service service) {
	        if (!serviceMap.containsKey(service.getNamespaceId())) {
	            synchronized (putServiceLock) {
	                if (!serviceMap.containsKey(service.getNamespaceId())) {
	                    serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
	                }
	            }
	        }
	        //把注册的服务存储到Map中
	        serviceMap.get(service.getNamespaceId()).putIfAbsent(service.getName(), service);
	    }

registerInstance做了三个事情

  • 通过putService()方法将服务缓存到内存

  • service.init()建立心跳机制

  • consistencyService.listen实现数据一致性监听

registerInstance方法会尝试从ServiceManager#serviceMap(服务注册表)中获取到服务实例,如果没有就会创建一个Service,并设置好属性:GroupName,namespaceId,serviceName。然后存储到ServiceManager#serviceMap中。

该Map是一个ConcurrentHashMap,结构是Map<String,Map<String,Service>>。第一个Key是NamespaceId 如:public ,第二个key是服务名,如 : DEFAULT_GROUP@@nacos-client

在这里插入图片描述

这就是nacos中的的服务注册表,用来存放注册的服务实例的Map.

在这里插入图片描述
注意:service和instance的关系是,一个service中包含一个 Map<String, Cluster> , 一个Cluster中包含一个 Set。

  • service代表一个服务:比如用户服务
  • Cluster代表服务集群,比如2个用户服务形成一个集群
  • 而一个集群中有多个服务实例,所以Cluster中有了Set 来保存服务实例

除此之外还会调用 com.alibaba.nacos.naming.core.Service#init 方法对service进行初始化,下面是init方法的源码

public void init() {
		//clientBeatCheckTask 是一个Runnable,它持有service,它的作用是
       //检查并更新临时实例的状态,如果它们已过期,则将其删除
        HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
        for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
            entry.getValue().setService(this);
            entry.getValue().init();
        }
    }
//定时任务:定时检查服务的健康状况,5S一次
 public static void scheduleCheck(ClientBeatCheckTask task) {
        futureMap.computeIfAbsent(task.taskKey(),
                k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
    }

service.init 初始化方法中主要是把service封装到 ClientBeatCheckTask 对象中,ClientBeatCheckTask 是一个Runnable线程对象,然后使用定时任务5s执行一次健康检查。 ClientBeatCheckTask 的作用是 : 检查并更新临时实例的状态,如果它们已过期,则将其删除

下面是 com.alibaba.nacos.naming.healthcheck.ClientBeatCheckTask#run 线程对象的源码

public void run() {
        try {
            if (!getDistroMapper().responsible(service.getName())) {
                return;
            }
            
            if (!getSwitchDomain().isHealthCheckEnabled()) {
                return;
            }
            //拿到服务中的所有实例
            List<Instance> instances = service.allIPs(true);
            
            // first set health status of instances:
            for (Instance instance : instances) {
            	//当前系统时间 - 实例最后心跳时间 > 默认15s,就意味着超时
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                    if (!instance.isMarked()) {
                        if (instance.isHealthy()) {
                        	//健康状态设置为false
                            instance.setHealthy(false);
                            Loggers.EVT_LOG
                                    .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                            instance.getIp(), instance.getPort(), instance.getClusterName(),
                                            service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                            instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                             //发布时间:服务状态改变
                            getPushService().serviceChanged(service);
                            //发布时间:服务实例心跳超时事件
                            ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                        }
                    }
                }
            }
            
            if (!getGlobalConfig().isExpireInstance()) {
                return;
            }
            
            // then remove obsolete instances:
            for (Instance instance : instances) {
                
                if (instance.isMarked()) {
                    continue;
                }
                
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                    // delete instance
                    Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                            JacksonUtils.toJson(instance));
                    deleteIp(instance);
                }
            }
            
        } catch (Exception e) {
            Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
        }
        
    }

run方法中会拿到当前service的所有instance,然后循环 , 如果:当前系统时间 - 实例最后心跳时间 > 默认15s,就意味着超时,然后会改变instance的Healthy健康状态Wie false; 并抛出 服务实例心跳超时事件

getPushService().serviceChanged(service):方法很有意思,他的作用是通知 nacos-client该服务已经下线(UDP协议 push),这样的话nacos-client就会从本地剔除掉下线的服务。这就是它和eureka不一样的地方,eureka使用的是pull.而 nacos采用pull + push模式。 具体源码见: PushService#onApplicationEvent

 public void onApplicationEvent(ServiceChangeEvent event) {
        Service service = event.getService();
        String serviceName = service.getName();
        String namespaceId = service.getNamespaceId();
        //使用定时任务 1s 一次
        Future future = GlobalExecutor.scheduleUdpSender(() -> {
            try {
            	//服务改变,添加到 push队列
                Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
                ConcurrentMap<String, PushClient> clients = clientMap
                        .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
                if (MapUtils.isEmpty(clients)) {
                    return;
                }
                
                Map<String, Object> cache = new HashMap<>(16);
                long lastRefTime = System.nanoTime();
                for (PushClient client : clients.values()) {
                    if (client.zombie()) {
                        Loggers.PUSH.debug("client is zombie: " + client.toString());
                        clients.remove(client.toString());
                        Loggers.PUSH.debug("client is zombie: " + client.toString());
                        continue;
                    }
                    
                    Receiver.AckEntry ackEntry;
                    Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
                    String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
                    byte[] compressData = null;
                    Map<String, Object> data = null;
                    if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
                        org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
                        compressData = (byte[]) (pair.getValue0());
                        data = (Map<String, Object>) pair.getValue1();
                        
                        Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
                    }
                    
                    if (compressData != null) {
                        ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
                    } else {
                        ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
                        if (ackEntry != null) {
                            cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
                        }
                    }
                    
                    Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
                            client.getServiceName(), client.getAddrStr(), client.getAgent(),
                            (ackEntry == null ? null : ackEntry.key));
                    //使用udp协议push
                    udpPush(ackEntry);
                }
            } catch (Exception e) {
                Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);
                
            } finally {
                futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
            }
            
        }, 1000, TimeUnit.MILLISECONDS);
        
        futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
        
    }

添加instance

到这里,service的缓存和初始化就看完了,代码回到 com.alibaba.nacos.naming.core.ServiceManager#registerInstance 。接下来就是分析 addInstance方法

//添加一个instance到Add instance to service.
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
            throws NacosException {
        //拿到key: com.alibaba.nacos.naming.iplist.ephemeral.public##DEFAULT_GROUP@@nacos-client
        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
        //拿到service
        Service service = getService(namespaceId, serviceName);
        //对service加同步锁,避免并发修改
        synchronized (service) {
        	//拿到该service中的所有instance
            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
            //把实例列表封装到Instances 对象中
            Instances instances = new Instances();
            instances.setInstanceList(instanceList);
            //调用consistencyService.put()方法完成Nacos集群的数据同步,保证集群一致性
            consistencyService.put(key, instances);
        }
    }

addInstance方法中会拿到service中的List<Instance>实例列表,然后设置到 Instances 中,调用 consistencyService去同步到nacos集群。

这里采用了CopyOnWrite方案。对于 addIPAddress方法会拷贝旧的实例列表添加到新实例到列表中。在同步完nacos集群后,完成对实例状态更新后,则会用新列表直接覆盖旧实例列表。而在更新过程中,旧实例列表不受影响,用户依然可以读取。

这样在更新列表状态过程中,无需阻塞用户的读操作,也不会导致用户读取到脏数据,性能比较好。这种方案称为CopyOnWrite方案

consistencyService是用作service同步的。代表集群一致性的接口。

在这里插入图片描述

下面看一下 consistencyService.put 方法,底层会调用 DistroConsistencyServiceImpl#put 方法,源码如下

@Override
public void put(String key, Record value) throws NacosException {
    //根据key确定是用ephemeralConsistencyService或者persistentConsistencyService
    mapConsistencyService(key).put(key, value);
}

private ConsistencyService mapConsistencyService(String key) {
		//key以 ephemeral 开头就是临时实例
		// 临时实例选择 ephemeralConsistencyService,也就是 DistroConsistencyServiceImpl类
    	//  持久实例选择 persistentConsistencyService,也就是PersistentConsistencyServiceDelegateImpl
        return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
    }

//初始化方法,
@PostConstruct
public void init() {
		//把notifier提交给线程池
    GlobalExecutor.submitDistroNotifyTask(notifier);
}
    
@Override
public void put(String key, Record value) throws NacosException {
		//把实例保存到本地实例表
        onPut(key, value);
        //使用distro协议同步到集群
        distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
                globalConfig.getTaskDispatchPeriod() / 2);
}




put方法中,会先先根据服务的key判断使用临时同步服务ephemeralConsistencyService ,或者持久同步服务persistentConsistencyService。然后会做2个事情

  • 调用onPut :把实例保存到本地实例列表 。
  • 调用distroProtocol.sync把实例同步到集群

更新服务列表

对于onPut 方法中做了2个事情.

  • 一个是把实例封装到Datum对象中,然后交给dataStore存储起来。
  • 另一个是通过notifier.addTask 把key放入阻塞队列,然后会通过线程池异步去执行阻塞队列
 public void onPut(String key, Record value) {
        //判断是否是临时实例
        if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
            Datum<Instances> datum = new Datum<>();
            datum.value = (Instances) value;
            datum.key = key;
            datum.timestamp.incrementAndGet();
            //把数据存储到dataStore,内部维护了一个Map
            dataStore.put(key, datum);
        }
        
        if (!listeners.containsKey(key)) {
            return;
        }
        //这里是把key放入一个阻塞队列,然后会用线程池异步去执行队列
        notifier.addTask(key, DataOperation.CHANGE);
    }

 public class Notifier implements Runnable {
        
        private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
       
        //一个阻塞队列
        private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
		public void addTask(String datumKey, DataOperation action) {
            
            if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
                return;
            }
            if (action == DataOperation.CHANGE) {
            	//如果是change,就把key放入一个map中
                services.put(datumKey, StringUtils.EMPTY);
            }
            //加入阻塞队列
            tasks.offer(Pair.with(datumKey, action));
        }

		 @Override
         public void run() {
            Loggers.DISTRO.info("distro notifier started");
            
            for (; ; ) {
                try {
                	//从阻塞队列中取出任务
                    Pair<String, DataOperation> pair = tasks.take();
                    //处理任务更新服务列表
                    handle(pair);
                } catch (Throwable e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
                }
            }
        }
        

Notifier是一个Runnable,其中维护了一个tasks(ArrayBlockingQueue)用来存储服务列表的变更事件。他的run方法中是一个死循环,不停的从阻塞队列中取出任务交给handle方法去处理。下面是 DistroConsistencyServiceImpl.Notifier#handle方法

private void handle(Pair<String, DataOperation> pair) {
            try {
                String datumKey = pair.getValue0();
                DataOperation action = pair.getValue1();
                
                services.remove(datumKey);
                
                int count = 0;

                ConcurrentLinkedQueue<RecordListener> recordListeners = listeners.get(datumKey);
                if (recordListeners == null) {
                    Loggers.DISTRO.info("[DISTRO-WARN] RecordListener not found, key: {}", datumKey);
                    return;
                }
                //拿到有change的service,RecordListener 就是 service的接口
                for (RecordListener listener : recordListeners) {
                    
                    count++;
                    
                    try {
                    	//如果是change事件
                        if (action == DataOperation.CHANGE) {
                        	//取出服务
                            Datum datum = dataStore.get(datumKey);
                            if (datum != null) {
                            	//执行linster的change事件。更新服务列表
                                listener.onChange(datumKey, datum.value);
                            } else {
                                Loggers.DISTRO.info("[DISTRO-WARN] data not found, key: {}", datumKey);
                            }
                            continue;
                        }
                        //处理服务的delete事件
                        if (action == DataOperation.DELETE) {
                            listener.onDelete(datumKey);
                            continue;
                        }
                    } catch (Throwable e) {
                        Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
                    }
                }
                
                if (Loggers.DISTRO.isDebugEnabled()) {
                    Loggers.DISTRO
                            .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
                                    datumKey, count, action.name());
                }
            } catch (Throwable e) {
                Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
            }
        }

handle方法中会找到有变化的RecordListener,其实就是service(change 或者 delete事件)然后,触发onChange方法,其实就是调用 com.alibaba.nacos.naming.core.Service#onChange方法。

 public void onChange(String key, Instances value) throws Exception {
        
        Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
        //遍历service中的所有实例instance
        for (Instance instance : value.getInstanceList()) {
            
            if (instance == null) {
                // Reject this abnormal instance list:
                throw new RuntimeException("got null instance " + key);
            }
            
            if (instance.getWeight() > 10000.0D) {
            	//设置权重
                instance.setWeight(10000.0D);
            }
            
            if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
                instance.setWeight(0.01D);
            }
        }
        //修改IP
        updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
        
        recalculateChecksum();
    }

该方法中会调用updateIPS去更新服务实例,源码如下

public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
    // 准备一个HashMap,key是cluster,值是集群下的Instance集合
    Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
    // 获取集群名称存储到map中,key是集群名
    for (String clusterName : clusterMap.keySet()) {
        ipMap.put(clusterName, new ArrayList<>());
    }
    // 遍历要更新的实例
    for (Instance instance : instances) {
        try {
            if (instance == null) {
                Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
                continue;
            }
			// 判断实例是否包含clusterName,没有的话用默认cluster
            if (StringUtils.isEmpty(instance.getClusterName())) {
                instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
            }
			// 判断cluster是否存在,不存在则创建新的cluster
            if (!clusterMap.containsKey(instance.getClusterName())) {
                Loggers.SRV_LOG
                    .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                          instance.getClusterName(), instance.toJson());
                Cluster cluster = new Cluster(instance.getClusterName(), this);
                cluster.init();
                getClusterMap().put(instance.getClusterName(), cluster);
            }
			// 获取当前cluster实例的集合,不存在则创建新的
            List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
            if (clusterIPs == null) {
                clusterIPs = new LinkedList<>();
                ipMap.put(instance.getClusterName(), clusterIPs);
            }
			// 添加新的实例到 Instance 集合
            clusterIPs.add(instance);
        } catch (Exception e) {
            Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
        }
    }

    for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
        //make every ip mine
        List<Instance> entryIPs = entry.getValue();
        // 这里就是在更新注册表
        clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
    }
	//设置最后修改时间
    setLastModifiedMillis(System.currentTimeMillis());
    // 发布服务变更的通知消息
    getPushService().serviceChanged(this);
    StringBuilder stringBuilder = new StringBuilder();

    for (Instance instance : allIPs()) {
        stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
    }

    Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
                         stringBuilder.toString());

}

上面代码中 ,clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral); 就是 在更新服务注册表,因为service#clusterMap 是一个Map<String, Cluster> 结构,cluster中就是服务实例。然后会调用 .Cluster#updateIps去更新实例。源码如下

public void updateIps(List<Instance> ips, boolean ephemeral) {

    Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
	//拿到旧的服务列表,
    HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
	
    for (Instance ip : toUpdateInstances) {
        oldIpMap.put(ip.getDatumKey(), ip);
    }
    ...省略部分代码...

	// 检查新加入实例的状态
    List<Instance> newIPs = subtract(ips, oldIpMap.values());
    
    if (newIPs.size() > 0) {
        Loggers.EVT_LOG
            .info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(),
                  getName(), newIPs.size(), newIPs.toString());

        for (Instance ip : newIPs) {
       		 //重置服务的健康状态
            HealthCheckStatus.reset(ip);
        }
    }
	// 移除要删除的实例
    List<Instance> deadIPs = subtract(oldIpMap.values(), ips);

    if (deadIPs.size() > 0) {
        Loggers.EVT_LOG
            .info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(),
                  getName(), deadIPs.size(), deadIPs.toString());

        for (Instance ip : deadIPs) {
        	//移除
            HealthCheckStatus.remv(ip);
        }
    }

    toUpdateInstances = new HashSet<>(ips);
	
    if (ephemeral) {
    // 直接覆盖旧实例列表
        ephemeralInstances = toUpdateInstances;
    } else {
        persistentInstances = toUpdateInstances;
    }
}

同步服务到集群

接下来回到 DistroConsistencyServiceImpl#put方法中。刚才说到该方法做了2个事情

  • onPut(key, value) : 更新服务列表
  • distroProtocol.sync :同步服务到集群

我们现在来看一下sync方法是怎么做的,下面是方法的源码

/**
     * 开始同步数据到所有的远程服务
     * Start to sync data to all remote server.
     *
     * @param distroKey distro key of sync data
     * @param action    the action of data operation
     */
    public void sync(DistroKey distroKey, DataOperation action, long delay) {
        //拿到除开自己以外的所有nacos集群中的成员
        for (Member each : memberManager.allMembersWithoutSelf()) {
            //构建一个key
            DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
                    each.getAddress());
            //构建一个延迟任务对象
            DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
            //交给线程池去执行,维护了一个DistroDelayTaskExecuteEngine
            //任务交给 NacosDelayTaskExecuteEngine 引擎 其中维护了一个ScheduledExecutorService线程池
            distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
            if (Loggers.DISTRO.isDebugEnabled()) {
                Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
            }
        }
    }

该方法中会找到所有的nacos集群的成员(除开自己),然后会拿到服务的key(DistroKey )构建一个DistroDelayTask任务对象,交给线程池去执行同步。

这里维护了一个 DelayTaskExecuteEngine 延迟任务执行引擎NacosDelayTaskExecuteEngine,任务的执行通过引擎的 processTasks方法完成com.alibaba.nacos.common.task.engine.NacosDelayTaskExecuteEngine#processTasks

protected void processTasks() {
		//拿到所有任务
        Collection<Object> keys = getAllTaskKeys();
        for (Object taskKey : keys) {
            AbstractDelayTask task = removeTask(taskKey);
            if (null == task) {
                continue;
            }
            //任务执行器
            NacosTaskProcessor processor = getProcessor(taskKey);
            if (null == processor) {
                getEngineLog().error("processor not found for task, so discarded. " + task);
                continue;
            }
            try {
                // ReAdd task if process failed
                //执行任务,任务失败会重试
                if (!processor.process(task)) {
                    retryFailedTask(taskKey, task);
                }
            } catch (Throwable e) {
                getEngineLog().error("Nacos task execute error : " + e.toString(), e);
                //重试失败的任务
                retryFailedTask(taskKey, task);
            }
        }
    }

总结

文章有点长,下面做个总结,从大的流程上来说分为如下几个步骤

  1. instanceController接口: nacos服务点接受到注册请求后会把请求解析为Instance,紧接着会执行serviceManager#registerInstance方法注册实例
  2. serviceManager#registerInstance方法中会先尝试创建Service对象,并缓存到一个Map<String, Map<String, Service>> 结构的服务注册表中,然后对每个service做初始化,主要是使用线程池10s一次检查服务是否健康状态,过期的服务会删除掉。
  3. serviceManager#registerInstance第二个事情就是执行addInstances方法添加实例,该方法会触发服务列表的更新以及把服务同步到其他nacos集群中。

在这里插入图片描述
文章到这里就结束了,如果文章对你有所帮助,请给个好评,你的鼓励是我最大的动力

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/118499.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Web3中文|为什么去中心化存储对NFT元数据很重要

图中文字&#xff1a;哦&#xff0c;看&#xff0c;FTX用Web2 API托管了所有在其平台上铸造的NFT&#xff0c;现在所有这些NFT的元数据都被破坏了&#xff0c;并且链接到了一个重组的网站。 这本不应该发生。但对于任何不考虑元数据和如何存储元数据的NFT项目来说&#xff0c;…

docker(5):Dockerfile

目录Dockerfile介绍Dockerfile常用指令案例&#xff1a;构建tomcat镜像Dockerfile介绍 Dockerfile 是一个用来构建镜像的文本文件&#xff0c;文本内容包含了一条条构建镜像所需的指令和说明&#xff0c;每条指令都会创建一个新的镜像层并对镜像进行提交。 Dockerfile 一般分…

【Django】第一课 基于Django图书借阅管理网站平台

概念 django服务器开发框架是一款基于Python编程语言用于web服务器开发的框架&#xff0c;采用的是MTV架构模式进行分层架构。 项目搭建 打开pycharm开发软件&#xff0c;打开开发软件的内置dos窗口操作命令行 在这里指定项目存放的磁盘路径&#xff0c;并使用创建django项目…

编辑器:保存格式化修复配置

规范化条目 制表符长度&#xff1a;2&#xff0c;缩进模式&#xff1a;2个空格&#xff0c;换行符&#xff1a;lf&#xff0c;末尾加分号&#xff0c;js单引号&#xff0c;冒号后一个空格&#xff0c;运算符前后一个空格&#xff0c;大括号&#xff08;有内容的&#xff09;首…

项目实战之旅游网(八)后台产品管理(下)

目录 一.上传产品图片 二.修改产品 三.上下架产品 一.上传产品图片 在新增产品时&#xff0c;我们还需要上传产品图片。我们采用异步上传的方法进行图片上传。 1.在conmmon_ resources.html 中引入jqueryform.js 2.修改product_ add.html 页面 点击保存 &#xff0c;自动…

【Spring【IOC】】——18、自定义组件中如何注入Spring底层的组件?

&#x1f4eb;作者简介&#xff1a;zhz小白 公众号&#xff1a;小白的Java进阶之路 专业技能&#xff1a; 1、Java基础&#xff0c;并精通多线程的开发&#xff0c;熟悉JVM原理 2、熟悉Java基础&#xff0c;并精通多线程的开发&#xff0c;熟悉JVM原理&#xff0c;具备⼀定的线…

LeetCode283.移动0

思路1 分析 在i位置遇到0&#xff0c;把后面的元素向前移动覆盖&#xff0c;然后把最后一个位置赋值为0即可 注意问题&#xff1a; 可能 i 一个位置 移动一次之后还是0&#xff0c;需要循环 有可能 i 位置的0 是因为 已经所有的0都到后面了 ​ 所以需要用count记录0的个数&am…

2022年区块链安全领域8成以上损失集中在DeFi和跨链桥

近期&#xff0c;欧科云链研究院上线《2022年全球区块链生态安全态势报告》&#xff0c;报告指出2022年区块链安全领域8成以上损失集中在DeFi和跨链桥&#xff0c;钓鱼攻击是最常见攻击手法。主要结论 2022年前11个月&#xff0c;OKLink共监测到区块链生态相关安全事件275起&a…

整理leetcode中”最长...“

1.最长公共子序列&#xff08;动态规划&#xff09;剑指offer95 输入&#xff1a;text1 “abcde”, text2 “ace” 输出&#xff1a;3 解释&#xff1a;最长公共子序列是 “ace” &#xff0c;它的长度为 3 。 Q1&#xff1a;为什么想到二维dp&#xff1f; A1&#xff1a;因…

JDBC第二章 (JDBC API详解)

目录 一、下载驱动包 二、加载与注册驱动 1、使用driverManager类 2、方式&#xff1a; 3、补充&#xff1a; 三、建立连接 1、URL 2.建立连接的方式 3.事务管理 4.获取Statement语句 1、普通版本 2、防止SQL注入版本 3、获取存储过程 四、Statement 1、概述 2…

数图互通高校房产管理——房屋模拟分配建设

数图互通房产管理系统在这方面做得比较全面&#xff1b; 1、 房屋模拟分配建设方案 实现对学校房屋分配进行情景模拟&#xff0c;在特定房屋类型、数量、使用面积等情况下&#xff0c;建立多个模拟分配方案&#xff0c;并对每个模拟分配方案生成明细清单。 1.1 房屋模拟分配清…

石墨烯太阳能供暖远程监控

石墨烯太阳能供暖系统是指采用全新一代石墨烯碳纤维电热膜为发热体&#xff0c;直接将电能转换为热能的供暖系统。再搭配太阳能光伏发电系统给石墨烯供暖系统供电&#xff0c;更加节能有效地解决用户用电问题。但目前这种供暖方式也存在诸多问题&#xff0c;如供暖温度得不到控…

深度学习交通标志识别项目

主要内容 在本文中&#xff0c;使用Python编程语言和库Keras和OpenCV建立CNN模型&#xff0c;成功地对交通标志分类器进行分类&#xff0c;准确率达96%。开发了一款交通标志识别应用程序&#xff0c;该应用程序具有图片识别和网络摄像头实时识别两种工作方式。 写作目的 近年…

jenkins 升级遇到问题总结

当我在使用jenkins的时候,避免不了下载很多插件,因为jenkins本身不提供很多功能,大部分的功能都是依赖插件来完成的,这也使jenkins更具有扩展性,但是,我在安装完成后打开插件列表居然是这样的。。。 或者插件列表打开的正常,但是安装某个插件时报这样的错误。。。 看标…

c++算法基础必刷题目——尺取法

文章目录尺取法1、字符串2、丢手绢尺取法 尺取法通常也叫滑动窗口法&#xff0c;顾名思义&#xff0c;像尺子一样取一段&#xff0c;借用挑战书上面的话说&#xff0c;尺取法通常是对数组保存一对下标&#xff0c;即所选取的区间的左右端点&#xff0c;然后根据实际情况不断地推…

Html网页和C++ App通信 - qwebchannel

Qt5 引入了 Qt WebChannel 的概念。这是为了在不能影响各端代码执行的前提下实现 Qt 端于 client 端的无缝 双向 通信。 QWebChannel 提供了在 C应用和 前端&#xff08;HTML/JS&#xff09;之间点对点的通信能力。通过向 前端的 QWebChannel 发布 QObject 的 派生对象&#xf…

开源版支持工作台展示,新增超级管理员用户组,MeterSphere开源持续测试平台v2.5.0发布

2022年12月27日&#xff0c;MeterSphere一站式开源持续测试平台正式发布v2.5.0版本。 在这一版本中&#xff0c;MeterSphere在工作台模块进行了UX交互升级&#xff0c;并将工作台模块由X-Pack增强功能开放为开源版功能。 在测试跟踪模块中&#xff0c;关联测试用例支持关联UI…

(四)RequestResponse

一、Request 和 Response 的概述 Request是请求对象&#xff0c;Response是响应对象。request&#xff1a;获取请求数据 &#xff08;1&#xff09;浏览器会发送HTTP请求到后台服务器 [Tomcat] &#xff08;2&#xff09;HTTP的请求中会包含很多请求数据[请求行请求头请求体] &…

26位前谷歌AI专家出走创业

细数近几年来高科技对现代社会的影响&#xff0c;人工智能&#xff08;AI&#xff09;无疑是排在前列。AI已经对人类社会行为、健康、教育和娱乐的方方面面都产生了巨大冲击。作为高科技的头部企业&#xff0c;谷歌的AI团队可能是AI行业最有影响的团队之一&#xff0c;他们的一…

第十三讲:MSTP技术应用

学校因为教师的人数越来越多&#xff0c;部门逐渐也增多&#xff0c;各部门之间都已经采用了vlan技术&#xff0c;但为了实现公司的稳定性和消除内部网络的环路&#xff0c;管理员小赵配合飞越公司去实现学校内部网络时刻不间断&#xff0c;来保证公司网络的运行。 为了解决校园…