nacos 删除过期实例源码分析

news2025/1/22 2:14:59

nacos 删除过期实例也是注册中心的一个重要功能,今天我们从入口到结束分析一下,首先确定删除的入口在服务端注册接口的源码里,此处可以参考:参考注册源码

一、注册入口

1、创建空服务

    public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
        //创建空实例,删除过期实例入口,点击进入
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());

        Service service = getService(namespaceId, serviceName);

        if (service == null) {
            throw new NacosException(NacosException.INVALID_PARAM,
                    "service not found, namespace: " + namespaceId + ", service: " + serviceName);
        }
        //核心代码,注册实例入口
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }

点击 createEmptyService方法

    public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
        createServiceIfAbsent(namespaceId, serviceName, local, null);//点击进入
    }

继续进入

    public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
            throws NacosException {
        Service service = getService(namespaceId, serviceName);//先获取
        if (service == null) {

            Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
            service = new Service();
            service.setName(serviceName);
            service.setNamespaceId(namespaceId);
            service.setGroupName(NamingUtils.getGroupName(serviceName));
            // now validate the service. if failed, exception will be thrown
            service.setLastModifiedMillis(System.currentTimeMillis());
            service.recalculateChecksum();
            if (cluster != null) {
                cluster.setService(service);
                service.getClusterMap().put(cluster.getName(), cluster);//存放集群数据
            }
            service.validate();
            //核心代码,点击进入
            putServiceAndInit(service);
            if (!local) {//还没有处理监听,此时处理
                addOrReplaceService(service);
            }
        }
    }

 点击putServiceAndInit(service);来到

    private void putServiceAndInit(Service service) throws NacosException {
        putService(service);//实例存放到map中,点击
        service.init();//定时任务初始化,点击
        consistencyService
                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
        consistencyService
                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
        Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
    }

点击putService(service);来到:

    public void putService(Service service) {
        if (!serviceMap.containsKey(service.getNamespaceId())) {
            synchronized (putServiceLock) {
                if (!serviceMap.containsKey(service.getNamespaceId())) {
                    //创建内存注册表
                    serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
                }
            }
        }//创建内存注册表第二层数据
        serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
    }

 2、点击 service.init(); 来的 service类

   public void init() {
        HealthCheckReactor.scheduleCheck(clientBeatCheckTask);//定时任务执行
        for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
            entry.getValue().setService(this);
            entry.getValue().init();
        }
    }

3、找到 clientBeatCheckTask 类点击进入

@Override
    public void run() {
        try {
            if (!getDistroMapper().responsible(service.getName())) {
                return;
            }

            if (!getSwitchDomain().isHealthCheckEnabled()) {
                return;
            }

            List<Instance> instances = service.allIPs(true);

            // first set health status of instances:
            for (Instance instance : instances) {
                //如果某个实例超过15秒没有收到心跳,相关属性设置为false
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                    if (!instance.isMarked()) {
                        if (instance.isHealthy()) {
                            instance.setHealthy(false);//设置false
                            Loggers.EVT_LOG
                                    .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                            instance.getIp(), instance.getPort(), instance.getClusterName(),
                                            service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                            instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                            getPushService().serviceChanged(service);
                            ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                        }
                    }
                }
            }

            if (!getGlobalConfig().isExpireInstance()) {
                return;
            }

            // then remove obsolete instances:
            for (Instance instance : instances) {

                if (instance.isMarked()) {
                    continue;
                }
                //如果超过30秒没有收到心跳,直接剔除该实例,被剔除的实例如果恢复心跳,则会重新注册
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                    // delete instance
                    Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                            JacksonUtils.toJson(instance));
                    deleteIp(instance);//删除实例,点击进入
                }
            }

        } catch (Exception e) {
            Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
        }

    }

