前言
在rocketmq的文件中,除了commitLog文件,还有两个重要的文件,分别是indexFile文件和consumerQueue文件,这篇笔记主要记录这两个文件的数据是怎么写进去的
consumeQueue文件中对应topic下的一个queue,在consumeQueue文件中,存放了三部分数据:分别是当前消息在commitLog中的offset、以及消息的大小、tagCode信息
indexFile文件暂时没有看懂细节,只是从官网和网上的博客来看,在根据key进行快速查找msg信息时,会用到indexFile文件,关于indexFile文件的细节,暂时先空着,后面学习了之后,再补充
入口
在前面broker启动的那篇博客中,有讲到过,在启动过程中,有一个service,是用来同步这两个文件的,我们从这里开始看起
/**
* setReputFromOffset:是设置当前要从哪里开始同步消息信息到indexFile和consumerQueue文件
* 启动reputMessageService,更新consumerQueue文件
* 这里启动的异步线程,是为了监听commitLog文件,然后根据文件中写入的数据,更新consumerQueue和indexFiLe
*/
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
this.reputMessageService.start();
可以看到,这里会每隔1ms,就去尝试执行一次doReput()方法
执行
在这个逻辑中,分为了几个比较重要的逻辑
1、判断当前开始同步的位置是否正确
2、for循环,开始同步commitLog中的消息
3、尝试从commitLog中指定的位置开始获取数据
4、开始处理消息,进行同步,这个逻辑在getData下面的if逻辑中,因为代码太长,就先暂时折叠起来了
从指定offset获取消息
commitLog.getData()
拉取消息的逻辑,这里看起来,就是根据offset,从mappedFile中获取数据;这里在调用mappedFile.selectMappedBuffer()方法的时候,方法内部会获取到一个endPos, 这里入参的pos 是开启位置的pos;endPos是如何获取的,暂时没有细看;总之这个方法执行之后,会获取到要同步的消息信息
处理消息
这里就是上面我说的先暂时折叠起来的if逻辑
这里大致有三个步骤
- 构建dispatchRequest对象
- 调用doDispatch()方法进行消息分发,真正同步到consumeQueue和indexFile,就是在这个方法中处理的
- 更新offset
consumeQueue
org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildConsumeQueue#dispatch
这里可以看到,在准备往consumeQueue中写 数据的时候,只有几个字段
- commitLogOffset
- msgSize
- tagsCode:这个需要注意下,对于延迟消息,tagCode存放的是消息的到期时间
- consumeQueueOffset
在这里,会把要写入到consumeQueue中的数据,先写入到byteBuffer中
接着会调用appendMessage方法,在这个方法中,就是通过fileChannel.write()把数据写入到文件中
indexFile
org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildIndex#dispatch
indexFile这里暂时先缺失吧,这里看了下,确实没有太看得懂,只看到里面会调用putKey(),类似于hashmap这种方式,然后内部又是写byteBuffer,又是写indexHeader, 这些等先学习下相关的理论知识 再看下源码