nacos源码分析-心跳检测(服务端)

news2024/11/22 17:26:57

前言

前面我们讲了《nacos源码分析-服务注册(客户端)》 和 《nacos源码分析-服务注册(服务端)》,主要是讲的服务注册流程,本章节我们来讲服务心跳检测机制。

心跳续约客户端

其实我们在讲 nacos服务注册客户端的时候顺带就说了心跳,服务注册流程是:

在这里插入图片描述

nacos客户端服务心跳在服务注册的流程中触发,这里我再贴一下源码, NacosNamingService#registerInstance的源码:

public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        if (instance.isEphemeral()) {
            BeatInfo beatInfo = new BeatInfo();
            beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
            beatInfo.setIp(instance.getIp());
            beatInfo.setPort(instance.getPort());
            beatInfo.setCluster(instance.getClusterName());
            beatInfo.setWeight(instance.getWeight());
            beatInfo.setMetadata(instance.getMetadata());
            beatInfo.setScheduled(false);
            beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
            
            //添加心跳
            this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
        }

        this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
    }

这里就看的比较清楚了,这里会把服务的ip,端口,服务名等信息封装到 BeatInfo 对象中,beatReactor.addBeatInfo是把当前服务实例加入心跳机制(心跳续约),然后通过serverProxy.registerService注册

代码在 BeatReactor#addBeatInfo中添加的心跳续约,在 NacosNamingService#registerInstance方法中把服务信息封装为一个 BeatInfo ,然后加入this.beatReactor.addBeatInfo 心跳机制。我们来看一下心跳是如何做的,下面是beatReactor.addBeatInfo的源码

 public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
        LogUtils.NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
        String key = this.buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
        BeatInfo existBeat = null;
        if ((existBeat = (BeatInfo)this.dom2Beat.remove(key)) != null) {
            existBeat.setStopped(true);
        }

        this.dom2Beat.put(key, beatInfo);
        //线程池,定时任务,5000毫秒发送一次心跳。beatInfo.getPeriod()是定时任务执行的频率
        this.executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
        MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size());
    }

   //心跳任务
   class BeatTask implements Runnable {
        BeatInfo beatInfo;

        public BeatTask(BeatInfo beatInfo) {
            this.beatInfo = beatInfo;
        }

       public void run() {
            if (!this.beatInfo.isStopped()) {
            
                long nextTime = this.beatInfo.getPeriod();

                try {
                //发送心跳请求,拿到结果
                    JSONObject result = BeatReactor.this.serverProxy.sendBeat(this.beatInfo, BeatReactor.this.lightBeatEnabled);
                    long interval = (long)result.getIntValue("clientBeatInterval");
                    boolean lightBeatEnabled = false;
                    if (result.containsKey("lightBeatEnabled")) {
                        lightBeatEnabled = result.getBooleanValue("lightBeatEnabled");
                    }

                    BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
                    if (interval > 0L) {
                        nextTime = interval;
                    }

                    int code = 10200;
                    if (result.containsKey("code")) {
                        code = result.getIntValue("code");
                    }

                    if (code == 20404) {
                    //实例不存在就创建
                        Instance instance = new Instance();
                        instance.setPort(this.beatInfo.getPort());
                        instance.setIp(this.beatInfo.getIp());
                        instance.setWeight(this.beatInfo.getWeight());
                        instance.setMetadata(this.beatInfo.getMetadata());
                        instance.setClusterName(this.beatInfo.getCluster());
                        instance.setServiceName(this.beatInfo.getServiceName());
                        instance.setInstanceId(instance.getInstanceId());
                        instance.setEphemeral(true);

                        try {
                            //注册服务
                            BeatReactor.this.serverProxy.registerService(this.beatInfo.getServiceName(), NamingUtils.getGroupName(this.beatInfo.getServiceName()), instance);
                        } catch (Exception var10) {
                        }
                    }
                } catch (NacosException var11) {
                    LogUtils.NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}", new Object[]{JSON.toJSONString(this.beatInfo), var11.getErrCode(), var11.getErrMsg()});
                }

                //定时任务:5s一次执行心跳任务
                BeatReactor.this.executorService.schedule(BeatReactor.this.new BeatTask(this.beatInfo), nextTime, TimeUnit.MILLISECONDS);
            }
        }
   }

