nacos服务端源码集群同步源码分析

news2024/11/20 21:26:01

nacos集群状态同步源码分析

ServerStatusReporter

ServerStatusReporter 是 ServerListManager的内部类

在这里插入图片描述
通过@Component注解被解析到spring容器中
再通过@PostConstruct初始化执行init方法
在这里插入图片描述

上边代码启动了一个延时2秒的线程

  private class ServerStatusReporter implements Runnable {
        
        @Override
        public void run() {
            try {
                
                if (EnvUtil.getPort() <= 0) {
                    return;
                }
                
                int weight = Runtime.getRuntime().availableProcessors() / 2;
                if (weight <= 0) {
                    weight = 1;
                }
                
                long curTime = System.currentTimeMillis();
                String status = LOCALHOST_SITE + "#" + EnvUtil.getLocalAddress() + "#" + curTime + "#" + weight
                        + "\r\n";
                //获取当前服务所有的实例
                List<Member> allServers = getServers();
                
                if (!contains(EnvUtil.getLocalAddress())) {
                    Loggers.SRV_LOG.error("local ip is not in serverlist, ip: {}, serverlist: {}",
                            EnvUtil.getLocalAddress(), allServers);
                    return;
                }
                
                if (allServers.size() > 0 && !EnvUtil.getLocalAddress()
                        .contains(IPUtil.localHostIP())) {
                    for (Member server : allServers) {
                    	//排除当前服务的ip
                        if (Objects.equals(server.getAddress(), EnvUtil.getLocalAddress())) {
                            continue;
                        }
                        
                        // This metadata information exists from 1.3.0 onwards "version"
                        if (server.getExtendVal(MemberMetaDataConstants.VERSION) != null) {
                            Loggers.SRV_LOG
                                    .debug("[SERVER-STATUS] target {} has extend val {} = {}, use new api report status",
                                            server.getAddress(), MemberMetaDataConstants.VERSION,
                                            server.getExtendVal(MemberMetaDataConstants.VERSION));
                            continue;
                        }
                        
                        Message msg = new Message();
                        msg.setData(status);
                        //向集群其他节点发送同步
                        synchronizer.send(server.getAddress(), msg);
                    }
                }
            } catch (Exception e) {
                Loggers.SRV_LOG.error("[SERVER-STATUS] Exception while sending server status", e);
            } finally {
            //又重新启动延时定时任务线程进行状态同步
                GlobalExecutor
                        .registerServerStatusReporter(this, switchDomain.getServerStatusSynchronizationPeriodMillis());
            }
            
        }
    }

上边代码其实就是获取当前服务的所有实例
然后排出当前节点的实例
同步集群其他节点状态
最后在finally中重新注册延时任务线程进行状态同步

节点同步接口

public class ServerStatusSynchronizer implements Synchronizer {
    
    @Override
    public void send(final String serverIP, Message msg) {
        if (StringUtils.isEmpty(serverIP)) {
            return;
        }
        
        final Map<String, String> params = new HashMap<String, String>(2);
        
        params.put("serverStatus", msg.getData());
        
        String url = "http://" + serverIP + ":" + EnvUtil.getPort() + EnvUtil.getContextPath()
                + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/operator/server/status";
        
        if (IPUtil.containsPort(serverIP)) {
            url = "http://" + serverIP + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT
                    + "/operator/server/status";
        }
        
        try {
        //异步发起调用
            HttpClient.asyncHttpGet(url, null, params, new Callback<String>() {
                @Override
                public void onReceive(RestResult<String> result) {
                    if (!result.ok()) {
                        Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serverStatus, remote server: {}",
                                serverIP);
                    }
                }
    
                @Override
                public void onError(Throwable throwable) {
                    Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serverStatus, remote server: {}", serverIP, throwable);
                }
    
                @Override
                public void onCancel() {
        
                }
            });
        } catch (Exception e) {
            Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serverStatus, remote server: {}", serverIP, e);
        }
    }
    
    @Override
    public Message get(String server, String key) {
        return null;
    }
}

数据新增及变更同步

在nacos服务启动中,会加载ServiceManager为spring的bean对象,执行init()方法,其中会创建定时任务线程池每隔1分钟执行ServiceReporter任务,他就是nacos各个节点间同步服务实例元数据的任务。一下是run()所有内容

