【Nacos】Nacos服务注册与发现 心跳检测机制源码解析

news2024/11/28 8:32:39

在前两篇文章,介绍了springboot的自动配置原理,而nacos的服务注册就依赖自动配置原理。

Nacos

在这里插入图片描述

Nacos核心功能点

服务注册 :Nacos Client会通过发送REST请求的方式向Nacos Server注册自己的服务,提供自身的元数据,比如ip地址、端口等信息。Nacos Server接收到注册请求后,就会把这些元数据信息存储在一个双层的内存Map中。

服务心跳: 在服务注册后,Nacos Client会维护一个定时心跳来持续通知Nacos Server,说明服务一直处于可用状态,防止被剔除。默认5s发送一次心跳。

服务健康检查: Nacos Server会开启一个定时任务用来检查注册服务实例的健康情况,对于超过15s没有收到客户端心跳的实例会将它 的healthy属性置为false(客户端服务发现时不会发现),如果某个实例超过30秒没有收到心跳,直接剔除该实例(被剔除的实例如果恢复 发送心跳则会重新注册)

服务发现: 服务消费者(Nacos Client)在调用服务提供者的服务时,会发送一个REST请求给Nacos Server,获取上面注册的服务清 单,并且缓存在Nacos Client本地,同时会在Nacos Client本地开启一个定时任务定时拉取服务端最新的注册表信息更新到本地缓存

服务同步: Nacos Server集群之间会互相同步服务实例,用来保证服务信息的一致性。

客户端 服务注册&心跳发送

在客户端中,也就是开发的应用,包含引入有 nacos-discovery 而路径下包含有一个spring.factories 在自动配置的时候,会加载
如下配置类。

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.alibaba.cloud.nacos.discovery.NacosDiscoveryAutoConfiguration,\
  com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\
  com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\
  com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration,\

其中 NacosServiceRegistryAutoConfiguration 中引入了三个类 NacosServiceRegistryNacosRegistrationNacosAutoServiceRegistration 其中 NacosAutoServiceRegistration 继承了 AbstractAutoServiceRegistration

	public void onApplicationEvent(WebServerInitializedEvent event) {
		bind(event);
	}
	// 启动
	this.start();
	//注册服务
	register();

组装实例信息,ip 端口 服务权重 集群名字 源信息 以及是否

	private Instance getNacosInstanceFromRegistration(Registration registration) {
		Instance instance = new Instance();
		instance.setIp(registration.getHost());
		instance.setPort(registration.getPort());
		instance.setWeight(nacosDiscoveryProperties.getWeight());
		instance.setClusterName(nacosDiscoveryProperties.getClusterName());
		instance.setEnabled(nacosDiscoveryProperties.isInstanceEnabled());
		instance.setMetadata(registration.getMetadata());
		instance.setEphemeral(nacosDiscoveryProperties.isEphemeral());
		return instance;
	}
	// 注册实例
	namingService.registerInstance(serviceId, group, instance);

	// 服务名 组名 实例
   public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
       // 实例非法校验
       NamingUtils.checkInstanceIsLegal(instance);
       // 获取组服务名
       String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
       
       if (instance.isEphemeral()) {
       		// 构建心跳信息
           BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
           // 启动一个心跳发送线程
           beatReactor.addBeatInfo(groupedServiceName, beatInfo);
       }
       // 实际就是调用 instance 
       serverProxy.registerService(groupedServiceName, groupName, instance);
   }

	// 心跳发送线程
	executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);

发送心跳线程

所谓的客户端心跳,其实就是启动一个线程,然后定时给一个接口发送调用。

	class BeatTask implements Runnable {
        
        BeatInfo beatInfo;
        
        public BeatTask(BeatInfo beatInfo) {
            this.beatInfo = beatInfo;
        }
        
        @Override
        public void run() {
        	// 如果停止 直接返回
            if (beatInfo.isStopped()) {
                return;
            }
            // 获取下次时间
            long nextTime = beatInfo.getPeriod();
            // 实际就是调用服务端的一个心跳接口  /instance/beat
            JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
            // 如果结束,启动另外一个 开始下次的心跳线程发送
            executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
        }
    }

在这里插入图片描述

nacos 服务注册

对于服务端来说,就是一个API接口

	public String register(HttpServletRequest request) throws Exception {
        // 准备服务实例
        final Instance instance = parseInstance(request);

        serviceManager.registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }

注册实例其实就是三步、创建服务、获取服务、添加实例

    public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());
        Service service = getService(namespaceId, serviceName);
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }

