RocketMQ5.0.0消息存储<五>_文件过期删除机制

news2024/11/13 23:43:35

目录

一、概览

二、过期文件删除机制

三、参考资料


一、概览

        RocketMQ操作CommitLog、ConsumeQueue文件是基于内存映射机制并在启动时会加载commitlog、consumequeue目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储Broker服务器上,所以需要删除己过期的文件。

        RocketMQ采用顺序写Commitlog文件、ConsumeQueue文件,所有写操作全部落在最后一个CommitLog或ConsumeQueue文件上,之前的文件在下一个文件创建后将不会再被更新。若是更新消息时(如:更新消息的延迟重试次数),采用重新写入的方式,而不是直接更新原始消息

        RocketMQ清除过期文件的方法是:如果非当前写文件在一定时间间隔内没有再次被更新,则认为是过期文件,可以被删除,不会关注该文件的消息是否全部被消费。默认每个文件的过期时间为48小时,通过broker.conf配置文件中fileReservedTime参数来改变过期时间,单位为小时

二、过期文件删除机制

        Broker启动时,会创建Schedule定时任务并启动,方法org.apache.rocketmq.store.DefaultMessageStore#addScheduleTask执行文件过期删除、消息存储自检查、存储锁住、Checkpoint刷盘等定时任务。如下代码所示,文件过期删除,每10s执行一次。方法调用链如下,看出CommitLog、ConsumeQueue过期文件删除共用一套删除机制,这里介绍Commitlog文件过期删除。

/**
 * 日程任务,如:文件过期删除、消息存储自检查、存储锁住、Checkpoint刷盘
 * 总入口:DefaultMessageStore#start(),即:Broker启动时,添加这些日程
 */
private void addScheduleTask() {

    // 文件过期删除,每10s执行一次
    this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
        @Override
        public void run2() {
            DefaultMessageStore.this.cleanFilesPeriodically();
        }
    }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);

    ......
}

// org.apache.rocketmq.store.DefaultMessageStore#cleanFilesPeriodically
// 删除CommitLog、ConsumeQueue过期文件
private void cleanFilesPeriodically() {
    // CommitLog文件
    this.cleanCommitLogService.run();
    // ConsumeQueue文件
    this.cleanConsumeQueueService.run();
    // 更新逻辑偏移量
    this.correctLogicOffsetService.run();
}

        org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#deleteExpiredFiles是Commitlog文件过期删除的核心方法,其调用链及代码如下所示。

        注意,触发删除过期文件的三个条件(满足其一即可删除):

  • 指定时间点删除文件(默认每天凌晨4点,通过brker.conf配置文件参数:deleteWhen);
  • 磁盘不足;
  • 人工删除,调用excuteDeleteFilesManualy方法手工触发过期文件删除。

/**
 * 删除过期Commitlog文件
 */
