21 Nacos客户端本地缓存及故障转移

news2024/12/25 9:29:27

Nacos客户端本地缓存及故障转移

在Nacos本地缓存的时候有的时候必然会出现一些故障,这些故障就需要进行处理,涉及到的核心类为ServiceInfoHolder和FailoverReactor。
在这里插入图片描述
本地缓存有两方面,第一方面是从注册中心获得实例信息会缓存在内存当中,也就是通过Map的形式承载,这样查询操作都方便。第二方面便是通过磁盘文件的形式定时缓存起来,以备不时之需。

故障转移也分两方面,第一方面是故障转移的开关是通过文件来标记的;第二方面是当开启故障转移之后,当发生故障时,可以从故障转移备份的文件中来获得服务实例信息。

ServiceInfoHolder功能概述

ServiceInfoHolder类,顾名思义,服务信息的持有者。每次客户端从注册中心获取新的服务信息时都会调用该类,其中processServiceInfo方法来进行本地化处理,包括更新缓存服务、发布事件、更新本地文件等。

在这里插入图片描述
除了这些核心功能以外,该类在实例化的时候,还做了本地缓存目录初始化、故障转移初始化等操作,下面我们来分析。

ServiceInfo的本地内存缓存

ServiceInfo,注册服务的信息,其中包含了服务名称、分组名称、集群信息、实例列表信息,上次更新时间等,所以我们由此得出客户端从服务端注册中心获得到的信息在本地都以ServiceInfo作为承载者。
而ServiceInfoHolder类又持有了ServiceInfo,通过一个ConcurrentMap来储存.

// ServiceInfoHolder
private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;

这就是Nacos客户端对服务端获取到的注册信息的第一层缓存,并且之前的课程中我们分析processServiceInfo方法时,我们已经看到,当服务信息变更时会第一时间更新ServiceInfoMap中的信息.

public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
 	....
    //缓存服务信息
    serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
    // 判断注册的实例信息是否更改
    boolean changed = isChangedServiceInfo(oldService, serviceInfo);
    if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
        serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
    }
    ....
    return serviceInfo;
}

serviceInfoMap的使用就是这样,当变动实例向其中put最新数据即可。当使用实例时,根据key进行get操作即可。

serviceInfoMap在ServiceInfoHolder的构造方法中进行初始化,默认创建一个空的ConcurrentMap。但当配置了启动时从缓存文件读取信息时,则会从本地缓存进行加载。

public ServiceInfoHolder(String namespace, Properties properties) {
    initCacheDir(namespace, properties);
    // 启动时是否从缓存目录读取信息,默认false。
    if (isLoadCacheAtStart(properties)) {
        this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir));
    } else {
        this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);
    }
    this.failoverReactor = new FailoverReactor(this, cacheDir);
    this.pushEmptyProtection = isPushEmptyProtect(properties);
}

这里我们要注意一下,涉及到了本地缓存目录,在我们上节课的学习中我们知道,processServiceInfo方法中,当服务实例变更时,会看到通过DiskCache#write方法向该目录写入ServiceInfo信息。

public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
   	.....
    // 服务实例已变更
    if (changed) {
        NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),
                           JacksonUtils.toJson(serviceInfo.getHosts()));
        // 添加实例变更事件InstancesChangeEvent,订阅者
        NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
                                                           serviceInfo.getClusters(), serviceInfo.getHosts()));
        // 记录Service本地文件
        DiskCache.write(serviceInfo, cacheDir);
    }
    return serviceInfo;
}

本地缓存目录

本地缓存目录cacheDir是ServiceInfoHolder的一个属性,用于指定本地缓存的根目录和故障转移的根目录。
在这里插入图片描述
在ServiceInfoHolder的构造方法中,初始化并且生成缓存目录
在这里插入图片描述
这个initCacheDir就不用了细看了,就是生成缓存目录的操作,默认路径:${user.home}/nacos/naming/public,也可以自定义,通过System.setProperty(“JM.SNAPSHOT.PATH”)自定义.
这里初始化完目录之后,故障转移信息也存储在该目录下。

private void initCacheDir(String namespace, Properties properties) {
    String jmSnapshotPath = System.getProperty(JM_SNAPSHOT_PATH_PROPERTY);

    String namingCacheRegistryDir = "";
    if (properties.getProperty(PropertyKeyConst.NAMING_CACHE_REGISTRY_DIR) != null) {
        namingCacheRegistryDir = File.separator + properties.getProperty(PropertyKeyConst.NAMING_CACHE_REGISTRY_DIR);
    }

    if (!StringUtils.isBlank(jmSnapshotPath)) {
        cacheDir = jmSnapshotPath + File.separator + FILE_PATH_NACOS + namingCacheRegistryDir
            + File.separator + FILE_PATH_NAMING + File.separator + namespace;
    } else {
        cacheDir = System.getProperty(USER_HOME_PROPERTY) + File.separator + FILE_PATH_NACOS + namingCacheRegistryDir
            + File.separator + FILE_PATH_NAMING + File.separator + namespace;
    }
}

