从零开始读RocketMq源码(三)Broker存储Message流程解析

news2024/11/15 21:33:55

目录

前言

准备

消息载体CommitLog

文件持久化位置 

源码解析

broker消息对象MessageExtBrokerInner

异步存储message

CommitLog的真相

创建MappedFile文件

加入异步刷盘队列

Message异步存储MappedByteBuffer

总结


前言

在面试中我们经常会听到这样的回答,生产者将message发送给broker服务,然后消费者从broker中获取消息并消费,为了保证message在broker服务中不丢失,mq会对消息数据进行持久化到磁盘中。那么message到达broker服务后是如何进行存储并持久化到磁盘中的呢?这就是本篇要学习的内容。

准备

源码地址:https://github.com/apache/rocketmq

目前最新版本为:5.2.0

那么我们在idea上切换分支为 release-5.2.0

消息载体CommitLog

该对象是broker服务接收到message后进行存储的数据对象,一般就把存储消息的文件就称为commitLog文件也就是最终存储磁盘上的数据文件。

大致的message流向如图:

根据源码可以知道,一个commitLog文件最大存储1G数据,文件写满了,则会写入下一个文件中

文件持久化位置 

commitlog文件的持久化存放的位置是通过broker.conf配置文件中storePathCommitLog配置

storePathCommitLog = /Users/leonsh/rocketmqnamesrv/store/commitlog

最后生成的文件为这样

文件命名

查看上面图片可知文件的名称是一串数字20个0组成,因为文件名称是按照偏移量offset来命名的,

因为这是第一个文件所以offset为0,补全20位,所以文件名称为20个0

,以此类推第二个文件名称则为00000000001073741824

上面说过一个commitlog文件最大存储1G,而1G=1024*1024*1024=1073741824bit,这就是第二个文件的偏移量

源码解析

前面说到Producer发送message到broker后,broker会对接收的message请求进行处理

//源码位置
//包名:org.apache.rocketmq.broker.processor
//文件:SendMessageProcessor
//行数:87
public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request)

上面的方法名中顾名思义就是处理请求的,并且所在的文件命名SendMessageProcessor也说明了该类的作用。那么我们就从该方法深入源码中

看方法引用位置我们会发现许多地方调用了该方法,先抛开前面broker如何接收的,反正最后消息会到达这里,从该方法开始就是broker处理message的核心流程也是本篇学习的重点

broker消息对象MessageExtBrokerInner

MessageExtBrokerInner该对象就是用来后续对message处理的封装

//源码位置
//包名:org.apache.rocketmq.broker.processor
//文件:SendMessageProcessor
//行数:255
//获取请求对象中的消息体
final byte[] body = request.getBody();
int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0) {
    queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
}
//初始化消息对象
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
  • requestHeader 该对象就是在上一篇中讲到的发送message的消息请求头
  • 从请求头中获取设置的队列id,如果没有设置,则会从对应的topic中随机获取一个randomQueueId()
  • 从请求头中获取topic名称,通过名称再去获取broker中存储的topic对应的数据对象,深入源码会发现,broker中存储topic数据也是使用的map,ConcurrentMap<String, TopicConfig> topicConfigTable
  • 最后就是创建MessageExtBrokerInner对象并设值

异步存储message

//源码位置
//包名:org.apache.rocketmq.broker.processor
//文件:SendMessageProcessor
//行数:255
CompletableFuture<PutMessageResult> asyncPutMessageFuture;
if (sendTransactionPrepareMessage) {
    //事务消息
    asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
    //普通消息
    asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}

或许大家和博主开始一样都有一个疑惑,我们生产者发送的是同步消息,为何到了broker却是异常存储呢?

1.其实生产者发送同步消息和broker异步存储都是相互独立互不干扰的,broker异步存储只是为了提高mq接收消息的写入性能吞吐量broker异步存储会将写入内存的message进行异步刷盘。

2.就算broker是异步存储但也不会立即返回结果给生产者,需要等待broker异步刷盘成功才会返回结果给生产者,通过broker提供的CompletableFuture机制实现。

什么,看完解释还是有点懵,有点抽象,我们继续向下深入源码,一步一步解开疑惑,我相信看完后面的解析同样会豁然开朗的!

CommitLog的真相

//源码位置
//包名:org.apache.rocketmq.broker.processor
//文件:SendMessageProcessor
//行数:255
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);