private void deleteExpiredFiles() {
    // 删除文件计数
    int deleteCount = 0;
    // 文件保留时间,离最后一次修改多长时间后删除,默认48h(通过brker.conf配置文件参数:fileReservedTime)
    long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
    // 两次删除真实物理文件的间隔时间
    int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
    // 离第一次拒绝删除的最大保留时间(文件被其他线程所引用,拒绝删除),超出改时间后,强制删除
    int destroyMappedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
    // 批量删除文件的最大数量
    int deleteFileBatchMax = DefaultMessageStore.this.getMessageStoreConfig().getDeleteFileBatchMax();

    /*
     * 满足下列3条之一,则直接删除:
     * 1. 指定时间点删除文件(默认每天凌晨4点,通过brker.conf配置文件参数:deleteWhen)
     * 2. 磁盘不足
     * 3. 人工删除
     */
    boolean isTimeUp = this.isTimeToDelete(); // 指定时间点删除文件
    boolean isUsageExceedsThreshold = this.isSpaceToDelete();  // 磁盘不足
    boolean isManualDelete = this.manualDeleteFileSeveralTimes > 0;  // 人工删除

    if (isTimeUp || isUsageExceedsThreshold || isManualDelete) {

        if (isManualDelete) {
            this.manualDeleteFileSeveralTimes--;
        }

        // 是否可直接删除文件
        boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;

        LOGGER.info("begin to delete before {} hours file. isTimeUp: {} isUsageExceedsThreshold: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {} deleteFileBatchMax: {}",
                fileReservedTime,
                isTimeUp,
                isUsageExceedsThreshold,
                manualDeleteFileSeveralTimes,
                cleanAtOnce,
                deleteFileBatchMax);

        fileReservedTime *= 60 * 60 * 1000;

        // 执行文件删除
        deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
                destroyMappedFileIntervalForcibly, cleanAtOnce, deleteFileBatchMax);
        if (deleteCount > 0) {
            // If in the controller mode, we should notify the AutoSwitchHaService to truncateEpochFile
            if (DefaultMessageStore.this.brokerConfig.isEnableControllerMode()) {
                if (DefaultMessageStore.this.haService instanceof AutoSwitchHAService) {
                    final long minPhyOffset = getMinPhyOffset();
                    ((AutoSwitchHAService) DefaultMessageStore.this.haService).truncateEpochFilePrefix(minPhyOffset - 1);
                }
            }
        } else if (isUsageExceedsThreshold) {
            LOGGER.warn("disk space will be full soon, but delete file failed.");
        }
    }
}

        org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#isSpaceToDelete是判定磁盘不足的核心方法,代码如下。

/**
 * 磁盘不足,则直接删除文件
 * Commilog文件、ConsumeQueue文件所在磁盘分区使用率:
 *      > diskSpaceWarningLevelRatio设置,默认0.9时,则设置磁盘不可写,拒绝新消息的写入
 *      > diskSpaceCleanForciblyRatio设置,默认0.85,则执行过期文件清除,但不会拒绝新消息的写入
 *      < diskSpaceCleanForciblyRatio时,恢复磁盘可写
 * 磁盘最大使用量:超出则直接删除文件
 * @return
 */