ServiceManager
执行nint方法启动定时任务
在这里插入图片描述
看下run方法

 private class ServiceReporter implements Runnable {
        
        @Override
        public void run() {
            try {
                //获取所有的服务信息
                Map<String, Set<String>> allServiceNames = getAllServiceNames();
                
                if (allServiceNames.size() <= 0) {
                    //ignore
                    return;
                }
                //循环
                for (String namespaceId : allServiceNames.keySet()) {
                    //创建需要同步的数据对象,它封装了namespaceId对应的service所有的实例信息
                    ServiceChecksum checksum = new ServiceChecksum(namespaceId);
                    
                    for (String serviceName : allServiceNames.get(namespaceId)) {
                        //只有维持心跳的节点才会向checksum中添加数据,也就是存活节点
                        if (!distroMapper.responsible(serviceName)) {
                            continue;
                        }
                        
                        Service service = getService(namespaceId, serviceName);
                        
                        if (service == null || service.isEmpty()) {
                            continue;
                        }
                        ///拼接所有实例信息,解析为md5赋值给checksum属性
                        service.recalculateChecksum();
                         //添加到checksum中
                        checksum.addItem(serviceName, service.getChecksum());
                    }
                    
                    Message msg = new Message();
                    
                    msg.setData(JacksonUtils.toJson(checksum));
                    //拿到所有nacos节点地址
                    Collection<Member> sameSiteServers = memberManager.allMembers();
                    
                    if (sameSiteServers == null || sameSiteServers.size() <= 0) {
                        return;
                    }
                    //将消息发送给除自身意外的所有nacos节点
                    for (Member server : sameSiteServers) {
                        if (server.getAddress().equals(NetUtils.localServer())) {
                            continue;
                        }
                        synchronizer.send(server.getAddress(), msg);
                    }
                }
            } catch (Exception e) {
                Loggers.SRV_LOG.error("[DOMAIN-STATUS] Exception while sending service status", e);
            } finally {
                GlobalExecutor.scheduleServiceReporter(this, switchDomain.getServiceStatusSynchronizationPeriodMillis(),
                        TimeUnit.MILLISECONDS);
            }
        }
    }

大致可以总结为将namespaceId对应的所有实例元数据信息,对于serviceName下所有实例信息,只有维持该serviceName心跳的节点才会对这元数据信息进行处理,将他们都加到一个checksum对象中,然后封装为Message对象中,最后发送给其他所有nacos节点。直到所有namespaceId都遍历结束。

1、集群环境维持每个service心跳的算法,对于一个服务类型会对他的serviceName进行hash,然后对集群节点数量求余,得到一个节点,该节点就是维持该服务类型所对应的所有实例。

2、节点之间同步服务实例数据就是基于1中选出来的节点,每个节点会向其他节点同步自己维持心跳的服务的所有实例

健康检查

nacos开启健康检查是在客户端启动时向服务端发起注册的时候
服务端接口
在这里插入图片描述
registerInstance方法
com.alibaba.nacos.naming.core.ServiceManager#registerInstance
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
com.alibaba.nacos.naming.core.ServiceManager#putServiceAndInit

在这里插入图片描述
com.alibaba.nacos.naming.core.Service#init

 public void init() {
 //该方法中通过HealthCheckReactor.scheduleCheck(clientBeatCheckTask)调用了一个clientBeatCheckTask任务线程,进入到scheduleCheck方法中,发现该方法中开启了一个定时任务,这个任务是每隔5s就执行一次CientBeatCheckTask线程操作
        HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
        for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
            entry.getValue().setService(this);
            entry.getValue().init();
        }
    }

看下定时任务


    public static void scheduleCheck(ClientBeatCheckTask task) {
        futureMap.computeIfAbsent(task.taskKey(),
                k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
    }

看下run方法
ClientBeatCheckTask

 @Override
    public void run() {
        try {
            if (!getDistroMapper().responsible(service.getName())) {
                return;
            }
            
            if (!getSwitchDomain().isHealthCheckEnabled()) {
                return;
            }
            
            List<Instance> instances = service.allIPs(true);
            
            // first set health status of instances:
            for (Instance instance : instances) {
            //大于15秒设置为false
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                    if (!instance.isMarked()) {
                        if (instance.isHealthy()) {
                            instance.setHealthy(false);
                            Loggers.EVT_LOG
                                    .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                            instance.getIp(), instance.getPort(), instance.getClusterName(),
                                            service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                            instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                                            //通知客户端操作
                            getPushService().serviceChanged(service);
                            ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                        }
                    }
                }
            }
            
            if (!getGlobalConfig().isExpireInstance()) {
                return;
            }
            
            // then remove obsolete instances:
            for (Instance instance : instances) {
                
                if (instance.isMarked()) {
                    continue;
                }
                //大于30秒则删除
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                    // delete instance
                    Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                            JacksonUtils.toJson(instance));
                    deleteIp(instance);
                }
            }
            
        } catch (Exception e) {
            Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
        }
        
    }