到这里,本文开头提到的commitLog对象终于出现了,查看该源码可知,commitlog对象中定义了一个MappedFileQueue对象这个对象又是做什么的,我们继续深入源码

//源码位置
//包名:org.apache.rocketmq.store
//文件:CommitLog
//行数:942
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

深入该方法,大概意思就是从MappedFileQueue对象中的CopyOnWriteArrayList<MappedFile> mappedFiles集合中取出里面的最后一个MappedFile对象,至此赢来大结局MappedFile对象才是最终映射到磁盘文件的,而CommitLog可以理解为MappedFile对象的外层封装。但落到磁盘上的文件我们依然称为commitLog文件

扩展:

CopyOnWriteArrayList 是 Java 中的一种线程安全的 List 实现,属于 java.util.concurrent 包

读操作:不需要加锁,直接操作底层数组,底层数组在写操作时是一个副本,读操作不会影响正在进行的写操作,能够保证高效的并发读性能。

写操作:会创建底层数组的一个新的副本,对这个副本进行修改, 修改完成后,新的副本会替换原来的数组

创建MappedFile文件

//源码位置
//包名:org.apache.rocketmq.store
//文件:CommitLog
//行数:1001
if (null == mappedFile || mappedFile.isFull()) {
    mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
    if (isCloseReadAhead()) {
        setFileReadMode(mappedFile, LibC.MADV_RANDOM);
    }
}

因为broker是启动后首次存储数据,所以上面获取出来的mappedFile一定为空则进入if代码块

因此偏移量也是初始值0

生成MappedFile文件路径名称

//源码位置
//包名:org.apache.rocketmq.store
//文件:MappedFileQueue
//行数:345
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset
                                                                                    + this.mappedFileSize);
  • this.storePath:该字段就是前面在broker.conf文件中配置的文件地址
  • File.separator:分隔符
  • UtilAll.offset2FileName(createOffset):生成20位数字组成的文件名称,当前createOffset=0。

为何会生成两个地址nextFilePathnextNextFilePath呢?

因为mq在生成当前需要使用的文件时同时生成下一个使用的文件,当第一个文件存储满后,直接使用下一个文件,减小了创建文件的开销,提高mq的性能。所以会同时生成2个文件

那么问题来了,为何本文开头生成的文件怎么只有一个?

我们查看源码提交记录可知,nextNextFilePath第二个文件是2021年9月才新增的

查看rocketMq在github上各个版本的发布时间,2021年9月并没有发布新版本,但是2021年10月发布了rocketmq-all-4.9.2

那么由此可得,rocketMq同时创建2个文件从版本4.9.2开始支持,之前的版本都只会创建1个文件

因为博主的broker服务是通过docker镜像启动的,但是查看镜像版本显示的确为最新版本

其实这只是rocketMq镜像的版本,而我们看的是镜像中使用rocketMq框架版本

执行命令查看镜像的详细信息

docker inspect apacherocketmq/rocketmq:latest

由此可得博主的rocketMq版本为:4.6.0,所以只会创建一个commitLog文件

加入异步刷盘队列

//源码位置
//包名:org.apache.rocketmq.store
//文件:AllocateMappedFileService
//行数:62
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
//...
//加入队列触发异步刷盘操作
boolean offerOK = this.requestQueue.offer(nextReq);
//...
AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
//...
boolean offerOK = this.requestQueue.offer(nextNextReq);
  • AllocateRequest:就是message异步存储请求最后的封装
  • this.requestTable:也是一个map对象 ConcurrentMap<String, AllocateRequest> requestTable;key为文件的路径,value则为AllocateRequest
  • this.requestQueue这是一个队列PriorityBlockingQueue<AllocateRequest> requestQueue队列元素为AllocateRequest

PriorityBlockingQueue是如何做到异步刷盘的呢?

该队列就是为broker实现异步存储核心,可能大家对这个队列比较陌生

它是Java 中 java.util.concurrent 包提供的一个线程安全的优先级队列。它基于优先级堆实现,能够保证元素按照自然顺序或者指定的比较器顺序进行排序

因为它是一个队列那么我们首先就会想到生产者消费者,那么就起到了异步解耦的作用

他有两个非常重要的方法:

  • offer(): 将一个元素插入到队列中
  • take(): 从队列中获取并移除元素 由于 PriorityBlockingQueue 是一个阻塞队列,如果队列为空,take 方法会一直阻塞直到有元素可用