private boolean isSpaceToDelete() {
    // 是否直接删除
    cleanImmediately = false;

    /*
        磁盘使用率处理:
        > diskSpaceWarningLevelRatio设置,默认0.9时,则设置磁盘不可写,拒绝新消息的写入
        > diskSpaceCleanForciblyRatio设置,默认0.85,则执行过期文件清除,但不会拒绝新消息的写入
        < diskSpaceCleanForciblyRatio时,恢复磁盘可写
     */
    // Commilog文件所在磁盘分区使用率
    String commitLogStorePath = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
    String[] storePaths = commitLogStorePath.trim().split(MixAll.MULTI_PATH_SPLITTER);
    Set<String> fullStorePath = new HashSet<>();
    double minPhysicRatio = 100;
    String minStorePath = null;
    for (String storePathPhysic : storePaths) {
        // Commitlog文件所在分区的磁盘使用率
        double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
        if (minPhysicRatio > physicRatio) {
            minPhysicRatio = physicRatio;
            minStorePath = storePathPhysic;
        }
        // 系统参数-Drocketmq.broker.diskSpaceCleanForciblyRatio设置,默认0.85。磁盘分区使用率超过该阈值,则执行过期文件清除,但不会拒绝新消息的写入
        if (physicRatio > getDiskSpaceCleanForciblyRatio()) {
            fullStorePath.add(storePathPhysic);
        }
    }
    DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath);
    // 系统参数-Drocketmq.broker.diskSpaceWarningLevelRatio设置,默认0.9。磁盘分区使用率超过该阈值,则设置磁盘不可写,拒绝新消息的写入
    if (minPhysicRatio > getDiskSpaceWarningLevelRatio()) {
        boolean diskFull = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
        if (diskFull) {
            DefaultMessageStore.LOGGER.error("physic disk maybe full soon " + minPhysicRatio +
                    ", so mark disk full, storePathPhysic=" + minStorePath);
        }

        cleanImmediately = true;
        return true;
    } else if (minPhysicRatio > getDiskSpaceCleanForciblyRatio()) {
        cleanImmediately = true;
        return true;
    } else {
        // 磁盘使用率 < diskSpaceCleanForciblyRatio时,恢复磁盘可写
        boolean diskOK = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
        if (!diskOK) {
            DefaultMessageStore.LOGGER.info("physic disk space OK " + minPhysicRatio +
                    ", so mark disk ok, storePathPhysic=" + minStorePath);
        }
    }

    // ConsumeQueue文件所在磁盘分区使用率
    String storePathLogics = StorePathConfigHelper
            .getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir());
    double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
    if (logicsRatio > getDiskSpaceWarningLevelRatio()) {
        boolean diskOK = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
        if (diskOK) {
            DefaultMessageStore.LOGGER.error("logics disk maybe full soon " + logicsRatio + ", so mark disk full");
        }

        cleanImmediately = true;
        return true;
    } else if (logicsRatio > getDiskSpaceCleanForciblyRatio()) {
        cleanImmediately = true;
        return true;
    } else {
        boolean diskOK = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
        if (!diskOK) {
            DefaultMessageStore.LOGGER.info("logics disk space OK " + logicsRatio + ", so mark disk ok");
        }
    }

    /*
        磁盘分区当前使用量,超出时则直接删除文件
     */
    double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
    int replicasPerPartition = DefaultMessageStore.this.getMessageStoreConfig().getReplicasPerDiskPartition();
    // Only one commitLog in node
    if (replicasPerPartition <= 1) {
        if (minPhysicRatio < 0 || minPhysicRatio > ratio) {
            DefaultMessageStore.LOGGER.info("commitLog disk maybe full soon, so reclaim space, " + minPhysicRatio);
            return true;
        }

        if (logicsRatio < 0 || logicsRatio > ratio) {
            DefaultMessageStore.LOGGER.info("consumeQueue disk maybe full soon, so reclaim space, " + logicsRatio);
            return true;
        }
        return false;
    } else {
        long majorFileSize = DefaultMessageStore.this.getMajorFileSize();
        long partitionLogicalSize = UtilAll.getDiskPartitionTotalSpace(minStorePath) / replicasPerPartition;
        double logicalRatio = 1.0 * majorFileSize / partitionLogicalSize;

        if (logicalRatio > DefaultMessageStore.this.getMessageStoreConfig().getLogicalDiskSpaceCleanForciblyThreshold()) {
            // if logical ratio exceeds 0.80, then clean immediately
            DefaultMessageStore.LOGGER.info("Logical disk usage {} exceeds logical disk space clean forcibly threshold {}, forcibly: {}",
                    logicalRatio, minPhysicRatio, cleanImmediately);
            cleanImmediately = true;
            return true;
        }

        boolean isUsageExceedsThreshold = logicalRatio > ratio;
        if (isUsageExceedsThreshold) {
            DefaultMessageStore.LOGGER.info("Logical disk usage {} exceeds clean threshold {}, forcibly: {}",
                    logicalRatio, ratio, cleanImmediately);
        }
        return isUsageExceedsThreshold;
    }
}

三、参考资料

RocketMQ文件刷盘机制与过期文件删除_fFee-ops的博客-CSDN博客_rocketmq设置过期时间

消息队列 - RocketMQ -- 过期文件的删除 - 个人文章 - SegmentFault 思否

RocketMQ5.0.0消息存储<一>_存储文件及内存映射_爱我所爱0505的博客-CSDN博客

RocketMQ5.0.0消息存储<二>_消息存储流程_爱我所爱0505的博客-CSDN博客_rocketmq 消息写入流程RocketMQ5.0.0消息存储<三>_消息转发与恢复机制_爱我所爱0505的博客-CSDN博客 

RocketMQ5.0.0消息存储<四>_刷盘机制_爱我所爱0505的博客-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/355913.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023/02/18 ES6数组的解读

1 扩展运算符 扩展运算符&#xff08;spread&#xff09;是三个点&#xff08;…&#xff09;. 它好比 rest 参数的逆运算&#xff0c;将一个数组转为用逗号分隔的参数序列. console.log(...[1, 2, 3]) // 1 2 3console.log(1, ...[2, 3, 4], 5) // 1 2 3 4 5该运算符主要用于…

