延迟消息存储机制
概述
什么是延迟消息呢?延迟消息也叫定时消息,一般地,生产者在发送消息后,消费者希望在指定的一段时间后再消费。常规做法是,把信息存储在数据库中,使用定时任务扫描,符合条件的数据再发送给消费者。典型的业务场景春节买票30分钟内完成订单支付。
RocketMQ延迟消息是通过ScheduleMessageService类实现的
核心属性
- SCHEDULE_TOPIC:一个系统内置的Topic,用来保存所有定时消息。RocketMQ全部未执行的延迟消息保存在这个内部Topic中(现如今保存在TopicValidator中)
- FIRST_DELAY_TIME:第一次执行定时任务的延迟时间,默认为1000ms
- DELAY_FOR_A_WHILE:第二次及以后的定时任务检查间隔时间,默认为100ms
- DELAY_FOR_A_PERIOD:如果延迟消息到时间投递时却失败了,会在DELAY_FOR_A_PERIOD中设置的ms后重新尝试投递,默认为10 000ms
- delayLevelTable:保存延迟队列和延迟时间的映射关系
- offsetTable:保存延迟级别及相应的消费位点
- timer:用于执行定时任务,线程名叫ScheduleMessageTImerThread
核心方法
- queueId2DelayLevel():将queueid转化为延迟级别
- delayLevel2QueueId():将延迟级别转化为queueId
一个延迟级别保存在一个Queue中,延迟级别和Queue之间的转化关系为
queueId = delayLevel -1
-
updateOffset():更新延迟消息的Topic的消费位点
-
computeDeliverTimestamp():根据延迟级别和消息的存储时间计算该延迟消息的投递时间
-
start():启动延迟消息服务。启动第一次延迟消息投递的检查定时任务和持久化消费位点的定时任务
-
shutdown():关闭start()方法中启动的timer任务
-
load():加载延迟消息的消费位点信息和全部延迟级别信息,延迟级别可以通过messageDelayLevel字段进行设置,默认1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
-
parseDelayLevel();格式化所有延迟级别信息,并保存到内存中
-
DeliverDelayedMessageTimerTask内部类用于检查延迟消息是否可以投递,DeliverDelayedMessageTImerTask是TimerTask的一个扩展实现
延迟消息存储机制
在延迟消息的发送流程中,消息体中会设置一个delayTimeLevel,其他发送流程也是如此。Broker在接收延迟消息时会有几个地方单独处理再存储,其余过程和普通消息存储一致.
延迟消息在保存到CommitLog中的单独处理。CommitLog.putMessage()/asyncPutMessage
方法存储延迟消息的实现逻辑如图
- msg.getDelayTimeLevel()是发送消息时可以设置的延迟级别,如果该值大于0,则表示当前处理的消息是一个延迟消息,将对该消息做如下修改:
1.将原始Topic、queueId备份在消息的扩展字段中,全部的延迟消息都保存在SCHEDULE_TOPIC的Topic中
2.备份原始Topic、queueId为延迟消息的Topic、queueId。备份的目的是当消息到达投递时间时会恢复原始的Topic和queueId,继而被消费者拉取并消费
- 经过处理后,该消息会被正常保存到CommitLog中,然后创建ConsumeQueue和IndexFile两个索引。在创建ConsumeQueue时,从CommitLog中获取的消息内容会单独进行处理,单独处理的逻辑方法是CommitLog.checkMessageAndReturnSize().
有一个很精巧的设计:在CommitLog中查询出消息后,调用computeDeliverTimestamp()方法计算消息具体的投递时间,再将该时间保存在ConsumeQueue的tagCode中。
这样设计的好处是,不需要检查CommitLog大文件,在定时任务检查消息是否需要投递时,只需要检查ConsumeQueue中的tagCode(不再是Tag的Hash值,而是消息可以投递的时间,单位是ms),如果满足条件再通过查询CommitLog将消息投递出去即可,如果每次都查询CommitLog,那么可想而知,效率会很低