和Eureka一样,心跳也是通过线程池 ScheduledExecutorService 来实现的,时间频率默认是5秒一次。

  • BeatInfo : 心跳续约的对象,其中包括服务的IP,端口,服务名,权重等
  • executorService.schedule :定时任务,beatInfo.getPeriod()是定时任务执行频率,默认是5000 毫秒发送一次心跳续约请求到NacosServer
  • BeatTask :是一个Runnable线程,run方法中会调用 BeatReactor.this.serverProxy.sendBeat 发送心跳请求。

BeatTask作为心跳续约的线程对象,他的run方法中 通过 BeatReactor.this.serverProxy.sendBeat发送心跳,如果发现服务未注册会通过 BeatReactor.this.serverProxy.registerService 注册服务。

下面是 com.alibaba.nacos.client.naming.net.NamingProxy#sendBeat 发送心跳的方法

 public JSONObject sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
        if (LogUtils.NAMING_LOGGER.isDebugEnabled()) {
            LogUtils.NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", this.namespaceId, beatInfo.toString());
        }

        Map<String, String> params = new HashMap(8);
        String body = "";
        if (!lightBeatEnabled) {
            try {
                body = "beat=" + URLEncoder.encode(JSON.toJSONString(beatInfo), "UTF-8");
            } catch (UnsupportedEncodingException var6) {
                throw new NacosException(500, "encode beatInfo error", var6);
            }
        }

        params.put("namespaceId", this.namespaceId);
        params.put("serviceName", beatInfo.getServiceName());
        params.put("clusterName", beatInfo.getCluster());
        params.put("ip", beatInfo.getIp());
        params.put("port", String.valueOf(beatInfo.getPort()));
        String result = this.reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/beat", params, body, "PUT");
        return JSON.parseObject(result);
    }

这里也是会拼接好心跳的地址 :127.0.0.1:8848/nacos/v1/ns/instance/beat ,参数包括namespaceId命名空间ID;serviceName 服务名;clusterName 集群名;ip 服务的IP;port 端口。然后发送一个PUT请求。底层依然是从多个NacosServer随机选择一个发起心跳请求。底层交给httpClient去执行

心跳续约服务端

服务端还是在InstanceController中,其中提供了一个beat方法,我们出了要考虑他是如何处理心跳请求外,还要考虑他是如何做心跳过期检查的。源码如下

 /**
     * Create a beat for instance.
     * 心跳检测
     * @param request http request
     * @return detail information of instance
     * @throws Exception any error during handle
     */
    @CanDistro
    @PutMapping("/beat")
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public ObjectNode beat(HttpServletRequest request) throws Exception {
        //客户端心跳频率 5s/次
         ObjectNode result = JacksonUtils.createEmptyJsonNode();
        result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
        //拿到请求中的beat数据,转成clientBeat对象
        String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
        RsInfo clientBeat = null;
        if (StringUtils.isNotBlank(beat)) {
            clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
        }//集群名
        String clusterName = WebUtils
                .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
        //拿到客户端IP,端口
        String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
        int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
        if (clientBeat != null) {
            if (StringUtils.isNotBlank(clientBeat.getCluster())) {
                clusterName = clientBeat.getCluster();
            } else {
                // fix #2533
                clientBeat.setCluster(clusterName);
            }
            ip = clientBeat.getIp();
            port = clientBeat.getPort();
        }
        //拿到命名空间ID和服务名
        String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        //检查服务名
        NamingUtils.checkServiceNameFormat(serviceName);
        Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
        //拿到服务表中的服务实例
        Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
        // 如果获取失败,说明心跳失败,实例尚未注册
        if (instance == null) {
            if (clientBeat == null) {//如果客户端心跳出现为空(请求参数中没beat),返回资源没找到
                result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
                return result;
            }
            
            Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
                    + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
            //创建一个实例
            instance = new Instance();
            instance.setPort(clientBeat.getPort());
            instance.setIp(clientBeat.getIp());
            instance.setWeight(clientBeat.getWeight());
            instance.setMetadata(clientBeat.getMetadata());
            instance.setClusterName(clusterName);
            instance.setServiceName(serviceName);
            instance.setInstanceId(instance.getInstanceId());
            instance.setEphemeral(clientBeat.isEphemeral());
            //注册实例
            serviceManager.registerInstance(namespaceId, serviceName, instance);
        }
        //获取服务
        Service service = serviceManager.getService(namespaceId, serviceName);
        
        if (service == null) {
            //服务为空
            throw new NacosException(NacosException.SERVER_ERROR,
                    "service not found: " + serviceName + "@" + namespaceId);
        }
        if (clientBeat == null) {
            clientBeat = new RsInfo();
            clientBeat.setIp(ip);
            clientBeat.setPort(port);
            clientBeat.setCluster(clusterName);
        }
        //处理心跳请求
        service.processClientBeat(clientBeat);
        
        result.put(CommonParams.CODE, NamingResponseCode.OK);
        if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
            result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
        }
        result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
        return result;
    }