点击 deleteIp(instance);

    private void deleteIp(Instance instance) {

        try {
            NamingProxy.Request request = NamingProxy.Request.newRequest();
            request.appendParam("ip", instance.getIp()).appendParam("port", String.valueOf(instance.getPort()))
                    .appendParam("ephemeral", "true").appendParam("clusterName", instance.getClusterName())
                    .appendParam("serviceName", service.getName()).appendParam("namespaceId", service.getNamespaceId());

            String url = "http://" + IPUtil.localHostIP() + IPUtil.IP_PORT_SPLITER + EnvUtil.getPort() + EnvUtil.getContextPath()
                    + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance?" + request.toUrl();

            // delete instance asynchronously:开始异步删除,点击
            HttpClient.asyncHttpDelete(url, null, null, new Callback<String>() {
                @Override
                public void onReceive(RestResult<String> result) {
                    if (!result.ok()) {
                        Loggers.SRV_LOG
                                .error("[IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {}",
                                        instance.toJson(), result.getMessage(), result.getCode());
                    }
                }

                @Override
                public void onError(Throwable throwable) {
                    Loggers.SRV_LOG
                            .error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(),
                                    throwable);
                }

                @Override
                public void onCancel() {

                }
            });

        } catch (Exception e) {
            Loggers.SRV_LOG
                    .error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(), e);
        }
    }

 4、点击HttpClient.asyncHttpDelete

    public static void asyncHttpDelete(String url, List<String> headers, Map<String, String> paramValues,
            Callback<String> callback) throws Exception {
        //删除实例接口,调到InstanceController的deregister接口
        asyncHttpRequest(url, headers, paramValues, callback, HttpMethod.DELETE);
    }

点击 asyncHttpRequest 方法

    public static void asyncHttpRequest(String url, List<String> headers, Map<String, String> paramValues,
            Callback<String> callback, String method) throws Exception {

        Query query = Query.newInstance().initParams(paramValues);
        query.addParam("encoding", "UTF-8");
        query.addParam("nofix", "1");

        Header header = Header.newInstance();
        if (CollectionUtils.isNotEmpty(headers)) {
            header.addAll(headers);
        }
        header.addParam(HttpHeaderConsts.ACCEPT_CHARSET, "UTF-8");
        AuthHeaderUtil.addIdentityToHeader(header);
        switch (method) {//异步增删改查服务
            case HttpMethod.GET:
                ASYNC_REST_TEMPLATE.get(url, header, query, String.class, callback);
                break;
            case HttpMethod.POST:
                ASYNC_REST_TEMPLATE.postForm(url, header, paramValues, String.class, callback);
                break;
            case HttpMethod.PUT:
                ASYNC_REST_TEMPLATE.putForm(url, header, paramValues, String.class, callback);
                break;
            case HttpMethod.DELETE://删除接口,点击
                ASYNC_REST_TEMPLATE.delete(url, header, query, String.class, callback);
                break;
            default:
                throw new RuntimeException("not supported method:" + method);
        }
    }

 5、点击 ASYNC_REST_TEMPLATE.delete(url, header, query, String.class, callback);

来到 NacosAsyncRestTemplate extends AbstractNacosRestTemplate 类

    public <T> void delete(String url, Header header, Query query, Type responseType, Callback<T> callback) {
        //点击
        execute(url, HttpMethod.DELETE, new RequestHttpEntity(header, query), responseType, callback);
    }

点击 execute 方法

    private <T> void execute(String url, String httpMethod, RequestHttpEntity requestEntity, Type type,
            Callback<T> callback) {
        try {
            URI uri = HttpUtils.buildUri(url, requestEntity.getQuery());
            if (logger.isDebugEnabled()) {
                logger.debug("HTTP method: {}, url: {}, body: {}", httpMethod, uri, requestEntity.getBody());
            }
            ResponseHandler<T> responseHandler = super.selectResponseHandler(type);
            //正式调用
            clientRequest.execute(uri, httpMethod, requestEntity, responseHandler, callback);
        } catch (Exception e) {
            // When an exception occurs, use Callback to pass it instead of throw it directly.
            callback.onError(e);
        }
    }

 最终调到com.alibaba.nacos.naming.controllers.InstanceController#deregister

二、删除入口

