# 消息中间件 RocketMQ 高级功能和源码分析(九)

news2025/1/23 6:02:48

消息中间件 RocketMQ 高级功能和源码分析(九)

一、消息中间件 RocketMQ 源码分析: 同步刷盘分析

1、刷盘机制

RocketMQ 的存储是基于 JDK NIO 的内存映射机制(MappedByteBuffer)的,消息存储首先将消息追加到内存,再根据配置的刷盘策略在不同时间进行刷写磁盘。

2、同步刷盘

消息追加到内存后,立即将数据刷写到磁盘文件

3、同步刷盘流程 示例图:

在这里插入图片描述

4、 代码:CommitLog#handleDiskFlush


//刷盘服务
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
    //封装刷盘请求
    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
    //提交刷盘请求
    service.putRequest(request);
    //线程阻塞5秒,等待刷盘结束
    boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
    if (!flushOK) {
        putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
    }

5、 GroupCommitRequest

在这里插入图片描述


long nextOffset;	//刷盘点偏移量
CountDownLatch countDownLatch = new CountDownLatch(1);	//倒计树锁存器
volatile boolean flushOK = false;	//刷盘结果;默认为false

6、 代码:GroupCommitService#run

public void run() {
    CommitLog.log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            //线程等待10ms
            this.waitForRunning(10);
            //执行提交
            this.doCommit();
        } catch (Exception e) {
            CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }
	...
}

7、 代码:GroupCommitService#doCommit


private void doCommit() {
    //加锁
    synchronized (this.requestsRead) {
        if (!this.requestsRead.isEmpty()) {
            //遍历requestsRead
            for (GroupCommitRequest req : this.requestsRead) {
                // There may be a message in the next file, so a maximum of
                // two times the flush
                boolean flushOK = false;
                for (int i = 0; i < 2 && !flushOK; i++) {
                    flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
					//刷盘
                    if (!flushOK) {
                        CommitLog.this.mappedFileQueue.flush(0);
                    }
                }
				//唤醒发送消息客户端
                req.wakeupCustomer(flushOK);
            }
			
            //更新刷盘监测点
            long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
            if (storeTimestamp > 0) {               CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
            }
			
            this.requestsRead.clear();
        } else {
            // Because of individual messages is set to not sync flush, it
            // will come to this process
            CommitLog.this.mappedFileQueue.flush(0);
        }
    }
}

二、消息中间件 RocketMQ 源码分析:异步刷盘说明

1、异步刷盘

在消息追加到内存后,立即返回给消息发送端。如果开启 transientStorePoolEnable,RocketMQ 会单独申请一个与目标物理文件(commitLog)同样大小的堆外内存,该堆外内存将使用内存锁定,确保不会被置换到虚拟内存中去,消息首先追加到堆外内存,然后提交到物理文件的内存映射中,然后刷写到磁盘。如果未开启 transientStorePoolEnable,消息直接追加到物理文件直接映射文件中,然后刷写到磁盘中。

2、异步刷盘流程 示例图:

在这里插入图片描述

3、开启 transientStorePoolEnable 后异步刷盘步骤:


- 1. 将消息直接追加到 ByteBuffer(堆外内存)。
- 2. CommitRealTimeService 线程每隔 200ms 将 ByteBuffer 新追加内容提交到 MappedByteBuffer 中。
- 3. MappedByteBuffer在内存中追加提交的内容,wrotePosition 指针向后移动。
- 4. commit 操作成功返回,将committedPosition 位置恢复。
- 5. FlushRealTimeService 线程默认每 500ms 将 MappedByteBuffer 中新追加的内存刷写到磁盘。

4、 代码:CommitLog$CommitRealTimeService#run

提交线程工作机制


//间隔时间,默认200ms
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();

//一次提交的至少页数
int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();

//两次真实提交的最大间隔,默认200ms
int commitDataThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();

//上次提交间隔超过commitDataThoroughInterval,则忽略提交commitDataThoroughInterval参数,直接提交
long begin = System.currentTimeMillis();
if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
    this.lastCommitTimestamp = begin;
    commitDataLeastPages = 0;
}

