一、主要内容
本文将实现一个MQ延迟消息的通用方案。
方案不依赖于MQ中间件,依靠MySQL和DelayQueue解决,不管大家用的是什么MQ,具体是RocketMQ、RabbitMQ还是kafka,本文这个方案你都可以拿去直接使用,可以轻松实现任意时间的延迟消息投递。
二、涉及技术点
- SpringBoot2.7
- MyBatisPlus
- MySQL
- 线程池
- java中的延迟队列:DelayQueue
- 分布式锁
三、延迟消息常见的使用场景
- 订单超时处理
比如下单后15分钟,未支付,则自动取消订单,回退库存。
可以采用延迟队列实现:下单的时候可以投递一条延迟15分钟的消息,15分钟后消息将被消费。
- 消息消费失败重试
比如MQ消息消费失败后,可以延迟一段时间再次消费。
可以采用延迟消息实现:消费失败,可以投递一条延迟消息,触发再次消费
- 其他任意需要延迟处理的业务
业务中需要延迟处理的场景,都可以使用延迟消息来搞定。
四、延迟消息常见的实现方案
方案1:MySQL + job定时轮询
由于延迟消息的时间不确定,若要达到实时性很高的效果,也就是说消息的延迟时间是不知道的,那就需要轮询每一秒才能确保消息在指定的延迟时间被处理,这就要求job需要每秒查询一次db中待投递的消息。
这种方案访问db的频率比较高,对数据库造成了一定的压力。
方案2:RabbitMQ 中的TTL+死信队列
rabbitmq中可以使用TTL消息 + 死信队列实现,也可以安装延时插件。
此方案对中间件有依赖,不同的MQ实现是不一样的,若换成其他的MQ,方案要重新实现
方案3:MySQL + job定时轮询 + DelayQueue
可以对方案1进行改进,引入java中的 DelayQueue。
job可以采用1分钟执行一次,每次拉取未来2分钟内需要投递的消息,将其丢到java自带的 DelayQueue 这个延迟队列工具类中去处理,这样便能做到实时性很高的投递效果,且对db的压力也降低了很多。
这种方案对db也没什么压力,实时性非常高,且对MQ没有依赖,这样不管切换什么MQ,这种方案都不需要改动。
本文将落地该方案。
需要一张本地消息表(t_msg)
这张表用来暂存事务消息和延迟消息
create table if not exists t_msg
(
id varchar(32) not null primary key comment '消息id',
body_json text not null comment '消息体,json格式',
status smallint not null default 0 comment '消息状态,0:待投递到mq,1:投递成功,2:投递失败',
expect_send_time datetime not null comment '消息期望投递时间,大于当前时间,则为延迟消息,否则会立即投递',
actual_send_time datetime comment '消息实际投递时间',
create_time datetime comment '创建时间',
fail_msg text comment 'status=2 时,记录消息投递失败的原因',
fail_count int not null default 0 comment '已投递失败次数',
send_retry smallint not null default 1 comment '投递MQ失败了,是否还需要重试?1:是,0:否',
next_retry_time datetime comment '投递失败后,下次重试时间',
update_time datetime comment '最近更新时间',
key idx_status (status)
) comment '本地消息表';
五、代码落地
将延迟消息保存到t_msg
投递事务消息 or 2分钟内的延时消息
每分钟执行一次Job,查出2分钟内应该被投递的延迟消息 和 2分钟内应该重新投递的上次投递失败的事务消息,再放到延时队列里
/**
* 每分钟执行一次
*/
@Scheduled(fixedDelay = 1, timeUnit = TimeUnit.MINUTES)
public void sendRetry() {
/**
* 查询出需要重试的消息(状态为0 and 期望投递时间 <= 当前时间 + 2分钟) || (投递失败的 and 需要重试 and 下次重试时间小于等于当前时间 + 2分钟)
* select * from t_msg where ((status = 0 and expect_send_time<=当前时间+2分钟) or (status = 2 and send_retry = 1 and next_retry_time <= 当前时间 + 2分钟))
*/
LocalDateTime time = LocalDateTime.now().plusMinutes(2);
LambdaQueryWrapper<MsgPO> qw = Wrappers.lambdaQuery(MsgPO.class)
.and(query -> query.and(lq ->
lq.eq(MsgPO::getStatus, MsgStatusEnum.INIT.getStatus())
.le(MsgPO::getExpectSendTime, time))
.or(lq -> lq.eq(MsgPO::getStatus, MsgStatusEnum.FAIL.getStatus())
.eq(MsgPO::getSendRetry, 1)
.le(MsgPO::getNextRetryTime, time)));
qw.orderByAsc(MsgPO::getId);
//先获取最小的一条记录的id
MsgPO minMsgPo = this.msgService.findOne(qw);
if (minMsgPo == null) {
return;
}
this.msgSender.sendRetry(minMsgPo);
String minMsgId = minMsgPo.getId();
//循环中继续向后找出id>minMsgId的所有记录,然后投递重试
while (true) {
//select * from t_msg where ((status = 0 and expect_send_time<=当前时间+2分钟) or (status = 2 and send_retry = 1 and next_retry_time <= 当前时间 + 2分钟)) and id>#{minMsgId}
qw = Wrappers.lambdaQuery(MsgPO.class)
.and(query -> query.and(lq ->
lq.eq(MsgPO::getStatus, MsgStatusEnum.INIT.getStatus()).le(MsgPO::getExpectSendTime, time))
.or(lq -> lq.eq(MsgPO::getStatus, MsgStatusEnum.FAIL.getStatus()).eq(MsgPO::getSendRetry, 1).le(MsgPO::getNextRetryTime, time)));
qw.gt(MsgPO::getId, minMsgId);
qw.orderByAsc(MsgPO::getId);
Page<MsgPO> page = new Page<>();
page.setCurrent(1);
page.setSize(500);
this.msgService.page(page, qw);
//如果查询出来的为空 || 当前服务要停止了(stop=true),则退出循环
if (CollUtils.isEmpty(page.getRecords()) || stop) {
break;
}
//投递重试
for (MsgPO msgPO : page.getRecords()) {
this.msgSender.sendRetry(msgPO);
}
// minMsgId = 当前列表最后一条消息的id
minMsgId = page.getRecords().get(page.getRecords().size() - 1).getId();
}
}