文章目录
- 意外发现
- 设计方案
- 时间轮
- 定时消息存储
- 具体实现
- 流程图
- 流程步骤
意外发现
无意间从官方的最新的客户端代码中看到下面的Example:
感兴趣的可以看看这个介绍:https://rocketmq.apache.org/docs/featureBehavior/02delaymessage
生产者:
// Send delay messages.
MessageBuilder messageBuilder = null;
// Specify a millisecond-level Unix timestamp. In this example, the specified timestamp indicates that the message will be delivered in 10 minutes from the current time.
Long deliverTimeStamp = System.currentTimeMillis() + 10L * 60 * 1000;
Message message = messageBuilder.setTopic("topic")
// Specify the message index key. The system uses the key to locate the message.
.setKeys("messageKey")
// Specify the message tag. The consumer can use the tag to filter messages.
.setTag("messageTag")
.setDeliveryTimestamp(deliverTimeStamp)
// Configure the message body.
.setBody("messageBody".getBytes())
.build();
try {
// Send the messages. Focus on the result of message sending and exceptions such as failures.
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
可以看到一句:setDeliveryTimestamp(deliverTimeStamp),也就是说可以支持任意时刻的延迟消息了???
消费者:
// Consumption example 1: If a scheduled message is consumed by a push consumer, the consumer needs to process the message only in the message listener.
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView.getDeliveryTimestamp());
// Return the status based on the consumption result.
return ConsumeResult.SUCCESS;
}
};
// Consumption example 2: If a scheduled message is consumed by a simple consumer, the consumer must obtain the message for consumption and submit the consumption result.
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// After consumption is complete, the consumer must invoke ACK to submit the consumption result.
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
// If the pull fails due to system traffic throttling or other reasons, you must re-initiate the request to obtain the message.
e.printStackTrace();
}
RocketMQ5.X版本新增了Proxy模块,从配置中可以看到,延迟消息方案目前默认还是通过按level来指定的,也就是说,可以选择不按level来执行了!!!
下面一起看看新版本是怎么实现任意时刻延迟消息的。
设计方案
时间轮
首先,RocketMQ对任意时刻延迟消息的支持,是基于主流的方案——时间轮做的,时间轮,对时刻表的一种抽象,通常使用数组实现。时刻表上的每一秒,顺序对应到数组中的位置,然后数组循环使用。时间轮的每一格,指向了TimerLog中的对应位置,如果这一格的时间到了,则按TimerLog中的对应位置以及prev_pos位置依次读出每条消息。时间轮一格一格向前推进,配合TimerLog,依次读出到期的消息,从而达到定时消息的目的。
时间轮的每一格设计如下:
定时消息存储
定时消息的记录文件,Append Only。每条记录包含一个prev_pos,指向前一条定时到同样时刻的记录。每条记录的内容可以包含定时消息本身,也可以只包含定时消息的位置信息。每一条记录包含如下信息:
名称 | 大小 | 备注 |
---|---|---|
size | 4B | 保存记录的大小 |
prev_pos | 8B | 前一条记录的位置 |
current_time | 8B | 当前时间 |
magic | 4B | magic value |
delayed_time | 4B | 该条记录的定时时间 |
offset_real | 8B | 该条消息在commitLog中的位置 |
size_real | 4B | 该条消息在commitLog中的大小 |
hash_topic | 4B | 该条消息topic的hash code |
varbody | 8B | 存储可变的body,暂时没有为空 |
具体实现
流程图
流程步骤
从图中可以看出,共有五个Service分别处理定时消息的放置和存储。工作流如下:
-
针对放置定时消息的service,每50ms从commitLog读取指定主题(rmq_sys_wheel_timer)的定时消息。
a. TimerEnqueueGetService从commitLog读取得到定时主题的消息,并先将其放入enqueuePutQueue。
org.apache.rocketmq.store.timer.TimerMessageStore.TimerEnqueueGetService#run
org.apache.rocketmq.store.timer.TimerMessageStore#enqueue
b. 另一个线程TimerEnqueuePutService将其放入timerLog,更新时间轮的存储内容。将该任务放进时间轮的指定位置。
org.apache.rocketmq.store.timer.TimerMessageStore.TimerEnqueuePutService#run
org.apache.rocketmq.store.timer.TimerMessageStore#doEnqueue
-
针对取出定时消息的service,每50ms读取下一秒的slot。有三个线程将读取到的消息重新放回commitLog。
a. 首先,TimerDequeueGetService每50ms读一次下一秒的slot,从timerLog中得到指定的msgs,并放进dequeueGetQueue。
org.apache.rocketmq.store.timer.TimerMessageStore.TimerDequeueGetService#run
org.apache.rocketmq.store.timer.TimerMessageStore#dequeue
b. 而后TimerDequeueGetMessageService从dequeueGetQueue中取出msg,并将其放入队列中。该队列为待写入commitLog的队列,dequeuePutQueue。
org.apache.rocketmq.store.timer.TimerMessageStore.TimerDequeueGetMessageService#run
c. 最后TimerDequeuePutMessageService将这个queue中的消息取出,若已到期则修改topic,放回commitlog,否则继续按原topic写回CommitLog滚动。
org.apache.rocketmq.store.timer.TimerMessageStore.TimerDequeuePutMessageService#run
org.apache.rocketmq.store.timer.TimerMessageStore#convertMessage
消息转换,更改真实Topic
投递消息
org.apache.rocketmq.store.timer.TimerMessageStore#doPut
消息投递到真实Topic后,其实就变成了一条“正常的消息”了,消费者就能正常消费了,以上就是对RocketMQ 5.0中延迟消息的变更做的分析,参考了部分官方的资料,后续会使用5.0版本,实际做一些演练,目前对于这个新特性,官方并没有大肆宣扬,也不知道具体有哪些限制,所以还需要做一些实践,踩踩坑。