创建服务

    public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
            throws NacosException {
        // 命名空间 服务名称
        Service service = getService(namespaceId, serviceName);
        if (service == null) {
            service = new Service();
            service.setName(serviceName);
            service.setNamespaceId(namespaceId);
            service.setGroupName(NamingUtils.getGroupName(serviceName));
            // now validate the service. if failed, exception will be thrown
            service.setLastModifiedMillis(System.currentTimeMillis());
            service.recalculateChecksum();
            if (cluster != null) {
                cluster.setService(service);
                service.getClusterMap().put(cluster.getName(), cluster);
            }
            service.validate();
			// 这里是重点
            putServiceAndInit(service);
            if (!local) {
                addOrReplaceService(service);
            }
        }
    }

    private void putServiceAndInit(Service service) throws NacosException {
     	// 添加服务
        putService(service);
        // 下面说心跳检测机制
        service.init();
    }

在这里插入图片描述
这里其实就是底层的注册表的数据结构了,这里使用双检查锁。分别是nameSpace、group、service、实例。

这里简单思考下,为什么要设计这么复杂。一个服务可能对应多个实例。没有问题。一个分组group 可能是在同一个公司有不同的组,比如订单、支付,每个组都有自己的服务。namespace则可以分为dev\test\prod 三个不同的组。

	// 注册表  如何提升
    private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();

	// 这里的机制其实就是 判断当前是否存在该nameSpaceId , 如果不存在的话,则创建一个CSLM
    public void putService(Service service) {
        if (!serviceMap.containsKey(service.getNamespaceId())) {
            // 添加锁 lock
            synchronized (putServiceLock) {
                if (!serviceMap.containsKey(service.getNamespaceId())) {
                    serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
                }
            }
        }
        // 将添加服务添加到nameSpaceId 添加服务
        serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
    }
    // 添加实例
    public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
            throws NacosException {

        // 构建一个key
        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);

        Service service = getService(namespaceId, serviceName);

        synchronized (service) {
            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

            Instances instances = new Instances();
            instances.setInstanceList(instanceList);

            // 持久化服务
            consistencyService.put(key, instances);
        }
    }
	// 这里根据判断是否ephemeral 走保存内存还是磁盘持久化 
    private ConsistencyService mapConsistencyService(String key) {
        // AP CP
        return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
    }

这里其实选择的是 DistroConsistencyServiceImpl 另一个就是 RaftConsistencyServiceImpl 使用raft实现的数据持久化,这里先不介绍。

DistroConsistencyServiceImpl

    public void put(String key, Record value) throws NacosException {
        onPut(key, value);
        distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
                globalConfig.getTaskDispatchPeriod() / 2);
    }

    // 这里调用put 
	public void onPut(String key, Record value) {
        if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
            Datum<Instances> datum = new Datum<>();
            datum.value = (Instances) value;
            datum.key = key;
            datum.timestamp.incrementAndGet();
            //将数据保存到内存中
            dataStore.put(key, datum);
        }
        if (!listeners.containsKey(key)) {
            return;
        }
        // 但是这里调用了一个任务
        notifier.addTask(key, DataOperation.CHANGE);
    }
	// 其实就是一个map
    private Map<String, Datum> dataMap = new ConcurrentHashMap<>(1024);
    
    public void put(String key, Datum value) {
        dataMap.put(key, value);
    }

    @PostConstruct
    public void init() {
        // 初始化 构造方法执行的时候 进行处理
        GlobalExecutor.submitDistroNotifyTask(notifier);
    }

从源码中可以看到,在类初始化的时候,创建一个任务异步进行执行。 其实就是将当前服务进行异步任务注册,可以提升性能。添加和获取任务。

源码精髓:很多开源框架为了提升操作性能会大量使用这种异步任务及内存队列操作,这些操作本省不需要写入立即返回成功,用这种方式可以提升操作性能很大帮助

   public class Notifier implements Runnable {
		// 保存服务
        private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
		// 阻塞队列 
        private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
		// 初始化的时候 添加一个任务到阻塞队列中
        public void addTask(String datumKey, DataOperation action) {
            if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
                return;
            }
            if (action == DataOperation.CHANGE) {
                services.put(datumKey, StringUtils.EMPTY);
            }
            tasks.offer(Pair.with(datumKey, action));
        }

        @Override
        public void run() {
            // 为什么异步设计 : 提升性能
            // 阻塞队列 ,会线程等待 wait
            // 并发、反射、网络、IO
            for (; ; ) {
              	// 异步处理
               Pair<String, DataOperation> pair = tasks.take();
               handle(pair);
            }
        }
    }

在这里插入图片描述

心跳检测机制