来到com.alibaba.nacos.naming.controllers.InstanceController#deregister方法

    @CanDistro
    @DeleteMapping//删除实例
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public String deregister(HttpServletRequest request) throws Exception {
        Instance instance = getIpAddress(request);
        String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);

        Service service = serviceManager.getService(namespaceId, serviceName);
        if (service == null) {
            Loggers.SRV_LOG.warn("remove instance from non-exist service: {}", serviceName);
            return "ok";
        }
        //正式删除,点击进入
        serviceManager.removeInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
        return "ok";
    }

1、点击 serviceManager.removeInstance 来到 

ServiceManager implements RecordListener<Service>
    public void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
            throws NacosException {
        Service service = getService(namespaceId, serviceName);

        synchronized (service) {//同步删除,进入
            removeInstance(namespaceId, serviceName, ephemeral, service, ips);
        }
    }

点击 removeInstance(namespaceId, serviceName, ephemeral, service, ips);

    private void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Service service,
            Instance... ips) throws NacosException {

        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
        //开始删除,点击进入
        List<Instance> instanceList = substractIpAddresses(service, ephemeral, ips);

        Instances instances = new Instances();
        instances.setInstanceList(instanceList);

        consistencyService.put(key, instances);//核心代码,点击
    }

点击 substractIpAddresses(service, ephemeral, ips);

   private List<Instance> substractIpAddresses(Service service, boolean ephemeral, Instance... ips)
            throws NacosException {
        //点击
        return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE, ephemeral, ips);
    }

 来到

    public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
            throws NacosException {

        Datum datum = consistencyService
                .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));

        List<Instance> currentIPs = service.allIPs(ephemeral);
        Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
        Set<String> currentInstanceIds = Sets.newHashSet();

        for (Instance instance : currentIPs) {
            currentInstances.put(instance.toIpAddr(), instance);
            currentInstanceIds.add(instance.getInstanceId());
        }

        Map<String, Instance> instanceMap;
        if (datum != null && null != datum.value) {
            instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
        } else {
            instanceMap = new HashMap<>(ips.length);
        }

        for (Instance instance : ips) {
            if (!service.getClusterMap().containsKey(instance.getClusterName())) {
                Cluster cluster = new Cluster(instance.getClusterName(), service);
                cluster.init();
                service.getClusterMap().put(instance.getClusterName(), cluster);
                Loggers.SRV_LOG
                        .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                                instance.getClusterName(), instance.toJson());
            }

            if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
                instanceMap.remove(instance.getDatumKey());//删除数据
            } else {
                Instance oldInstance = instanceMap.get(instance.getDatumKey());
                if (oldInstance != null) {
                    instance.setInstanceId(oldInstance.getInstanceId());
                } else {
                    instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
                }
                instanceMap.put(instance.getDatumKey(), instance);//添加数据
            }

        }

        if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
            throw new IllegalArgumentException(
                    "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils
                            .toJson(instanceMap.values()));
        }

        return new ArrayList<>(instanceMap.values());
    }

2、点击 consistencyService.put(key, instances);进入

com.alibaba.nacos.naming.consistency.DelegateConsistencyServiceImpl#put

    @Override
    public void put(String key, Record value) throws NacosException {
        mapConsistencyService(key).put(key, value);//进入
    }

点击 mapConsistencyService(key)

    private ConsistencyService mapConsistencyService(String key) {
        //临时ephemeralConsistencyService 阿里自己实现的基于AP模式的Distro协议
        // 持久化数据persistentConsistencyService
        return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
    }

点击put方法,对应一下两个实现类

 

 3、来到

com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#put

临时实例处理

    @Override
    public void put(String key, Record value) throws NacosException {
        onPut(key, value);//将注册实例更新到内存注册表,临时实时数据
        //同步 信息到nacos服务端的其他节点
        distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
            globalConfig.getTaskDispatchPeriod() / 2);
    }

