1.RocketMq 存储概要设计
RocketMQ主要存储的文件包括Comitlog文件、ConsumeQueue文件、IndexFile文件,存储路径为${ROCKET_HOME}/store
,默认在当前用户目录下的store目录:
store目录下的文件如上所示,分别有:checkpoint、commitlog、config、consumequeue、index、lock,下面一一介绍一下RocketMQ主要的存储文件夹:
- commitlog: 消息存储目录,消息实际存储地方,所有的topic消息都会被存储在ComitLog文件,消息内容会被定期清除,默认大小为1G,可见MessageStoreConfig属性mappedFileSizeCommitLog:
- private int mappedFileSizeCommitLog = 1024 * 1024 * 1024;
- config: 运行期间的一些配置信息,主要包括下列信息:
- consumerFilter.json:主题消息过滤信息
- consumerOffset.json:集群消费模式消息消费进度
- delayOffset.json:延时消息队列拉取进度
- subscriptionGroup.json:消息消费组配置信息
- topics.json:topic配置属性
- consumequeue: 消息消费队列的存储目录,存储索引映射文件,一个queue一个文件,记录每个topic下的queue的offset在消息文件的偏移量
- index: 消息索引文件的存储目录
- abort: 如果存在abort文件说明Broker非正常关闭,该文件默认启动时创建,正常退出之前删除,同时RocketMq也是利用该文件来确定CommitLog文件和Index文件,consumequeue文件是否一致
- checkpoint: 文件检测点,存储commitlog文件,index索引文件,consumequeue最后一次刷盘时间戳
1.1 数据流向
进入RocketMQ存储剖析之前,先看一下RocketMQ数据流向,如图所示:
生产者推将消息推送给对应的Broker之后,Broker会将消息内容持久化到对应的commitLog文件,然后ReputMessageService线程会定时以CommitLog文件为基础来更新ConsumeQueue文件和index文件;作为消费者,会根据ConsumeQueue的offset信息,拉取对应queue的数据消费,所以,当消息生产者提交的消息存储在Commitlog文件中,ConsumeQueue、IndexFile需要及时更新,否则消息无法及时被消费
2.RocketMq数据文件职责
基于上面的介绍,我们大致了解到了RocketMQ整个存储设计所关联到的文件结构以及文件的数据流向,下面我们再来介绍核心数据文件的内容结构以及每个文件所承担的核心能力,而对于具体的源码实现,博主会放到后续文章,正在排版中,感兴趣的朋友可关注一下,后续会持续更新RocketMq的相关内容。
2.1 CommitLog文件
CommitLog文件的消息组织格式如下:
每条消息的前四个字节存储该消息的总长度,然后是固定长度的其它属性,RocketMq其它属性因涉及种类比较多,将近20种属性类型,此处只简单介绍下一些核心属性:
- MAGICCODE:魔数,4字节。固定值0xdaa320a7
- QUEUEID:消息消费队列ID,4字节
- SYSFLAG:消息系统Flag,例如是否压缩、是否是事务消息等,4字节
- BodyLength:消息体长度,4字节
- Topic:主题,长度为TopicLength中存储的值
2.1.1 核心能力
在RocketMq的实现中,commitlog对应的数据结构为org.apache.rocketmq.store.CommitLog,其提供的核心能力有:
- 根据偏移量查找消息
- 文件刷盘
- 文件恢复
2.2 ConsumerQueue
到这里,我们已经知道,RoceketMq会将所有消息存储在commitlog目录下面,虽说目录下面会分文件,但是一个Broker实例有多少个Topic,每个Topic下面会有多少条消息我们也不知道,所以为了加快消息检索速度,RocketMq设计了消息消费队列文件(Consumequeue),该文件可以看成是Commitlog关于消息消费的“索引”文件,consumequeue的第一级目录为消息主题,第二级目录为主题的消息队列,如下所示:
为了加速ConsumeQueue消息条目的检索速度与节省磁盘空间,每一个Consumequeue条目不会存储消息的全量信息,其存储格式如下:
核心能力
- 根据逻辑偏移量,时间戳查找消息内容
- ConsumerQueue的构建与恢复
对比与Kafka,此处的设计,是否是RocketMq 优于Kafka的设计,了解Kafka的童鞋可留下评论与博主一起学习交流一下
2.3 Index索引文件
RocketMQ的index又称之为Hash索引,其主要是为消息建立索引机制,既然是Hash索引,那必然存在Hash槽和Hash冲突问题,下面我们先看下RocketMQ的Hash索引的文件结构,然后再讲述RocketMQ是怎么解决Hash冲突的:
从图中可以看出,IndexFile总共包含IndexHeader、Hash槽、Hash条目(数据):
- IndexHeader头部,包含40个字节,记录该IndexFile的统计信息,其结构如下
- beginTimestamp:该索引文件中包含消息的最小存储时间
- endTimestamp:该索引文件中包含消息的最大存储时间
- beginPhyoffset:该索引文件中包含消息的最小物理偏移量(commitlog文件偏移量)
- endPhyoffset:该索引文件中包含消息的最大物理偏移量(commitlog文件偏移量)
- hashslotCount:hashslot个数,并不是hash槽使用的个数,在这里意义不大。
- indexCount:Index条目列表当前已使用的个数,Index条目在Index条目列表中按顺序存储
- Hash槽,一个IndexFile默认包含500万个Hash槽,每个Hash槽存储的是落在该Hash槽的hashcode最新的Index的索引
- Index条目列表,默认一个索引文件包含2000万个条目,每一个Index条目结构如下:
- hashcode:key的hashcode
- phyoffset:消息对应的物理偏移量
- preIndexNo:该条目的前一条记录的Index索引,当出现hash冲突时,构建的链表结构
对于RocketMQ如何解决Hash冲突关键点在于Hash槽中存储的是该HashCode所对应的最新的Index条目的下标,新的Index条目的最后4个字节存储该HashCode上一个条目的Index下标,至于Hash冲突解决的详细实现以及Index文件数据写入过程,此处,博主这里不再详细介绍,具体的实现在后续的源码实现章节再详细介绍org.apache.rocketmq.store.index.IndexFile#putKey
整个代码实现,其实并不难,就跟通常解决Hash冲突的办法类似相同
2.4 checkpoint 文件
checkpoint的作用是记录Comitlog、ConsumeQueue、Index文件的刷盘时间点,文件固定长度为4k,其中只用该文件的前面24个字节,其存储格式如图所示:
- physicMsgTimestamp:commitlog文件刷盘时间点。
- logicsMsgTimestamp:消息消费队列文件刷盘时间点。
- indexMsgTimestamp:索引文件刷盘时间点
3. 总结
RocketMQ是一款高性能的消息中间件,存储部分的设计是核心,存储的核心是IO访问性能,基于对于RocketMQ的兴趣,博主后续会将整个学习过程,不断记录,感兴趣的朋友可关注留下评论,一起学习RocketMQ整个优秀的设计,下图是串联整个消息的存储过程,因涉及到知识点比较多,还在不断细化中