总结:由上面我们知道offer()一般用于生产者调用,而take()则是消费者调用,当队列为空时消费者线程会一直阻塞,只要队列中存入对象,消费者就会感知到并消费。可以理解为消费者和生产者共享PriorityBlockingQueue对象

//源码位置
//包名:org.apache.rocketmq.store
//文件:AllocateMappedFileService
//行数:99
AllocateRequest result = this.requestTable.get(nextFilePath);
//...
//阻塞等待刷盘结果
boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);

上面源码的作用就是等待异步刷盘结果

  • 第一段就是取出之前存入的第一个请求对象AllocateRequest
  • 第二段则是判断异步刷盘是否完成,成功则返回,还没有处理完则一直阻塞,直到达到超时时间waitTimeOut

result.getCountDownLatch().await为何能做到阻塞等待结果呢?

进入AllocateRequest对象中可知,操作的是这个对象CountDownLatch countDownLatch = new CountDownLatch(1)

CountDownLatch或许大家不太熟悉,但ReentrantLock大家并不陌生吧,面试中经常问到,他们同属于java并发包JUC( java.util.concurrent 下的对象.

概念:它允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。它是通过一个计数器实现的,该计数器初始化为一个给定的值。每当一个线程完成了它的一项操作后,这个计数器就递减。当计数器的值到达零时,等待在这个计数器上的线程将被唤醒并继续执行

总结:通过源码我们看到AllocateRequest被创建时里面属性CountDownLatch中计数器默认就是1所以需要一直等待被修改为0时才会继续执行后续逻辑,那就是等待异步刷盘完成。

Message异步存储MappedByteBuffer

//源码位置
//包名:org.apache.rocketmq.store
//文件:AllocateMappedFileService
//行数:155
AllocateRequest req = null;
req = this.requestQueue.take();
AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());

该源码就是对之前加入队列的AllocateRequest取出来,并执行后续的存储操作,可以说就是消费者消费的地方,我们可以结合源码上下文代码可以知道,所在的类的顶级继承类是Runnable,而上面代码所在方法就是被重写的run()方法调用,可以认为消费者是在单独的一个线程中执行的。

获取缓冲区

//源码位置
//包名:org.apache.rocketmq.store.logfile
//文件:DefaultMappedFile
//行数:607
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();

被操作的对象是MappedByteBuffer

MappedByteBuffer是什么?

是 Java NIO(New Input/Output)中的一个类,它允许将文件直接映射到内存中,从而提高文件的读写效率。RocketMQ 使用 MappedByteBuffer 来管理 CommitLog 文件,以实现高效的消息存储和检索通过将文件映射到内存,RocketMQ 可以直接操作内存数据,而无需频繁的磁盘 I/O 操作。

MappedByteBuffer也是mmap的一种实现方式

什么是mmap?

mmap(内存映射文件)是一种将文件内容映射到进程的地址空间的技术。这样一来,文件内容就可以像访问内存一样被读写,从而显著提高 I/O 操作的效率。

调用mappedByteBuffer.slice()方法的作用是什么?

用于创建一个新的缓冲区,该缓冲区与原始缓冲区共享相同的底层内存,但具有独立的位置、限制和标记。这在需要操作内存映射文件的某一部分时非常有用,而不影响整个映射文件的其他部分。

MappedByteBuffer有两大特点:

  • 延迟写入:数据写入 MappedByteBuffer 时,实际上是写入了内存中的映射区域,操作系统会在合适的时候将这些数据同步到磁盘,而不是立即进行磁盘 I/O 操作。
  • 强制刷新:为了确保数据的一致性和持久性,MappedByteBuffer 提供了 force() 方法,可以将内存中的修改强制刷新到磁盘
//源码位置
//包名:org.apache.rocketmq.store.logfile
//文件:DefaultMappedFile
//行数:611
byteBuffer.put((int) i, (byte) 0);
//...
mappedByteBuffer.force();

总结:那么在RocketMQ 中,MappedFile 类通过使用 MappedByteBuffer 来管理 CommitLog 文件,并且使用 slice() 方法来创建子缓冲区进行局部操作,通过延迟写入减少了频繁的磁盘 I/O 操作,定期调用 force() 方法,将内存中的数据同步到磁盘,减少数据丢失的风险。这样可以提高性能和灵活性,特别是在处理大量消息时。


内存数据的刷盘过程本篇就不在深究,只要知道是通过MappedByteBuffer对延迟写入配置相关策略,并在设定的时期将内存数据写入磁盘文件中就可以了


基于上面所有内容重新修改一版简易的流程图如下