点击 onPut(key, value);

    public void onPut(String key, Record value) {

        if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
            Datum<Instances> datum = new Datum<>();
            datum.value = (Instances) value;
            datum.key = key;
            datum.timestamp.incrementAndGet();
            dataStore.put(key, datum);
        }

        if (!listeners.containsKey(key)) {
            return;
        }

        notifier.addTask(key, DataOperation.CHANGE);
    }

 点击 distroProtocol.sync

  public void sync(DistroKey distroKey, DataOperation action, long delay) {
        for (Member each : memberManager.allMembersWithoutSelf()) {
            DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
                    each.getAddress());
            DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
            //开始同步到其他节点
            distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
            if (Loggers.DISTRO.isDebugEnabled()) {
                Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
            }
        }
    }

点击addTask

    @Override
    public void addTask(Object key, AbstractDelayTask newTask) {
        lock.lock();
        try {
            AbstractDelayTask existTask = tasks.get(key);
            if (null != existTask) {
                newTask.merge(existTask);
            }
            tasks.put(key, newTask);//存入map
        } finally {
            lock.unlock();
        }
    }

临时实例 

 4、来到处理持久化实例处理

com.alibaba.nacos.naming.consistency.persistent.raft.RaftConsistencyServiceImpl#put

    @Override
    public void put(String key, Record value) throws NacosException {
        //阿里自己实现的CP模式的raft算法协议
        checkIsStopWork();
        try {//阿里自己实现的CP模式的raft算法协议,点击
            raftCore.signalPublish(key, value);
        } catch (Exception e) {
            Loggers.RAFT.error("Raft put failed.", e);
            throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value,
                    e);
        }
    }

进入com.alibaba.nacos.naming.consistency.persistent.raft.RaftCore#signalPublish

 

    public void signalPublish(String key, Record value) throws Exception {
        if (stopWork) {
            throw new IllegalStateException("old raft protocol already stop work");
        }
        //判断此节点是否是leader
        if (!isLeader()) {
            ObjectNode params = JacksonUtils.createEmptyJsonNode();
            params.put("key", key);
            params.replace("value", JacksonUtils.transferToJsonNode(value));
            Map<String, String> parameters = new HashMap<>(1);
            parameters.put("key", key);

            final RaftPeer leader = getLeader();
            //将注册请求转发到leader节点,点击
            raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
            return;
        }

        OPERATE_LOCK.lock();
        try {
            final long start = System.currentTimeMillis();
            final Datum datum = new Datum();
            datum.key = key;
            datum.value = value;
            if (getDatum(key) == null) {
                datum.timestamp.set(1L);
            } else {
                datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
            }

            ObjectNode json = JacksonUtils.createEmptyJsonNode();
            json.replace("datum", JacksonUtils.transferToJsonNode(datum));
            json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));
            //更新注册实例数据到内存和磁盘文件上,点击
            onPublish(datum, peers.local());

            final String content = json.toString();

            final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
            for (final String server : peers.allServersIncludeMyself()) {
                if (isLeader(server)) {
                    latch.countDown();
                    continue;
                }
                //UtilsAndCommons.NACOS_NAMING_CONTEXT + "/raft/datum/commit";
                final String url = buildUrl(server, API_ON_PUB);
                //利用CountDownLatch实现一个简单的raft协议写入数据的逻辑,必须集群半数以上节点写入成功才会给客户端返回成功
                HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {
                    @Override
                    public void onReceive(RestResult<String> result) {
                        if (!result.ok()) {
                            Loggers.RAFT
                                    .warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
                                            datum.key, server, result.getCode());
                            return;
                        }
                        latch.countDown();
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable);
                    }

                    @Override
                    public void onCancel() {

                    }
                });

            }

            if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
                // only majority servers return success can we consider this update success
                Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
                throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
            }

            long end = System.currentTimeMillis();
            Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
        } finally {
            OPERATE_LOCK.unlock();
        }
    }