其实就是服务注册的时候 启动一个线程,然后检查所有实例的心跳检测,对于超过15s没有收到客户端心跳的实例会将它 的healthy属性置为false(客户端服务发现时不会发现),如果某个实例超过30秒没有收到心跳,直接剔除该实例(被剔除的实例如果恢复 发送心跳则会重新注册)

    /**
     * Init service.
     */
    public void init() {
        // 心跳检查线程
        HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
        for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
            entry.getValue().setService(this);
            entry.getValue().init();
        }
    }

	// 初始化5S后执行,每5S执行一次
    public static void scheduleCheck(ClientBeatCheckTask task) {
        futureMap.putIfAbsent(task.taskKey(), GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
    }

    // 
	public void run() {
        List<Instance> instances = service.allIPs(true);

            for (Instance instance : instances) {
                // 当前时间 减去 心跳超时时间
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                        if (instance.isHealthy()) {
                            // 健康状态
                            instance.setHealthy(false);
                            getPushService().serviceChanged(service);
                            ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                        }
                    }
                }
            }

            // then remove obsolete instances:
            for (Instance instance : instances) {
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                    // delete instance
                    Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                            JacksonUtils.toJson(instance));
                    deleteIp(instance);
                }
            }
        } 
    }

至此,我们就基本上过了一遍,服务的注册 以及心跳检测机制,本篇主要是针对nacos1.4.1 的源码学习,关于后续的服务发现,以及2.X版本的源码 讲解,后续在继续。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1937924.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【C语言】深入解析希尔排序

文章目录 什么是希尔排序&#xff1f;希尔排序的基本实现代码解释希尔排序的优化希尔排序的性能分析希尔排序的实际应用结论 在C语言编程中&#xff0c;希尔排序是一种高效的排序算法&#xff0c;是插入排序的一种更高效的改进版本。它通过比较相距一定间隔的元素来进行排序&am…

JRT报告打印设计

检验报告单打印一直是个难点问题&#xff0c;JRT开发时候重点考虑了简化检验报告打印&#xff0c;首先采用脚本化方便快速修改报告。然后打印基础解决难点问题&#xff0c;基于JRT打印就可以简化到本文代码的水平&#xff0c;维护方便&#xff0c;结构清晰&#xff0c;上线修改…

【1】Spring Cloud 工程搭建

&#x1f3a5; 个人主页&#xff1a;Dikz12&#x1f525;个人专栏&#xff1a;Spring学习之路&#x1f4d5;格言&#xff1a;吾愚多不敏&#xff0c;而愿加学欢迎大家&#x1f44d;点赞✍评论⭐收藏 目录 1. 父子工程创建 1.1 创建父工程 1.2 创建子项目 重点关注Spring C…

头发稀疏治疗笔记

1. 前言 今天去中南医院看了一下“头发稀疏”的病症&#xff1b; 2. 头皮检测 2.1 毛发光镜检查 2.2 皮肤镜影像

root的安卓12系统上,如何使apk获得root或者高级别的系统权限?

&#x1f3c6;本文收录于《CSDN问答解答》专栏&#xff0c;主要记录项目实战过程中的Bug之前因后果及提供真实有效的解决方案&#xff0c;希望能够助你一臂之力&#xff0c;帮你早日登顶实现财富自由&#x1f680;&#xff1b;同时&#xff0c;欢迎大家关注&&收藏&…

JVM--经典的垃圾收集器

1. 垃圾收集器的概念 定义 ; 如果说收集算法是内存回收的方法论&#xff0c;那垃圾收集器就是内存回收的实践者。《Java虚拟机规 范》中对垃圾收集器应该如何实现并没有做出任何规定&#xff0c;因此不同的厂商、不同版本的虚拟机所包含 的垃圾收集器都可能会有很大差别…

Linux journalctl日志太长,如何倒序查看journalctl --reverse,journalctl -xeu

文章目录 需求实验方法一方法二 需求 Linux journalctl日志太长&#xff0c;如何倒序查看 我们通常关心的是最近的日志&#xff0c;但是每次打开日志都是按时间先后顺序显示的&#xff0c;如何倒序查看&#xff0c;请看下面&#xff1a; 实验 方法一 journalctl 命令默认按…

uniapp封装请求拦截器,封装请求拦截和响应拦截的方法

首先我们先看一下uni官方给开发者提供的uni.request用来网络请求的api 1 2 3 4 5 6 7 8 9 uni.request({ url: , method: GET, data: {}, header: {}, success: res > {}, fail: () > {}, complete: () > {} }); 可以看到我们每次请求数据的时候都需…

两种调用方法可以让Contact form 7表单在任意地方显示

