(二)微服务中间键工作原理——nacos服务端服务注册心跳包(/nacos/v1/ns/instance/beat)源码解读

news2024/11/24 0:43:45

前言

上节内容我们通过分析nacos客户端源码,了解了nacos客户端是如何向服务端注册服务和发送心跳包的,本节内容话接上一节内容,我们通过分析nacos服务的源码,查看服务端是如何处理客户端注册时候的心跳包的。关于nacos服务端的源码,下载地址为:GitHub - alibaba/nacos: an easy-to-use dynamic service discovery, configuration and service management platform for building cloud native applications.

正文

①找到nacos服务端处理心跳包的接口InstanceController类

接口地址:/nacos/v1/ns/instance/beat

 ②在InstanceController类中的beat方法实现了心跳包的处理逻辑

心跳包的整体处理流程说明:

@CanDistro
    @PutMapping("/beat")
    @Secured(action = ActionTypes.WRITE)
    public ObjectNode beat(HttpServletRequest request) throws Exception {

        ObjectNode result = JacksonUtils.createEmptyJsonNode();
        //1.设置默认心跳间隔时间
        result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
        //2.获取心跳包数据
        String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
        RsInfo clientBeat = null;
        if (StringUtils.isNotBlank(beat)) {
            //3.解析心跳包数据
            clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
        }
        //4.获取集群名称、IP地址、和端口
        String clusterName = WebUtils
                .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
        String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
        int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
        if (clientBeat != null) {
            if (StringUtils.isNotBlank(clientBeat.getCluster())) {
                clusterName = clientBeat.getCluster();
            } else {
                // fix #2533
                clientBeat.setCluster(clusterName);
            }
            ip = clientBeat.getIp();
            port = clientBeat.getPort();
        }
        //5.获取客户端服务命名空间和服务名称
        String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        //6.检查服务名称规范,不符合要求抛出异常
        NamingUtils.checkServiceNameFormat(serviceName);
        Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}, namespaceId: {}", clientBeat,
                serviceName, namespaceId);
        BeatInfoInstanceBuilder builder = BeatInfoInstanceBuilder.newBuilder();
        builder.setRequest(request);
        //7.处理心跳包数据
        int resultCode = getInstanceOperator()
                .handleBeat(namespaceId, serviceName, ip, port, clusterName, clientBeat, builder);
        //8.返回处理结果
        result.put(CommonParams.CODE, resultCode);
        result.put(SwitchEntry.CLIENT_BEAT_INTERVAL,
                getInstanceOperator().getHeartBeatInterval(namespaceId, serviceName, ip, port, clusterName));
        result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
        return result;
    }

③在InstanceOperatorClientImpl类中handleBeat方法是对心跳包的具体处理流程

handleBeat方法执行流程的说明:

public int handleBeat(String namespaceId, String serviceName, String ip, int port, String cluster,
            RsInfo clientBeat, BeatInfoInstanceBuilder builder) throws NacosException {
        //1.根据服务命名空间和服务名称获取服务实例对象
        Service service = getService(namespaceId, serviceName, true);
        //2.根据ip和端口号获取客户端ID
        String clientId = IpPortBasedClient.getClientId(ip + InternetAddressUtil.IP_PORT_SPLITER + port, true);
        //3.查询注册的客户端
        IpPortBasedClient client = (IpPortBasedClient) clientManager.getClient(clientId);
        //4.如果客户端不存在或者客户端服务实例还未发布,注册客户端实例,否则跳过该操作
        if (null == client || !client.getAllPublishedService().contains(service)) {
            if (null == clientBeat) {
                //4.1心跳包不存在,直接返回不存在的提示码
                return NamingResponseCode.RESOURCE_NOT_FOUND;
            }
            //4.2根据心跳包和服务名称构建Instance实例对象
            Instance instance = builder.setBeatInfo(clientBeat).setServiceName(serviceName).build();
            //4.3注册Instance实例对象,该方式中存在关于client的实例对象的注册
            registerInstance(namespaceId, serviceName, instance);
            //4.4再次获取客户端实例
            client = (IpPortBasedClient) clientManager.getClient(clientId);
        }
        //5.验证服务实例对象是否存在,不存在则抛出服务不存在的异常
        if (!ServiceManager.getInstance().containSingleton(service)) {
            throw new NacosException(NacosException.SERVER_ERROR,
                    "service not found: " + serviceName + "@" + namespaceId);
        }
        //6.心跳包不存在,则根据传入参数封装客户端心跳包数据
        if (null == clientBeat) {
            clientBeat = new RsInfo();
            clientBeat.setIp(ip);
            clientBeat.setPort(port);
            clientBeat.setCluster(cluster);
            clientBeat.setServiceName(serviceName);
        }
        //7.服务健康检查,更新服务的心跳时间,如果服务的健康状态是false,则更新为true,表明服务实例是健康状态
        ClientBeatProcessorV2 beatProcessor = new ClientBeatProcessorV2(namespaceId, clientBeat, client);
        HealthCheckReactor.scheduleNow(beatProcessor);
        //8.更新客户端时间
        client.setLastUpdatedTime();
        return NamingResponseCode.OK;
    }

 registerInstance(namespaceId, serviceName, instance)方法实现了客户端服务和客户端实例的注册

 

下面的方法实现了服务的心跳时间更新和健康状态更新

ClientBeatProcessorV2 beatProcessor = new ClientBeatProcessorV2(namespaceId, clientBeat, client);
HealthCheckReactor.scheduleNow(beatProcessor);

④ InstanceOperatorClientImpl类中的registerInstance方法实现了IpPortBasedClient客户端和Instance服务实例的注册

IpPortBasedClient客户端和Instance服务实例的注册的源码说明:
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
        //1.检查服务实例是否合法
        NamingUtils.checkInstanceIsLegal(instance);
        //2.获取IpPortBasedClient客户端id
        boolean ephemeral = instance.isEphemeral();
        String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);
        //3.创建IpPortBasedClient客户端
        createIpPortClientIfAbsent(clientId);
        //4.获取服务实例
        Service service = getService(namespaceId, serviceName, ephemeral);
        //5.创建Instance实例
        clientOperationService.registerInstance(service, instance, clientId);
    }

 ⑤分析IpPortBasedClient客户端实例创建的方法createIpPortClientIfAbsent(clientId);

- IpPortBasedClient客户端存储在一个ConcurrentMap集合中

private final ConcurrentMap<String, IpPortBasedClient> clients = new ConcurrentHashMap<>();

- 判断IpPortBasedClient客户端是否存在,不存在则创建该客户端

private void createIpPortClientIfAbsent(String clientId) {
        //1.判断IpPortBasedClient客户端是否存在
        if (!clientManager.contains(clientId)) {
            ClientAttributes clientAttributes;
            if (ClientAttributesFilter.threadLocalClientAttributes.get() != null) {
                clientAttributes = ClientAttributesFilter.threadLocalClientAttributes.get();
            } else {
                clientAttributes = new ClientAttributes();
            }
            //2.客户端不存在,创建IpPortBasedClient客户端
            clientManager.clientConnected(clientId, clientAttributes);
        }
    }

-调用clientConnected方法,使用客户端工厂类EphemeralIpPortClientFactory创建一个客户端IpPortBasedClient

public boolean clientConnected(String clientId, ClientAttributes attributes) {
	//使用客户端工厂工具创建一个客户端
	return clientConnected(clientFactory.newClient(clientId, attributes));
}

-调用clientConnected同名方法,将IpPortBasedClient客户端存储于ConcurrentMap中,并执行init初始化方法

