目录
一、消息存储概览
二、Broker接收消息
三、消息存储流程
1. DefaultMessageStore类
2. 存储流程
1):同步与异步存储
2):CommitLog异步存储消息
3):提交消息(Commit)
四、参考资料
一、消息存储概览
如下图所示,是消息从生产者发送消息到消费者消费消息的大致流程。
- step1:生产者发送消息到消息存储Broker端;
- step2:单一文件Commitlog存储所有主题消息,确保顺序写入,提高吞吐量;
- step3:消息通过堆外缓存,Commit消息写入文件内存映射,然后Flush写入磁盘;
- step4:消息Flush磁盘后,把消息转发到ConsumeQueue、IndexFile供消费者消费;
- step5:主题下消费队列内容相同,但是一个消费队列在同一时刻只能被一个消费者消费;
- step6:消费者根据集群/广播模式、PUSH/PULL模式来消费消息。
如何实现顺序存储的呢?通过org.apache.rocketmq.store.PutMessageLock接口,在消息追加文件内存映射时,加锁实现存储消息串行。
消息存储模式:同步、异步。默认异步存储,但是无论同步还是异步,最终执行存储方法是org.apache.rocketmq.store.CommitLog#asyncPutMessage(异步执行,提高存储效率),而同步需要等待存储结果才能返回。
本章主要介绍生产者发送消息,Broker如何接收消息,如何Commit写入文件内存映射,并没有介绍如何刷盘、转发到ConsumeQueue和IndexFile、HA主从同步等内容。
二、Broker接收消息
org.apache.rocketmq.broker.processor.SendMessageProcessor是生产者发送消息后,Broker接收消息的核心实现类。 发送消息请求码是RequestCode.SEND_MESSAGE。发送消息参考《RocketMQ5.0.0消息发送》。
org.apache.rocketmq.broker.processor.SendMessageProcessor#processRequest不仅处理生产者发来的消息,同时还是处理消费端消费ACK的处理请求。其核心逻辑是sendMessage或sendBatchMessage处理方法,都是Broker端存储消息。
以下代码是org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage存储之前对消息的预处理。 核心逻辑如下:
- randomQueueId():发送时是否指定消费队列,若没有指定,则随机选择;
- handleRetryAndDLQ():消息是否延迟或重试消息,并处理;
- sendTransactionPrepareMessage变量:判定是否事务消息,true事务消息;
- 异步存储消息(默认):
事务消息存储:TransactionalMessageServiceImpl#asyncPrepareMessage
普通消息存储:DefaultMessageStore#asyncPutMessage
- 同步存储消息:
事务消息存储:TransactionalMessageServiceImpl#prepareMessage
普通消息存储:DefaultMessageStore#putMessage
/**
* 存储之前,对消息的处理
* step1:预发送处理,如:检查消息、主题是否符合规范
* step2:发送消息时,是否指定消费队列,若没有则随机选择
* step3:消息是否进入重试或延迟队列中(重试次数失败)
* step4:消息是否是事务消息,若是则存储为prepare消息
* step5:BrokerConfig#asyncSendEnable是否开启异步存储,默认开启true
* (异步存储、同步存储)
*/
public RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader,
final TopicQueueMappingContext mappingContext,
final SendMessageCallback sendMessageCallback) throws RemotingCommandException {
// 预发送处理,如:检查消息、主题是否符合规范
final RemotingCommand response = preSend(ctx, request, requestHeader);
if (response.getCode() != -1) {
return response;
}
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
// 获取消息体
final byte[] body = request.getBody();
// 发送消息时,是否指定消费队列,若没有则随机选择
int queueIdInt = requestHeader.getQueueId();
// 获取主题配置属性
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0) { // 队列ID不符合,则在写队列随机找个
queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
}
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
// 消息扩展属性
Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
// 消息是否进入重试或延迟队列中(重试次数失败)
if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig, oriProps)) {
return response;
}
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
String uniqKey = oriProps.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (uniqKey == null || uniqKey.length() <= 0) {
uniqKey = MessageClientIDSetter.createUniqID();
oriProps.put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, uniqKey);
}
MessageAccessor.setProperties(msgInner, oriProps);
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
// Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
// 事务标签
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
boolean sendTransactionPrepareMessage = false;
if (Boolean.parseBoolean(traFlag)
&& !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1
// Broker禁止事务消息存储
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
sendTransactionPrepareMessage = true;
}
long beginTimeMillis = this.brokerController.getMessageStore().now();
// 消息是否异步存储
if (brokerController.getBrokerConfig().isAsyncSendEnable()) {
CompletableFuture<PutMessageResult> asyncPutMessageFuture;
if (sendTransactionPrepareMessage) { // 事务prepare操作
asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
final int finalQueueIdInt = queueIdInt;
final MessageExtBrokerInner finalMsgInner = msgInner;
asyncPutMessageFuture.thenAcceptAsync(putMessageResult -> {
RemotingCommand responseFuture =
handlePutMessageResult(putMessageResult, response, request, finalMsgInner, responseHeader, sendMessageContext,
ctx, finalQueueIdInt, beginTimeMillis, mappingContext);
if (responseFuture != null) {
doResponse(ctx, request, responseFuture);
}
sendMessageCallback.onComplete(sendMessageContext, response);
}, this.brokerController.getPutMessageFutureExecutor());
// Returns null to release the send message thread
return null;
}
// 消息同步存储
else {
PutMessageResult putMessageResult = null;
// 事务消息存储
if (sendTransactionPrepareMessage) {
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
// 同步存储消息
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt, beginTimeMillis, mappingContext);
sendMessageCallback.onComplete(sendMessageContext, response);
return response;
}
}
三、消息存储流程
1. DefaultMessageStore类
org.apache.rocketmq.store.DefaultMessageStore是消息存储实现类,也是存储模块最重要的一个类,其UML如下。
其关键属性如下代码所示。同步与异步存储的方法:
- 同步消息:单个消息putMessage()、批量消息putMessages()
- 异步消息:单个消息asyncPutMessage()、批量消息asyncPutMessages()
// Commitlog引用次数
public final PerfCounter.Ticks perfs = new PerfCounter.Ticks(LOGGER);
// 存储配置属性
private final MessageStoreConfig messageStoreConfig;
// CommitLog(Commitlog文件存储实现类)
private final CommitLog commitLog;
// ConsumeQueue文件存储实现类
private final ConsumeQueueStore consumeQueueStore;
// 刷盘线程
private final FlushConsumeQueueService flushConsumeQueueService;
// 删除过期Commitlog文件服务
private final CleanCommitLogService cleanCommitLogService;
// 删除过期ConsumeQueue文件服务
private final CleanConsumeQueueService cleanConsumeQueueService;
// 矫正逻辑偏移量服务
private final CorrectLogicOffsetService correctLogicOffsetService;
// index文件实现类
private final IndexService indexService;
// MappedFile分配服务
private final AllocateMappedFileService allocateMappedFileService;
// 消息提交到Commitlog时消息转发,构建ConsumeQueue、index文件服务
private ReputMessageService reputMessageService;
// HA服务(主从同步服务)
private HAService haService;
// 存储状态服务
private final StoreStatsService storeStatsService;
// 堆内存缓存
private final TransientStorePool transientStorePool;
// Broker状态管理
private final BrokerStatsManager brokerStatsManager;
// 消息达到监听器(消息拉取长轮询模式)
private final MessageArrivingListener messageArrivingListener;
// Broker配置属性
private final BrokerConfig brokerConfig;
// 存储刷盘检查点
private StoreCheckpoint storeCheckpoint;
// 定时消息存储实现类
private TimerMessageStore timerMessageStore;
// 日志打印次数
private AtomicLong printTimes = new AtomicLong(0);
// Commitlog文件转发请求
private final LinkedList<CommitLogDispatcher> dispatcherList;
// 延迟消息的延迟级别
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
new ConcurrentHashMap<Integer, Long>(32);
// 最大延迟级别
private int maxDelayLevel;
2. 存储流程
1):同步与异步存储
如下图所示是同步与异步存储的实现方法调用链,它们之间区别与联系:
联系:a. 同步存储实际是调用异步存储方法,即:DefaultMessageStore#asyncPutMessage;
b. 最终执行存储方法是org.apache.rocketmq.store.CommitLog#asyncPutMessage;
区别:同步存储需要等待存储结果waitForPutResult()
2):CommitLog异步存储消息
org.apache.rocketmq.store.CommitLog#asyncPutMessage是异步执行存储消息。如下代码所示,关键步骤如下:
- step1:mappedFileQueue队列中,获取可写入的Commitlog,即:从commitlog目录下获取当前写入的Commitlog;
- step2:获取当前写入Commitlog的偏移量 = 文件偏移量 + 该文件写入位置;
- step3:是否需要HA(主从Broker同步数据);
- step4:CommitLog.calMsgLength获取该消息的总长度(不定长,总长度存储在前4个字节);
- step5:加锁后(串行写),判定再次Commitlog文件没有或已被写满,则创建新的Commitlog文件;
- step6:Commit操作,即:消息缓存或直接(是否开启堆外内存池)追加到Commitlog文件内存映射buffer,追加当前消息,并没有刷写到磁盘,则返回结果,方法:{@link DefaultMappedFile#appendMessage};
- step7:执行同步或异步刷盘、HA主从同步复制等,方法: {@link CommitLog#handleDiskFlushAndHA}。
/**
* 执行存储消息
* step1:mappedFileQueue队列中,获取可写入的Commitlog,即:从commitlog目录下获取当前写入的Commitlog;
* step2:获取当前写入Commitlog的偏移量 = 文件偏移量 + 该文件写入位置;
* step3:是否需要HA(主从Broker同步数据);
* step4:CommitLog.calMsgLength获取该消息的总长度(不定长,总长度存储在前4个字节);
* step5:加锁后(串行写),判定再次Commitlog文件没有或已被写满,则创建新的Commitlog文件;
* step6:Commit操作,即:消息缓存或直接(是否开启堆外内存池)追加到Commitlog文件内存映射buffer,追加当前消息,并没有刷写到磁盘,则返回结果
* {@link DefaultMappedFile#appendMessage}
* step7:执行同步或异步刷盘、HA主从同步复制等
* {@link CommitLog#handleDiskFlushAndHA}
* @param msg 消息
* @return 存储结果
*/
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// Set the storage time
if (!defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
msg.setStoreTimestamp(System.currentTimeMillis());
}
// Set the message body CRC (consider the most appropriate setting on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
if (bornSocketAddress.getAddress() instanceof Inet6Address) {
msg.setBornHostV6Flag();
}
InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
if (storeSocketAddress.getAddress() instanceof Inet6Address) {
msg.setStoreHostAddressV6Flag();
}
PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
updateMaxMessageSize(putMessageThreadLocal);
String topicQueueKey = generateKey(putMessageThreadLocal.getKeyBuilder(), msg);
long elapsedTimeInLock = 0;
// mappedFileQueue队列中,获取可写入的Commitlog,即:从commitlog目录下获取当前写入的Commitlog
MappedFile unlockMappedFile = null;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
// 当前写入Commitlog的偏移量
long currOffset;
if (mappedFile == null) {
currOffset = 0;
} else {
// 当前写入Commitlog的偏移量 = 文件偏移量 + 该文件写入位置
currOffset = mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();
}
// 是否需要HA
int needAckNums = this.defaultMessageStore.getMessageStoreConfig().getInSyncReplicas();
boolean needHandleHA = needHandleHA(msg);
if (needHandleHA && this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) {
if (this.defaultMessageStore.getHaService().inSyncReplicasNums(currOffset) < this.defaultMessageStore.getMessageStoreConfig().getMinInSyncReplicas()) {
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));
}
if (this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) {
// -1 means all ack in SyncStateSet
needAckNums = MixAll.ALL_ACK_IN_SYNC_STATE_SET;
}
} else if (needHandleHA && this.defaultMessageStore.getBrokerConfig().isEnableSlaveActingMaster()) {
int inSyncReplicas = Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(),
this.defaultMessageStore.getHaService().inSyncReplicasNums(currOffset));
needAckNums = calcNeedAckNums(inSyncReplicas);
if (needAckNums > inSyncReplicas) {
// Tell the producer, don't have enough slaves to handle the send request
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));
}
}
topicQueueLock.lock(topicQueueKey);
try {
boolean needAssignOffset = true;
if (defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()
&& defaultMessageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
needAssignOffset = false;
}
if (needAssignOffset) {
defaultMessageStore.assignOffset(msg, getMessageNum(msg));
}
/*
当前消息编码,计算byteBuf(字节缓存)长度, null则正常处理
CommitLog.calMsgLength获取该消息的总长度(不定长,总长度存储在前4个字节)
*/
PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
if (encodeResult != null) {
return CompletableFuture.completedFuture(encodeResult);
}
msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer());
PutMessageContext putMessageContext = new PutMessageContext(topicQueueKey);
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
if (!defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
msg.setStoreTimestamp(beginLockTimestamp); // 消息存储时间戳,确保消息存储有序
}
// Commitlog文件没有或已被写满,则创建新的Commitlog文件
if (null == mappedFile || mappedFile.isFull()) {
// 创建新的Commitlog文件
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null));
}
// Commitlog文件写入消息(Commitlog文件内存映射buffer,追加当前消息,并没有刷写到磁盘,则返回结果)
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
switch (result.getStatus()) {
case PUT_OK:
onCommitLogAppend(msg, result, mappedFile);
break;
case END_OF_FILE:
onCommitLogAppend(msg, result, mappedFile);
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result));
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) {
onCommitLogAppend(msg, result, mappedFile);
}
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
default:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
}
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}
} finally {
topicQueueLock.unlock(topicQueueKey);
}
// putMessage加锁超时
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
}
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(result.getMsgNum());
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());
/*
执行同步或异步刷盘、HA主从同步复制等
注意:MappedFile.appendMessage只是将消息追加到Commitlog文件内存映射buffer中,并没有刷写到磁盘,则返回结果
*/
return handleDiskFlushAndHA(putMessageResult, msg, needAckNums, needHandleHA);
}
3):提交消息(Commit)
org.apache.rocketmq.store.logfile.DefaultMappedFile#appendMessage是文件内存映射追加消息方法,目的是把堆外缓存池消息或直接Commit到文件内存映射,其调用链如下。
org.apache.rocketmq.store.logfile.DefaultMappedFile#appendMessagesInner是追加消息到文件内存映射的核心方法,如下代码所示。
/**
* Commit操作,即:消息缓存或直接(是否开启堆外内存池)追加到Commitlog文件内存映射buffer,追加当前消息,并没有刷写到磁盘,则返回结果
* step1:当前Commitlog的写指针,判断文件是否写满;
* step2:slice():创建与原ByteBuffer共享的内存区,拥有独立的position、limit、capacity等指针,并设置position当前写指针
* step3:判断是否是批量消息,并追加消息到Commitlog文件内存映射buffer,并没有刷写到磁盘,则返回结果
*/
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
PutMessageContext putMessageContext) {
assert messageExt != null;
assert cb != null;
// 当前Commitlog的写指针
int currentPos = WROTE_POSITION_UPDATER.get(this);
// 未写满,则追加
if (currentPos < this.fileSize) {
/*
slice():创建与原ByteBuffer共享的内存区,拥有独立的position、limit、capacity等指针
并设置position当前写指针
*/
ByteBuffer byteBuffer = appendMessageBuffer().slice();
byteBuffer.position(currentPos);
AppendMessageResult result;
// 批量消息
if (messageExt instanceof MessageExtBatch && !((MessageExtBatch) messageExt).isInnerBatch()) {
// traditional batch message
// Commitlog文件内存映射buffer,追加当前消息,并没有刷写到磁盘,则返回结果
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
(MessageExtBatch) messageExt, putMessageContext);
}
// 单个消息
else if (messageExt instanceof MessageExtBrokerInner) {
// traditional single message or newly introduced inner-batch message
// Commitlog文件内存映射buffer,追加当前消息,并没有刷写到磁盘,则返回结果
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
(MessageExtBrokerInner) messageExt, putMessageContext);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
WROTE_POSITION_UPDATER.addAndGet(this, result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
需要注意的是:appendMessageBuffer().slice(),创建与原ByteBuffer共享的内存区,拥有独立的position、limit、capacity等指针,并设置position当前写指针。创建的内存追加消息。
四、参考资料
Rocket Mq消息持久化_飞科-程序人生的博客-CSDN博客
百度安全验证
【RocketMQ】同一个项目中,同一个topic,可以存在多个消费者么? - N!CE波 - 博客园
RocketMQ5.0.0消息发送_爱我所爱0505的博客-CSDN博客
RocketMQ5.0.0路由中心NameServer_爱我所爱0505的博客-CSDN博客
RocketMQ5.0.0消息存储<一>_存储文件及内存映射_爱我所爱0505的博客-CSDN博客