故障转移

在ServiceInfoHolder的构造方法中,还会初始化一个FailoverReactor类,同样是ServiceInfoHolder的成员变量。FailoverReactor的作用便是用来处理故障转移的。
在这里插入图片描述

public ServiceInfoHolder(String namespace, Properties properties) {
    ....
    // this为ServiceHolder当前对象,这里可以立即为两者相互持有对方的引用
    this.failoverReactor = new FailoverReactor(this, cacheDir);
    .....
}

我们来看一下FailoverReactor的构造方法,FailoverReactor的构造方法基本上把它的功能都展示出来了:

1. 持有ServiceInfoHolder的引用
2. 拼接故障目录:${user.home}/nacos/naming/public/failover,其中public也有可能是其他的自定义命名空间
3. 初始化executorService(执行者服务)
4. init方法:通过executorService开启多个定时任务执行
public FailoverReactor(ServiceInfoHolder serviceInfoHolder, String cacheDir) {
    // 持有ServiceInfoHolder的引用
    this.serviceInfoHolder = serviceInfoHolder;
    // 拼接故障目录:${user.home}/nacos/naming/public/failover
    this.failoverDir = cacheDir + FAILOVER_DIR;
    // 初始化executorService
    this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            // 守护线程模式运行
            thread.setDaemon(true);
            thread.setName("com.alibaba.nacos.naming.failover");
            return thread;
        }
    });
    // 其他初始化操作,通过executorService开启多个定时任务执行
    this.init();
}

init方法执行

在这个方法中开启了三个定时任务,这三个任务其实都是FailoverReactor的内部类:

1. 初始化立即执行,执行间隔5秒,执行任务SwitchRefresher
2. 初始化延迟30分钟执行,执行间隔24小时,执行任务DiskFileWriter
3. 初始化立即执行,执行间隔10秒,执行核心操作为DiskFileWriter
public void init() {
	// 初始化立即执行,执行间隔5秒,执行任务SwitchRefresher
    executorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L, 5000L, TimeUnit.MILLISECONDS);
	// 初始化延迟30分钟执行,执行间隔24小时,执行任务DiskFileWriter
    executorService.scheduleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, TimeUnit.MINUTES);

    // backup file on startup if failover directory is empty.
    // 如果故障目录为空,启动时立即执行,立即备份文件
    // 初始化立即执行,执行间隔10秒,执行核心操作为DiskFileWriter
    executorService.schedule(new Runnable() {
        @Override
        public void run() {
            try {
                File cacheDir = new File(failoverDir);

                if (!cacheDir.exists() && !cacheDir.mkdirs()) {
                    throw new IllegalStateException("failed to create cache dir: " + failoverDir);
                }

                File[] files = cacheDir.listFiles();
                if (files == null || files.length <= 0) {
                    new DiskFileWriter().run();
                }
            } catch (Throwable e) {
                NAMING_LOGGER.error("[NA] failed to backup file on startup.", e);
            }

        }
    }, 10000L, TimeUnit.MILLISECONDS);
}

这里我们先看DiskFileWriter,这里的逻辑不难,就是获取ServiceInfo中缓存的ServiceInfo,判断是否满足写入磁盘,如果条件满足,就将其写入拼接的故障目录,因为后两个定时任务执行的都是DiskFileWriter,但是第三个定时任务是有前置判断的,只要文件不存在就会立即执行把文件写入到本地磁盘中。

class DiskFileWriter extends TimerTask {

    @Override
    public void run() {
        Map<String, ServiceInfo> map = serviceInfoHolder.getServiceInfoMap();
        for (Map.Entry<String, ServiceInfo> entry : map.entrySet()) {
            ServiceInfo serviceInfo = entry.getValue();
            if (StringUtils.equals(serviceInfo.getKey(), UtilAndComs.ALL_IPS) || StringUtils
                .equals(serviceInfo.getName(), UtilAndComs.ENV_LIST_KEY) || StringUtils
                .equals(serviceInfo.getName(), UtilAndComs.ENV_CONFIGS) || StringUtils
                .equals(serviceInfo.getName(), UtilAndComs.VIP_CLIENT_FILE) || StringUtils
                .equals(serviceInfo.getName(), UtilAndComs.ALL_HOSTS)) {
                continue;
            }
			// 将缓存写入磁盘
            DiskCache.write(serviceInfo, failoverDir);
        }
    }
}