点击  proxyPostLarge 方法com.alibaba.nacos.naming.consistency.persistent.raft.RaftProxy#proxyPostLarge

    public void proxyPostLarge(String server, String api, String content, Map<String, String> headers)
            throws Exception {
        // do proxy
        if (!IPUtil.containsPort(server)) {
            server = server + IPUtil.IP_PORT_SPLITER + EnvUtil.getPort();
        }
        String url = "http://" + server + EnvUtil.getContextPath() + api;
        
        RestResult<String> result = HttpClient.httpPostLarge(url, headers, content);
        if (!result.ok()) {
            throw new IllegalStateException("leader failed, caused by: " + result.getMessage());
        }
    }

 点击 onPublish(datum, peers.local());

    public void onPublish(Datum datum, RaftPeer source) throws Exception {
        if (stopWork) {
            throw new IllegalStateException("old raft protocol already stop work");
        }
        RaftPeer local = peers.local();
        if (datum.value == null) {
            Loggers.RAFT.warn("received empty datum");
            throw new IllegalStateException("received empty datum");
        }

        if (!peers.isLeader(source.ip)) {
            Loggers.RAFT
                    .warn("peer {} tried to publish data but wasn't leader, leader: {}", JacksonUtils.toJson(source),
                            JacksonUtils.toJson(getLeader()));
            throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader");
        }

        if (source.term.get() < local.term.get()) {
            Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JacksonUtils.toJson(source),
                    JacksonUtils.toJson(local));
            throw new IllegalStateException(
                    "out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get());
        }

        local.resetLeaderDue();

        // if data should be persisted, usually this is true:
        if (KeyBuilder.matchPersistentKey(datum.key)) {
            raftStore.write(datum);//同步写实时数据到文件
        }

        datums.put(datum.key, datum);

        if (isLeader()) {
            local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
        } else {
            if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
                //set leader term:
                getLeader().term.set(source.term.get());
                local.term.set(getLeader().term.get());
            } else {
                local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
            }
        }
        raftStore.updateTerm(local.term.get());
        //发布事件ValueChangeEvent更新内存注册表,点击
        NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());
        Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
    }

点击 NotifyCenter.publishEvent来到 com.alibaba.nacos.common.notify.NotifyCenter#publishEvent(com.alibaba.nacos.common.notify.Event)

   public static boolean publishEvent(final Event event) {
        try {
            return publishEvent(event.getClass(), event);//触发监听
        } catch (Throwable ex) {
            LOGGER.error("There was an exception to the message publishing : {}", ex);
            return false;
        }
    }

点击

 private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
        if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
            return INSTANCE.sharePublisher.publish(event);//触发监听,点击
        }

        final String topic = ClassUtils.getCanonicalName(eventType);

        EventPublisher publisher = INSTANCE.publisherMap.get(topic);
        if (publisher != null) {
            return publisher.publish(event);
        }
        LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
        return false;
    }

来到com.alibaba.nacos.common.notify.DefaultPublisher#publish

    @Override
    public boolean publish(Event event) {
        checkIsStart();
        boolean success = this.queue.offer(event);
        if (!success) {
            LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
            receiveEvent(event);//触发监听,点击
            return true;
        }
        return true;
    }

 来到

void receiveEvent(Event event) {
        final long currentEventSequence = event.sequence();

        // Notification single event listener
        for (Subscriber subscriber : subscribers) {
            // Whether to ignore expiration events
            if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
                LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
                        event.getClass());
                continue;
            }

            // Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
            // Remove original judge part of codes.触发监听,点击
            notifySubscriber(subscriber, event);
        }
    }

进入

@Override
    public void notifySubscriber(final Subscriber subscriber, final Event event) {

        LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
        //触发监听到子类PersistentNotifier,进入

        final Runnable job = new Runnable() {
            @Override
            public void run() {
                subscriber.onEvent(event);
            }
        };

        final Executor executor = subscriber.executor();

        if (executor != null) {
            executor.execute(job);
        } else {
            try {
                job.run();
            } catch (Throwable e) {
                LOGGER.error("Event callback exception : {}", e);
            }
        }
    }

 来到com.alibaba.nacos.naming.consistency.persistent.PersistentNotifier#onEvent

    @Override//事件监听
    public void onEvent(ValueChangeEvent event) {
        notify(event.getKey(), event.getAction(), find.apply(event.getKey()));//点击进入
    }

