深入浅出消息队列----【顺序消息的实现原理】
- 何为顺序
- 发消息的顺序性
- 存储消息的顺序性
- 消费消息的顺序性
- 顺序消息消费的三把锁
- 第一把锁:分布式锁
- 第二把锁:Synchronized
- 第三把锁:ReentrantLock
本文仅是文章笔记,整理了原文章中重要的知识点、记录了个人的看法
文章来源:编程导航-鱼皮【yes哥深入浅出消息队列专栏】
何为顺序
- 因果顺序
- 时间顺序
发消息的顺序性
在明白什么是顺序后,我们需要保证消息在发送阶段的顺序就符合因果或时间顺序。
首先,我们需要保证单个生产者来发送顺序消息。
现在生产环境基本上都是多服务器部署,如果有多个生产者分布在不同的服务中,都往同一个 Topic 发送相关的顺序消息,那么我们压根无法保证消息的顺序性。
即使在因果上他们产生的顺序是对的,但是最终发送到 Broker 这个过程的顺序是无法把控的(可能产生消息的时间早,但是实际发送的时间晚,还有网络上的传输顺序也是不可预测的)。
保证单个生产者后,我们还需保证单个生产者内对顺序消息是单线程(串行)发送的。
RocketMQ 的生产者是支持多线程发送消息的(线程安全),因此在使用上如果我们利用多线程提高发送顺序消息的速率,理论上就无法保证绝对的顺序。
比如消息-1和消息-2都发往顺序 Topic-A,消息-1比消息-2在顺序上领先,他也的确比消息-2更早产生,消息-1也更早被线程-A 处理,随后消息-2才产生并且被线程-B处理。
如图所示:
但是即使在这种情况下,都无法保证消息-1 一定比消息-2 早发送到 Broker 上,因为线程会被调度,可能线程-A 执行一半就停了,线程-B 还在执行,这样一来先启动的并不一定先执行完成,这就是多线程的不确定性。
也就是多线程是无法保证顺序的,因此在发送的时候,需要单线程串行发送有关的顺序消息。
所以对生产者来说,发送顺序消息需要保证两点:单个生产者和串行发送。
存储消息的顺序性
保证了发送的顺序性之后,我们来看看 Broker 存储消息的顺序性要如何保证。
在存储上我们知晓消息是按照时间顺序追加写入到 commitlog 中的,且会被分发到 consumeQueue 中。
同消费组内,一个 consumeQueue 仅会被一个消费者消费,且这个消费者会按照 consumeQueue 内存储的顺序来消费消息,因此我们仅需让相关的顺序消息都分配到同一个 consumeQueue 即可,这样存储上就是有序的。
好比之前举例过的订单情况,同一笔订单相关的创建、支付、发货都发往同一个队列即可:
那如何让相关的顺序消息投递到同一个 consumeQueu 中?
发送顺序消息的时候指定队列就行了。
通过前面的学习可以知晓 RocketMQ 有队列的概念,对应到 Kafka 就是分区的概念,因此发消息的时候指定队列即可让相关的顺序消息被分配盗同一个 consumeQueu 中,以此来保证存储上的顺序。
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List mqs, Message msg, Object arg) {
Integer id = (Integer)arg; // 订单号
int index = id % mqs.size(); // 对队列数取余
return mqs.get(index); // 选择队列
}
}, orderId);
就像官方示例的代码,利用 MessageQueueSelector 来选择队列,select 方法里面的 mqs 就像 Topic 下所有队列,通过 orderId 取余队列数使得一样的订单都会被发往相同的队列,这样就保证分区顺序消息。
当然,这样的操作部保证所有订单的顺序,仅保证同一笔订单相关消息的顺序。
如果我们要保证所有订单处理的顺序,那么直接写死选择一个队列即可,也就是让所有的订单相关消息都发往同一个队列,这样叫全局顺序消息。
全局顺序消息的性能比较低,因为只能发往一个队列,这个并发度已经限制死了,像前面说的因为在集群模式下,统一消费组内,一个队列只能对应有一个消费者来消费。
而分区顺序消息的并发度取决于 Topic 下的队列数,因此如果想提升性能仅需增加队列数即可,不同队列之间可以并发处理顺序消息,互不影响,性能较好。
一般在业务上,我们很少用到全局顺序消息,一般而言分区顺序消息就够用了。
消费消息的顺序性
存储上如果已经保证了顺序,那么消费者只要老老实实的按照拉取到的消息顺序一条一条消费,这样就保证消费的顺序性,但是实际上还是有很多细节的。
首先消费者需要保证单线程消费顺序消息,如果消费者是多线程消费消息,那么道理同生产者多线程发送消息一样,无法保证消息的顺序性。
并且还需要考虑异常的情况,也就是消费失败场景的处理。
我们之前了解到正常消息如果消费失败会进行重试,重试 16 次后会进入死信队列,后续人工介入处理,不会堵着后面的消息。
而对于顺序消息来说,如果前置消息消费失败了,后续的消息能正常消费吗?
也就说重试多次后,能直接进入死信队列吗?跳过前置消息,后面的消息能正常消费吗?
很多情况下前面的消息消费失败,后续的消息是无法正常消费的,如果在逻辑上没有做处理很容易造成脏数据。
还是拿订单来举例:比如创建订单的消息消费失败了,但是随后的支付消息消费成功,也就是成功把用户的钱给扣了,但订单实际上没生成,用户一看钱都扣了,订单却没有…
所以对于顺序消息来说,消费失败其实是比较难处理的,RocketMQ 对顺序消息的默认实现是重试次数时 Integer.MAX_VALUE 次。
这样不就一直重试堵着后面的消息?那也不行啊,一直堵着业务上不就阻塞了吗?
因此顺序消息的处理在业务上可能需要支持相关联的消息都直接失败,然后可以在另外的地方持久化保存这些消息,待后续修复可以重新消费。
也就是不抛错,先记着不处理,不堵着后续无关的消息,后面可以人工介入或者其他方式去处理这些失败的消息。
还有重平衡机制需要考虑,简单来说就是消费者的负载均衡,反正在顺序消息场景下因为重平衡机制,可能会导致两个消费者消费同一个队列的消息,不仅导致重复消费,也可能使得顺序不一致。
因此需要一定的机制来避免这种情况的发送。
顺序消息消费的三把锁
在消费场景,RocketMQ 利用三把锁来尽可能地保证消息的消费顺序性。
很巧的是这三把锁,覆盖了我们常见的三把锁:分布式锁、Synchronized、ReentrantLock。
第一把锁:分布式锁
普通的消息消费用的是 RocketMQ 提供的 MessageListenerConcurrently,来实现并发消费。
而顺序消费用的是 MessageListenerOrderly 来保证顺序消费,RocketMQ 默认已经提供了一个实现类 ConsumeMessageOrderlyService。
这个 service 在启动的时候就会向 Broker 申请当前消费者负责的队列锁,会将自己的消费组、自己的客户端ID、以及负责的队列发往 Broker,Broker 就把对应的队列与这个消费者绑定,将这个关系存储在了本地。
这样一来,别的消费者如果想消费对应的队列也得来加分布式锁,如果发现已经被别的消费者绑定了,那么就无法拉取消息消费。
因此这个分布式锁保证了同一个消费组内,一个队列只会被分配给一个消费者。
对了,这个锁在 Broker 会过期,消费者会定时(默认每 20s)的去续这个锁来保证分布式锁的拥有。
第二把锁:Synchronized
第一把分布式锁保证了一个队列只会被分配给一个消费者,那么第二把本地锁保证了同一时刻只有一个线程去消费这个队列。
因为实际上 MessageListenerOrderly 拉取到消息后,也是丢给线程池并发消费的,因此需要有把锁来保证一个队列只会被一个线程消费。
第三把锁:ReentrantLock
线程获取到 Synchronized 锁后,要开始处理消费消息了。
但是在真正开始消费消息之前,会先获取 ProcessQueue 的 consumeLock,这个 lock 是一把 ReentrantLock。
那么为什么需要这把锁呢?从名字来看是表明正在消费消息的消费锁。
前面已经提到 RocketMQ 会有重平衡机制,当前的队列-1 此时属于消费者-1 负责,但重平衡后可能会分配给同一个消费组内的另一个消费者-2 负责。
而发生重平衡的时候,很可能当前的消费者-1 正在消费这个队列的数据,但是还没消费完,可能消费了一半且还未向 Broker 提交消费点位,这种情况不能直接将这个正在消费的队列重平衡给另一个消费者,因为会产生重复消费。
这把锁更像一个标记位,来表明当前这个队列还有消息在消费,无法重平衡,等待下一次重平衡。
具体在真的发生队列平衡时,当前的消费者会先尝试获取这个队列对应的 ProcessQueue 的 consumeLock,如果获取失败说明该队列正在被消费,次队列重平衡失败,待下次重平衡再试。
如果获取 consumeLock 成功,表示当前没有正在消费消息,于是安心的去 Broker 解除分布式锁,这样一来新的消费者就能接手当前的队列了,消息也不会重复消费。
已经有一个 Synchronized 来保证一个队列只会被一个线程消费了吗,通过这个锁不能判断正在消费吗?为什么要加一个 consumeLock?