比较全面的HTTP和TCP网络传输的单工、全双工和半双工

文章目录单工、全双工、半双工1. 单工2. 半双工3. 全双工HTTP协议的工作模式TCP协议的工作模式本文参考&#xff1a; 图解网络传输单工、半双工、全双工 - 知乎 (zhihu.com) 问&#xff1a;HTTP是单工的还是双工的还是半双工的 - 简书 (jianshu.com) 关于TCP全双工模式的解释_忙…

大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——InputFormat数据输入

3.1.1切片与MapTask并行度决定机制 1、问题引出 MapTask的并行度决定Map阶段的任务处理并发度&#xff0c;进而影响到整个Job的处理速度。 思考&#xff1a;1G的数据&#xff0c;启动8个MapTask&#xff0c;可以提高集群的并发处理能力。那么1K的数据&#xff0c;也启动8个M…

华为OD机试 - 机器人走迷宫(JS)

机器人走迷宫 题目 房间有X*Y的方格组成&#xff0c;例如下图为6*4的大小。每一个放个以坐标(x,y)描述。 机器人固定从方格(0,0)出发&#xff0c;只能向东或者向北前进&#xff0c; 出口固定为房间的最东北角&#xff0c;如下图的方格(5,3)。 用例保证机器人可以从入口走到出…

算法:(十四)动态规划

文章目录14.1 单序列问题面试题88&#xff1a;爬楼梯的最少成本面试题89&#xff1a;偷盗房屋面试题90:环形偷盗房屋面试题91&#xff1a;粉刷房子面试题92&#xff1a;反转字符面试题93&#xff1a;最长斐波那契数列面试题94&#xff1a;最少回文分割14.2 双序列问题面试题95&…

2023美国大学生数学建模竞赛A题详细公式和代码分享

目录 2023美赛A题翻译 1.1 建立一个数学模型&#xff0c;预测一个植物群落在各种不规则的天气周期中如何随时间变化。包括本该降水充足的干旱时期。该模型应考虑到干旱周期中不同物种之间的相互作用。 1.2就植物群落与大环境的长期相互作用&#xff0c;探讨你能从你的模型中…

【Python从入门到进阶】8、Python的输入输出

接上篇《7、运算符》 上一篇我们学习了Python的运算符部分&#xff0c;包括算数运算符、赋值运算符、比较运算符、逻辑运算符等。本篇我们来学习Python的输入和输出相关内容。 一、输出 其实输出的含义就是在控制台里打印一些东西&#xff0c;我们在之前已经做过很多的“prin…

社招前端必会手写面试题集锦

查找字符串中出现最多的字符和个数 例: abbcccddddd -> 字符最多的是d&#xff0c;出现了5次 let str "abcabcabcbbccccc"; let num 0; let char ;// 使其按照一定的次序排列 str str.split().sort().join(); // "aaabbbbbcccccccc"// 定义正则表达…

IDEA插件安装慢、超时、不成功问题如何解决?

目录 一、打开国内插件的节点IP地址 二、修改本地hosts文件 三、刷新DNS缓存 一、打开国内插件的节点IP地址 国内插件的节点IP地址查询: http://tool.chinaz.com/speedtest/plugins.jetbrains.com 在下方的检测结果中&#xff0c;找到一个解析时间最短的IP地址&#xff0c;解…

流程引擎之Activiti简介

背景Activiti 是一个开源架构的工作流引擎&#xff0c;基于 bpmn2.0 标准进行流程定义&#xff0c;其前身是 jBPM&#xff0c;Activiti 相对于 jBPM 更轻量&#xff0c;更易上手&#xff0c;且天然集成了 Spring。2010年 jBPM 创始人 Tom Baeyens 离开 JBoss&#xff0c;随之加…

大数据之-Nifi-监控nifi处理数据的状态信息_处理数据的状态栏_组件统计_公告BUG信息---大数据之Nifi工作笔记0010

