前言:
上一篇我们分享了 Spring Boot 整合 RocketMQ 完成普通消息发送的过程,本篇我们来分享一下 RocketMQ 顺序消息的发送。
RocketMQ 系列文章传送门
RocketMQ 的介绍及核心概念讲解
Spring Boot 整合 RocketMQ 之普通消息
Spring Boot 整合 RocketMQ 之定时/延时消息
RocketMQ 顺序消息的使用场景
典型场景一:撮合交易
以证券、股票交易撮合场景为例,对于出价相同的交易单,坚持按照先出价先交易的原则,下游处理订单的系统需要严格按照出价顺序来处理订单。
典型场景二:数据实时增量同步
以数据库变更增量同步场景为例,上游源端数据库按需执行增删改操作,将二进制操作日志作为消息,通过 Apache RocketMQ 传输到下游搜索系统,下游系统按顺序还原消息数据,实现状态数据按序刷新。如果是普通消息则可能会导致状态混乱,和预期操作结果不符,基于顺序消息可以实现下游状态和上游操作结果一致。
RocketMQ 如何保证消息的顺序性?
Producer 保证消息有序性
RocketMQ 只能保证一个队列中的消息是有顺序的,不同队列之间的消息是无法保证顺序的,因此我们要想使用 RocketMQ 完成顺序消息的功能,就必须保证消息发送到同一个队列中,RocketMQ 通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。
如需保证消息生产的顺序性,则必须满足以下条件:
- 单一生产者:消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的消息组,不同生产者之间产生的消息也无法判定其先后顺序。
- 串行发送:RocketMQ 生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。
满足以上条件的生产者,将顺序消息发送至 RocketMQ 后,会保证设置了同一消息组的消息,按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下:
- 相同消息组的消息按照先后顺序被存储在同一个队列。
- 不同消息组的消息可以混合在同一个队列中,且不保证连续。
举例如下:
如上图所示,消息组1和消息组4的消息混合存储在队列1中, Apache RocketMQ 保证消息组1中的消息G1-M1、G1-M2、G1-M3是按发送顺序存储,且消息组4的消息G4-M1、G4-M2也是按顺序存储,但消息组1和消息组4中的消息不涉及顺序关系。
Consumer 保证消息有序性
- RocketMQ 提供了两种消费模式,有序消费模式 ConsumeMode.ORDERLY 和并发消费模式ConsumeMode.CONCURRENTLY,因此我们的消费端也要使用有序消费模式。
- RocketMQ 顺序消息投递仅在重试次数限定范围内,即一条消息如果一直重试失败,超过最大重试次数后将不再重试,跳过这条消息消费,不会一直阻塞后续消息处理。
了解了 RocketMQ 的顺序消息原理后,接下来我们进入实战部分。
RocketMQ 实现顺序消息发送
RocketMQTemplate 给我们提供了 SendOrderly 方法,来实现发送顺序消息,主要分为三类,如下:
- syncSendOrderly:发送同步顺序消息,支持超时设置。
- asyncSendOrderly:发送异步顺序消息,支持超时设置。
- sendOneWayOrderly:发送单向超时消息。
顺序消息生产者代码如下:
package com.order.service.rocketmq.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
/**
* @ClassName: OneWayMessageProducer
* @Author: zhangyong
* @Date: 2024/9/27 17:27
* @Description: 顺序消息发送者
*/
@Slf4j
@Component
public class OrderlyMessageProducer {
@Autowired
private RocketMQTemplate rocketMqTemplate;
/**
* @date 2024/10/12 15:13
* @description 单向顺序消息
*/
public void sendOneWayOrderly(){
rocketMqTemplate.sendOneWayOrderly("orderly-topic", MessageBuilder.withPayload("单向顺序消息,订单编号:666666 创建").build(), "666666");
rocketMqTemplate.sendOneWayOrderly("orderly-topic", MessageBuilder.withPayload("单向顺序消息,订单编号:666666 支付").build(), "666666");
rocketMqTemplate.sendOneWayOrderly("orderly-topic", MessageBuilder.withPayload("单向顺序消息,订单编号:666666 确认收货").build(), "666666");
}
/**
* @date 2024/10/11 15:45
* @description 同步顺序消息
*/
public void syncSendOrderly() {
//hashKey 用来计算决定消息发送到哪个队列 一般是订单 ID 等信息 这里我们模拟订单 ID 发送
rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:666666 创建").build(), "666666");
rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:666666 支付").build(), "666666");
rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:666666 确认收货").build(), "666666");
rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:888888 创建").build(), "888888");
rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:888888 支付").build(), "888888");
rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:888888 确认收货").build(), "888888");
}
/**
* @date 2024/10/11 15:52
* @description 发送异步顺序消息
*/
public void asyncSendOrderly() throws InterruptedException {
//hashKey 用来计算决定消息发送到哪个队列 一般是订单 ID 等信息 这里我们模拟订单 ID 发送
rocketMqTemplate.asyncSendOrderly("orderly-topic", MessageBuilder.withPayload("异步顺序消息,订单编号:666666 创建").build(), "666666", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("订单编号:666666 创建,异步消息发送成功");
}
@Override
public void onException(Throwable throwable) {
log.error("订单编号:666666 创建,异步消息发送失败");
}
});
Thread.sleep(200);
rocketMqTemplate.asyncSendOrderly("orderly-topic", MessageBuilder.withPayload("异步顺序消息,订单编号:666666 支付").build(), "666666", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("订单编号:666666 支付,异步消息发送成功");
}
@Override
public void onException(Throwable throwable) {
log.error("订单编号:666666 支付,异步消息发送失败");
}
});
Thread.sleep(200);
rocketMqTemplate.asyncSendOrderly("orderly-topic", MessageBuilder.withPayload("异步顺序消息,订单编号:666666 确认收货").build(), "666666", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("订单编号:666666 确认收货,异步消息发送成功");
}
@Override
public void onException(Throwable throwable) {
log.error("订单编号:666666 确认收货,异步消息发送失败");
}
});
Thread.sleep(200);
rocketMqTemplate.asyncSendOrderly("orderly-topic", MessageBuilder.withPayload("异步顺序消息,订单编号:888888 创建").build(), "888888", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("订单编号:888888 创建,异步消息发送成功");
}
@Override
public void onException(Throwable throwable) {
log.error("订单编号:888888 创建,异步消息发送失败");
}
});
Thread.sleep(200);
rocketMqTemplate.asyncSendOrderly("orderly-topic", MessageBuilder.withPayload("异步顺序消息,订单编号:888888 支付").build(), "888888", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("订单编号:888888 支付,异步消息发送成功");
}
@Override
public void onException(Throwable throwable) {
log.error("订单编号:888888 支付,异步消息发送失败");
}
});
Thread.sleep(200);
rocketMqTemplate.asyncSendOrderly("orderly-topic", MessageBuilder.withPayload("异步顺序消息,订单编号:888888 确认收货").build(), "888888", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("订单编号:888888 确认收货,异步消息发送成功");
}
@Override
public void onException(Throwable throwable) {
log.error("订单编号:888888 确认收货,异步消息发送失败");
}
});
}
/**
* @date 2024/10/11 16:10
* @description 同步顺序超时消息
*/
public void syncSendOrderlyTimeOut() {
//hashKey 用来计算决定消息发送到哪个队列 一般是订单 ID 等信息 这里我们模拟订单 ID 发送
rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步带超时顺序消息,订单编号:666666 创建").build(), "666666",200);
rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步带超时顺序消息,订单编号:666666 支付").build(), "666666",200);
rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步带超时顺序消息,订单编号:666666 确认收货").build(), "666666",200);
rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步带超时顺序消息,订单编号:888888 创建").build(), "888888",200);
rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步带超时顺序消息,订单编号:888888 支付").build(), "888888",200);
rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步带超时顺序消息,订单编号:888888 确认收货").build(), "888888",200);
}
/**
* @date 2024/10/11 15:52
* @description 发送异步顺序超时消息
*/
public void asyncSendOrderlyTimeOut() throws InterruptedException {
//hashKey 用来计算决定消息发送到哪个队列 一般是订单 ID 等信息 这里我们模拟订单 ID 发送
rocketMqTemplate.asyncSendOrderly("orderly-topic", MessageBuilder.withPayload("异步带超时顺序消息,订单编号:666666 创建").build(), "666666", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("订单编号:666666 创建,异步消息发送成功");
}
@Override
public void onException(Throwable throwable) {
log.error("订单编号:666666 创建,异步消息发送失败");
}
},200);
Thread.sleep(200);
rocketMqTemplate.asyncSendOrderly("orderly-topic", MessageBuilder.withPayload("异步带超时顺序消息,订单编号:666666 支付").build(), "666666", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("订单编号:666666 支付,异步消息发送成功");
}
@Override
public void onException(Throwable throwable) {
log.error("订单编号:666666 支付,异步消息发送失败");
}
},200);
Thread.sleep(200);
rocketMqTemplate.asyncSendOrderly("orderly-topic", MessageBuilder.withPayload("异步带超时顺序消息,订单编号:666666 确认收货").build(), "666666", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("订单编号:666666 确认收货,异步消息发送成功");
}
@Override
public void onException(Throwable throwable) {
log.error("订单编号:666666 确认收货,异步消息发送失败");
}
},200);
Thread.sleep(200);
rocketMqTemplate.asyncSendOrderly("orderly-topic", MessageBuilder.withPayload("异步带超时顺序消息,订单编号:888888 创建").build(), "888888", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("订单编号:888888 创建,异步消息发送成功");
}
@Override
public void onException(Throwable throwable) {
log.error("订单编号:888888 创建,异步消息发送失败");
}
},200);
Thread.sleep(200);
rocketMqTemplate.asyncSendOrderly("orderly-topic", MessageBuilder.withPayload("异步带超时顺序消息,订单编号:888888 支付").build(), "888888", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("订单编号:888888 支付,异步消息发送成功");
}
@Override
public void onException(Throwable throwable) {
log.error("订单编号:888888 支付,异步超时消息发送失败");
}
},200);
Thread.sleep(200);
rocketMqTemplate.asyncSendOrderly("orderly-topic", MessageBuilder.withPayload("异步带超时顺序消息,订单编号:888888 确认收货").build(), "888888", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("订单编号:888888 确认收货,异步超时消息发送成功");
}
@Override
public void onException(Throwable throwable) {
log.error("订单编号:888888 确认收货,异步超时消息发送失败");
}
},200);
}
}
生产者代码分析:
我们在代码中分别模拟了单向顺序消息、同步顺序消息、异步顺序消息、同步带超时顺序消息、异步带超时顺序消息的发送,这里提一席一下异步顺序消息代码中的 sleep,因为异步消息是非阻塞的,这里我们又是模拟的业务,因此使用 sleep 操作来保证消费者消息发送的顺序性。
顺序消息消费者代码如下:
顺序消息的消费者代码和普通消息的的消费者代码一样,并没有什么特殊之处。
package com.order.service.rocketmq.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "one-way-group", topic = "one-way-topic")
public class OneWayMessageCousumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("单向消息消费成功:{}", message);
}
}
单向顺序消息验证:
单向顺序消息只是在单向消息的基础上加了一个顺序,单向顺序消息也是不关注消息发送结果的,。
2024-10-12 19:26:54.680 INFO 29852 --- [MessageThread_1] c.o.s.r.consumer.OrderlyMessageConsumer : 顺序消息消费成功:单向顺序消息,订单编号:666666 创建
2024-10-12 19:26:54.682 INFO 29852 --- [MessageThread_1] c.o.s.r.consumer.OrderlyMessageConsumer : 顺序消息消费成功:单向顺序消息,订单编号:666666 支付
2024-10-12 19:26:54.682 INFO 29852 --- [MessageThread_1] c.o.s.r.consumer.OrderlyMessageConsumer : 顺序消息消费成功:单向顺序消息,订单编号:666666 确认收货
消息是按照创建、支付、确认收货的流程消费的,符合预期。
同步顺序消息验证:
同步顺序消息和同步消息一样,是阻塞的。
2024-10-12 19:27:58.806 INFO 29852 --- [MessageThread_2] c.o.s.r.consumer.OrderlyMessageConsumer : 顺序消息消费成功:同步顺序消息,订单编号:666666 创建
2024-10-12 19:27:58.808 INFO 29852 --- [MessageThread_3] c.o.s.r.consumer.OrderlyMessageConsumer : 顺序消息消费成功:同步顺序消息,订单编号:666666 支付
2024-10-12 19:27:58.810 INFO 29852 --- [MessageThread_4] c.o.s.r.consumer.OrderlyMessageConsumer : 顺序消息消费成功:同步顺序消息,订单编号:666666 确认收货
2024-10-12 19:27:58.812 INFO 29852 --- [MessageThread_5] c.o.s.r.consumer.OrderlyMessageConsumer : 顺序消息消费成功:同步顺序消息,订单编号:888888 创建
2024-10-12 19:27:58.814 INFO 29852 --- [MessageThread_6] c.o.s.r.consumer.OrderlyMessageConsumer : 顺序消息消费成功:同步顺序消息,订单编号:888888 支付
2024-10-12 19:27:58.817 INFO 29852 --- [MessageThread_7] c.o.s.r.consumer.OrderlyMessageConsumer : 顺序消息消费成功:同步顺序消息,订单编号:888888 确认收货
消息是按照创建、支付、确认收货的流程消费的,同时模拟的两个订单的消息顺序也没有错乱,符合预期。
异步顺序消息验证:
单向顺序消息只是在单向消息的基础上加了一个顺序,单向顺序消息也是不关注消息发送结果的。
2024-10-12 19:33:35.681 INFO 29852 --- [ublicExecutor_1] c.o.s.r.producer.OrderlyMessageProducer : 订单编号:666666 创建,异步消息发送成功
2024-10-12 19:33:35.682 INFO 29852 --- [MessageThread_8] c.o.s.r.consumer.OrderlyMessageConsumer : 顺序消息消费成功:异步顺序消息,订单编号:666666 创建
2024-10-12 19:33:35.881 INFO 29852 --- [ublicExecutor_2] c.o.s.r.producer.OrderlyMessageProducer : 订单编号:666666 支付,异步消息发送成功
2024-10-12 19:33:35.882 INFO 29852 --- [MessageThread_9] c.o.s.r.consumer.OrderlyMessageConsumer : 顺序消息消费成功:异步顺序消息,订单编号:666666 支付
2024-10-12 19:33:36.082 INFO 29852 --- [ublicExecutor_3] c.o.s.r.producer.OrderlyMessageProducer : 订单编号:666666 确认收货,异步消息发送成功
2024-10-12 19:33:36.082 INFO 29852 --- [essageThread_10] c.o.s.r.consumer.OrderlyMessageConsumer : 顺序消息消费成功:异步顺序消息,订单编号:666666 确认收货
2024-10-12 19:33:36.282 INFO 29852 --- [ublicExecutor_4] c.o.s.r.producer.OrderlyMessageProducer : 订单编号:888888 创建,异步消息发送成功
2024-10-12 19:33:36.282 INFO 29852 --- [essageThread_11] c.o.s.r.consumer.OrderlyMessageConsumer : 顺序消息消费成功:异步顺序消息,订单编号:888888 创建
2024-10-12 19:33:36.483 INFO 29852 --- [ublicExecutor_5] c.o.s.r.producer.OrderlyMessageProducer : 订单编号:888888 支付,异步消息发送成功
2024-10-12 19:33:36.484 INFO 29852 --- [essageThread_12] c.o.s.r.consumer.OrderlyMessageConsumer : 顺序消息消费成功:异步顺序消息,订单编号:888888 支付
2024-10-12 19:33:36.683 INFO 29852 --- [ublicExecutor_6] c.o.s.r.producer.OrderlyMessageProducer : 订单编号:888888 确认收货,异步消息发送成功
2024-10-12 19:33:36.685 INFO 29852 --- [essageThread_13] c.o.s.r.consumer.OrderlyMessageConsumer : 顺序消息消费成功:异步顺序消息,订单编号:888888 确认收货
消息是按照创建、支付、确认收货的流程消费的,同时模拟的两个订单的消息顺序也没有错乱,符合预期。
同步带超时顺序消息验证:
同步带超时顺序消息在保证顺序的同时,可以设置一个超时时间。
2024-10-12 20:05:48.819 INFO 29984 --- [MessageThread_6] c.o.s.r.consumer.OrderlyMessageConsumer : 顺序消息消费成功:同步带超时顺序消息,订单编号:666666 创建
2024-10-12 20:05:48.820 INFO 29984 --- [MessageThread_7] c.o.s.r.consumer.OrderlyMessageConsumer : 顺序消息消费成功:同步带超时顺序消息,订单编号:666666 支付
2024-10-12 20:05:48.822 INFO 29984 --- [MessageThread_8] c.o.s.r.consumer.OrderlyMessageConsumer : 顺序消息消费成功:同步带超时顺序消息,订单编号:666666 确认收货
2024-10-12 20:05:48.824 INFO 29984 --- [MessageThread_9] c.o.s.r.consumer.OrderlyMessageConsumer : 顺序消息消费成功:同步带超时顺序消息,订单编号:888888 创建
2024-10-12 20:05:48.826 INFO 29984 --- [essageThread_10] c.o.s.r.consumer.OrderlyMessageConsumer : 顺序消息消费成功:同步带超时顺序消息,订单编号:888888 支付
2024-10-12 20:05:48.829 INFO 29984 --- [essageThread_11] c.o.s.r.consumer.OrderlyMessageConsumer : 顺序消息消费成功:同步带超时顺序消息,订单编号:888888 确认收货
消息是按照创建、支付、确认收货的流程消费的,同时模拟的两个订单的消息顺序也没有错乱,符合预期。
异步带超时顺序消息验证:
异步带超时顺序消息在保证顺序的同时,可以设置一个超时时间。
2024-10-12 20:08:13.126 INFO 29984 --- [ublicExecutor_1] c.o.s.r.producer.OrderlyMessageProducer : 订单编号:666666 创建,异步消息发送成功
2024-10-12 20:08:13.127 INFO 29984 --- [essageThread_12] c.o.s.r.consumer.OrderlyMessageConsumer : 顺序消息消费成功:异步带超时顺序消息,订单编号:666666 创建
2024-10-12 20:08:13.327 INFO 29984 --- [ublicExecutor_2] c.o.s.r.producer.OrderlyMessageProducer : 订单编号:666666 支付,异步消息发送成功
2024-10-12 20:08:13.327 INFO 29984 --- [essageThread_13] c.o.s.r.consumer.OrderlyMessageConsumer : 顺序消息消费成功:异步带超时顺序消息,订单编号:666666 支付
2024-10-12 20:08:13.528 INFO 29984 --- [ublicExecutor_3] c.o.s.r.producer.OrderlyMessageProducer : 订单编号:666666 确认收货,异步消息发送成功
2024-10-12 20:08:13.529 INFO 29984 --- [essageThread_14] c.o.s.r.consumer.OrderlyMessageConsumer : 顺序消息消费成功:异步带超时顺序消息,订单编号:666666 确认收货
2024-10-12 20:08:13.729 INFO 29984 --- [ublicExecutor_4] c.o.s.r.producer.OrderlyMessageProducer : 订单编号:888888 创建,异步消息发送成功
2024-10-12 20:08:13.729 INFO 29984 --- [essageThread_15] c.o.s.r.consumer.OrderlyMessageConsumer : 顺序消息消费成功:异步带超时顺序消息,订单编号:888888 创建
2024-10-12 20:08:13.929 INFO 29984 --- [ublicExecutor_5] c.o.s.r.producer.OrderlyMessageProducer : 订单编号:888888 支付,异步消息发送成功
2024-10-12 20:08:13.930 INFO 29984 --- [essageThread_16] c.o.s.r.consumer.OrderlyMessageConsumer : 顺序消息消费成功:异步带超时顺序消息,订单编号:888888 支付
2024-10-12 20:08:14.130 INFO 29984 --- [ublicExecutor_6] c.o.s.r.producer.OrderlyMessageProducer : 订单编号:888888 确认收货,异步超时消息发送成功
2024-10-12 20:08:14.130 INFO 29984 --- [essageThread_17] c.o.s.r.consumer.OrderlyMessageConsumer : 顺序消息消费成功:异步带超时顺序消息,订单编号:888888 确认收货
RocketMQ 消费端保证消息顺序性揭秘
前面我们分析了消费端 Consumer 保持顺序性,需要使用有序消费模式 ConsumeMode.ORDERLY(不要使用并发消费模式ConsumeMode.CONCURRENTLY),使用有序消费模式只是一个简单的总结,我们想一下在生产环境我们几乎都是多节点部署,那是否每个节点都会去消费顺序消息呢?
如果每个节点都去消费顺序消息是无法保证消息顺序的,RocketMQ 底层使用了分布式锁,消费端在初始化的时候,会为自己申请一个分布式锁,只有锁获取成功了,才可以进行消费,而在接下来的消费过程中才是使用单线程消费队列中的消息,这一步也是需要加锁的,通过加锁来保证一个队列只有一个线程来消费,你以为这就完了吗?为了保证消息的顺序,还要对处理的队列加锁,这次加锁是为了确保在重平衡的过程中消息不会被重复消费。
重平衡问题
当我们的消费者集群重新加入了节点的时候,这个时候就会发生重平衡,重平衡之前的队列可能属于 A 节点来消费,假设重平衡之后该队列属于 B 节点来消费,此时节点 A 就需要把自己加在 Broker 上的锁解锁,让节点 B 来申请锁,如果此时节点 A 拉取了消息正在消费但是还没有提交 ACK,而节点 B 马上又去拉取消息消费,此时就可能会出现重复消费的情况,那如何避免这种情况呢,这时就需要通过处理队列上的锁来判断了,只有当处理队列上的锁释放了,其他节点才可以正常消费该消息,这样就避免了消息重复消费。
下面附上一张极客某专栏的一张流程图,过程描述的比较详细,可供参考:
RocketMQ 顺序消息的问题?
通过上面的介绍,我们可以很明显的发现一个问题,那就是顺序消息的效率较低,顺序消息的实现是通过多次加锁来实现的,这会明显降低消息吞吐率,同时如果前面的消息阻塞了,会导致后面所有的消息阻塞,因此非必要不要使用 RocketMQ 的顺序消息。
如有不正确的地方欢迎各位指出纠正。