方法大致逻辑如下

  • 拿到心跳请求参数,beat,其中包括客户端服务的IP,端口,服务名,命名空间等
  • 通过serviceManager 从服务端服务注册表中拿到当前心跳请求的服务实例
  • 如果实例为空会创建新的instance,通过serviceManager注册实例
  • 然后拿到当前服务的service对象,调用 service.processClientBeat 方法处理心跳
  • 最后返回OK
    在这里插入图片描述

下面是 service#processClientBeat方法源码

public void processClientBeat(final RsInfo rsInfo) {
        //心跳处理器,runnable对象
        ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
        clientBeatProcessor.setService(this);
        clientBeatProcessor.setRsInfo(rsInfo);
        //这里HealthCheckReactor.scheduleNow(clientBeatProcessor);
        // 开启一个没有延迟的任务,可以理解为这里就是开启了一个异步线程处理心跳续约逻辑
        HealthCheckReactor.scheduleNow(clientBeatProcessor);
    }
    
 /**	没有延迟的任务
     * Schedule client beat check task without a delay.
     *
     * @param task health check task
     * @return scheduled future
     */
    public static ScheduledFuture<?> scheduleNow(Runnable task) {
        return GlobalExecutor.scheduleNamingHealth(task, 0, TimeUnit.MILLISECONDS);
    }

看得出来,心跳是通过 ClientBeatProcessor去处理的。通过定时任务去执行。ClientBeatProcessor是一个线程对象

public class ClientBeatProcessor implements Runnable {
    
    public static final long CLIENT_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);
    
    private RsInfo rsInfo;
    
    private Service service;
    
    @JsonIgnore
    public PushService getPushService() {
        return ApplicationUtils.getBean(PushService.class);
    }
    
    public RsInfo getRsInfo() {
        return rsInfo;
    }
    
    public void setRsInfo(RsInfo rsInfo) {
        this.rsInfo = rsInfo;
    }
    
    public Service getService() {
        return service;
    }
    
    public void setService(Service service) {
        this.service = service;
    }
    
    @Override
    public void run() {
        //拿到续约的服务
        Service service = this.service;
        if (Loggers.EVT_LOG.isDebugEnabled()) {
            Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
        }
        //拿到ip,端口,集群名等
        String ip = rsInfo.getIp();
        String clusterName = rsInfo.getCluster();
        int port = rsInfo.getPort();
        //拿到服务中的cLuster对象
        Cluster cluster = service.getClusterMap().get(clusterName);
        //拿到所有实例
        List<Instance> instances = cluster.allIPs(true);
        
        for (Instance instance : instances) {
            //找到当前发送心跳的instance,通过IP和端口对比
            if (instance.getIp().equals(ip) && instance.getPort() == port) {
                if (Loggers.EVT_LOG.isDebugEnabled()) {
                    Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
                }
                //设置心跳最后发送时间【重要】
                instance.setLastBeat(System.currentTimeMillis());
                if (!instance.isMarked() && !instance.isHealthy()) {
                    //设置健康状态为true
                    instance.setHealthy(true);
                    Loggers.EVT_LOG
                            .info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
                                    cluster.getService().getName(), ip, port, cluster.getName(),
                                    UtilsAndCommons.LOCALHOST_SITE);
                    //发布一个改变事件:ServiceChangeEvent
                    //PushService发布ServiceChangeEvent事件,使用udpPush推送给所有的客户端
                    getPushService().serviceChanged(service);
                }
            }
        }
    }
}