总结

本篇涉及到的知识面比较广,在broker存储message中出现了许多我们在日常开发中并不常见但功能强大的对象,比如PriorityBlockingQueueCountDownLatchMappedByteBufferRocketMq正是合理的运用了他们,从而造就了rocketMq本身这款优秀的消息队列框架,这也是我们读源码所要学习的。下一篇我们将学习RocketMq的“大脑”NameServer!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1915507.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

31_JQuery一文读懂,JS的升级版

今日内容 零、 复习昨日 一、JQuery 零、 复习昨日 1 js数组的特点(长度,类型,方法) - js数组的长度不限 - 类型不限 - 提供很多方法2 js中和的区别 - 判断数值相等 - 判断数值和数据类型同时相等3 js表单事件的事件名(事件属性单词) - 获得焦点 onfocus - 失去焦点 onblur …

自媒体运营怎样引流客源?

不管是企业还是个人&#xff0c;越来越多都在做自媒体引流运营&#xff0c;那有什么引流客源的方式呢&#xff1f; 高质量内容&#xff1a;创作并分享有价值的内容&#xff0c;吸引目标受众&#xff0c;提升内容的分享和传播效果。 SEO优化&#xff1a;优化文章标题、关键词和…

React学习笔记01

一、学习资料 1.学习网课 黑马程序员前端React18入门到实战视频教程&#xff0c;从reacthooks核心基础到企业级项目开发实战&#xff08;B站评论、极客园项目等&#xff09;及大厂面试全通关_哔哩哔哩_bilibili 2.学习文档 快速入门 – React 中文文档 二、React 1.定义 …

如何在玩客云中安装小雅AList并实现使用手机平板远程连接听歌看电影

文章目录 前言1. 本地部署AList2. AList挂载网盘3. 部署小雅alist3.1 Token获取3.2 部署小雅3.3 挂载小雅alist到AList中 4. Cpolar内网穿透安装5. 创建公网地址6. 配置固定公网地址 前言 本文主要介绍如何在安装了CasaOS的玩客云主机中部署小雅AList&#xff0c;并在AList中挂…

【Vscode】显示多个文件 打开多个文件时实现标签栏多行显示

Vscode显示多个文件&VSCode打开多个文件时实现标签栏多行显示 写在最前面一、解决打开文件的时候只显示一个tab的办法解决办法如下&#xff1a; 二、文件标签栏多行显示设置步骤&#xff1a; &#x1f308;你好呀&#xff01;我是 是Yu欸 &#x1f30c; 2024每日百字篆刻时…

【中项第三版】系统集成项目管理工程师 | 第 11 章 规划过程组① | 11.1 - 11.2

前言 第 11 章对应的内容选择题和案例分析都会进行考查&#xff0c;这一章节属于10大管理的内容&#xff0c;学习要以教材为准。本章上午题分值预计在15分。 目录 11.1 制定项目管理计划 11.1.1 主要输入 11.1.2 主要输出 11.2 规划范围管理 11.2.1 主要输入 11.2.2 主…

比curl更直观的网站性能测试工具httpstat——筑梦之路

GitHub - davecheney/httpstat: Its like curl -v, with colours. wget https://raw.githubusercontent.com/reorx/httpstat/master/httpstat.pymv httpstat.py /usr/bin/httpstat #移动到环境变量路径chmod x /usr/bin/httpstat #添加可执行权限 exec bash #重置当前bash进…

算法训练营day27--122.买卖股票的最佳时机II +55. 跳跃游戏 +45.跳跃游戏 II+1005.K次取反后最大化的数组和

一、 122.买卖股票的最佳时机II 题目链接&#xff1a;https://leetcode.cn/problems/binary-search/description/ 文章讲解&#xff1a;https://www.programmercarl.com/0122.%E4%B9%B0%E5%8D%96%E8%82%A1%E7%A5%A8%E7%9A%84%E6%9C%80%E4%BD%B3%E6%97%B6%E6%9C%BAII.html 视频…

致远漏洞(登陆绕过+任意文件上传)