ClientBeatCheckTask线程操作主要包括两点:
先遍历一次所有的实例对象,判断最后一次心跳发送的时间距离当前时间是否超过了设定的值(默认为15s),如果是,则将该实例的health属性改为false
再遍历一次所有的实例对象,判断最后一次心跳发送的时间距离当前时间是否超过了可删除时间的值(默认为30s),如果是,则将该实例从内存中删除。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/536756.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

信号时域分析方法

主要参考&#xff1a; 时域分析——有量纲特征值含义一网打尽 信号时域分析方法的理解&#xff08;峰值因子、脉冲因子、裕度因子、峭度因子、波形因子和偏度等&#xff09; 重要笔记如下&#xff1a; 建议跟参考笔记同步看。 有量纲特征值8个——最大值、最小值、峰峰值、均值…

Unable to resolve resource vscode-vfs://github%2B7b2276223a312c22726566223a7

github无法访问&#xff1f;vscode 无法使用github登录同步? 改 hosts 吧 Unable to resolve resource vscode-vfs://github%2B7b2276223a312c22726566223a7一、无法访问 github.com &#xff1f; 想要去 github.com 上拿来主义&#xff0c;结果访问不了&#xff0c;或者 np…

Go基础篇:接口

目录 前言✨一、什么是接口&#xff1f;二、空接口 interface{}1、eface的定义2、需要注意的问题 三、非空接口1、iface的定义2、itab的定义3、itab缓存 前言✨ 前段时间忙着春招面试&#xff0c;现在也算告一段落&#xff0c;找到一家比较心仪的公司实习&#xff0c;开始慢慢回…

Linux防火墙之iptables(上)

目录 一、iptables防火墙的相关知识 1&#xff09;防火墙的概念 2&#xff09;iptables的简介 3&#xff09;netfilter/iptables 的关系 netfilter iptables 二、iptables中的四表五链 1 &#xff09;四表五链的关系 2&#xff09;iptables中的四表 3&#xff0…

0基础要怎么开始学习c++?

先放一张学习线路图&#xff1a; 对于初学者看到这张线路图可能有点晕。实际上耐心一点&#xff0c;有足够的毅力还是可以自学攻克C这门语言的。 一、C语言 基础内容就是两块&#xff1a;基础语法和一个小游戏案例 进阶一点也是两块内容&#xff1a;C高级和数据结构。包括一…

怎么画骑鹅旅行记思维导图?办公常备工具

骑鹅旅行记不仅是一本神奇的奇幻童话&#xff0c;更是一本寓教于乐的启蒙读物&#xff0c;引导着孩子们学习正确的价值观和道德观。在开始制作骑鹅旅行记思维导图之前&#xff0c;我们需要先整理好故事情节&#xff1a;小男孩尼尔斯因捉弄动物而被一个小精灵变成了拇指大小&…

中国社科院与美国杜兰大学金融管理硕士——与时间赛跑,充分利用每一分钟

不管你愿不愿意&#xff0c;时间总是在不经意间流去。林清玄在《和时间赛跑》中写道&#xff1a;“虽然我知道人永远跑不过时间&#xff0c;但是可以比原来快跑几步。那几步虽然很小很小&#xff0c;但作用却很大很大”。是的&#xff0c;我们需要与时间赛跑&#xff0c;充分利…

就业内推 | 上市公司招网工,最高25k*14薪,六险一金

01 锐捷网络 招聘岗位&#xff1a;网络工程师 职责描述&#xff1a; 1、承接本产品线&#xff08;无线或数通&#xff09;所有咨询、故障、网络变更等业务&#xff0c;响应内外部客户的业务响应需求&#xff0c;需要值班。 2、同时作为产品线技术力的核心&#xff0c;需要负责…

购票API接口商品详情信息API

购票网是一个在线购票平台&#xff0c;为音乐会、演唱会、话剧、体育比赛等各类娱乐活动提供门票销售服务。通过大麦网&#xff0c;用户可以轻松购买心仪的演出门票&#xff0c;并享受到良好的购票体验。 为了让更多用户了解到大麦网的商品详情&#xff0c;并能够方便地获取相…