进入

public <T extends Record> void notify(final String key, final DataOperation action, final T value) {
        if (listenerMap.containsKey(KeyBuilder.SERVICE_META_KEY_PREFIX)) {
            if (KeyBuilder.matchServiceMetaKey(key) && !KeyBuilder.matchSwitchKey(key)) {
                for (RecordListener listener : listenerMap.get(KeyBuilder.SERVICE_META_KEY_PREFIX)) {
                    try {
                        if (action == DataOperation.CHANGE) {
                            listener.onChange(key, value);//监听改变,进入service类
                        }
                        if (action == DataOperation.DELETE) {
                            listener.onDelete(key);//监听删除
                        }
                    } catch (Throwable e) {
                        Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {}", key, e);
                    }
                }
            }
        }

        if (!listenerMap.containsKey(key)) {
            return;
        }

        for (RecordListener listener : listenerMap.get(key)) {
            try {
                if (action == DataOperation.CHANGE) {
                    listener.onChange(key, value);
                    continue;
                }
                if (action == DataOperation.DELETE) {
                    listener.onDelete(key);
                }
            } catch (Throwable e) {
                Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {}", key, e);
            }
        }
    }

 进入com.alibaba.nacos.naming.core.Service#onChange

 @Override
    public void onChange(String key, Instances value) throws Exception {

        Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);

        for (Instance instance : value.getInstanceList()) {

            if (instance == null) {
                // Reject this abnormal instance list:
                throw new RuntimeException("got null instance " + key);
            }

            if (instance.getWeight() > 10000.0D) {
                instance.setWeight(10000.0D);
            }

            if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
                instance.setWeight(0.01D);
            }
        }
        //将注册实例更新到注册表内存结构里去,点击
        updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));

        recalculateChecksum();
    }

点击

public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
        Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
        for (String clusterName : clusterMap.keySet()) {
            ipMap.put(clusterName, new ArrayList<>());
        }

        for (Instance instance : instances) {
            try {
                if (instance == null) {
                    Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
                    continue;
                }

                if (StringUtils.isEmpty(instance.getClusterName())) {
                    instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
                }

                if (!clusterMap.containsKey(instance.getClusterName())) {
                    Loggers.SRV_LOG
                            .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                                    instance.getClusterName(), instance.toJson());
                    Cluster cluster = new Cluster(instance.getClusterName(), this);
                    cluster.init();
                    getClusterMap().put(instance.getClusterName(), cluster);
                }

                List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
                if (clusterIPs == null) {
                    clusterIPs = new LinkedList<>();
                    ipMap.put(instance.getClusterName(), clusterIPs);
                }

                clusterIPs.add(instance);
            } catch (Exception e) {
                Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
            }
        }

        for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
            //make every ip mine
            List<Instance> entryIPs = entry.getValue();
            //将临时注册实例更新到cluster的ephemeralInstances属性上去,服务发现查找临时实例最终从内存里找到的就是这个属性
            clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
        }

        setLastModifiedMillis(System.currentTimeMillis());
        getPushService().serviceChanged(this);//发布服务变化事件,点击
        StringBuilder stringBuilder = new StringBuilder();

        for (Instance instance : allIPs()) {
            stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
        }

        Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
                stringBuilder.toString());

    }

 来到com.alibaba.nacos.naming.push.PushService#serviceChanged

public void serviceChanged(Service service) {
        // merge some change events to reduce the push frequency:
        if (futureMap
                .containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {
            return;
        }
        //udp方式将服务变动通知给订阅的客户端
        this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));
    }

到此,删除过期实例的源码分析完成,下篇分析获取服务列表的源码,敬请期待!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/185974.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

行测笔记(主要知识点)

