什么是顺序消费
例如:业务上产生者发送三条消息, 分别是对同一条数据的增加、修改、删除操作, 如果没有保证顺序消费,执行顺序可能变成删除、修改、增加,这就乱了。
如何保证顺序性
一般我们讨论如何保证消息的顺序性,会从下面三个方面考虑
1:发送消息的顺序
2:队列中消息的顺序
3:消费消息的顺序
发送消息的顺序
消息发送端的顺序,大部分业务不做要求,谁先发消息无所谓,如果遇到业务一定要发送消息也确保顺序,那意味着,只能全局加锁一个个的操作,一个个的发消息,不能并发发送消息。
队列中消息的顺序
RabbitMQ 中,消息最终会保存在队列中,在同一个队列中,消息是顺序的,先进先出原则,这个由 RabbitMQ 保证,通常也不需要开发关心。
不同队列 中的消息顺序,是没有保证的,例如:进地铁站的时候,排了三个队伍,不同队伍之间的,不能确保谁先进站。
消费消息的顺序
我们说如何保证消息顺序性,通常说的就是消费者消费消息的顺序,在多个消费者消费同一个消息队列的场景,通常是无法保证消息顺序的,
虽然消息队列的消息是顺序的,但是多个消费者并发消费消息,获取的消息的速度、执行业务逻辑的速度快慢、执行异常等等原因都会导致消息顺序不一致。
例如:消息A、B、C按顺序进入队列,消费者A1拿到消息A、消费者B1拿到消息B, 结果消费者B执行速度快,就跑完了,又或者消费者A1挂了,都会导致消息顺序不一致。
解决消费顺序的问题, 通常就是一个队列只有一个消费者 , 这样就可以一个个消息按顺序处理, 缺点就是并发能力下降了,无法并发消费消息,这是个取舍问题。
如果业务又要顺序消费,又要增加并发,通常思路就是开启多个队列,业务根据规则将消息分发到不同的队列,通过增加队列的数量来提高并发度,例如:电商订单场景,只需要保证同一个用户的订单消息的顺序性就行,不同用户之间没有关系,所以只要让同一个用户的订单消息进入同一个队列就行,其他用户的订单消息,可以进入不同的队列。
以下为代码设计过程实现
首先我们必须保证只有一个消费者 那么问题就来了,我们的项目一般是多副本的,如何保证只有一个副本在消费呢
这时就会用到消费者 单活模式 x-single-active-consumer
使用下述配置实现
private Queue creatQueue(String name){
// 创建一个 单活模式 队列
HashMap<String, Object> args=new HashMap<>();
args.put("x-single-active-consumer",true);
return new Queue(name,true,false,false,args);
}
创建之后,我们可以在控制台看到 消费者的激活状态
=======================>配置类
@Configuration
@SuppressWarnings("all")
public class DirectExchangeConfiguration {
@Bean
public Queue queue15_0() {
return creatQueue(Message15.QUEUE_0);
}
@Bean
public Queue queue15_1() {
return creatQueue(Message15.QUEUE_1);
}
@Bean
public Queue queue15_2() {
return creatQueue(Message15.QUEUE_2);
}
@Bean
public Queue queue15_3() {
return creatQueue(Message15.QUEUE_3);
}
@Bean
public DirectExchange exchange15() {
// name: 交换机名字 | durable: 是否持久化 | exclusive: 是否排它
return new DirectExchange(Message15.EXCHANGE, true, false);
}
@Bean
public Binding binding15_0() {
return BindingBuilder.bind(queue15_0()).to(exchange15()).with("0");
}
@Bean
public Binding binding15_1() {
return BindingBuilder.bind(queue15_1()).to(exchange15()).with("1");
}
@Bean
public Binding binding15_2() {
return BindingBuilder.bind(queue15_2()).to(exchange15()).with("2");
}
@Bean
public Binding binding15_3() {
return BindingBuilder.bind(queue15_3()).to(exchange15()).with("3");
}
/**
* 创建一个 单活 模式的队列
* 注意 :
* <p>
* 如果一个队列已经创建为非x-single-active-consumer,而你想更改其为x-single-active-consumer,要把之前创建的队列删除
*
* @param name
* @return queue
*/
private Queue creatQueue(String name) {
// 创建一个 单活模式 队列
HashMap<String, Object> args = new HashMap<>();
args.put("x-single-active-consumer", true);
return new Queue(name, true, false, false, args);
}
=================================》生产者
@Component
@Slf4j
public class Producer15 {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 这里的发送是 拟投递到多个队列中
*
* @param id 业务id
* @param msg 业务信息
*/
public void syncSend(int id, String msg) {
Message15 message = new Message15(id, msg);
rabbitTemplate.convertAndSend(Message15.EXCHANGE, this.getRoutingKey(id), message);
}
/**
* 根据 id 取余来决定丢到那个队列中去
*
* @param id id
* @return routingKey
*/
private String getRoutingKey(int id) {
return String.valueOf(id % Message15.QUEUE_COUNT);
}
}
============================》消费者
/**
* 要想保证消息的顺序,每个队列只能有一个消费者
*
* @author 深漂码农@明哥
* @date 2024-03-18
*/
@Component
@RabbitListener(queues = Message15.QUEUE_0)
@RabbitListener(queues = Message15.QUEUE_1)
@RabbitListener(queues = Message15.QUEUE_2)
@RabbitListener(queues = Message15.QUEUE_3)
@Slf4j
public class Consumer15 {
@RabbitHandler
public void onMessage(Message15 message) throws InterruptedException {
log.info("[{}][Consumer15 onMessage][线程编号:{} 消息内容:{}]", LocalDateTime.now(), Thread.currentThread().getId(), message);
// 这里随机睡一会,模拟业务处理时候的耗时
long l = new Random(1000).nextLong();
TimeUnit.MILLISECONDS.sleep(l);
}
}
==============================》测试类
@Test
void mock() throws InterruptedException {
// 先启动这个测试类,模拟多个副本情况下,看如何消费
new CountDownLatch(1).await();
}
@Test
void syncSend() throws InterruptedException {
// 模拟每个队列中扔 10 个数据,看看效果
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 4; j++) {
producer15.syncSend(j, " 编号:" + j + " 第:" + i + " 条消息");
}
}
TimeUnit.SECONDS.sleep(20);
}
}
ps:测试的时候时候 先启动 mock 方式。 在启动 syncSend 方法,模拟多个副本同时消费,观察是否可以
以上的是RabbitMQ之顺序消费实现的代码 若不了解rabbitmq的基本使用 建议先看看我前面对应的文章 文章链接:点我—>let’s go
若需完整代码 可识别二维码后 给您发代码。