Broker读写分离机制
在RocketMQ中,有两处地方使用了"读写分离"机制
-
Broker Master-Slave读写分离:写消息到Master Broker,从Slave Broker读取消息。Broker配置为Slave Broker读取消息。
Broker配置为slaveReadEnable=True(默认False),消息占用内存百分比配置为accessMessageInMemoryMaxRatio=40(默认40) -
Broker Direct Memory-Page Cache读写分离:写消息到Direct Memory(直接内存,简称DM),从操作系统的PageCache中读取消息。
Master Broker配置读写分离开关为transientStorePoolEnable=True(默认False),写入DM存储数量,配置transientStorePoolSize至少大于0(默认为5,建议不修改),刷盘类型配置为flushDiskType=FlushDiskType.ASYNC_FLUSH,即异步刷盘
读写分离能够最大限度地提供吞吐量,同时会增加数据不一致的风险
Master-Slave读写分离机制。
通常Master提供读写处理,如果Master负载较高就从Slave读取
第一步:Broker在处理Pull消息时,计算下次是否从Slave拉取消息,是通过DefaultMessageStore.getMessage()方法实现的,
diff>memory 表示没有拉取的消息比分配的内存大,如果diff > memory的值为True,则说明此时Master Broker内存繁忙,应该选择从Slave拉取消息
- maxOffsetPy:表示当前Master Broker存储的所有消息的最大物理位点
- maxPhyOffsetPulling:表示拉取的最大消息位点
- diff:是上面两者的差值,表示还有多少消息没有拉取
- StoreUtil.TOTAL_PYHSICAL_MEMORY_SIZE:表示当前Master Broker全部的物理内存
- memory:Broker认为可使用的最大内存,该值可以通过accessMessageInMemoryMaxRatio配置项决定,默认accessMessageInMemoryMaxRatio=40,如果物理内存为100MB,那么memory=40MB
第二步:通知客户端下次从哪个Broker拉取消息。在消费者Pull消息返回结果时,根据第一步设置的suggestPullingFromSlave值返回给消费者,该过程通过PullMessageProcessor.processRequest()方法实现。
通过查看以上代码,我们直到要想从Slave读取消息,需要设置slaveReadEnable=True,此时会根据第一步返回的suggestPullingFromSlave值告诉客户端下次可以从哪个Broker拉取消息。suggestPullingFromSlave=1表示从Slave拉取,suggestPullingFromSlave=0表示从Master拉取。
Direct Memory-Page Cache的读写分离机制
以上逻辑通过MappedFile.appendMessagesInner()方法来实现,核心代码如图
这段代码中,writeBuffer表示从DM中申请的缓存;mappedByteBuffer表示从PageCache中申请的缓存,如果Broker设置transientStorePoolEnable=true,并且异步刷盘,则存储层DefaultMessageStore在
初始化会调用TransientStorePool.init()方法(按照配置的Buffer个数)初始化writeBuffer
初始化writeBuffer后,当生产者将消息发送到Broker时,Broker将消息写入writeBuffer,然后被异步转存服务不断地从DM中Commit到Page中,消费者此时从哪儿读取数据呢?消费者拉取消息的实现在MappedFile.selectMappedBuffer()方法中
从代码中可以看到,消费者始终从mappedByteBuffer(即Pagecache)读取消息。