接下来,我们再来看第一个定时任务SwitchRefresher的核心实现,具体逻辑如下:

1. 如果故障转移文件不存在,则直接返回(文件开关)
2. 比较文件修改时间,如果已经修改,则获取故障转移文件中的内容。
3. 故障转移文件中存储了0和1标识。0表示关闭,1表示开启。
4. 当为开启状态时,执行线程FailoverFileReader。
class SwitchRefresher implements Runnable {

    long lastModifiedMillis = 0L;

    @Override
    public void run() {
        try {
            File switchFile = new File(failoverDir + UtilAndComs.FAILOVER_SWITCH);
            // 文件不存在则退出
            if (!switchFile.exists()) {
                switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
                NAMING_LOGGER.debug("failover switch is not found, {}", switchFile.getName());
                return;
            }

            long modified = switchFile.lastModified();
			
            if (lastModifiedMillis < modified) {
                lastModifiedMillis = modified;
                // 获取故障转移文件内容
                String failover = ConcurrentDiskUtil.getFileContent(failoverDir + UtilAndComs.FAILOVER_SWITCH,
                                                                    Charset.defaultCharset().toString());
                if (!StringUtils.isEmpty(failover)) {
                    String[] lines = failover.split(DiskCache.getLineSeparator());

                    for (String line : lines) {
                        String line1 = line.trim();
                        // 1 表示开启故障转移模式
                        if (IS_FAILOVER_MODE.equals(line1)) {
                            switchParams.put(FAILOVER_MODE_PARAM, Boolean.TRUE.toString());
                            NAMING_LOGGER.info("failover-mode is on");
                            new FailoverFileReader().run();
                        // 0 表示关闭故障转移模式
                        } else if (NO_FAILOVER_MODE.equals(line1)) {
                            switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
                            NAMING_LOGGER.info("failover-mode is off");
                        }
                    }
                } else {
                    switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
                }
            }

        } catch (Throwable e) {
            NAMING_LOGGER.error("[NA] failed to read failover switch.", e);
        }
    }
}

FailoverFileReader

顾名思义,故障转移文件读取,基本操作就是读取failover目录存储的备份服务信息文件内容,然后转换成ServiceInfo,并且将所有的ServiceInfo储存在FailoverReactor的ServiceMap属性中。

​ 流程如下:

1. 读取failover目录下的所有文件,进行遍历处理
2. 如果文件不存在跳过
3. 如果文件是故障转移开关标志文件跳过
4. 读取文件中的备份内容,转换为ServiceInfo对象
5. 将ServiceInfo对象放入到domMap中
6. 最后判断domMap不为空,赋值给serviceMap
class FailoverFileReader implements Runnable {

    @Override
    public void run() {
        Map<String, ServiceInfo> domMap = new HashMap<String, ServiceInfo>(16);

        BufferedReader reader = null;
        try {

            File cacheDir = new File(failoverDir);
            if (!cacheDir.exists() && !cacheDir.mkdirs()) {
                throw new IllegalStateException("failed to create cache dir: " + failoverDir);
            }

            File[] files = cacheDir.listFiles();
            if (files == null) {
                return;
            }

            for (File file : files) {
                if (!file.isFile()) {
                    continue;
                }
				// 如果是故障转移标志文件,则跳过
                if (file.getName().equals(UtilAndComs.FAILOVER_SWITCH)) {
                    continue;
                }

                ServiceInfo dom = new ServiceInfo(file.getName());

                try {
                    String dataString = ConcurrentDiskUtil
                        .getFileContent(file, Charset.defaultCharset().toString());
                    reader = new BufferedReader(new StringReader(dataString));

                    String json;
                    if ((json = reader.readLine()) != null) {
                        try {
                            dom = JacksonUtils.toObj(json, ServiceInfo.class);
                        } catch (Exception e) {
                            NAMING_LOGGER.error("[NA] error while parsing cached dom : {}", json, e);
                        }
                    }

                } catch (Exception e) {
                    NAMING_LOGGER.error("[NA] failed to read cache for dom: {}", file.getName(), e);
                } finally {
                    try {
                        if (reader != null) {
                            reader.close();
                        }
                    } catch (Exception e) {
                        //ignore
                    }
                }
                if (!CollectionUtils.isEmpty(dom.getHosts())) {
                    domMap.put(dom.getKey(), dom);
                }
            }
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] failed to read cache file", e);
        }
		