public boolean clientConnected(final Client client) {
	//将IpPortBasedClient客户端存储到clients的ConcurrentMap中
	clients.computeIfAbsent(client.getClientId(), s -> {
		Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId());
		IpPortBasedClient ipPortBasedClient = (IpPortBasedClient) client;
		//调用IpPortBasedClient的init方法,实现初始化方法
		ipPortBasedClient.init();
		return ipPortBasedClient;
	});
	return true;
}

-在init方法中,实现了注册服务心跳包检查和健康检查

public void init() {
	if (ephemeral) {
		//心跳包检查
		beatCheckTask = new ClientBeatCheckTaskV2(this);
		HealthCheckReactor.scheduleCheck(beatCheckTask);
	} else {
		//健康检查
		healthCheckTaskV2 = new HealthCheckTaskV2(this);
		HealthCheckReactor.scheduleCheck(healthCheckTaskV2);
	}
}

- 心跳包检查,在类ClientBeatCheckTaskV2中的doHealthCheck()方法实现心跳包的健康检查

 -在doInterceptor方法中的passIntercept方法实现了客户端检查、健康检查、服务实例检查

-这里我们以服务实例检查为例,ExpiredInstanceChecker中的doCheck方法实现服务实例的过期检查,如果过期则删除实例,默认超过30秒删除实例

public void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance) {
	boolean expireInstance = ApplicationUtils.getBean(GlobalConfig.class).isExpireInstance();
	//如果过期则删除服务注册实例
	if (expireInstance && isExpireInstance(service, instance)) {
		deleteIp(client, service, instance);
	}
}

 

- 在UnhealthyInstanceChecker中的doCheck方法实现服务实例的健康检查,默认时间超过15秒则视实例为不健康

public void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance) {
	//判断实例是否健康
	if (instance.isHealthy() && isUnhealthy(service, instance)) {
		//如果实例不健康则将实例状态置为false
		changeHealthyStatus(client, service, instance);
	}
}

⑥服务实例Instance注册 registerInstance

public void registerInstance(Service service, Instance instance, String clientId) throws NacosException {
	//1.检查服务实例是否合法
	NamingUtils.checkInstanceIsLegal(instance);
	//2.获取并创建单实例的服务
	Service singleton = ServiceManager.getInstance().getSingleton(service);
	if (!singleton.isEphemeral()) {
		throw new NacosRuntimeException(NacosException.INVALID_PARAM,
				String.format("Current service %s is persistent service, can't register ephemeral instance.",
						singleton.getGroupedServiceName()));
	}
	//3.获取客户端,如果不合法则直接返回
	Client client = clientManager.getClient(clientId);
	if (!clientIsLegal(client, clientId)) {
		return;
	}
	//4.获取实例发布信息
	InstancePublishInfo instanceInfo = getPublishInfo(instance);
	//5.更新客户端信息
	client.addServiceInstance(singleton, instanceInfo);
	client.setLastUpdatedTime();
	client.recalculateRevision();
	//6.发布客户端服务注册事件
	NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
	NotifyCenter
			.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}

结语

至此,关于nacos服务端服务注册心跳包(/nacos/v1/ns/instance/beat)源码解读到这里就结束了,下期见。。。。。。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/601287.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

工程制图篇:公差原则与要求

目录 1、尺寸与几何公差2、公差原则3、公差要求4、关键名词解释5、常用通用量具 1、尺寸与几何公差 1&#xff09;尺寸公差1&#xff1a;上极限尺寸减去下极限尺寸之差&#xff0c;或上极限偏差减去下极限偏差只差。它是允许尺寸的变动量。尺寸公差是一个没有符号的绝对值&…

Cisco.Packet.Tracer思科模拟器练习题

一、网络设备配置 1、将 RA 路由器重命名为 RA&#xff0c;将 RB 路由器重命名为 RB 2、按下方表格中的要求完成设备主机名&#xff08;hostname&#xff09;及接口配置&#xff1b; 3、将 RA 的 DCE 端时间频率设置 64000&#xff0c;并封装 PPP 协议后发送用户名密码 usera…