Contact form 7是wordpress建站过程中最常用到的插件之一&#xff0c;不过&#xff0c;在Contact form 7调用的时候&#xff0c;有些新手还是搞不太清楚它的调用方法。下面简站wordpress小编&#xff0c;就把常用的两种调用方法&#xff0c;分享给大家&#xff1a; Contact fo…

Open3D 非线性最小二乘法拟合空间球

目录 一、概述 1.1原理 1.2实现步骤 二、代码实现 2.1关键代码 2.1.1定义残差函数 2.1.2拟合球面 2.2完整代码 三、实现效果 3.1原始点云 3.2拟合后点云 3.3结果数据 前期试读&#xff0c;后续会将博客加入下列链接的专栏&#xff0c;欢迎订阅 Open3D点云算法与点…

题解|2023暑期杭电多校02

【原文链接】 &#xff08;补发&#xff09;题解|2023暑期杭电多校02 1002.Binary Number 字符串、贪心 题目大意 给定一段长度为 n n n的01串&#xff0c;首位保证为1 任选定其中任意长的一段并将其反转 必须执行以上操作 k k k次&#xff0c;求操作后得到的01串表示的二进…

JAVA.2.运算符

目录 1.算术运算符 小数有可能会不精确&#xff0c;整数除以整数得整数 例子 package demo1;public class Hello {public static void main(String[] args) {System.out.println(12);System.out.println(1-2);System.out.println(5/2);System.out.println(5.0/2);System.out.…

代码随想录移除元素二刷

代码随想录移除元素二刷 leetcode 27 这道题思路的话可以这样去理解&#xff0c;用两个指针&#xff0c;一个慢指针&#xff0c;一个快指针。先让快指针往前面去探路&#xff0c;也就是去遍历数组&#xff0c;遇到不为val的值再去把该值赋值给nums[slow]&#xff0c;slow指针1…

3.1、matlab双目相机标定实验

1、双目相机标定原理及流程 双目相机标定是将双目相机系统的内外参数计算出来&#xff0c;从而实现双目视觉中的立体测量和深度感知。标定的目的是确定各个摄像头的内部参数&#xff08;如焦距、主点、畸变等&#xff09;和外部参数&#xff08;如相机位置、朝向等&#xff09…

uni-app 影视类小程序开发从零到一 | 开源项目分享

引言 在数字娱乐时代&#xff0c;移动设备已成为我们生活中不可或缺的一部分&#xff0c;尤其是对于电影爱好者而言&#xff0c;随时随地享受精彩影片成为一种日常需求。爱影家&#xff0c;一款基于 uni-app 开发的影视类小程序&#xff0c;正是为此而生。它不仅提供了丰富的影…

北京交通大学《深度学习》专业课,实验3卷积、空洞卷积、残差神经网络实验

一、实验要求 1. 二维卷积实验&#xff08;平台课与专业课要求相同&#xff09; ⚫ 手写二维卷积的实现&#xff0c;并在至少一个数据集上进行实验&#xff0c;从训练时间、预测精 度、Loss变化等角度分析实验结果&#xff08;最好使用图表展示&#xff09; ⚫ 使用torch.nn…

基于ffmepg的视频剪辑

1.ffmpeg命令实现视频剪辑 FFmpeg是一个非常强大的视频处理工具&#xff0c;可以用来剪辑视频。以下是一个基本的FFmpeg命令行示例&#xff0c;用于剪辑视频&#xff1a; $ ffmpeg -i ./最后一滴水.mp4 -ss 0:0:20 -t 50 -c copy output.mp4-i ./最后一滴水.mp4 输入文件  …

利用PyTorch进行模型量化

利用PyTorch进行模型量化 目录 利用PyTorch进行模型量化 一、模型量化概述 1.为什么需要模型量化&#xff1f; 2.模型量化的挑战 二、使用PyTorch进行模型量化 1.PyTorch的量化优势 2.准备工作 3.选择要量化的模型 4.量化前的准备工作 三、PyTorch的量化工具包 1.介…

Linux复习02

一、什么是操作系统 操作系统是一款做软硬件管理的软件&#xff01; 一个好的操作系统&#xff0c;衡量的指标是&#xff1a;稳定、快、安全 操作系统的核心工作&#xff1a; 通过对下管理好软硬件资源的手段&#xff0c;达到对上提供良好的&#xff08;稳定&#xff0c;快…

【MindSpore学习打卡】应用实践-LLM原理和实践-文本解码原理 —— 以MindNLP为例

在自然语言处理&#xff08;NLP&#xff09;领域&#xff0c;文本生成是一项重要且具有挑战性的任务。从对话系统到自动文本补全&#xff0c;文本生成技术无处不在。本文将深入探讨自回归语言模型的文本解码原理&#xff0c;使用MindNLP工具进行示例演示&#xff0c;并详细分析…