//执行提交操作,将待提交数据提交到物理文件的内存映射区
boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
long end = System.currentTimeMillis();
if (!result) {
    this.lastCommitTimestamp = end; // result = false means some data committed.
    //now wake up flush thread.
    //唤醒刷盘线程
    flushCommitLogService.wakeup();
}

if (end - begin > 500) {
    log.info("Commit data to file costs {} ms", end - begin);
}
this.waitForRunning(interval);

5、 代码:CommitLog$FlushRealTimeService#run

刷盘线程工作机制


//表示await方法等待,默认false
boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
//线程执行时间间隔
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
//一次刷写任务至少包含页数
int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
//两次真实刷写任务最大间隔
int flushPhysicQueueThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
...
//距离上次提交间隔超过flushPhysicQueueThoroughInterval,则本次刷盘任务将忽略flushPhysicQueueLeastPages,直接提交
long currentTimeMillis = System.currentTimeMillis();
if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
    this.lastFlushTimestamp = currentTimeMillis;
    flushPhysicQueueLeastPages = 0;
    printFlushProgress = (printTimes++ % 10) == 0;
}
...
//执行一次刷盘前,先等待指定时间间隔
if (flushCommitLogTimed) {
    Thread.sleep(interval);
} else {
    this.waitForRunning(interval);
}
...
long begin = System.currentTimeMillis();
//刷写磁盘
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
//更新存储监测点文件的时间戳
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);

三、消息中间件 RocketMQ 源码分析:删除过期文件机制分析

1、过期文件删除机制

由于 RocketMQ 操作 CommitLog、ConsumerQueue文 件是基于内存映射机制并在启动的时候回加载 CommitLog、ConsumerQueue 目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,所以要引入一种机制来删除已过期的文件。RocketMQ 顺序写 CommitLog、ConsumerQueue 文件,所有写操作全部落在最后一个 CommitLog 或者 ConsumerQueue 文件上,之前的文件在下一个文件创建后将不会再被更新。RocketMQ 清除过期文件的方法时:如果当前文件在在一定时间间隔内没有再次被消费,则认为是过期文件,可以被删除,RocketMQ 不会关注这个文件上的消息是否全部被消费。默认每个文件的过期时间为72小时,通过在 Broker 配置文件中设置 fileReservedTime 来改变过期时间,单位为小时。

2、 代码:DefaultMessageStore#addScheduleTask


private void addScheduleTask() {
	//每隔10s调度一次清除文件
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            DefaultMessageStore.this.cleanFilesPeriodically();
        }
    }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
	...
}

3、 代码:DefaultMessageStore#cleanFilesPeriodically


private void cleanFilesPeriodically() {
    //清除存储文件
    this.cleanCommitLogService.run();
    //清除消息消费队列文件
    this.cleanConsumeQueueService.run();
}

4、 代码:DefaultMessageStore#deleteExpiredFiles


