前言
这篇笔记,主要记录producer在通过netty发送了请求之后,在broker这边是如何处理的消息的
org.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandler#channelRead0
这里是broker的nettyServer端接收客户端发送消息的入口,不解释为什么会是这个类了
在内部调用的时候,调用链是这样的
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processMessageReceived
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand
org.apache.rocketmq.broker.processor.SendMessageProcessor#processRequest(在这里会判断当前消息是批量消息/单条消息)
org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage(在这里会区分当前消息是普通消息/事务消息)
org.apache.rocketmq.store.DefaultMessageStore#putMessage
org.apache.rocketmq.store.CommitLog#putMessage
因为这中间链路的逻辑,看的不多,所以暂时先只是把调用链路梳理出来,先看核心逻辑
CommitLog#putMessage
延迟消息的判断
上面的截图,是在broker接收到消息,真正开始处理时,做的第一个比较重要的判断逻辑,如果当前消息是延迟消息,会把当前业务上指定的topic和计算出来的queueId进行替换,替换成对应延迟级别的topic和queueId
需要注意的是:这里将业务上指定的topic和queueId暂时存入到了msg的property属性中,在后面延迟消息到期之后,会从这个property属性中取出来,然后再放到topic和queueId上
mappedFile.appendMessage()
这是第二个关键点:将消息追加到mappedFile中,我们通常说,rocketmq使用零拷贝,提高响应速度,我觉得应该就体现在这里
这里append的细节,还没有仔细看,所以暂时先不展开说,总之,我认为这里是把消息追加到内存中
接着来看最后两个关键的点:文件刷盘和主从同步
文件刷盘机制
在文件刷盘机制,分为异步和同步
所谓的同步刷盘,就是master节点在收到消息之后,在写到内存中之后,需要持久化到磁盘文件上,才会返回给producer处理完成
异步刷盘是指:master节点接收到消息之后,在写入到内存中之后,就返回给producer处理完成;然后通过异步线程将消息从内存写入到磁盘文件,在下面的截图中可以看到,对于异步刷盘,只是去唤醒了刷盘线程,对于主线程来说,就结束了,就会继续下一个操作
if判断逻辑中是同步刷盘的逻辑,else中是异步刷盘的逻辑
异步刷盘的逻辑中,也细分为了两类,一种是开启了transientStorePoolEnable, 一种是未开启transientStorePoolEnable
//异步刷盘且未开启TransientStorePool的处理逻辑
org.apache.rocketmq.store.CommitLog.FlushRealTimeService
//异步刷盘且开启TransientStorePool的处理逻辑
org.apache.rocketmq.store.CommitLog.CommitRealTimeService
// 同步刷盘处理逻辑
org.apache.rocketmq.store.CommitLog.GroupCommitService
这三个service,都是间接实现了Runnable接口,所以其核心逻辑,都在其run方法中
同步刷盘
在同步刷盘的时候
- 初始化service对象:GroupCommitService
- 构建request对象:GroupCommitRequest
- 把request对象放入到list集合中
- request.waitForFlush等待刷盘完成
其中在第三步,把请求放到list集合中之后,会唤醒刷盘线程进行刷盘,具体细节,我们来看下
这里的waitPoint是在什么用到的呢?在下面run方法中的waitForRunning()
所以,对于同步刷盘来说,在调用putRequest()的时候,会唤醒刷盘线程进行同步刷盘
接着来看刷盘的逻辑
这里可以看到,核心逻辑是在doCommit()方法中
在这里,有几重逻辑处理
1.从requestRead中取出来当前写磁盘的请求
2.判断当前要写磁盘的数据,是否已经在被刷到磁盘中
3.如果未被持久化,就进行刷盘的操作
这里flush的逻辑就不看了,底层就是调用mappedByteBuffer.force()方法强制将内存中的数据写入到磁盘
异步刷盘-FlushRealTimeService
异步刷盘的时候,会判断当前是否有配置定时刷盘以及刷盘周期,然后再判断,当上次刷盘时间超过最大刷盘时间之后,也会触发刷盘的逻辑
然后在刷盘的时候,底层调用的是:fileChannel.force() 或者是mappedBuyeBuffer.force()
CommitRealTimeService异步刷盘的逻辑和FlushRealTimeService的类似,只是底层在持久化的时候,CommitRealTimeService是通过调用fileChannel.write()方法进行写数据的
主从同步
handleHA这个方法,暂时还没有看明白主从同步的细节,待后面再补充
总结
对于broker在接收到消息之后,会经过一系列的判断,比如:当前消息类型是事务消息?批量消息?普通消息?,一些合法性的校验就不说了
然后在校验之后,真正开始进行处理的时候,会先处理延迟消息,将延迟消息的topic和queuId进行一层替换
然后再进行持久化、写磁盘的操作
最后会进行主从节点数据的同步