一、什么是顺序消息
投递消息的顺序和消费消息的顺序一致。
比如生产者按顺序投递了1/2/3/4/5 这 5 条消息,那么消费的时候也必须按照1到5的顺序消费这些消息。
二、顺序消息如何实现?(2种方案)
- 方案1:生产者串行发送+同一个队列+消费者单线程消费
- 方案2:生产方消息带上连续递增编号+同一个队列+消费按编号顺序消费消息
2.1 方案1:生产者串行发送+同一个队列+消费者单线程消费
2.1.1 生产者串行发送消息
- 使用分布式锁,让消息投递串行执行
- 确保顺序消息到达同一个队列
2.1.2 消费者单线程消费
- 消费者这边只能有一个线程,拉取一个消费一个,消费完成后再拉取下一个消费
2.2 方案2:生产方消息带上连续递增编号+同一个队列+消费按编号顺序消费消息
2.2.1 生产方消息带上连续递增编号
- 顺序消息携带连续递增的编号,从1开始,连续递增,比如发送了3条消息,编号分别是1、2、3,后面再投递消息,编号就从4开始了
- 确保顺序消息到达同一个队列
2.2.2 消费方按照编号顺序消费消息
- 消费方需要记录消息消费的位置:当前轮到哪个编号的消息了
- 收到消息后,将消息的编号和当前消费的位置对比下,是不是轮着这条消息消费了,如果是则进行消费,如果不是,则排队等待,等待前一个到达后,且消费成功后,将自己唤醒进行消费
这里举个例子,如下
-
生产者发送了编号为看1、2、3 的3条消息
-
到达的消费端顺序刚好相反,3先到,发现没有轮到自己,会进行排队
-
然后2到了,发现也没有轮到自己,也会排队
-
然后过了一会1到了,发现轮到自己了,然后1被消费了
-
1消费后,会唤醒下一个编号的2进行消费
-
2消费完了,会唤醒下一个编号的3进行消费。
本文我们会落地方案2。
三、代码
-- 创建订单表
drop table if exists t_order_lesson034;
create table if not exists t_order_lesson034
(
id varchar(32) not null primary key comment '订单id',
goods varchar(100) not null comment '商品',
price decimal(12, 2) comment '订单金额'
) comment '订单表';
-- 创建本地消息表
drop table if exists t_msg_lesson034;
create table if not exists t_msg_lesson034
(
id varchar(32) not null primary key comment '消息id',
exchange varchar(100) comment '交换机',
routing_key varchar(100) comment '路由key',
body_json text not null comment '消息体,json格式',
status smallint not null default 0 comment '消息状态,0:待投递到mq,1:投递成功,2:投递失败',
expect_send_time datetime not null comment '消息期望投递时间,大于当前时间,则为延迟消息,否则会立即投递',
actual_send_time datetime comment '消息实际投递时间',
create_time datetime comment '创建时间',
fail_msg text comment 'status=2 时,记录消息投递失败的原因',
fail_count int not null default 0 comment '已投递失败次数',
send_retry smallint not null default 1 comment '投递MQ失败了,是否还需要重试?1:是,0:否',
next_retry_time datetime comment '投递失败后,下次重试时间',
update_time datetime comment '最近更新时间',
key idx_status (status)
) comment '本地消息表';
-- 创建消息和消费者关联表
drop table if exists t_msg_consume_lesson034;
create table if not exists t_msg_consume_lesson034
(
id varchar(32) not null primary key comment '消息id',
producer varchar(100) not null comment '生产者名称',
producer_bus_id varchar(100) not null comment '生产者这边消息的唯一标识',
consumer_class_name varchar(300) not null comment '消费者完整类名',
queue_name varchar(100) not null comment '队列名称',
body_json text not null comment '消息体,json格式',
status smallint not null default 0 comment '消息状态,0:待消费,1:消费成功,2:消费失败',
create_time datetime comment '创建时间',
fail_msg text comment 'status=2 时,记录消息消费失败的原因',
fail_count int not null default 0 comment '已投递失败次数',
consume_retry smallint not null default 1 comment '消费失败后,是否还需要重试?1:是,0:否',
next_retry_time datetime comment '投递失败后,下次重试时间',
update_time datetime comment '最近更新时间',
key idx_status (status),
unique uq_msg (producer, producer_bus_id, consumer_class_name)
) comment '消息和消费者关联表';
drop table if exists t_msg_consume_log_lesson034;
create table if not exists t_msg_consume_log_lesson034
(
id varchar(32) not null primary key comment '消息id',
msg_consume_id varchar(32) not null comment '消息和消费者关联记录',
status smallint not null default 0 comment '消费状态,1:消费成功,2:消费失败',
create_time datetime comment '创建时间',
fail_msg text comment 'status=2 时,记录消息消费失败的原因',
key idx_msg_consume_id (msg_consume_id)
) comment '消息消费日志';
-- 幂等辅助表
drop table if exists t_idempotent_lesson034;
create table if not exists t_idempotent_lesson034
(
id varchar(50) primary key comment 'id,主键',
idempotent_key varchar(500) not null comment '需要确保幂等的key',
unique key uq_idempotent_key (idempotent_key)
) comment '幂等辅助表';
-- 顺序消息编号生成器
drop table if exists t_sequential_msg_number_generator_lesson034;
create table if not exists t_sequential_msg_number_generator_lesson034
(
id varchar(50) primary key comment 'id,主键',
group_id varchar(256) not null comment '组id',
numbering bigint not null comment '消息编号',
version bigint not null default 0 comment '版本号,每次更新+1',
unique key uq_group_id (group_id)
) comment '顺序消息排队表';
-- 顺序消息消费信息表,(group_id、queue_name)中的消息消费到哪里了?
drop table if exists t_sequential_msg_consume_position_lesson034;
create table if not exists t_sequential_msg_consume_position_lesson034
(
id varchar(50) primary key comment 'id,主键',
group_id varchar(256) not null comment '组id',
queue_name varchar(100) not null comment '队列名称',
consume_numbering bigint default 0 not null comment '当前消费位置的编号',
version bigint not null default 0 comment '版本号,每次更新+1',
unique key uq_group_queue (group_id, queue_name)
) comment '顺序消息消费信息表';
-- 顺序消息排队表
drop table if exists t_sequential_msg_queue_lesson034;
create table if not exists t_sequential_msg_queue_lesson034
(
id varchar(50) primary key comment 'id,主键',
group_id varchar(256) not null comment '组id',
numbering bigint not null comment '消息编号',
queue_name varchar(100) not null comment '队列名称',
msg_json text not null comment '消息json格式',
create_time datetime comment '创建时间',
unique key uq_group_number_queue (group_id, numbering, queue_name)
) comment '顺序消息排队表';
3.1 发送顺序消息
如下模拟发送订单相关的5条顺序消息
com.itsoku.lesson034.controller.TestController#sendSequential
@PostMapping("/sendSequential")
public Result<Void> sendSequential() {
String orderId = IdUtil.fastSimpleUUID();
List<String> list = Arrays.asList("订单创建消息",
"订单支付消息",
"订单已发货",
"买家确认收货",
"订单已完成");
for (String type : list) {
msgSender.sendSequentialWithBody(orderId,
RabbitMQConfiguration.Order.EXCHANGE,
RabbitMQConfiguration.Order.ROUTING_KEY,
OrderMsg.builder().orderId(orderId).type(type).build());
}
return ResultUtils.success();
}
3.2 消息按顺序消费
3.2.1 消费者拉取消息时,先把消息连同顺序的编号保存在表t_sequential_msg_queue里
3.2.2 从t_sequential_msg_queue表里取出消息进行消费
有个疑问:
- 为什么加锁失败要去触发重试?比如消费者1,2,3,4,5分别拿到了编号为1,2,3,4,5的消息,1,2,3,4,5都已经入库了,这时候只需要一个加锁成功的线程就能消费全部消息了,因为有个循环。
- 假如有5个线程,那么每次加锁势必会有至多4个线程获取不到锁,那么消息会重新投递4次,需要这样做吗?
- 比如消费者1,2,3,4,5分别拿到了编号为1,2,3,4,5的消息,2,3,4,5都已经入库了,编号为1的消息还没有入库。这时候消费者2获得了锁,1,3,4,5都还没有获得锁,但消息队列表里编号最小的是2,所以比较编号的时候没轮到消费者2,直接break退出循环了。