private void deleteExpiredFiles() {
    //删除的数量
    int deleteCount = 0;
    //文件保留的时间
    long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
    //删除物理文件的间隔
    int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
    //线程被占用,第一次拒绝删除后能保留的最大时间,超过该时间,文件将被强制删除
    int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();

boolean timeup = this.isTimeToDelete();
boolean spacefull = this.isSpaceToDelete();
boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
if (timeup || spacefull || manualDelete) {
	...执行删除逻辑
}else{
    ...无作为
}

5、删除文件操作的条件

    1. 指定删除文件的时间点,RocketMQ 通过 deleteWhen 设置一天的固定时间执行一次删除过期文件操作,默认零晨4点
    1. 磁盘空间如果不充足,删除过期文件
    1. 预留,手工触发。

6、 代码:CleanCommitLogService#isSpaceToDelete

当磁盘空间不足时执行删除过期文件


private boolean isSpaceToDelete() {
    //磁盘分区的最大使用量
    double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
	//是否需要立即执行删除过期文件操作
    cleanImmediately = false;

    {
        String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
        //当前CommitLog目录所在的磁盘分区的磁盘使用率
        double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
        //diskSpaceWarningLevelRatio:磁盘使用率警告阈值,默认0.90
        if (physicRatio > diskSpaceWarningLevelRatio) {
            boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
            if (diskok) {
                DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
            }
			//diskSpaceCleanForciblyRatio:强制清除阈值,默认0.85
            cleanImmediately = true;
        } else if (physicRatio > diskSpaceCleanForciblyRatio) {
            cleanImmediately = true;
        } else {
            boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
            if (!diskok) {
            DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
        }
    }

    if (physicRatio < 0 || physicRatio > ratio) {
        DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);
        return true;
    }
}

7、 代码:MappedFileQueue#deleteExpiredFileByTime

执行文件销毁和删除


for (int i = 0; i < mfsLength; i++) {
    //遍历每隔文件
    MappedFile mappedFile = (MappedFile) mfs[i];
    //计算文件存活时间
    long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
    //如果超过72小时,执行文件删除
    if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
        if (mappedFile.destroy(intervalForcibly)) {
            files.add(mappedFile);
            deleteCount++;

            if (files.size() >= DELETE_FILES_BATCH_MAX) {
                break;
            }

            if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
                try {
                    Thread.sleep(deleteFilesInterval);
                } catch (InterruptedException e) {
                }
            }
        } else {
            break;
        }
    } else {
        //avoid deleting files in the middle
        break;
    }
}

四、消息中间件 RocketMQ 源码分析: 消息存储总结

1、RocketMQ 的存储文件包括消息文件(Commitlog)、消息消费队列文件(ConsumerQueue)、Hash 索引文件(IndexFile)、监测点文件(checkPoint)、abort(关闭异常文件)。

单个消息存储文件、消息消费队列文件、Hash索引文件长度固定以便使用内存映射机制进行文件的读写操作。RocketMQ组织文件以文件的起始偏移量来命令文件,这样根据偏移量能快速定位到真实的物理文件。RocketMQ基于内存映射文件机制提供了同步刷盘和异步刷盘两种机制,异步刷盘是指在消息存储时先追加到内存映射文件,然后启动专门的刷盘线程定时将内存中的文件数据刷写到磁盘。

2、CommitLog,消息存储文件

RocketMQ 为了保证消息发送的高吞吐量,采用单一文件存储所有主题消息,保证消息存储是完全的顺序写,但这样给文件读取带来了不便,为此 RocketMQ 为了方便消息消费构建了消息消费队列文件,基于主题与队列进行组织,同时 RocketMQ 为消息实现了 Hash 索引,可以为消息设置索引键,根据所以能够快速从 CommitLog 文件中检索消息。

3、ReputMessageService 线程实时地将消息转发给消息消费队列文件与索引文件。abort 文件引入。

当消息达到 CommitLog 后,会通过 ReputMessageService 线程接近实时地将消息转发给消息消费队列文件与索引文件。为了安全起见,RocketMQ 引入 abort 文件,记录 Broker 的停机是否是正常关闭还是异常关闭,在重启Broker时为了保证 CommitLog 文件,消息消费队列文件与Hash索引文件的正确性,分别采用不同策略来恢复文件。

4、RocketMQ 文件过期机制

RocketMQ 不会永久存储消息文件、消息消费队列文件,而是启动文件过期机制并在磁盘空间不足或者默认凌晨4点删除过期文件,文件保存72小时并且在删除文件时并不会判断该消息文件上的消息是否被消费。

上一节关联链接请点击:
# 消息中间件 RocketMQ 高级功能和源码分析(八)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1846687.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

好用的便签是什么 电脑桌面上好用的便签

作为一名文字工作者&#xff0c;我经常需要在繁杂的思绪中捕捉灵感&#xff0c;记录下那些一闪而过的想法。在寻找一款适合电脑桌面的便签应用时&#xff0c;我偶然发现了敬业签便签软件简直是为我量身定制的&#xff0c;它不仅界面简洁&#xff0c;操作便捷&#xff0c;更重要…

