RocketMQ 的延迟消息是其核心特性之一,允许消息在指定延迟时间后才被消费者消费。
定时消息生命周期
一、延迟消息的核心机制
- RocketMQ(
5.0之前
) 不支持任意时间精度的延迟,而是通过预定义的 延迟级别(Delay Level) 实现。默认支持 18 个延迟级别,对应延迟时间如下:
1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h
消息发送时指定延迟级别,Broker 会将消息暂存到内部 Topic(SCHEDULE_TOPIC_XXXX
),这个topic下对应着这18个延迟级别的队列,到期后投递到目标 Topic。
- RocketMQ
5.0之后
支持任意时间精度的延迟,内部是通过时间轮算法实现的消息延迟功能。
定时消息仅支持在 MessageType为Delay 的主题内使用,即定时消息只能发送至类型为定时消息的主题中,发送的消息的类型必须和主题的类型一致。
RocketMQ 定时消息的定时时长参数精确到毫秒级,但是默认精度为1000ms,即定时消息为秒级精度。
二、使用方式
- 发送消息时设置延迟级别:
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
// 设置延迟级别为 2(对应 5s)
msg.setDelayTimeLevel(2);
SendResult sendResult = producer.send(msg);
- 发送消息时设置延迟时间
//定时/延时消息发送
MessageBuilder messageBuilder = new MessageBuilderImpl();;
//以下示例表示:延迟时间为10分钟之后的Unix时间戳。
Long deliverTimeStamp = System.currentTimeMillis() + 10L * 60 * 1000;
Message message = messageBuilder.setTopic("topic")
//设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
//设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
.setDeliveryTimestamp(deliverTimeStamp)
//消息体
.setBody("messageBody".getBytes())
.build();
try {
//发送消息,需要关注发送结果,并捕获失败等异常。
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
三、源码实现分析
1. 消息存储阶段
-
CommitLog 存储
消息写入 CommitLog 前,Broker 检测delayTimeLevel
:- 若为延迟消息,替换 Topic 为
SCHEDULE_TOPIC_XXXX
,队列 ID 为delayLevel - 1
。 - 源码位置:
org.apache.rocketmq.store.CommitLog#putMessage
- 若为延迟消息,替换 Topic 为
-
延迟队列 Topic
每个延迟级别对应一个队列,例如 Level 2 存储在SCHEDULE_TOPIC_XXXX
的队列 1 中。
2. 延迟消息调度
-
ScheduleMessageService
核心调度服务,启动时初始化定时任务:public void start() { for (int i = 0; i < this.delayLevelTable.size(); i++) { int delayLevel = i + 1; this.scheduleExecutorService.schedule( new DeliverDelayedMessageTimerTask(delayLevel), this.firstDelayTime, TimeUnit.MILLISECONDS ); } }
- 每个延迟级别对应一个
DeliverDelayedMessageTimerTask
定时任务。 - 定时扫描对应队列,将到期消息投递到目标 Topic。
- 每个延迟级别对应一个
-
消息投递逻辑
- 计算消息的 到期时间(存储时间 + 延迟时间)。
- 若当前时间 >= 到期时间,从延迟队列拉取消息,恢复原始 Topic/QueueId,重新存入 CommitLog。
- 源码位置:
DeliverDelayedMessageTimerTask#executeOnTime
3. 消费者可见性
- 消息在延迟期间对消费者不可见,到期投递到原 Topic 后,消费者才能拉取。
四、关键配置
- Broker 配置
修改messageDelayLevel
参数可自定义延迟级别(需重启 Broker):messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
五、设计思考
-
固定延迟级别 vs 任意延迟
RocketMQ 采用固定级别避免为每个消息创建 Timer,减少资源消耗。定时任务按队列批量处理,效率更高。 -
性能优化
延迟消息的存储和普通消息共用 CommitLog,通过 Topic 和队列切换实现逻辑隔离,保证写入性能。
六、注意事项
- 延迟精度:受定时任务扫描间隔影响,可能存在微小误差。
- 消息重试:若消费失败,消息进入重试队列(非延迟队列)。
- 避免大量相同定时时刻的消息:定时消息的实现逻辑需要先经过定时存储等待触发,定时时间到达后才会被投递给消费者。因此,如果将大量定时消息的定时时间设置为同一时刻,则到达该时刻后会有大量消息同时需要被处理,会造成系统压力过大,导致消息分发延迟,影响定时精度。
通过上述机制,RocketMQ 以较低成本实现了高吞吐量的延迟消息功能,适用于订单超时、定时任务等场景。