文章目录&#xff1a; 一&#xff1a;言语理解 1.技巧关系 2.逻辑填空 二&#xff1a;判断推理 1.图形推理 2.定义判断 3.类比推理 4.逻辑判断 三&#xff1a;资料分析 1.增长率 2.增长量 3.比重 4.平均数 5.倍数与比值 三&#xff1a;数量关系 1.解题…

Nozomi 交付业界第一个物联网端点安全传感器

运营技术 (OT) 和物联网 (IoT) 安全提供商 Nozomi Networks 推出了业界首款 OT 和物联网端点安全传感器 Nozomi Arc&#xff0c;旨在以指数方式加快实现完全运营弹性的时间。 Nozomi Arc 旨在自动部署在组织需要可见性的任何地方的大量站点和设备上&#xff0c;添加了关于关键…

六个步骤教你用Xmind制作思维导图

XMind是国产优秀的思维导图软件&#xff0c;那么如何使用xmind制作思维导图呢&#xff1f;对于新手来说&#xff0c;首先就要正确安装Xmind&#xff1b;其次&#xff0c;就是要搞清楚使用XMind画思维导图的步骤和方法&#xff0c;当然在学习使用xmind画思维导图的过程中也可以学…

Redis 面试题总结

Redis是什么&#xff1f; Redis 是一个 key-value 存储系统&#xff0c;它支持存储的 value 类型相对更多&#xff0c;包括 string、list、set、zset&#xff08;sorted set --有序集合&#xff09;和 hash。这些数据结构都支持 push/pop、add/remove 及取交集并集和差集及更丰…

面试题 : Spring循环依赖问题及其解决方案

一级缓存,存在循环依赖问题 一级缓存的作用 一级缓存就是singletonObjects(单例池) : 作用就是保证单例,里面放的是成品对象循环依赖问题 假设有两个类A, B ,然后A依赖B, B依赖A此时在spring 容器中一级缓存的工作流程是&#xff1a; (1)、首先在单例池中找,一开始是没有的…

2023年二月份图形化一级打卡试题

活动时间 从2023年 2月1日至1月21日&#xff0c;每天一道编程题。 本次打卡的规则如下&#xff1a; &#xff08;1&#xff09;小朋友每天利用10~15分钟做一道编程题&#xff0c;遇到问题就来群内讨论&#xff0c;我来给大家答疑。 &#xff08;2&#xff09;小朋友做完题目后&…

SharedWorker 让你多个页面相互通信

SharedWorker 是一个新的Web Worker API&#xff0c;它允许你在多个页面之间共享一个Worker。 SharedWorker 代表一种特定类型的Worker&#xff0c;可以在多个浏览器上下文中运行&#xff0c;比如多个页面或者多个iframe。 什么是 SharedWorker 根据前几篇的了解&#xff0c…

i.MX8MM开发板音视频开发-音频编码

我们举个例子&#xff0c;以 CD 音质来说&#xff0c;量化格式是 2 字节&#xff0c;采样率是 44100&#xff0c;声道数是 2&#xff0c;这些信息就描述 了 CD 的音质。对于声音信息&#xff0c;我们还可以用数据比特率来描述音频数据单位时间内的容量大小。那么 CD 的 数 据 采…

【Java AWT 图形界面编程】Frame 窗口中进行自定义布局 ( AWT 中常用的布局容器 )

文章目录一、Frame 窗口中进行自定义布局二、AWT 中常用的布局容器一、Frame 窗口中进行自定义布局 在 【Java AWT 图形界面编程】LayoutManager 布局管理器总结 ( FlowLayout 布局 | BorderLayout 布局 | BoxLayout 布局 ) 介绍了常用的布局 ; 使用布局的好处是 布局内的子组…

【服务器数据恢复】ZFS文件系统下RAIDZ的数据恢复案例

服务器数据恢复环境&#xff1a; ORACLE Sun ZFS Storage&#xff1b; 32块磁盘分为4组&#xff0c;每组8块硬盘&#xff0c;热备盘全部启用。 ZFS文件系统&#xff0c;Windows操作系统。 服务器故障&分析&#xff1a; 设备在正常工作时候突然崩溃&#xff0c;经过检查排除…