前瞻展望,中国信通院即将发布“2024云计算十大关键词”

人类对于未知领域的探索欲望&#xff0c;似乎总是无穷无尽&#xff0c;而探索欲反过来推动了技术的革新与进步。今年以来&#xff0c;AI大模型成为科技领域最为确定的趋势之一。在大模型开启的AI原生时代&#xff0c;AI原生正在重构云计算的演化逻辑和发展走向&#xff0c;MaaS…

含铬废水有哪些危害?含铬废水怎么处理呢?

铬化物可以通过消化道、呼吸道、皮肤和粘膜侵人人体&#xff0c;主要积聚在肝、肾、内分泌系统和肺部。毒理作用是影响体内物质氧化、还原和水解过程&#xff0c;与核酸、核蛋白结合影响组织中的磷含量。铬化合物具有致癌作用。 铬化合物以蒸汽和粉尘的方式进入人体组织中&…

gstreamer+qt5实现简易视频播放器

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、安装环境1.QT52.gstreamer 二、代码1.Windows实现 三、测试效果总结 前言 最近在研究mpp&#xff0c;通过gstreamer实现了硬解码&#xff0c;但是我在想我…

ip地址怎么写才是的对的?合法ip地址正确的格式

IP地址怎么写才是的对的&#xff1f;在互联网的世界里&#xff0c;IP地址就像是我们生活中的门牌号码&#xff0c;它是每个设备在网络中的唯一标识。正确的书写IP地址对于确保网络通信的顺畅至关重要。本文将带您了解合法IP地址的正确格式与书写规范&#xff0c;并深入探讨其在…

【MySQL】索引的原理及其使用