Spring 源码分析衍生篇十三 :事务扩展机制 TransactionSynchronization

文章目录 一、前言二、TransactionSynchronization 1. TransactionSynchronization 1.1 TransactionSynchronization 的定义1.2 TransactionSynchronizationManager2. TransactionSynchronization 原理简述 2.1 TransactionSynchronization#beforeCommit2.2 TransactionSynchro…

基于状态的系统和有限状态机FSM

基于状态的系统和有限状态机FSM 介绍 基于模型进行测试。模型可能是规格或描述感兴趣的属性。 该模型通常是一个抽象概念&#xff0c;应该相对容易理解。 测试补充了白盒方法。 测试通常是黑盒&#xff1a;不考虑实际系统的结构。 如果模型具有形式语义&#xff0c;则可实现的…

leetcode--二叉搜索树中第K小的元素(java)

二叉搜索树中第K小的元素 leetcode 230 题 二叉搜索树第K 小的元素解题思路代码演示二叉树专题 leetcode 230 题 二叉搜索树第K 小的元素 原题链接&#xff1a; https://leetcode.cn/problems/kth-smallest-element-in-a-bst/ 题目描述 给定一个二叉搜索树的根节点 root &#…

智慧办公室虚拟现实 VR 可视化

“虚拟现实”是来自英文“Virtual Reality”&#xff0c;简称 VR 技术&#xff0c;其是通过利用计算机仿真系统模拟外界环境&#xff0c;主要模拟对象有环境、技能、传感设备和感知等&#xff0c;为用户提供多信息、三维动态、交互式的仿真体验。 图扑软件基于自研可视化引擎 H…

亚马逊云科技位列IDC MarketScape决策支持分析数据平台领导者

随着科学技术的不断发展&#xff0c;人们的生活方式也在不断改变。现在&#xff0c;人们可以通过互联网获得更多的信息&#xff0c;也可以通过智能手机随时随地与他人进行交流。此外&#xff0c;人工智能技术的进步也使得机器能够完成一些复杂的任务&#xff0c;从而提高了人们…

【Azure】微软 Azure 基础解析(六)计算服务中的虚拟机 VM、虚拟机规模集、Azure Functions 与 Azure 容器(ACI)

本系列博文还在更新中&#xff0c;收录在专栏&#xff1a;「Azure探秘&#xff1a;构建云计算世界」 专栏中。 本系列文章列表如下&#xff1a; 【Azure】微软 Azure 基础解析&#xff08;三&#xff09;描述云计算运营中的 CapEx 与 OpEx&#xff0c;如何区分 CapEx 与 OpEx…

写了 7 年代码,第一次见这么狗血的小 Bug!

刚刚修我们鱼聪明 AI 助手平台的一个 Bug&#xff0c;结局很狗血&#xff01;赶紧给大家分享一下&#xff0c;顺便也分享下标准的排查 Bug 思路。 事情是这样的&#xff0c;有小伙伴在鱼聪明平台&#xff08;https://www.yucongming.com&#xff09;创建了一个 AI 助手&#x…

ModStartCMS v6.5.0 菜单多级支持,框架结构优化

ModStart 是一个基于 Laravel 模块化极速开发框架。模块市场拥有丰富的功能应用&#xff0c;支持后台一键快速安装&#xff0c;让开发者能快的实现业务功能开发。 系统完全开源&#xff0c;基于 Apache 2.0 开源协议&#xff0c;免费且不限制商业使用。 功能特性 丰富的模块市…

InnoDB Cluster集群Mysql Router代理层最佳实践

InnoDB Cluster 集群 & Mysql-Router 代理层 前言 Mysql是现今最常用的关系型数据库之一&#xff0c;高可用一直是我们对软件服务的要求。常见的Mysql高可用是主从配置&#xff0c;在主节点挂掉后需要依赖监控脚本进行主从切换将从节点升级&#xff0c;后台服务代码层面也…