方法中会从服务注册表中取出心跳续约对应的服务,然后设置最后心跳时间和健康状态。

  • instance.setLastBeat(System.currentTimeMillis()); :就是把最后续约时间修改为当前系统时间
  • instance.setHealthy(true);:把健康状态设置为tue

心跳超时检测

Nacos中的实例分为临时实例和永久实例,临时实例会在心跳续约超时后被注册中心剔除,则不会。对于非临时实例(ephemeral=false),Nacos会采用主动的健康检测,定时向实例发送请求,根据响应来判断实例健康状态。

上面只是心跳续约的处理流程,心跳过期检测入口在servieManager#registerInstance 注册服务方法中,会调用servieManager#putServiceAndInit(service)方法对service进行初始化,在该方法中调用Service#init方法来开启心跳检查,该方法是在服务注册成功之后就会被调用

// servieManager#putServiceAndInit 服务初始化
private void putServiceAndInit(Service service) throws NacosException {
        putService(service);
        service = getService(service.getNamespaceId(), service.getName());
        //服务初始化,心跳检查入口
        service.init();
        consistencyService
                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
        consistencyService
                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
        Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
    }

下面是service#init()方法

@JsonInclude(Include.NON_NULL)
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {

	public void init() {
        //心跳检查。对临时服务的初始化	
        HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
        //遍历注册表,初始化集群
        for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
            entry.getValue().setService(this);
           //对永久实例初始化,调用Cluster.init()
            entry.getValue().init();
        }
    }

//定时心跳超时检查 5s一次
public static void scheduleCheck(ClientBeatCheckTask task) {
        futureMap.computeIfAbsent(task.taskKey(),
                k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
    }

临时服务,心跳检查通过定时任务5s一次,通过 ClientBeatCheckTask 线程对象来完成,

//客户端心跳检查
public class ClientBeatCheckTask implements Runnable {
   
    @Override
    public void run() {
        try {
            if (!getDistroMapper().responsible(service.getName())) {
                return;
            }
            
            if (!getSwitchDomain().isHealthCheckEnabled()) {
                return;
            }
            //拿到注册表中的所有实例
            List<Instance> instances = service.allIPs(true);
            
            // first set health status of instances:
            for (Instance instance : instances) {
                //判断心跳是否超时:系统时间 - 最后心跳时间 > 超时时间
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                    if (!instance.isMarked()) {
                        //如果是健康的,设置为不健康
                        if (instance.isHealthy()) {
                            instance.setHealthy(false);
                            Loggers.EVT_LOG
                                    .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                            instance.getIp(), instance.getPort(), instance.getClusterName(),
                                            service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                            instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                            //抛出服务改变时间
                            getPushService().serviceChanged(service);
                            //抛出服务超时事件
                            ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                        }
                    }
                }
            }
            
            if (!getGlobalConfig().isExpireInstance()) {
                return;
            }
            
            //移除过时的实例
            // then remove obsolete instances:
            for (Instance instance : instances) {
                //是否超时的标记
                if (instance.isMarked()) {
                    continue;
                }
                //超时时间大于30s就要把服务剔除
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                    // delete instance
                    Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                            JacksonUtils.toJson(instance));
                    //剔除服务
                    deleteIp(instance);
                }
            }
            
        } catch (Exception e) {
            Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
        }
        
    }
    