文章目录 什么叫索引减少磁盘IO次数缓存池(Buffer Pool&#xff09;MySQL的页页内目录页目录 正确理解索引结构为什么Innodb的索引是B树结构各种存储引擎支持的索引聚簇索引和非聚簇索引索引类型 关于索引的操作创建主键索引唯一索引的创建普通索引的创建查看索引删除索引 什么…

6月21日(周五)AH股总结:沪指失守3000点,恒生科技指数跌近2%,多只沪深300ETF午后量能显著放大

内容提要 沪指全天围绕3000点关口来回拉锯&#xff0c;收盘跌破3000点。白酒及光刻机概念集体走低&#xff0c;中芯国际港股跌超2%。CRO医药概念及水利股逆势走强。 A股低开低走 沪指全天围绕3000点关口来回拉锯&#xff0c;收盘跌破3000点&#xff0c;跌0.24%。深成指跌0.04…

如何在 MySQL 中创建和使用事务?

目录 1. 环境准备 2. 创建事务 3. 事务执行 4. 事务撤消 5. 总结 事务是数据库区别于文件系统的重要特征之一&#xff0c;当我们有了事务就会让数据库始终保持一致&#xff0c;同时我们还能通过事务机制恢复到某个时间点&#xff0c;这样可以保证已提交到数据库的修改不会…

【Linux】基础IO_2

文章目录 六、基础I/O2. 系统文件I/O磁盘的存储结构 未完待续 六、基础I/O 2. 系统文件I/O 磁盘的存储结构 系统中不是所有对文件都是打开的状态&#xff0c;大部分的文件都是没有被打开的。这些文件一般都被存储在磁盘当中。磁盘通过柱面&#xff0c;扇面&#xff0c;扇区确…

JS 【详解】树的遍历(含深度优先遍历和广度优先遍历的算法实现)

用 js 描述树 let tree [{label:a,children:[{label:b,children:[{label:d},{label:e}]},{label:c,children:[{label:f}]}]} ]使用数组是因为树的节点有顺序 深度优先遍历 从根节点出发&#xff0c;优先遍历最深的节点 遍历顺序为 abdecf function DFS(tree) {tree.forEach(…

c++编译器优化不显示拷贝构造函数

一.错误情景&#xff08;无法打印拷贝函数&#xff09; #include<iostream> using namespace std;class person { public:person(){cout << "person默认构造函数调用" << endl;}person(int age){cout << "有参构造函数调用" <…

IIS代理配置-反向代理

前后端分离项目&#xff0c;前端在开发中使用proxy代理解决跨域问题&#xff0c;打包之后无效。 未配置前无法访问 部署环境为windows IIS&#xff0c;要在iis设置反向代理 安装代理模块 需要在iis中实现代理&#xff0c;需要安装Application Request Routing Cache和URL重…

在Verilog HDL中使用任务(task)

代码&#xff1a; sort4.v module sort4(ra,rb,rc,rd,a,b,c,d);output[3:0] ra,rb,rc,rd;input[3:0] a,b,c,d;reg[3:0] ra,rb,rc,rd;reg[3:0] va,vb,vc,vd;always (a or b or c or d)begin{va,vb,vc,vd}{a,b,c,d};sort2(va,vc); //va 与vc互换。sort2(vb,vd);…

为什么在React中一定要使用setState来更新数据?

为什么在React中一定要使用setState来更新数据&#xff1f; 因为我们修改了State后&#xff0c;我们希望看到React更具最新的State来重新渲染界面&#xff0c;但是这种方式修改的React并不知道数据发送的改变&#xff1b; 在React中并没有使用像Vue3或者Vue2中的数据劫持来监…

时间?空间?复杂度??

1.什么是时间复杂度和空间复杂度&#xff1f; 1.1算法效率 算法效率分析分为两种&#xff1a;第一种是时间效率&#xff0c;第二种是空间效率。时间效率被称为时间复杂度&#xff0c;而空间效率被称为空间复杂度。 时间复杂度主要衡量的是一个算法的运行速度&#xff0c;而空…

爱眼小妙招:台灯怎么选?学生如何正确使用台灯?

视力是心灵的窗户&#xff0c;尤其对于儿童来说更为重要。然而&#xff0c;随着现代生活方式的改变&#xff0c;孩子们面临越来越多的视力挑战。据统计&#xff0c;在近视学生中&#xff0c;近10%的人患有高度近视&#xff0c;而这一比例随年级的增加而逐渐上升。从幼儿园的小小…

Redis-数据类型-Bit的基本操作-getbit-setbit-Bitmap

文章目录 0、Bitmaps&#xff08;位图&#xff09;1、查看redis是否启动2、通过客户端连接redis3、切换到db7数据库4、设置&#xff08;或覆盖&#xff09;一个键&#xff08;key&#xff09;的值&#xff08;value&#xff09;5、获取存储在给定键&#xff08;key&#xff09;…

解锁空间数据奥秘:ArcGIS Pro与Python双剑合璧,处理表格数据、矢量数据、栅格数据、点云数据、GPS数据、多维数据以及遥感云平台数据等

ArcGISPro提供了用户友好的图形界面&#xff0c;适合初学者快速上手进行数据处理和分析。它拥有丰富的工具和功能&#xff0c;支持各种数据格式的处理和分析&#xff0c;适用于各种规模的数据处理任务。ArcGISPro在地理信息系统&#xff08;GIS&#xff09;领域拥有广泛的应用&…

鸿蒙Harmony角落里的知识:从ECMA规范到ArkTS接口(二)

上篇介绍了typedArray.slice方法&#xff0c;鸿蒙Harmony角落里的知识&#xff1a;从ECMA规范到ArkTS接口&#xff08;一&#xff09;本文介绍一个返回结果和参数和slice非常类似的函数&#xff1a;TypedArray.prototype.subarray。 ECMA对TypedArray.prototype.subarray接口的…

【启明智显产品介绍】Model3C工业级HMI芯片详解专题(三)通信接口

Model3C 是一款基于 RISC-V 的高性能、国产自主、工业级高清显示与智能控制 MCU, 集成了内置以太网控制器&#xff0c;配备2路CAN、4路UART、5组GPIO、2路SPI等多种通信接口&#xff0c;能够轻松与各种显示设备连接&#xff0c;实现快速数据传输和稳定通信&#xff0c;可以与各…