Nacos服务心跳和健康检查源码介绍

news2024/12/27 12:13:38

服务心跳

Nacos Client会维护一个定时任务通过持续调用服务端的接口更新心跳时间,保证自己处于存活状态,防止服务端将服务剔除,Nacos默认5秒向服务端发送一次,通过请求服务端接口/instance/beat发送心跳。
客户端服务在注册服务的时候会增加一个心跳的任务,如下图所示:


首先看下BeatInfo这个类,重点看标注的字段,该字段是给周期任务设定时间,如下图:
 


该方法内部定义的一个DEFAULT_HEART_BEAT_INTERVAL的常量,设定5秒:
 


接下来我们看下addBeatInfo方法,该方法内部主要是将BeatTask任务加入到线程池ScheduledExecutorService当中,如下图:
 


重点部分就是看BeatTask,BeatTask继承Runnable,run方法就是我们的重点,该方法调用了NamingProxy的sendBeat方法,服务端请求地址为/instance/beat的方法
 


 


接下来我们把目光放到服务端,找到InstanceController的beat方法,如果是参数beat信息的话,说明是第一次发起心跳,则会带有服务实例信息,因为发起心跳成功则服务端会返回下次不要带beat信息的参数,这样客户端第二次就不会携带beat信息了。如果发现没有该服务,又没带beat信息,说明这个服务可能被移除过了,直接返回没找到。如果没有服务,但是发现有beat信息,那就从beat中获取服务实例信息,进行注册,整体执行流程如下图:

@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {

    ObjectNode result = JacksonUtils.createEmptyJsonNode();
    //设置心跳间隔
    result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());

    String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
    RsInfo clientBeat = null;
    //判断有无心跳内容
    //如果存在心跳内容则不是轻量级心跳就转化为RsInfo
    if (StringUtils.isNotBlank(beat)) {
        clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
    }
    String clusterName = WebUtils
            .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
    String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
    int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
    if (clientBeat != null) {
        if (StringUtils.isNotBlank(clientBeat.getCluster())) {
            clusterName = clientBeat.getCluster();
        } else {
            // fix #2533
            clientBeat.setCluster(clusterName);
        }
        ip = clientBeat.getIp();
        port = clientBeat.getPort();
    }
    String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
    Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
    //获取实例的信息
    Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
    //如果实例不存在
    if (instance == null) {
        if (clientBeat == null) {
            result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
            return result;
        }

        Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
                + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
        //根据您心跳内容创建一个实例信息
        instance = new Instance();
        instance.setPort(clientBeat.getPort());
        instance.setIp(clientBeat.getIp());
        instance.setWeight(clientBeat.getWeight());
        instance.setMetadata(clientBeat.getMetadata());
        instance.setClusterName(clusterName);
        instance.setServiceName(serviceName);
        instance.setInstanceId(instance.getInstanceId());
        instance.setEphemeral(clientBeat.isEphemeral());
        //注册实例
        serviceManager.registerInstance(namespaceId, serviceName, instance);
    }
    //获取服务的信息
    Service service = serviceManager.getService(namespaceId, serviceName);

    if (service == null) {
        throw new NacosException(NacosException.SERVER_ERROR,
                "service not found: " + serviceName + "@" + namespaceId);
    }
    //不存在的话,要创建一个进行处理
    if (clientBeat == null) {
        clientBeat = new RsInfo();
        clientBeat.setIp(ip);
        clientBeat.setPort(port);
        clientBeat.setCluster(clusterName);
    }
    //开启心跳检查任务
    service.processClientBeat(clientBeat);

    result.put(CommonParams.CODE, NamingResponseCode.OK);
    //5秒间隔
    if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
        result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
    }
    //告诉客户端不需要带上心跳信息了,变成轻量级心跳了
    result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
    return result;
}

接下来我们看一下processClientBeat方法,该方法将ClientBeatProcessor放入到线程池中,接下来我们看下重点看下run方法,


该方法内部主要就是更新对应实例下心跳时间,整体上如下图:
 


至此完成了从客户端到服务端更新实例的心跳时间,下图是整体的时序图:

服务的健康检查

Nacos Server会开启一个定时任务来检查注册服务的健康情况,对于超过15秒没收到客户端的心跳实例会将它的 healthy属性置为false,此时当客户端不会将该实例的信息发现,如果某个服务的实例超过30秒没收到心跳,则剔除该实例,如果剔除的实例恢复,发送心跳则会恢复。
当有实例注册的时候,我们会看到有个service.init()的方法,该方法的实现主要是将ClientBeatCheckTask加入到线程池当中,如下图:


ClientBeatCheckTask中的run方法主要做两件事心跳时间超过15秒则设置该实例信息为不健康状况和心跳时间超过30秒则删除该实例信息,如下代码:

public void run() {
    try {
        if (!getDistroMapper().responsible(service.getName())) {
            return;
        }

        if (!getSwitchDomain().isHealthCheckEnabled()) {
            return;
        }
        //获取服务所有实例信息
        List<Instance> instances = service.allIPs(true);

        // first set health status of instances:
        for (Instance instance : instances) {
            //如果心跳时间超过15秒则设置该实例信息为不健康状况
            if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                if (!instance.isMarked()) {
                    if (instance.isHealthy()) {
                        instance.setHealthy(false);
                        Loggers.EVT_LOG
                                .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                        instance.getIp(), instance.getPort(), instance.getClusterName(),
                                        service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                        instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                        getPushService().serviceChanged(service);
                        ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                    }
                }
            }
        }

        if (!getGlobalConfig().isExpireInstance()) {
            return;
        }

        // then remove obsolete instances:
        for (Instance instance : instances) {

            if (instance.isMarked()) {
                continue;
            }
            //如果心跳时间超过30秒则删除该实例信息
            if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                // delete instance
                Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                        JacksonUtils.toJson(instance));
                deleteIp(instance);
            }
        }

    } catch (Exception e) {
        Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
    }

}

首先我们来看一下deleteIp方法,该方法内部主要通过构建删除请求,发送删除请求,如下图:


删除实例的接口如下图:
 


内部通过调用ServiceManager的removeInstance方法,如下图:
 


重点看下substractIpAddresses内部通过调用updateIpAddresses,该方法内部主要就是移除到超过30秒的实例信息,如下图:
 


到此完成删除实例的过程,整体的时序图如下:
 


接下来我们看标记不健康时候的代码,这部分代码在客户端注册的时候也出现相同的代码,只是我们略过了,这部分也是观察者模式的重要体现,从这里我们可以学习到的东西在于结合Spring的事件机制,轻松实现观察者模式,当然这个里面也有部分我感觉写的不太好,哈哈,大佬们看到勿喷。
 


首先我们看serviceChanged方法,该方法主要是发布一个服务不健康的事件,如下图:
 


接下来我们看下如何处理这个事件,这个时候涉及PushService这个类,整体的继承结构如下图:
 


我们看到该类的继承ApplicationListener接口,该接口是一个支持泛型的接口,传入了ServiceChangeEvent的类,此处就是对事件的处理,如下图:
 


接下来看一下onApplicationEvent方法,这个方法主要完成了准备数据,发送数据这几件事情:

public void onApplicationEvent(ServiceChangeEvent event) {
    Service service = event.getService();
    String serviceName = service.getName();
    String namespaceId = service.getNamespaceId();

    Future future = GlobalExecutor.scheduleUdpSender(() -> {
        try {
            Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
            //获取所有需要推送的客户端
            ConcurrentMap<String, PushClient> clients = clientMap
                    .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
            if (MapUtils.isEmpty(clients)) {
                return;
            }

            Map<String, Object> cache = new HashMap<>(16);
            long lastRefTime = System.nanoTime();
            for (PushClient client : clients.values()) {
                //超时的不删除跳过处理
                if (client.zombie()) {
                    Loggers.PUSH.debug("client is zombie: " + client.toString());
                    clients.remove(client.toString());
                    Loggers.PUSH.debug("client is zombie: " + client.toString());
                    continue;
                }

                Receiver.AckEntry ackEntry;
                Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
                String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
                byte[] compressData = null;
                Map<String, Object> data = null;

                if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
                    org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
                    compressData = (byte[]) (pair.getValue0());
                    data = (Map<String, Object>) pair.getValue1();
                    Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
                }
                //准备UDP数据
                if (compressData != null) {
                    ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
                } else {
                    ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
                    if (ackEntry != null) {
                        cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
                    }
                }

                Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
                        client.getServiceName(), client.getAddrStr(), client.getAgent(),
                        (ackEntry == null ? null : ackEntry.key));
                //发送数据
                udpPush(ackEntry);
            }
        } catch (Exception e) {
            Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);

        } finally {
            //发送完成删除
            futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
        }

    }, 1000, TimeUnit.MILLISECONDS);
    //增加待推送的任务
    futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);

接下里我们重点看下udpPush的方法,整个方法主要是通过一个Map对象来记录UDP请求,如果没收到就重试发送请求,整体如下:

private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
    if (ackEntry == null) {
        Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
        return null;
    }

    //如果大于最大的尝试次数
    //移除发送的数据和待确认的key
    //失败推送的次数+1
    if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
        Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);
        ackMap.remove(ackEntry.key);
        udpSendTimeMap.remove(ackEntry.key);
        failedPush += 1;
        return ackEntry;
    }

    try {
        if (!ackMap.containsKey(ackEntry.key)) {
            totalPush++;
        }
        //记录UDP请求的返回信息
        ackMap.put(ackEntry.key, ackEntry);
        udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());

        Loggers.PUSH.info("send udp packet: " + ackEntry.key);
        //发送UDP请求
        udpSocket.send(ackEntry.origin);

        ackEntry.increaseRetryTime();
        //如果UDP没收到返回信息 每10秒尝试一下
        GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),
                TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);

        return ackEntry;
    } catch (Exception e) {
        Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data,
                ackEntry.origin.getAddress().getHostAddress(), e);
        ackMap.remove(ackEntry.key);
        udpSendTimeMap.remove(ackEntry.key);
        failedPush += 1;

        return null;
    }
}

服务端有发送,那么客户端就有接收的,接收部分我理解上是服务发现部分。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1002110.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

论文解读 | 用于3D对象检测的PV-RCNN网络原创

原创 | 文 BFT机器人 01 背景 本文的背景涉及到3D物体检测&#xff0c;这是一个在自动驾驶和机器人等领域应用广泛的重要问题。在这些领域&#xff0c;LiDAR传感器被广泛用于捕捉3D场景信息&#xff0c;生成不规则且稀疏的点云数据。这些点云数据提供了理解和感知3D场景的关键…

QVector 和 QMap

QVector_QMap QVector简介 头文件&#xff1a;#include<QVector> 模块&#xff1a; QT core 功能&#xff1a; QVector类是动态数组的模板类&#xff0c;顺序容器&#xff0c;它将自己的每一个对象存储在连续的内存中&#xff0c;可以使用索引号来快速访问它们 常用…

【数据结构】树和二叉树概念

1.树概念及结构 树概念 树是一种非线性的数据结构&#xff0c;它是由n&#xff08;n>0&#xff09;个有限结点组成一个具有层次关系的集合。把它叫做树是因为它看起来像一棵倒挂的树&#xff0c;也就是说它是根朝上&#xff0c;而叶朝下的。 有一个特殊的结点&#xff0c;…

Android获取系统读取权限

在Androidifest.xml文件中加上授权语句 <uses-permission android:name"android.permission.WRITE_EXTERNAL_STORAGE"/><uses-permission android:name"android.permission.READ_EXTERNAL_STORAGE"/>

Android12之/proc/pid/status参数含义(一百六十五)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 人生格言&#xff1a; 人生…

【LeetCode题目详解】第九章 动态规划part13 300.最长递增子序列 674. 最长连续递增序列 718. 最长重复子数组 (day52补)

本文章代码以c为例&#xff01; 一、力扣第300题&#xff1a;最长递增子序列 题目&#xff1a; 给你一个整数数组 nums &#xff0c;找到其中最长严格递增子序列的长度。 子序列 是由数组派生而来的序列&#xff0c;删除&#xff08;或不删除&#xff09;数组中的元素而不改…

Scrum敏捷开发工具的基本概念、使用方法、优势以及实际应用案例

​随着软件开发行业的不断发展和进步&#xff0c;Scrum敏捷开发工具逐渐成为了备受关注的话题。 Scrum是一种灵活且高效的项目管理方法&#xff0c;旨在提高团队协作和交付效率&#xff0c;使团队能够更快地响应变化和需求。 本文将深入探讨Scrum敏捷开发工具的基本概念、使用…

分类预测 | MATLAB实现WOA-CNN-BiGRU鲸鱼算法优化卷积双向门控循环单元数据分类预测

分类预测 | MATLAB实现WOA-CNN-BiGRU鲸鱼算法优化卷积双向门控循环单元数据分类预测 目录 分类预测 | MATLAB实现WOA-CNN-BiGRU鲸鱼算法优化卷积双向门控循环单元数据分类预测分类效果基本描述模型描述程序设计参考资料 分类效果 基本描述 1.Matlab实现WOA-CNN-BiGRU多特征分类…

敏捷项目管理实践及敏捷工具

​敏捷项目管理是一种基于敏捷开发方法的项目管理方式&#xff0c;它强调快速响应变化、持续交付价值和高效的团队合作。 1、确定敏捷宣言的价值观和原则&#xff0c;例如“以人为本”、“可用的软件”、“以客户为中心”、“拥抱变化”等&#xff0c;并在项目中始终遵循这些价…