方法做如下几个事情

  • 拿到注册表中所有的服务实例
  • 使用算法: 系统时间 - 最后心跳时间 > 超时时间 。来判断是否心跳超时,心跳超时默认是15s
  • 超时的实例会设置健康状态为false,然后抛出服务改变事件ServiceChangeEvent和抛出心跳超时事件InstanceHeartbeatTimeoutEvent.也就是说你通过nacos的控制台看到服务的健康状态是false
  • 最后还会判断如果超时时间超过 30s ,会删除当前服务

对于serviceChanged 服务改变事件的话是通过:PushService#serviceChanged来发布的,他会采用 udpPush 协议push给所有的客户端,当前服务状态。

永久实例的检查

下面是 com.alibaba.nacos.naming.core.Cluster#init 方法源码

   public synchronized void init() {
        if (inited) {
            return;
        }
        checkTask = new HealthCheckTask(this);
        //开启对 永久实例的 定时健康检测
        HealthCheckReactor.scheduleCheck(checkTask);
        inited = true;
    }
    
	public static ScheduledFuture<?> scheduleCheck(HealthCheckTask task) {
        task.setStartTime(System.currentTimeMillis());
        //开启定时任务心跳检查
        return GlobalExecutor.scheduleNamingHealth(task, task.getCheckRtNormalized(), TimeUnit.MILLISECONDS);
    }

这里通过HealthCheckTask来处理永久实例的健康检查,通过定时任务定时检查。下面是 HealthCheckTask源码

//这里在计算定时任务的时间频率
private void initCheckRT() {
        // first check time delay 计算主动检测的时间频率
        //周期为2000 + 5000毫秒内的随机数
        checkRtNormalized =
                2000 + RandomUtils.nextInt(0, RandomUtils.nextInt(0, switchDomain.getTcpHealthParams().getMax()));
        checkRtBest = Long.MAX_VALUE;
        checkRtWorst = 0L;
    }
    
    @Override
    public void run() {
        
        try {
            if (distroMapper.responsible(cluster.getService().getName()) && switchDomain
                    .isHealthCheckEnabled(cluster.getService().getName())) {
                    //执行检查逻辑,使用的是 TcpSuperSenseProcessor 处理,基于TCP模式
                healthCheckProcessor.process(this);
                if (Loggers.EVT_LOG.isDebugEnabled()) {
                    Loggers.EVT_LOG
                            .debug("[HEALTH-CHECK] schedule health check task: {}", cluster.getService().getName());
                }
            }
        } catch (Throwable e) {
            Loggers.SRV_LOG
                    .error("[HEALTH-CHECK] error while process health check for {}:{}", cluster.getService().getName(),
                            cluster.getName(), e);
        } finally {
            ...
        }
    }

代码 healthCheckProcessor.process(this); 是处理心跳检查,使用的是实现类 TcpSuperSenseProcessor ,他是一个Runnable,源码如下

@Override
   public void process(HealthCheckTask task) {
   	//拿到集群中的所有实例,非临时ephemeral=false的实例
       List<Instance> ips = task.getCluster().allIPs(false);
       
       if (CollectionUtils.isEmpty(ips)) {
           return;
       }
       
       for (Instance ip : ips) {
           
           ...
           Beat beat = new Beat(ip, task);
           //添加到队列LinkedBlockingQueue,可以看到,所有的健康检测任务都被放入一个阻塞队列
           taskQueue.add(beat);
           MetricsMonitor.getTcpHealthCheckMonitor().incrementAndGet();
       }
   }

//处理任务
	private void processTask() throws Exception {
        Collection<Callable<Void>> tasks = new LinkedList<>();
        do {
            Beat beat = taskQueue.poll(CONNECT_TIMEOUT_MS / 2, TimeUnit.MILLISECONDS);
            if (beat == null) {
                return;
            }
            //把任务封装到TaskProcessor
            tasks.add(new TaskProcessor(beat));
        } while (taskQueue.size() > 0 && tasks.size() < NIO_THREAD_COUNT * 64);
        //执行所有任务,批量执行
        for (Future<?> f : GlobalExecutor.invokeAllTcpSuperSenseTask(tasks)) {
            f.get();
        }
    }
    
    @Override
    public void run() {
    //循环,不停的从队列中拿到beat心跳任务去执行
        while (true) {
            try {
            //执行任务
                processTask();
                
                int readyCount = selector.selectNow();
                if (readyCount <= 0) {
                    continue;
                }
                
                Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    iter.remove();
                    
                    GlobalExecutor.executeTcpSuperSense(new PostProcessor(key));
                }
            } catch (Throwable e) {
                SRV_LOG.error("[HEALTH-CHECK] error while processing NIO task", e);
            }
        }
    }
    