漏洞复现 1.获得cookie POST /seeyon/thirdpartyController.do HTTP/1.1 Host: 192.168.1.9 User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64; rv:52.0) Gecko/20100101 Firefox/52.0 Accept: text/html,application/xhtmlxml,application/xml;q0.9,*/*;q0.8 Accept-Langua…

Linux系统之lscpu命令的基本使用

Linux系统之lscpu命令的基本使用 一、lscpu命令介绍二、lscpu命令的使用帮助2.1 命令格式2.2 命令选项2.3 使用帮助 三、lscpu命令的基本使用3.1 查看lscpu版本3.2 直接使用lspcu命令3.3 可解析的格式打印cpu信息3.4 可扩展格式打印cpu信息 四、lscpu命令使用注意事项 一、lscp…

【分布式系统】Ceph块存储系统之RBD接口

目录 一.服务端操作 1.创建一个名为 rbd-xy101 的专门用于 RBD 的存储池 2.将存储池转换为 RBD 模式 3.初始化存储池 4.创建镜像 5.管理镜像 6.Linux客户端使用 6.1.在管理节点创建并授权一个用户可访问指定的 RBD 存储池 6.2.修改RBD镜像特性&#xff0c;CentOS7默认…

【进阶篇-Day7:JAVA中Date、LocalDate等时间API的介绍】

目录 1、概述2、JDK8(-) 时间类2.1 Date类&#xff1a;&#xff08;1&#xff09;构造方法&#xff1a;&#xff08;2&#xff09;常用成员方法&#xff1a; 2.2 SimpleDateFormat类&#xff1a;2.3 总结&#xff1a;2.4 Calendar类介绍&#xff1a; 3、JDK8() 时间类3.1 日历类…

【计算机毕业设计】基于Springboot的足球青训俱乐部管理系统【源码+lw+部署文档】

包含论文源码的压缩包较大&#xff0c;请私信或者加我的绿色小软件获取 免责声明&#xff1a;资料部分来源于合法的互联网渠道收集和整理&#xff0c;部分自己学习积累成果&#xff0c;供大家学习参考与交流。收取的费用仅用于收集和整理资料耗费时间的酬劳。 本人尊重原创作者…

蚁剑编码器编写——php木马免杀

蚁剑编码器编写——php木马免杀 我的想法是 木马要先免杀&#xff0c;能够落地&#xff0c;再去考虑流量层面的问题 举几个例子演示一下 命令执行与代码执行是有比较大的区别&#xff0c;蚁剑执行的是php代码&#xff0c;而system&#xff0c;proc_open,passthru,exec,shell_…

CSS学习碎碎念之卡片展示

效果展示&#xff1a; 代码展示 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>图片展示</title…

【Spring成神之路】老兄,来一杯Spring AOP源码吗?

文章目录 一、引言二、Spring AOP的使用三、Spring AOP的组件3.1 Pointcut源码3.2 Advice源码3.3 Advisor源码3.4 Aspect源码 四、Spring AOP源码刨析4.1 configureAutoProxyCreator源码解析4.2 parsePointcut源码解析4.3 parseAdvisor源码解析4.4 parseAspect源码解析4.5 小总…

【题目/算法训练】:单调队列单调栈

&#x1f680; 前言&#xff1a; 【算法】单调队列&&单调栈 可以在看完这篇文章后&#xff0c;再来写下面的题目 一、绝对差不超过限制的最长连续子数组 思路&#xff1a; 1&#xff09; 就相当于滑动窗口&#xff0c;维护滑动窗口内的两个值&#xff0c;一个是最大值…

溶解氧(DO)理论指南(3)

转载自梅特勒官网资料&#xff0c;仅用于学习交流&#xff0c;侵权则删&#xff01; 溶解氧理论指南 设备操作3.1 DO电极准备3.2 DO电极校准3.3 进行DO测量3.4 转换单位3.5 维护和储存 设备操作 本章总结了 DO电极日常使用的一些建议。它们基于普遍接受的操作规则。 3.1 DO电…

【数据结构——链表的深度探索】从实现到应用,保姆级攻略

【数据结构——链表深度探索】从实现到应用&#xff0c;保姆级攻略 &#x1f341;1. 链表的介绍&#x1f341;2. 链表的实现&#x1f341;2.1 单向链表&#x1f341;2.1.1 size()&#x1f341;2.1.2 display()&#x1f341;2.1.3 contains(int key)&#x1f341;2.1.4 addFirst…

本地部署,强大的面部修复与增强网络CodeFormer

目录 什么是 CodeFormer&#xff1f; 技术原理 主要功能 应用场景 本地部署 运行结果 结语 Tip&#xff1a; 在图像处理和计算机视觉领域&#xff0c;面部修复和增强一直是一个备受关注的研究方向。近年来&#xff0c;深度学习技术的飞速发展为这一领域带来了诸多突破性…