深入浅出消息队列----【延迟消息的实现原理】
- 粗说 RocketMQ 的设计
- 细说 RocketMQ 的设计
- 这样实现是否有什么问题?
本文仅是文章笔记,整理了原文章中重要的知识点、记录了个人的看法
文章来源:编程导航-鱼皮【yes哥深入浅出消息队列专栏】
粗说 RocketMQ 的设计
RocketMQ 约定了一些延迟时间,即生产者无法灵活的自定义延迟时间,而是固定的几个延迟时间来供生产者选择。
这样延迟消息就有了统一归类和约束,便于管理和调配。
虽说归类了延迟消息,但是同一个延迟 level 的延迟消息共用一个闹钟也是无法满足需求的。
所以变成专门雇一个“人”,每个“人”管一个 level 的延迟消息,定时查看是否有到期的消息,如果到了立马让消息给消费者消费。
至于复用 commitlog 这一套的问题,专门搞个存放延迟消息的 Topic,延迟消息先发往这个 Topic,消费者并不会订阅这个 Topic,因此此时消费者无法消费到这个消息。
等到延迟消息到达时间后,Broker 将这个延迟消息发往原 Topic,此时消费者就能从原 Topic 消费到这条消息!
也就是说 Broker 自己建立一个专门 Topic 用来存放延迟消息,此时延迟消息的存储能复用 commitlog 这一套模型,消息也会被分发到 consumerQueue。
不同的延迟 level 的消息回存放到这个 Topic 不同的队列中,也就是说这个 Topic 一个有 18 个队列对应 18 个 level。
然后会有一个定时线程去每个队列按序检查消息是否都到时间了,如果到了就发到消息原先的 Topic 中。
细说 RocketMQ 的设计
延迟消息的发送很简单,仅需设置一个 delayTimeLevel 即可:
Message message = new Message("TestTopic",("Hello scheduled message" + i).getBytes());
message.setDelayTimeLevel(3);
producer.send(message);
Broker 收到这个消息后,一看 delayTimeLevel 设置了值,那么就知道它是一个延迟消息,于是乎直接来个偷梁换柱!
把消息的原 Topic 和对应队列 ID 保存在消息扩展属性里面。
然后把这条消息的 Topic 设置成 SCHEDULE_TOPIC_XXXX
,没错 Topic 的名字就是 SCHEDULE_TOPIC_XXXX
哈,后面就是 XXXX
!
并且根据消息的 Level 选择 SCHEDULE_TOPIC_XXXX
下对应的队列。
这样一来延迟消息就存储好了。
然后 Broker 起了一个定时线程池,里面一共有 18 个核心线程,这个线程池的任务就是定时调度 SCHEDULE_TOPIC_XXXX
下的每个队列的消息,一旦有到期的消息,就分发到原 Topic 供消费者消费。
具体的做法是在初始时,每个队列都会对应被创建一个任务扔到线程池中,这些任务的内容就是根据传入的队列 ID,得到对应的 consumeQueue,当然还有对应的 offset。
Broker 会定时保存 SCHEDULE_TOPIC_XXXX
下 consumeQueue 的消费 offset。
得到 consumeQueue 和 offset,对应的就能获取延时消息,这时候将延迟时间跟当前时间对比,就能判断是否到期。
如果到期了,就从消息扩展属性里面获取原 Topic 和对应队列 ID,然后投递到原队列中。
上面的图表就是这个意思,这里再贴一下:
然后再代码上的实现是立马新建一个任务扔到线程池中,延迟时间是 1000ms,任务的入参会塞入更新后的 offset,这样线程就会继续消费后面的消息,如此往复循环。
当然,如果拿到的对应延迟消息还未到时间,那么 offset 不变,也立马新建一个任务塞入到线程池中,这样 1000,s 后又会来看这个消息是否到期。
可以看到,整个延迟消息设计就加了一个线程池,很巧妙地复用了正常消息的 commitlog 和 consumeQueue 的存储机制,且利用发布订阅的特性,改变了消息的 Topic 来使得消费者无法消费到未到时间的消息。
到时间了又投递回原 Topic 使得消费者可以消费到到期的消息,非常 nice!
这样实现是否有什么问题?
从实现层面来看,大大减少了延迟消息开发的复杂度,但是这样的实现对延迟时间来说是不准的。
首先,同一个延迟 level 的消息都是入同一个队列,然后上一个延迟消息处理完之后继续处理下一个,如果同一时刻有大量的同一个 level 的延迟消息产生,那么它们都堆积在一个队列里面,一个一个处理,这样一来即使后面的消息到时间了也得排队等着。
这样的机制就做不到非常实时。
并且从 SHEDULE_TOPIC_XXXX
分发至原 Topic 之后,假设原 Topic 本身就已经有很多消息堆积了,那么等消费者消费到这条消息的时候,时间也有大大的延迟。
当然,本身在大流量下对时间的把控是无法做到很准确的,不论是什么方法,都会有延迟,无非是延迟精度多少的问题。
有一种比较好的定时结构就是时间轮了。