看到这里我们大概明白,healthCheckProcessor通过TCP来向客户端发送心跳检查,底层通过队列LinkedBlockingQueue来存储心跳任务Beat 。 然后TcpSuperSenseProcessor 本身是一个Runnable,通过定时从队列中取出Beat任务,并封装陈 TaskProcessor批量执行。下面是 TaskProcessor源码

 private class TaskProcessor implements Callable<Void> {
        
        private static final int MAX_WAIT_TIME_MILLISECONDS = 500;
        
        Beat beat;
        
        public TaskProcessor(Beat beat) {
            this.beat = beat;
        }
        
        @Override
        public Void call() {
            long waited = System.currentTimeMillis() - beat.getStartTime();
            if (waited > MAX_WAIT_TIME_MILLISECONDS) {
                Loggers.SRV_LOG.warn("beat task waited too long: " + waited + "ms");
            }
            
            SocketChannel channel = null;
            try {
                Instance instance = beat.getIp();
                
                BeatKey beatKey = keyMap.get(beat.toString());
                if (beatKey != null && beatKey.key.isValid()) {
                    if (System.currentTimeMillis() - beatKey.birthTime < TCP_KEEP_ALIVE_MILLIS) {
                        instance.setBeingChecked(false);
                        return null;
                    }
                    
                    beatKey.key.cancel();
                    beatKey.key.channel().close();
                }
                
                channel = SocketChannel.open();
                channel.configureBlocking(false);
                // only by setting this can we make the socket close event asynchronous
                channel.socket().setSoLinger(false, -1);
                channel.socket().setReuseAddress(true);
                channel.socket().setKeepAlive(true);
                channel.socket().setTcpNoDelay(true);
                
                Cluster cluster = beat.getTask().getCluster();
                int port = cluster.isUseIPPort4Check() ? instance.getPort() : cluster.getDefCkport();
                channel.connect(new InetSocketAddress(instance.getIp(), port));
                
                SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
                key.attach(beat);
                keyMap.put(beat.toString(), new BeatKey(key));
                
                beat.setStartTime(System.currentTimeMillis());
                
                GlobalExecutor
                        .scheduleTcpSuperSenseTask(new TimeOutTask(key), CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
            } catch (Exception e) {
                beat.finishCheck(false, false, switchDomain.getTcpHealthParams().getMax(),
                        "tcp:error:" + e.getMessage());
                
                if (channel != null) {
                    try {
                        channel.close();
                    } catch (Exception ignore) {
                    }
                }
            }
            
            return null;
        }
    }

看得出来他是一个 Callable,通过 NIO去发送TCP请求。这里做个小总结

Nacos的健康检测分为临时实例和永久实例两种:

  • 对于临时实例:客户端5秒发送一次心跳,超过15秒则标记为不健康,超时30秒则从服务列表删除
  • 对于永久实例:服务端主动健康检测,周期为2000 + 5000毫秒内的随机数,检测超时只会标记为不健康,不会删除

好了文章到此结束,用一个图来总结一下服务注册和心跳

在这里插入图片描述
如果文章对你有所帮助,请给个好评把,你的肯定是我最大的动力

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/165691.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

iNav飞控AOCODARC-F7MINI固件编译

iNav飞控AOCODARC-F7MINI固件编译1. 编译目标&#xff08;AOCODARC-F7MINI&#xff09;2. 编译步骤Step 1 软件配置环境准备Step 2 获取开源代码Step 3 构建命令介绍Step 4 厂家目标板查询Step 5 目标固件编译Step 6 目标固件清理3. 参考资料iNav是一款非常出色的飞控航模开源软…

怎么恢复360删除的文件?360文件恢复,快速完成

日常生活和工作中&#xff0c;使用电脑总会保存着很多数据。其中有我们很多的文件&#xff0c;如果不小心删除了重要的文件&#xff0c;我们该怎么恢复呢&#xff1f; 很多人都喜欢在电脑上安装3 60安 全卫士&#xff0c;文件被误删&#xff0c;我们可以通过它来恢复数据。文件…

来看看我在CSDN上的好朋友们吧,看看有没有你

首先&#xff0c;感谢支持我的所有人&#xff0c;其次&#xff0c;感谢支持我的所有人&#xff0c;然后感谢支持我的所有人&#xff0c;最后&#xff0c;感谢支持我的所有人&#xff08;我这是废话吗&#xff1f;&#xff1f;&#xff1f;不是吧&#xff09; 今天就来看看我在…

基于transfomer架构的模型[GPT、BERT、VIT、ST、MAE等等]总结

Transformer首先我们来回顾一下Transformer模型架构图对于Transformer从宏观角度可以可以理解为6个Encoder6个Decoder组成各部分介绍输入部分主要就是词嵌入位置编码对于词嵌入比较简单&#xff0c;就是对一个句子里的每个词做一个嵌入操作映射到相应的维度。一般来说就是先把句…

一次非典型的Netty内存泄露案例复盘

背景 作为后端开发相信大家或多或少都接触过Nettty&#xff0c;说起Netty真实又爱又恨&#xff0c;因为基于它可以很简单的开发高性能的Java网络通信服务&#xff0c;但同时要是不小心就会出现各种奇奇怪怪的问题&#xff0c;特别是由于特殊的内存管理机制很容易出现内存泄漏问…

数据大佬的成长经验分享 | ​我的非典型数据分析之路

小飞象交流会哪有什么错过的人&#xff0c;会离开的都是路人。哪有什么命运不公&#xff0c;都是懒惰让你变得无能。内部交流│19期数据大佬的成长经验分享我的非典型数据分析之路data analysis●●●●分享人&#xff1a;夏宇‍在大数据、人工智能热、5G、物联网的时代&#x…

1、Mavan项目管理工具

1.1 什么是 Maven 1.1.1 什么是 Maven Maven 的正确发音是[ˈmevən]&#xff0c;而不是“马瘟”以及其他什么瘟。Maven 在美国是一个口语化的词 语&#xff0c;代表专家、内行的意思。 一个对 Maven 比较正式的定义是这么说的&#xff1a;Maven 是一个项目管理工具&#xff0…

Spring Boot学习篇(十)

Spring Boot学习篇(十) shiro安全框架使用篇(二)——登录实例(密码以密文方式存储,不含记住密码) 1.模拟注册时,生成密文到数据库中 1.1 在zlz包下创建util包,并在下面创建SHAUtil01类(初始里面无方法)和SHAUtil02类,其目录结构如下所示 1.2 两种生成密文的方式 1.2.1 自己…

一篇文章彻底搞懂折半查找法[二分查找法]算法~

算法实现的要求&#xff1a; 折半查找法又称为二分查找法&#xff0c;这种方法对待查找的列表有两个要求&#xff1a; 1&#xff1a;必须采用顺序存储结构 2&#xff1a;必须按关键字大小有序排列算法思想&#xff1a; 将表中间位置记录的关键字与查找关键字进行比较&#x…

性能测试时那些「难以启齿」的问题-CPU相关

NO.1 为什么cpu使用率可以>100%? 小白的我在进行压测的时候&#xff0c;查看服务的cpu总使用率如下&#xff0c;总使用率会超过100%&#xff0c;这个数据是怎么来的呢&#xff0c;为什么会有大于100%的情况呢&#xff1f; 作为小白的我刚开始觉得这个问题应该很基础&#x…

Go语言实现猜数字小游戏

目录 前言 一、设计思路 二、代码编写 2.1 产生随机数 2.2 用户输入数据 2.3 核心代码 三、 全部代码 四、效果图 总结 前言 最近在学习go语言&#xff0c;刚刚学完go语言的基础语法。编写了一个猜数字的小游戏来练习循环、分支语句、变量定义、输入输出等基础的go语…

4、变量与常量

目录 一、标识符和关键字 1.标识符 2.关键字 二、声明变量 三、声明常量 四、变量的有效范围 1. 成员变量 2. 局部变量 一、标识符和关键字 1.标识符 Java语言规定标识符由任意顺序的字母、下画线&#xff08;_&#xff09;、美元符号&#xff08;$&#xff09;和数字…

【数据结构】手撕八大排序算法

作者&#xff1a;一个喜欢猫咪的的程序员 专栏&#xff1a;《数据结构》 喜欢的话&#xff1a;世间因为少年的挺身而出&#xff0c;而更加瑰丽。 ——《人民日报》 目录 1.排序的概念&#xff1a; 2.八大排序的思路及其细节 2.1直接插入排序 …

适合编程初学者的开源项目:小游戏2048(安卓Compose版)

目标 为编程初学者打造入门学习项目&#xff0c;使用各种主流编程语言来实现。 2048游戏规则 一共16个单元格&#xff0c;初始时由2或者4构成。 1、手指向一个方向滑动&#xff0c;所有格子会向那个方向运动。 2、相同数字的两个格子&#xff0c;相遇时数字会相加。 3、每次…

SpringMVC面试题

概述 什么是Spring MVC&#xff1f;简单介绍下你对Spring MVC的理解&#xff1f; Spring MVC是一个基于Java的实现了MVC设计模式的请求驱动类型的轻量级Web框架&#xff0c;通过把模型-视图-控制器分离&#xff0c;将web层进行职责解耦&#xff0c;把复杂的web应用分成逻辑清…

如何在Linux上搭建C++开发环境

工欲善其事&#xff0c;必先利其器&#xff01;我们要在Linux上开发C程序&#xff0c;就要先搭建好它的开发环境。 搭建环境步骤安装Linux安装开发工具写一个demo在项目根目录创建一个构建脚本build.sh使用CodeLite IDE打开项目安装Linux Linux的发行版本很多&#xff0c;萝卜…

测试开发——测试分类

目录 一、 有关测试用例的回顾 二、 测试用例的划分 1、 按照测试对象来划分 可靠性测试 容错性测试 内存泄漏测试 弱网测试 2、按照是否查看代码划分 3、按照开发阶段划分 一、 有关测试用例的回顾 万能测试用例设计公式 如何根据需求去设计测试用例&#xff1f; …

计算机视觉OpenCv学习系列:第三部分、滚动条操作

第三部分、滚动条操作第一节、滚动条操作1.事件响应函数&#xff08;1&#xff09;UI组件时间响应过程&#xff08;2&#xff09;事件响应函数&#xff08;3&#xff09;创建窗口函数&#xff08;4&#xff09;调整图像亮度2.滚动条操作3.代码练习与测试学习参考第一节、滚动条…

Python 协程学习有点难度?这篇文字值得你去收藏

Python 协程在基础学习阶段&#xff0c;属于有难度的知识点&#xff0c;建议大家在学习的时候&#xff0c;一定要反复练习。 Python 中的协程是一种用户态的轻量级线程。它与普通的线程不同&#xff0c;普通线程是由操作系统调度的&#xff0c;而协程是由程序自己调度的。因此&…

【ESP 保姆级教程】玩转emqx篇③ ——认证安全之使用内置数据库(Mnesia)的密码认证

忘记过去&#xff0c;超越自己 ❤️ 博客主页 单片机菜鸟哥&#xff0c;一个野生非专业硬件IOT爱好者 ❤️❤️ 本篇创建记录 2023-01-15 ❤️❤️ 本篇更新记录 2022-01-15 ❤️&#x1f389; 欢迎关注 &#x1f50e;点赞 &#x1f44d;收藏 ⭐️留言&#x1f4dd;&#x1f64…