RocketMQ5.0–定时消息
一、定时消息概览
定时消息或延迟消息是指消息发送到Broker后,并不立即被消费而是要等到特定的时间后才能被消费。RocketMQ并不支持任意的时间精度延迟,只支持特定延迟时间的延迟消息。
消息延迟级别在Broker端通过MessageStoreConfig#messageDelayLevel配置,默认为"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h",其delayLevel=1则延迟1s,delayLevel=2则延迟5s,以此类推。解析该配置后,存放到org.apache.rocketmq.broker.schedule.ScheduleMessageService#delayLevelTable表中,格式为{1:1000,2:5000}。RocketMQ为每种延迟级别创建定时任务,这也是RocketMQ不支持任意时间延迟的原因。
注意,消息消费失败后,若是有延迟,也是和定时消息具有相同的逻辑。参考消息消费《RocketMQ5.0.0消息消费<三> _ 消息消费》。
org.apache.rocketmq.broker.schedule.ScheduleMessageService是定时消息实现类。消息存入commitlog文件之前需要判断消息的重试次数,如果大于0,则消息主题设置为SCHEDULE_TOPIC_XXXX,即:TopicValidator#RMQ_SYS_SCHEDULE_TOPIC属性。如下所示是该类的关键属性。
ScheduleMessageService方法的调用顺序:构造方法 -> load() -> start()方法。
// 第一次调度时延迟时间,默认1s
private static final long FIRST_DELAY_TIME = 1000L;
// 每一延迟级别调度一次后,则延迟该时间100ms再放入调度池
private static final long DELAY_FOR_A_WHILE = 100L;
// 发送异常后,则延迟该时间10s再放入调度池
private static final long DELAY_FOR_A_PERIOD = 10000L;
// 关闭时,等待5s
private static final long WAIT_FOR_SHUTDOWN = 5000L;
// 延迟睡10s
private static final long DELAY_FOR_A_SLEEP = 10L;
// 延迟级别表,解析MessageStoreConfig#messageDelayLevel后的数据结构{1:1000,2:5000}
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = new ConcurrentHashMap<Integer, Long>(32);
// 延迟级别的消息消费进度,存储在{ROCKET_HOME}/store/config/delayOffset.json
private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable = new ConcurrentHashMap<Integer, Long>(32);
// 最大消息延迟级别
private int maxDelayLevel;
// 是否异步传送到调度池,默认关闭
private boolean enableAsyncDeliver = false;
下图所示,是定时消息实现流程图,步骤如下:
- step1:消息存入commitlog文件之前,如果发送消息的delayLevel大于0,则改变消息主题为SCHEDULE_TOPIC_XXXX,消息队列ID为delayLevel-1;
- step2:消息经由commitlog异步转发到主题为SCHEDULE_TOPIC_XXXX,delayLevel - 1的消息消费队列;
- step3:定时任务Time每隔1s根据上次拉取偏移量从消费队列中取出所有消息;
- step4:根据消息的物理偏移量与消息大小从CommitLog中拉取消息;
- step5:根据消息属性重新创建消息,并恢复原始主题、原始消费队列,清除delayLevel属性,再存入commitlog文件;
- step6:转发到原始主题、原始消费队列,供消费者消费。
需要注意的是延迟级别delayLevel与延迟消费队列的映射关系:消费队列ID = 延迟级别 - 1。
二、定时消息实现机制
1. 提交消息前的处理
消息存储流程参考《RocketMQ5.0.0消息存储<二>_消息存储流程》,其中DefaultMessageStore#asyncPutMessage执行异步存储消息时,执行存储消息钩子列表,如下代码所示。
// 异步存放消息,继续处理下一个请求;存储完成后,异步通知客户端
@Override
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
......
// 遍历存储消息钩子列表
for (PutMessageHook putMessageHook : putMessageHookList) {
/*
存储消息前验证消息格式规范;定时消息时修改为定时主题等
如:Broker停止工作、从Broker、是否写权限、主题太长、消息体太长等
*/
PutMessageResult handleResult = putMessageHook.executeBeforePutMessage(msg);
// 为null时,则消息符合规范
if (handleResult != null) {
return CompletableFuture.completedFuture(handleResult);
}
}
......
}
org.apache.rocketmq.broker.util.HookUtils#handleScheduleMessage是commit操作前对定时消息处理的核心逻辑,如下代码所示。注意事项:
- 事务消息不能有延迟级别,若是延迟级别 > 0时,则修改为SCHEDULE_TOPIC_XXXX,消费队列ID为delayLevel -
1;原始主题及消费队列存储到扩展属性中。
/**
* 消息存储前,处理定时消息
* 方法入口:{@link DefaultMessageStore#asyncPutMessage(MessageExtBrokerInner)}
* step1:非事务消息时,检查定时消息
* step2:若是延迟级别 > 0时,则修改为SCHEDULE_TOPIC_XXXX,消费队列ID为delayLevel - 1;原始主题及消费队列存储到扩展属性中
* {@link HookUtils#transformDelayLevelMessage(BrokerController, MessageExtBrokerInner)}
* @param brokerController
* @param msg
* @return
*/
public static PutMessageResult handleScheduleMessage(BrokerController brokerController,
final MessageExtBrokerInner msg) {
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
// 非事务消息时
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
if (!isRolledTimerMessage(msg)) {
// 检查定时消息
if (checkIfTimerMessage(msg)) {
if (!brokerController.getMessageStoreConfig().isTimerWheelEnable()) {
//wheel timer is not enabled, reject the message
return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_NOT_ENABLE, null);
}
PutMessageResult tranformRes = transformTimerMessage(brokerController, msg);
if (null != tranformRes) {
return tranformRes;
}
}
}
// Delay Delivery,若是延迟级别 > 0时,则修改为SCHEDULE_TOPIC_XXXX,消费队列ID为delayLevel - 1
if (msg.getDelayTimeLevel() > 0) {
// 修改主题及消费队列ID
transformDelayLevelMessage(brokerController,msg);
}
}
return null;
}
定时消息commit操作后(消息提交到Commitlog文件内存映射),异步转发到延迟级别对应的消费队列中,下面介绍定时任务处理延迟消费队列。
2. 定时调度
1):load()方法
org.apache.rocketmq.broker.schedule.ScheduleMessageService#load完成延迟消费进度的加载且解析延迟级别字符串,如下代码所示。注意事项:
- 加载消费进度:加载延迟消费队列的消费进度文件,{ROCKET_HOME}/store/config/delayOffset.json文件,其格式:延迟级别:消费进度,如下实例:
{
"dataVersion":{
"counter":19,
"stateVersion":0,
"timestamp":1676598354088
},
"offsetTable":{3:17,12:0
}
}
- 解析配置:字符串MessageStoreConfig.messageDelayLevel转列表ScheduleMessageService#delayLevelTable。
/**
* step1:加载延迟级别的消息消费进度,{ROCKET_HOME}/store/config/delayOffset.json文件
* step2:解析MessageStoreConfig.messageDelayLevel转换为{@link ScheduleMessageService#delayLevelTable}
* step3:矫正延迟级别消费的偏移量
*/
@Override
public boolean load() {
// 加载延迟级别的消息消费进度文件
boolean result = super.load();
// 解析延迟级别
result = result && this.parseDelayLevel();
// 矫正延迟级别消费的偏移量
result = result && this.correctDelayOffset();
return result;
}
2):start()方法
org.apache.rocketmq.broker.schedule.ScheduleMessageService#start启动调度池,为每个延迟级别创建定时任务,注意事项:
- 延迟级别delayLevel与延迟消费队列的映射关系:消费队列ID = 延迟级别 - 1
- 创建定时任务DeliverDelayedMessageTimerTask线程:遍历延迟级别,并获取对应延迟队列的消费进度,创建定时任务。定时任务第一次启动时,默认延迟1s执行,第二次开始执行延迟级别对应的延迟时间。
- 每10s执行调度池任务持久化延迟队列的消费进度。
/**
* 启动调度池,为每个延迟级别创建定时任务
* step1:加载延迟消息消费进度;
* step2:遍历延迟级别,并获取对应的消费进度;
* step3:创建定时任务,并放入调度池(延迟级别与延迟消费队列的映射关系:消费队列ID = 延迟级别 - 1)
* step4:每10s执行调度池任务持久化延迟队列的消费进度(MessageStoreConfig.flushDelayOffsetInterval配置)
*/
public void start() {
if (started.compareAndSet(false, true)) {
// 加载延迟消息消费进度
this.load();
//
this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
if (this.enableAsyncDeliver) {
this.handleExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));
}
// 遍历延迟级别
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
// 获取延迟级别对应的消费进度
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
// 是否异步传送到调度池,默认关闭
if (this.enableAsyncDeliver) {
this.handleExecutorService.schedule(new HandlePutResultTask(level), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
}
/*
创建Timer定时任务,并放入调度池
a. 定时任务第一次启动时,默认延迟1s执行,第二次开始执行对应的延迟时间
b. 延迟级别与延迟消费队列的映射关系:消费队列ID = 延迟级别 - 1
*/
this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
}
}
// 每10s执行持久化延迟队列的消费进度(MessageStoreConfig.flushDelayOffsetInterval配置)
this.deliverExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
if (started.get()) {
// 持久化延迟队列的消费进度
ScheduleMessageService.this.persist();
}
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.brokerController.getMessageStore().getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);
}
}
3):定时调度任务
org.apache.rocketmq.broker.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask是个定时调度任务线程,其下图是该线程run()的调用链。
org.apache.rocketmq.broker.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup是调用任务的核心方法,代码如下。注意事项:
-
获取延迟消费队列:根据延迟级别(映射为延迟消费队列ID) + 延迟主题。
-
延迟时间是否到期:延迟时间deliverTimestamp = 延迟级别对应的延迟时间 + 消息存储时间戳,根据差值来判定是否到期countdown = deliverTimestamp - now:
countdown > 0时:说明没有到延迟时间,则;执行下一个调度任务(100ms后从currOffset偏移量开始执行定时任务) + 更新消费进度。
countdown <= 0时,说明到延迟时间,可以消费消息。 -
还原原始消息:根据偏移量从Commitlog获取延迟消息,后恢复到原始消息(原始topic、原始消费队列),清除延迟级别,保留消费次数。
-
同步syncDeliver或异步asyncDeliver处理:原始消息重新提交到Commitlog中,供消费者消费。
-
scheduleNextTimerTask(long offset, long delay):offset当前消费进度,delay(默认100ms)是定时任务100ms后从当前offset再次执行调度任务。
/**
* 定时消息调度核心方法
* 注意:消息消费失败返回ACK时,根据delayLevel > 0时,改变消息主题为SCHEDULE_TOPIC_XXXX,延迟消费队列ID = delayLevel -1
* 或
* 延迟消息(topic为:SCHEDULE_TOPIC_XXXX)写入Commitlog,进而转发到延迟消息队列(延迟消费队列ID = delayLevel -1)
* step1:根据延迟级别(映射为消费队列ID) + 延迟主题 获取 延迟消费队列
* step2:获取当前消费进度后的所有消息{@link ConsumeQueueInterface#iterateFrom(long)}
* step3:遍历消息,获取消息的偏移量、大小、Tag哈希码,为获取Commitlog完整消息准备
* step4:判断消息TAG的哈希码是否有效,计算当前延迟时间 = 延迟级别对应的延迟时间 + 消息存储时间戳
* {@link ScheduleMessageService#computeDeliverTimestamp(int, long)}
* step5:矫正延迟时间{@link DeliverDelayedMessageTimerTask#correctDeliverTimestamp(long, long)}
* step6:判定延迟是否到期:countdown = deliverTimestamp - now
* countdown > 0时:说明没有到延迟时间,则;执行下一个调度任务(100ms后从currOffset偏移量开始执行定时任务) + 更新消费进度
* countdown <= 0时,说明到延迟时间,可以消费消息
* step7:根据延迟消息的偏移量、大小从Commitlog获取完整延迟消息
* {@link MessageStore#lookMessageByOffset(long, int)}
* step8:延迟消息恢复到原始消息(原始topic、原始消费队列),清除延迟级别,保留消费次数
* {@link ScheduleMessageService#messageTimeup}
* step9:原始消息再次放入Commitlog,并转发到相应的原始消费队列,供消费者消费
* {@link DeliverDelayedMessageTimerTask#asyncDeliver} 和 {@link DeliverDelayedMessageTimerTask#syncDeliver}
*/
public void executeOnTimeup() {
// 根据延迟级别(映射为消费队列ID) + 延迟主题 获取 延迟消费队列
ConsumeQueueInterface cq =
ScheduleMessageService.this.brokerController.getMessageStore().getConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
// 消费队列为null,说明没有该延迟级别的消费队列,忽略本次调度任务,创建下次调度任务
if (cq == null) {
// 下次调度任务
this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);
return;
}
// 获取当前消费进度后的所有消息
ReferredIterator<CqUnit> bufferCQ = cq.iterateFrom(this.offset);
// 未找到消息,创建下次调度任务
if (bufferCQ == null) {
long resetOffset;
if ((resetOffset = cq.getMinOffsetInQueue()) > this.offset) {
log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}",
this.offset, resetOffset, cq.getQueueId());
} else if ((resetOffset = cq.getMaxOffsetInQueue()) < this.offset) {
log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}",
this.offset, resetOffset, cq.getQueueId());
} else {
resetOffset = this.offset;
}
this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);
return;
}
long nextOffset = this.offset;
try {
while (bufferCQ.hasNext() && isStarted()) {
// 获取消息队列元素
CqUnit cqUnit = bufferCQ.next();
// 获取消息的偏移量、大小、Tag哈希码,为获取Commitlog完整消息准备
long offsetPy = cqUnit.getPos(); // 消息偏移量
int sizePy = cqUnit.getSize(); // 消息大小
long tagsCode = cqUnit.getTagsCode(); // 消息TAG的哈希码
// 消息TAG的哈希码是否有效
if (!cqUnit.isTagsCodeValid()) {
//can't find ext content.So re compute tags code.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
tagsCode, offsetPy, sizePy);
// 获取当前消息的存储时间戳
long msgStoreTime = ScheduleMessageService.this.brokerController.getMessageStore().getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
// 计算当前延迟时间 = 延迟级别对应的延迟时间 + 消息存储时间戳
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
long now = System.currentTimeMillis();
// 矫正延迟时间
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
// 计算下条消息的偏移量
long currOffset = cqUnit.getQueueOffset();
assert cqUnit.getBatchNum() == 1;
nextOffset = currOffset + cqUnit.getBatchNum();
// 定时消息的到期时间,> 0时:说明没有到延迟时间;<= 0时,说明到延迟时间,可以消费消息
long countdown = deliverTimestamp - now;
// > 0时:说明没有到延迟时间
if (countdown > 0) {
// 没有到延迟时间,执行下一个调度任务(100ms后从currOffset偏移量开始执行定时任务)
this.scheduleNextTimerTask(currOffset, DELAY_FOR_A_WHILE);
// 更新该延迟级别对应消费队列的消费进度
ScheduleMessageService.this.updateOffset(this.delayLevel, currOffset);
return;
}
// 从Commitlog获取完整消息(延迟消息)
MessageExt msgExt = ScheduleMessageService.this.brokerController.getMessageStore().lookMessageByOffset(offsetPy, sizePy);
if (msgExt == null) {
continue;
}
/*
* 从Commitlog获取的延迟消息转为之前的原始消息
* 清除延迟级别属性;恢复原先的消息topic、消费队列;消费次数不会丢失
*/
MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
msgInner.getTopic(), msgInner);
continue;
}
boolean deliverSuc;
// 异步传送到Commitlog
if (ScheduleMessageService.this.enableAsyncDeliver) {
deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), currOffset, offsetPy, sizePy);
}
// 同步传送到Commitlog
else {
deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), currOffset, offsetPy, sizePy);
}
if (!deliverSuc) {
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
return;
}
}
} catch (Exception e) {
log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);
} finally {
bufferCQ.release();
}
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
}
// 下次调度任务
public void scheduleNextTimerTask(long offset, long delay) {
ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, offset), delay, TimeUnit.MILLISECONDS);
}
三、参考资料
https://blog.csdn.net/yunqiinsight/article/details/126284555