首先可以看到状态栏,可以提供活动现场的数量,排队统计信息,等等 可以看到在面板的最上面有一行状态栏,就是 就是具体的状态信息 然后组件统计就是具体的处理器的统计信息 可以看到这里pickup这个getfile处理器,可以看到in这里,说了文件的输入个数,以及大小 out是输出个数以及大…

Linux进程学习【二】

✨个人主页&#xff1a; Yohifo &#x1f389;所属专栏&#xff1a; Linux学习之旅 &#x1f38a;每篇一句&#xff1a; 图片来源 &#x1f383;操作环境&#xff1a; CentOS 7.6 阿里云远程服务器 Perseverance is not a long race; it is many short races one after another…

java黑马头条 day5自媒体文章审核 敏感词过滤算法DFA 集成RabbitMQ实现自动审核

自动审核流程介绍 做为内容类产品&#xff0c;内容安全非常重要&#xff0c;所以需要进行对自媒体用户发布的文章进行审核以后才能到app端展示给用户。2 WmNews 中status 代表自媒体文章的状态 status字段&#xff1a;0 草稿 1 待审核 2 审核失败 3 人工审核 4 人工审核通过 …

Mac上动态切换JDK

起因&#xff1a; 因为甲骨文的JDK8并没有适配Mac M1芯片&#xff0c;新版的17倒是有适配的&#xff0c;11这些不清楚。往常开发可以使用Zulu这些厂商提供的JDK8。 但是在本地起一些服务&#xff0c;例如Nacos时候&#xff0c;还是会出现不兼容导致起不来&#xff01;虽然Nac…

【网络原理6】数据链路层协议——以太网

数据链路层负责的是相邻两个网络节点之间的数据以帧为单位进行传输。 具体关于数据链路层的介绍&#xff0c;已经在这一篇文章当中提到了。 初识网络&#xff1a;IP、端口、网络协议、TCP-IP五层模型_革凡成圣211的博客-CSDN博客TCP/IP五层协议详解https://blog.csdn.net/weix…

Python每日一练(20230219)

目录 1. 循环随机取数组直到得出指定数字&#xff1f; 2. 旋转链表 3. 区间和的个数 1. 循环随机取数组直到得出指定数字&#xff1f; 举个例子&#xff1a; 随机数字范围&#xff1a;0~100 每组数字量&#xff1a;6&#xff08;s1,s2,s3,s4,s5,s6&#xff09; 第二轮开始随…

高级前端一面面试题集锦

详细说明 Event loop 众所周知 JS 是门非阻塞单线程语言&#xff0c;因为在最初 JS 就是为了和浏览器交互而诞生的。如果 JS 是门多线程的语言话&#xff0c;我们在多个线程中处理 DOM 就可能会发生问题&#xff08;一个线程中新加节点&#xff0c;另一个线程中删除节点&#…

[LeetCode周赛复盘] 第 98 场双周赛20230219

[LeetCode周赛复盘] 第 98 场双周赛20230219 一、本周周赛总结二、 [Easy] 6359. 替换一个数字后的最大差值1. 题目描述2. 思路分析3. 代码实现三、[Medium] 6361. 修改两个元素的最小分数1. 题目描述2. 思路分析3. 代码实现四、[Medium] 6360. 最小无法得到的或值1. 题目描述2…

将镭神C32激光雷达的PointXYZ数据转化为PointXYZIR格式 - 附代码

之前遇到过“镭神32线激光雷达ROS下运行fromRosMsg()报错 Failed to find match for field “intensity“ 问题”&#xff0c; 当时确定了是镭神C32雷达缺少相应字段&#xff0c;并记录博客【学习记录】镭神32线激光雷达ROS下运行fromRosMsg()报错 Failed to find match for fi…

如何正确使用chatgpt,让chatgpt回答优质内容?

我们以chatgpt写一篇文章为例。大家都知道&#xff0c;如果直接让chatgpt写某篇文章&#xff0c;他的回答总是简洁明了的&#xff0c;因为它定位就是聊天&#xff0c;而不是会像“舔狗”一样写一篇小作文。 并且他的回答&#xff0c;总是固定格式的&#xff0c;只要稍微了解ch…