highlight: arduino-light
4.3 延时消息
延迟消息对应的Topic是SCHEDULETOPICXXXX,注意就是SCHEDULETOPICXXXX,XXXX不是某某某的意思。
SCHEDULETOPICXXXX的队列名称是从2开始到17,对应的delayLevel为3到18,3对应10s,18对应2h,在类MessageStoreConfig中这样定义延时时间:
String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"。
SCHEDULETOPICXXXX这个topic只对内部使用,对于consumer只能消费到自己所在的消费者组的重试topic的数据。
比如A是OrderConsumer组的一个消费者,OrderConsumer消费的主题Topic是Order。
那么消费者A在启动的时候,会订阅Order主题的同时,还会订阅%RETRY%_OrderConsumer。
即订阅Order主题的同时,还会订阅%RETRY%+消费者组的主题。
consumer消费失败的消息发回broker后总是先写到SCHEDULETOPICXXXX里面,然后schedule service在延迟时间到了以后会读取SCHEDULETOPICXXXX里面的数据然后重新发回到重试主题,consumer订阅了重试主题,所以会重新消费失败的数据,这样就完成了一个循环。
发送到重试消费Topic 是%RETRY% + 消费组名 注意是消费组名 我们思考一下为什么是消费者组名? A消费者组消费成功 B消费者组消费失败 如果发回原topic就有问题了,A又会消费一次
rocketmq 先将不同延时等级的消息存入内部对应延时队列中,然后不断的从延时队列中拉取消息判断是否到期,然后进行投递到对应的topic中。
从这个过程也能看到,一个消费失败的消息体每次发回broker需要在commitLog里面存储两份。
topic为SCHEDULETOPICXXXX的一份这个主要是为schedule service控制延时用的。
topic为%RETRY%groupName的一份。
通过固定延时等级的方式,同一个队列中的消息都是相同的延时等级,不需要对消息进行排序,只需要按顺序拉取消息判断是否可以投递就行了。但也限制了延时时间。
另外,因为只要延时消息存入延时队列中,就会写入commitlog文件中,然后rocketmq的高可用(同步复制或异步复制)就会将消息复制到slave中,从而保证延时消息的可靠性。
虽然rocketmq不支持任意延时时间,但相比于rabbitmq的死信消息,仍然提供了18个延时等级,基本也能覆盖很多场景了。
比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
Apache RocketMQ 目前只支持固定精度的定时消息,因为如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化, 那么消息排序要不可避免的产生巨大性能开销。
阿里云 RocketMQ 提供了任意时刻的定时消息功能,Apache 的 RocketMQ 并没有,阿里并没有开源 发送延时消息时需要设定一个延时时间长度,消息将从当前发送时间点开始延迟固定时间之后才开始投递。
延迟消息是根据延迟队列的 level 来的,延迟队列默认是 msg.setDelayTimeLevel(5) 代表延迟一分钟 "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" 是这 18 个等级(秒(s)、分(m)、小时(h)),level 为 1,表示延迟 1 秒后消费,level 为 5 表示延迟 1 分钟后消费,level 为 18 表示延迟 2 个 小时消费。
生产消息跟普通的生产消息类似,只需要在消息上设置延迟队列的level即可。消费消息跟普通的消费消息一致。
固定Level的含义是延迟是特定级别的,比如支持3秒、5秒的Level,那么用户只能发送3秒延迟或者5秒延迟,不能发送8秒延迟的消息。
消息队列RocketMQ的阿里云版本(收费版本)才支持到精确到秒级别的延迟消息(没有特定Level的限制)。
开源版本没有支持任意延迟的消息,我想可能有以下几个原因:
- 任意延迟的消息的需求不强烈
- 可能是一个比较有技术含量的点,不愿意开源
需求不强
对支持任意延迟的需求确实不强,因为:
- 延迟并不是MQ场景的核心功能,业务单独做一个替代方案的成本不大
- 业务上一般对延迟的需求都是固定的,比如下单后半小时check是否付款,发货后7天check是否收货
在我司,MQ上线一年多后才有业务方希望我能支持延迟消息,且不要求任意延迟,只要求和RocketMQ开源版本一致,支持一些业务上的级别即可。
不愿意开源
为了差异化(好云上卖钱),只能将开源版本的功能进行阉割,所以开源版本的RocketMQ变成了只支持特定Level的延迟。
既然业务有需求,我们肯定也要去支持。
任意延迟的消息难点在哪里?
首先,我们先划清楚定义和边界:在我们的系统范围内,支持任意延迟的消息指的是:
- 精度支持到秒级别
- 最大支持30天的延迟
本着对自己的高要求,我们并不满足于开源RocketMQ的18个Level的方案。那么,如果我们自己要去实现一个支持任意延迟的消息队列,难点在排序和消息存储。
消息要在服务端排序
任意延迟意味消息要在服务端排序
比如用户先发了一条延迟1分钟的消息,一秒后发了一条延迟3秒的消息,显然延迟3秒的消息需要先被投递出去。
那么服务端在收到消息后需要对消息进行排序后再投递出去。在MQ中,为了保证可靠性,消息是需要落盘的,且对性能和延
迟的要求,决定了在服务端对消息进行排序是完全不可接受的。
消息存储量太大
其次,目前MQ的方案中都是基于Write Ahead Log的方式实现的(RocketMQ、Kafka),日志文件会被过期删除,一般会保留最近一段时间的数据。
支持任意级别的延迟,那么需要保存最近30天的消息。阿里内部 1000+ 核心应用使用,每天流转几千亿条消息,经过双11交易、商品等核心链路真实场景的验证,稳定可靠。
考虑一下一天几千亿的消息,保存30天的话需要堆多少服务器,显然是无法做到的。
开源版本延迟消息如何做的?
虽然决定自己做,但是依旧需要先了解开源的实现,那么就只能看看RocketMQ开源版本中,支持18个Level是怎么实现的,希望能从中得到一些灵感。
上图是通过RocketMQ源码分析后简化一个实现原理方案示意图。
消息写入
在写入CommitLog之前,如果是延迟消息,替换掉消息的Topic和queueId(被替换为延迟消息特定的Topic,queueId则为延迟级别对应的id)
消息写入CommitLog之后,提交dispatchRequest到DispatchService
因为在第①步中Topic和QueueId被替换了,所以写入的ConsumeQueue实际上非真正消息应该所属的ConsumeQueue,而是写入到ScheduledConsumeQueue中(这个特定的Queue存放不会被消费)
Schedule过程中:
private String levelString = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; String[] levelArray = levelString.split(" "); for (int i = 0; i < levelArray.length; i++) { this.delayLevelTable.put(level, delayTimeMillis); }
每个level分配1个定时器,扫描所有延迟级别里面的延迟消息message。
如果存在延时消息。
如果当前消息不到消费的时间,则在countdown
毫秒后再执行任务。countdown是消息消费的时间-当前时间now。
如果当前消息到消费的时间,根据消息的物理偏移量和大小获取消息。把延迟消息message,发送到真正的topic对应的某个队列。
RocketMQ延迟消息的代码实战及原理分析 - 万猫学社 - 博客园 (cnblogs.com)
回顾一下这个方案,最大的优点就是没有了排序:
- 先发一条level是5s的消息,再发一条level是3s的消息,因为他们会属于不同的ScheduleQueue所以投递顺序能保持正确
- 如果先后发两条level相同的消息,那么他们的处于同一个ConsumeQueue且保持发送顺序
- 因为level数固定,每个level的有自己独立的定时器,开销也不会很大
- ScheduledConsumeQueue其实是一个普通的ConsumeQueue,所以可靠性等都可以按照原系统的M-S结构等得到保障
但是这个方案也有一些问题:
- 固定了Level,不够灵活,最多只能支持18个Level
- 业务是会变的,但是Level需要提前划分,不支持修改
- 如果要支持30天的延迟,CommitLog的量会很大,这块怎么处理没有看到
时间轮:TimeWheel
总结RocketMQ的方案,通过划分Level的方式,将排序操作转换为了O(1)的ConsumeQueue 的append操作。
我们去支持任意延迟的消息,必然也需要通过类似的方式避免掉排序。
此时我们想到了TimeWheel:Netty中也是用TimeWheel来优化I/O超时的操作。
4.3.1 启动消息消费者
java public class ScheduledMessageConsumer { public static void main(String[] args) throws Exception { // 实例化消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer"); // 订阅Topics consumer.subscribe("TestTopic", "*"); // 注册消息监听者 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) { for (MessageExt message : messages) { // Print approximate delay time period System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later"); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者 consumer.start(); } }
4.3.2 发送延时消息
java public class ScheduledMessageProducer { public static void main(String[] args) throws Exception { // 实例化一个生产者来产生延时消息 DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup"); // 启动生产者 producer.start(); int totalMessagesToSend = 100; for (int i = 0; i < totalMessagesToSend; i++) { Message message = new Message ("TestTopic", ("Hello scheduled message " + i).getBytes()); // delayTimeLevel="1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间) message.setDelayTimeLevel(3); // 发送消息 producer.send(message); } // 关闭生产者 producer.shutdown(); } }
4.3.3 验证
您将会看到消息的消费比存储时间晚10秒
4.3.4 使用限制:1s-2h
md // org/apache/rocketmq/store/config/MessageStoreConfig.java private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18。