二维差分---基础算法

书接上回 a二维数组是b二维数组的前缀和数组,b二维数组是a二维数组的差分数组,也就是说a[i][j]b[1][1]b[1][2] ......b[i][1] b[i][2] ...... b[i][j] ,下图是b的二维数组 如图,当你想要整个矩阵中的一个子矩阵都加上一个C,如果我们将b[x1][x2]加上C,那么a数组右下角所有的…

3.3 栈的表示和操作的实现

3.3.1 栈的类型定义 主要内容&#xff1a; 这段文字中包含了很多栈数据结构的基本概念和操作。 ### 3.3 栈的表示和操作的实现 #### 3.3.1 栈的类型定义 1. **数据对象**&#xff1a; - 定义了一个数据对象集合&#xff0c;记作 D {a1, a2, ..., an}&#xff0c;其…

一维的差分

差分的方法 差分实际上是前缀和的逆运算 ,这个关系和 积分与求导 的关系类似 例如有数组 ...... 和构造数组 ...... 我们要使得a数组是b数组的前缀和 ...... 那么该如何构造b数组呢? 令 , …

使用带有示例和代码的因式分解机的推荐系统

一、说明 在我之前的文章中&#xff0c;我讨论了推荐系统的基础知识、矩阵分解和神经协同过滤 &#xff08;NCF&#xff09;&#xff0c;您可以在下面的“我的博客”部分找到它们。接下来&#xff0c;这次我将通过示例和代码来探索因式分解机器。 将因子分解机用于推荐系统的一…

pytorch无法使用cuda

import torch # 如果pytorch安装成功即可导入 print(torch.cuda.is_available()) # 查看CUDA是否可用 print(torch.cuda.device_count()) # 查看可用的CUDA数量 print(torch.version.cuda) # 查看CUDA的版本号#False #0 #None 表明安装失败&#xff01;查看安装包&#xff1a;…

Gin框架---基础综述

目录 一&#xff1a;经典入门案例二&#xff1a;请求参数2.1: API参数2.2: URL参数2.3: 表单参数 三&#xff1a; 响应参数四&#xff1a;数据解析和绑定4.1: JSON数据解析绑定4.2: FROM表单数据解析和绑定 五&#xff1a; 路由组六&#xff1a;异步处理七&#xff1a;中间件7.…

【UE】刀光粒子效果——part1

效果 步骤 1. 打开3dsmax&#xff0c;首先新建一个管状体 转成可编辑多边形后&#xff0c;删除多余的面&#xff0c;只保留一层 选择内圈将其拉高5mm 在修改器列表中添加“UVW展开” 点击打开“UV编辑器” 选中左边所有的顶点 将其拖拽到最左边 将右边的点拖拽到最右边 关闭 “…

VR古迹复原——数字化复原圆明园,开创文化遗产保护新方式

圆明园是中国历史上一处重要的文化遗产&#xff0c;曾经被誉为“万园之园”&#xff0c;但在1860年的英法联军侵华战争中被毁。近年来&#xff0c;虚拟现实技术不断发展&#xff0c;广州华锐互动利用VR全景技术复原了圆明园&#xff0c;通过VR设备&#xff0c;人们可以在家中就…

浏览器面试题

浏览器面试题 1.常见的浏览器内核有哪些&#xff1f;2.浏览器的主要组成部分有哪些&#xff1f;3.说一说从输入URL到页面呈现发生了什么&#xff1f;4.浏览器重绘域重排的区别&#xff1f;5.CSS加载会阻塞DOM吗&#xff1f;6.JS会阻塞页面吗&#xff1f;7.说一说浏览器的缓存机…

基于ASP.NET的驾校管理系统设计与实现

摘 要 伴随国民经济的飞速发展和人民生活水平的不断提高&#xff0c;家用汽车在我国逐渐普及。面对不断增长的庞大的用户群&#xff0c;随之产生的驾驶培训行业&#xff0c;规模不断扩大。近年来&#xff0c;随着Internet的迅速发展以及网页制作技术的日臻完善&#xff0c;驾校…

win10查看并设置tomcat的jvm堆内存参数

win10查看并设置tomcat的jvm堆内存参数 查看 进入命令行 通过Winr命令输入cmd进入命令行页面 进入到jdk的bin目录 D: cd D:\Y4ECSRUN\WGQ4 Java jdk1.8.0 131\bin执行jps查看进程pid D:\Y4ECSRUN\WGQ4 Java jdk1.8.0 131\bin>jps 16528 Jps 6868 Bootstrap通过jmap查看…