正在破坏您的协程(Coroutines)的无声杀手(Silent Killer)

正在破坏您的协程的无声杀手 处理 Kotlin 中的取消异常的唯一安全方法是不重新抛出它们。 今天生产服务器再次停止响应流量。 上个星期&#xff0c;你刚重新启动它们并将其视为故障。但是你总觉得有些奇怪&#xff0c;因为日志中没有任何错误的痕迹&#xff0c;甚至没有警告。…

vue cli配置代理解决跨域问题

跨域问题 是由于违背了同源策略&#xff0c;同源策略规定了协议名、主机名、端口号必须一致 我们目前所处的位置是http localhost 8080&#xff0c;我们想向一台服务器发送请求&#xff0c;它的位置是http localhost 5000&#xff0c;我们的ajax请求从浏览器发送到服务器&#…

界面组件DevExpress WPF中文指南 - 如何应用系统强调色及主题切换

在最新版本的Microsoft Office产品中&#xff0c;用户可以根据系统设置选择主题&#xff0c;当使用这个主题时&#xff0c;Office将采用Windows强调色和应用模式(亮/暗)设置&#xff0c;并将它们应用到Office本身。如果用户在操作系统中更改了强调色或应用模式&#xff0c;Offi…

【spring源码系列-02】通过refresh方法剖析IOC的整体流程

Spring源码系列整体栏目 内容链接地址【一】spring源码整体概述https://blog.csdn.net/zhenghuishengq/article/details/130940885【一】通过refresh方法剖析IOC的整体流程https://blog.csdn.net/zhenghuishengq/article/details/131003428 spring底层源码整体概述 一&#xff…

2个实际工作中的小技巧,先收再看(网工版)

大家好&#xff0c;我是老杨。 本来想发点关于快乐的文章&#xff0c;但我思来想去&#xff0c;对成年人最大的快乐&#xff0c;莫过于高效完成工作&#xff0c;早点下班回家。 关于提升工作效率这方面啊&#xff0c;我的文章其实零碎、分散的写过了很多次了。 你要知道&…

ChatGPT国内免费使用地址和开源公众号集成项目分享

ChatGPT国内免费使用地址和开源公众号集成项目分享 ChatGPT国内免费使用地址ChatGPT开源公众号集成项目ChatGPT国内免费调用API的地址总结免费体验地址 人工智能技术的快速发展&#xff0c;ChatGPT聊天机器人备受瞩目。然而&#xff0c;如何在国内免费使用ChatGPT却是许多人关注…

手绘echarts散点图

面对各种定制&#xff0c;echarts图表有时候不好处理&#xff0c;无论是数据处理还是样式处理&#xff0c;都会被echarts限制。 举例&#xff1a;echarts散点图如果数据较少&#xff0c;echarts会均匀分布&#xff0c;如图1 对于产品或者老板对页面的要求&#xff0c;认为中间…

ROS2中,从SolidWorks导出的urdf,联合moveit、gazebo进行控制及仿真

文章目录 1.前言2.从urdf到moveit3.从urdf到gazebo3.1.urdf文件的修改3.1.1.mesh路径3.1.2.零件起飞3.1.3.文件保存 3.2.xacro文件的修改3.3.launch 4.用moveit控制gazebo5.结语 1.前言 本文是对之前发的文章【在ROS2中&#xff0c;通过MoveIt2控制Gazebo中的自定义机械手】的…

C 语言详细教程

目录 第一章 C语言基础知识 第二章 数据类型、运算符和表达式 第三章 结构化程序设计 第四章 数组 第五章 函数 第六章 指针 第七章 结构体类型和自定义类型 第八章 编译预处理 第九章 文件 说明&#xff1a;本教程中的代码除一二三个之外&#xff0c;都在https://ligh…