内防泄密重要,还是外防窃密重要?

内防泄密是组织为防止内部敏感信息未经授权泄露所采取的各种管理与技术措施的总称。它主要针对内部人员的信息访问与操作行为进行管控,减少故意或疏忽泄密事件的发生几率。 内防泄密的工作&#xff0c;通常包括员工管理、权限管控、监控检查、分级保护、离岗管控、技术防护、事…

高赞热门,这是B站值得一看的商单作品

3月30日、5月11日&#xff0c;第十届中国网络视听大会和首届上海网络视听内容创作者大会先后开幕。 在现场&#xff0c;陈睿共发表了《科技与青年的双向奔赴》、《B站&#xff0c;高质量内容的沃土》两次主题演讲。陈睿分享道&#xff0c;“优质的内容和创作者&#xff0c;是能…

主动防御更安全 混沌工程助力企业系统稳定

数字经济时代背景下&#xff0c;各个行业迎来数字化转型浪潮&#xff0c;随着企业对于云计算的理解和实践不断深入&#xff0c;数据作为生产要素参与到市场分配当中&#xff0c;全面渗透金融、政务、医疗、教育、能源等各个领域&#xff0c;已成为企业发展乃至国家发展的重要战…

GPU机器docker环境离线安装

秋风阁(https://focus-wind.com/) 文章目录 docker 环境离线二进制安装下载二进制包解压.tgz包迁移文件到/usr/bin/目录下启动docker手动启动dockersystemctl启动dockerdocker.servicedocker.socketcontainerd.service设置开机重启启动docker服务 离线安装nvidia-docker&#x…

GPT逆向:高效解读小红书文案生成器的内部逻辑

文章目录 前言一、什么是小红书文案生成器二、具体步骤总结 前言 关注我的很多同学都会写爬虫。但如果想把爬虫写得好&#xff0c;那一定要掌握一些逆向技术&#xff0c;对网页的JavaScript和安卓App进行逆向&#xff0c;从而突破签名或者绕过反爬虫限制。 最近半年&#xff…

密码学基本原理和发展——古典密码学

密码技术最早起源于公元前404年的希腊&#xff0c;此后密码大致经历了古典密码、近代密码和现代密码三个阶段。 1.古典密码 古典密码&#xff08;公元前五世纪&#xff5e;19世纪末&#xff09;代表性的是滚桶密码、棋盘密码和凯撒密码。 1.1滚筒密码 滚筒密码原理为信息发送…

5月17号软件资讯更新合集....

Erlang/OTP 26 正式发布 Erlang/OTP 26.0 已正式发布。 Erlang 是一种通用的并发函数式程序设计语言。Erlang 也可以指 Erlang/OTP 的通称&#xff0c;开源电信平台 (OTP) 是 Erlang 的常用执行环境及一系列标准组件。 这是一个重要版本更新&#xff0c;包含许多新特性、改进…

Python带你实现批量自动点赞小程序

前言 大家早好、午好、晚好吖 ❤ ~欢迎光临本文章 所用知识点: 动态数据抓包 requests发送请求 json数据解析 开发环境: python 3.8 运行代码 pycharm 2022.3 辅助敲代码 requests 请求模块 &#xff0c;第三方&#xff0c;需安装 win R 输入cmd 输入安装命令 pip inst…

oa是什么意思?oa系统哪个好用?

一、oa是什么意思 oa&#xff08;Office Automation办公自动化&#xff09;是一种将智能化科技应用于企业管理中的应用系统。它可以通过电脑网络、互联网等技术手段&#xff0c;将企业的各种业务流程、各种业务数据进行集成和处理&#xff0c;将各种业务流程和各种业务数据统一…

centos安装kurento

前沿 本文章使用docker安装&#xff0c;为个人安装记录 一、安装kurento 1.拉取镜像 # 搜索kurento docker search kurento# 拉取 docker pull kurento/kurento-media-server# 查看镜像是否拉取成功 docker images2.运行kurento服务 # 运行 docker run -d --name kms -p 8…

【Python Mayavi】零基础也能轻松掌握的学习路线与参考资料

Mayavi是Python的一个科学可视化库&#xff0c;主要用于三维&#xff08;3D&#xff09;科学数据的可视化。Mayavi具有优秀的可视化效果、交互性良好、易于使用、能够输出高质量的图像和动画等特点。对于需要展示科学数据的学者和研究人员来说&#xff0c;Mayavi是一个非常好的…