        // 读入缓存
        if (domMap.size() > 0) {
            serviceMap = domMap;
        }
    }
}

但是这里还有一个问题就是serviceMap是哪里用到的,这个其实是我们之前读取实例时候用到的getServiceInfo方法。

其实这里就是一旦开启故障转移就会先调用failoverReactor.getService方法,此方法便是从serviceMap中获取ServiceInfo

public ServiceInfo getService(String key) {
    ServiceInfo serviceInfo = serviceMap.get(key);

    if (serviceInfo == null) {
        serviceInfo = new ServiceInfo();
        serviceInfo.setName(key);
    }

    return serviceInfo;
}

调用serviceMap方法getServiceInfo方法就在ServiceInfoHolder中

// ServiceInfoHolder
public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {
    NAMING_LOGGER.debug("failover-mode: {}", failoverReactor.isFailoverSwitch());
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    String key = ServiceInfo.getKey(groupedServiceName, clusters);
    if (failoverReactor.isFailoverSwitch()) {
        return failoverReactor.getService(key);
    }
    return serviceInfoMap.get(key);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/386445.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AGV机器人出圈:助力产线物流自动化

随着开年档电影《流浪地球2》的热映&#xff0c;里面的四足仿生机器人机械狗“笨笨”、可穿戴的外骨骼机器人等“黑科技”&#xff0c;都让人对机器人的魅力刮目相看&#xff0c;机器人成功“出圈”了&#xff0c;随着智能技术的发展与进步&#xff0c;我们常见的机器人种类越来…

Linux命令之sed

sed&#xff0c;Stream Editor&#xff08;字符流编辑器&#xff09;的缩写&#xff0c;简称流编辑器&#xff0c;是操作、过滤、转换文本内容的工具。 常用功能包括结合正则表达式对文件实现快速的增删改查。 工作原理 sed有2个空间来缓存数据&#xff0c;paattern space&am…

Qt交叉编译环境搭建

环境及版本&#xff1a;Deepin 20.3 Qt 5.12.9 arm编译工具 gcc-linaro-7.5.0-2019.12-x86_64_arm-linux-gnueabihf.tar.xz 1.下载Qt源码&#xff1a;qt-everywhere-src-5.12.9.tar.xz&#xff0c;并解压 2.下载arm编译工具&#xff1a; gcc-linaro-7.5.0-2019.12-x86_64_arm…

央企集团是怎么设置信息化、数字化部门的?

在数字经济大潮中&#xff0c;数字化转型已不是企业的“选修课”&#xff0c;而是关乎企业生存和长远发展的“必修课”。在企业数字化转型中&#xff0c;国有企业特别是中央企业普遍将数字化转型战略作为“十四五”时期业务规划的重要内容之一&#xff0c;数字化能力也成为衡量…

代码随想录【Day31】| 455. 分发饼干、376. 摆动序列、53. 最大子数组和

455. 分发饼干 题目链接 题目描述&#xff1a; 假设你是一位很棒的家长&#xff0c;想要给你的孩子们一些小饼干。但是&#xff0c;每个孩子最多只能给一块饼干。 对每个孩子 i&#xff0c;都有一个胃口值 g[i]&#xff0c;这是能让孩子们满足胃口的饼干的最小尺寸&#xff…

用Docker搭建yolov5开发环境

拉取镜像 sudo docker pull pytorch/pytorch:latest 创建容器 sudo docker run -it -d --gpus "device0" pytorch/pytorch bash 查看所有容器 sudo docker ps -a 查看运行中的容器 sudo docker ps 进入容器 docker start -i 容器ID 将依赖包全都导入到requiremen…

如何将图数据库应用于电影智能推荐

导读 电影&#xff0c;是一种结合视觉与听觉的现代艺术。如今&#xff0c;电影已不单是人们娱乐消遣的生活方式&#xff0c;也逐渐成为国家文化软实力的重要标志之一。据有关数据统计&#xff0c;2021年中国影视行业市场规模达2349亿元&#xff0c;同比增长23.2%&#xff0c;预…

java--IO

IO1.文件流2.常用的文件操作&#xff08;1&#xff09;根据路径构建一个File对象&#xff08;2&#xff09;根据父目录文件子路径构建&#xff08;3&#xff09;根据父目录子路径构建&#xff08;4&#xff09;获取文件相关信息&#xff08;5&#xff09;目录的操作和文件的删除…

计算机图形学07:有效边表法的多边形扫描转换

作者&#xff1a;非妃是公主 专栏&#xff1a;《计算机图形学》 博客地址&#xff1a;https://blog.csdn.net/myf_666 个性签&#xff1a;顺境不惰&#xff0c;逆境不馁&#xff0c;以心制境&#xff0c;万事可成。——曾国藩 文章目录专栏推荐专栏系列文章序一、算法原理二、…

Git 企业级分支提交流程

Git 企业级分支提交流程 首先在本地分支hfdev上进行开发&#xff0c;开发后要经过测试。 如果测试通过了&#xff0c;那么久可以合并到本地分支develop&#xff0c;合并之后hfdev和development应该完全一样。 git add 文件 git commit -m ‘注释’ git checkout develop //切换…

svn使用

一、SVN概述 1.1为什么需要SVN版本控制软件 1.2解决之道 SCM&#xff1a;软件配置管理 所谓的软件配置管理实际就是对软件源代码进行控制与管理 CVS&#xff1a;元老级产品 VSS&#xff1a;入门级产品 ClearCase&#xff1a;IBM公司提供技术支持&#xff0c;中坚级产品 1.…

【无标题】开发板设置系统时间

开发板设置系统时间环境查看系统时间查看硬件时间设置系统时间设置RTC时间时钟包括硬件时钟和系统时钟&#xff0c;系统时钟就是linux系统显示的时间&#xff0c;用命令 date可以显示当前系统时间&#xff1b;硬件时钟就是硬件自身的时间了。它们两者没有关系的&#xff0c;但是…

如何利用Power Virtual Agents机器人远程打开电脑中的应用

今天我们来介绍如何利用Power Virtual Agents来远程控制电脑。我们的设计思路是在聊天机器人里输入触发短语后打开自己电脑中的题库软件。 首先&#xff0c;进入已经创建好的聊天机器人编辑界面。 新建一个主题后&#xff0c;在“新建主题”中添加“触发短语”。 添加节点后&a…

C++代码优化(3):条款13~17

"野性袒露着灵魂纯粹"条款13:以对象管理资源(1)什么是资源&#xff1f;C中最常使用的资源就是动态内存分配&#xff0c;在系统编程层面上&#xff0c;文件描述符(fd)、互斥锁(mutex)、套接字网络socket……不管是哪一种资源&#xff0c;重要的是&#xff0c;你不使用…

CEC2014:鱼鹰优化算法(Osprey optimization algorithm,OOA)求解CEC2014(提供MATLAB代码

一、鱼鹰优化算法简介 鱼鹰优化算法&#xff08;Osprey optimization algorithm&#xff0c;OOA&#xff09;由Mohammad Dehghani 和 Pavel Trojovsk于2023年提出&#xff0c;其模拟鱼鹰的捕食行为。 鱼鹰是鹰形目、鹗科、鹗属的仅有的一种中型猛禽。雌雄相似。体长51-64厘米…

Spark 任务调度机制

1.Spark任务提交流程 Spark YARN-Cluster模式下的任务提交流程&#xff0c;如下图所示&#xff1a; 图YARN-Cluster任务提交流程 下面的时序图清晰地说明了一个Spark应用程序从提交到运行的完整流程&#xff1a; 图Spark任务提交时序图 提交一个Spark应用程序&#xff0c;首…

mysql数据库之存储过程

一、存储过程简介。 存储过程是事先经过编译并存储在数据库中的一段sql语句的集合&#xff0c;调用存储过程可以简化应用开发人员的很多工作&#xff0c;减少数据在数据库和应用服务器之间的传输&#xff0c;对于提高数据处理的效率是也有好处的。 存储过程思想上很简单&…

Mysql常见面试题总结

1、什么是存储引擎 存储引擎指定了表的类型&#xff0c;即如何存储和索引数据&#xff0c;是否支持事务&#xff0c;同时存储引擎也决定了表在计算机中的存储方式。 2、查看数据库支持哪些存储引擎使用什么命令&#xff1f; -- 查看数据库支持的存储引擎 show engines; 或者 …

百趣代谢组学分享,关于儿童Graves病相关的新环境物质的鉴定

代谢组学文章标题&#xff1a;Identification of Novel Environmental Substances Relevant to Pediatric Graves’ Disease 发表期刊&#xff1a;Frontiers in endocrinology 影响因子&#xff1a;6.055 作者单位&#xff1a;苏州大学附属儿童医院 百趣提供服务&#xf…

外贸建站多少钱才能达到预期效果?

外贸建站多少钱才能达到预期效果&#xff1f;这是每个外贸企业都会问的问题。作为一个做外贸建站多年的人&#xff0c;我有一些个人的操盘感想。 首先&#xff0c;我认为外贸建站的投资是非常必要的。 因为在现代社会&#xff0c;网站已经成为外贸企业开展业务的必要工具之一…