机器学习笔记之深度玻尔兹曼机(三)预训练思路整理

机器学习笔记之深度玻尔兹曼机——预训练思路整理引言回顾&#xff1a;受限玻尔兹曼机的叠加逻辑回顾&#xff1a;受限玻尔兹曼机叠加过程中的计算方式关于计算过程的优化引言 上一节介绍了受限玻尔兹曼机叠加的逻辑&#xff0c;以及叠加过程中出现的Double Counting\text{Dou…

理解 TypeScript 背后的结构化类型系统

前言 你能说清楚类型、类型系统、类型检查这三个的区别吗&#xff1f;在理解TypeScript的结构化类型系统之前&#xff0c;我们首先要搞清楚这三个概念和它们之间的关系 类型&#xff1a;即对变量的访问限制与赋值限制。如 TypeScript 中的原始类型、对象类型、函数类型和字面…

python代码实现批量yunfile文件下载

建议下载文件大小不要超过1M吧,超过的话,把等待下载时间加大点 —>说明: 使用python2.7+selenium+chrome v49+百度ocr识别,基本上pip install 相关的程序,源码就可以直接运行了,exe文件也编译了,去别的电脑运行感觉有各种问题,大家自己用源码跑吧,最新chrome版本的…

如何实现报表集成?(一)

报表需求在每个企业都是“刚需”&#xff0c;而报表的应用又是其中的关键之一&#xff0c;并不是说报表开发出来就万事大吉了&#xff0c;怎么用、怎么用得好&#xff0c;也是用户非常关注的。在这个话题中&#xff0c;报表的集成是个绕不过去的坎&#xff0c;如何通过集成&…

机器学习之参数学习

下述内容为课程小结 定义 参数估计的方法包括经验风险最小化、结构风险最小化、最大似然估计、最大后验估计。 参数估计用于学习模型参数&#xff0c;以达到最优的目的&#xff0c;如线性回归的模型参数 经验风险最小化 对于输入的待处理数据格式为(x&#xff0c;y){(x&…

Plant Simulation热力图工具V2.1全新发布

在做AGV路径规划或人员路径规划时&#xff0c;如果配套热力图&#xff0c;是可以非常方便的分析出相应位置的热点情况&#xff0c;决策人员可以更加方便的确定方案修改思路&#xff0c;比如下图可以非常清晰地看到AGV的停顿位置和路口的使用情况。较早之前&#xff0c;波哥开发…

Vue3+Vite+Element-Plus实现CRUD常见表单项目

效果有 查询&#xff0c;增加&#xff0c;表格&#xff0c;删除&#xff0c;编辑 其实CRUD&#xff0c;就是一个管理项目最常见的功能 C增加 (Create) R读取 (Read) U更新 (Update) D删除 (Delete) 一、创建项目 vue3用vite创建项目 1 对应路径cmd 输入 npm create vitelates…

C 语言零基础入门教程(十八)

C 输入 & 输出 当我们提到输入时&#xff0c;这意味着要向程序填充一些数据。输入可以是以文件的形式或从命令行中进行。C 语言提供了一系列内置的函数来读取给定的输入&#xff0c;并根据需要填充到程序中。 当我们提到输出时&#xff0c;这意味着要在屏幕上、打印机上或…

SSM框架整合(Spring+SpringMVC+MyBatis)

一、创建MAVEN工程 二、导入pom依赖 <dependency><groupId>com.mchange</groupId><artifactId>c3p0</artifactId><version>0.9.5.2</version></dependency><!-- https://mvnrepository.com/artifact/commons-logging/comm…

项目错误排查

项目运行不起来&#xff0c;先观察表现&#xff0c;就像中医所讲的望。复现&#xff0c;了解触发问题的时机和过程。在哪个步骤&#xff0c;哪个接口出了问题。闻问切浏览器f12&#xff0c;根据请求参数和响应码判断问题出在前端还是后端。查看错误日志&#xff0c;一般写的还是…