1. 问题
2. 延时任务
2.1 什么是延时任务
在当前时间往后延迟多少时间执行的任务
2.1.1 和定时任务区别
- 定时任务有明确的触发时间,延时任务没有
- 定时任务有执行周期,而延时任务在某事件触发后一段时间内执行,没有执行周期
- 定时任务一般执行的是批处理操作是多个任务,而延时任务一般是单个任务
2.2 延时队列使用场景
- 订单在十分钟之内未支付则自动取消。
- 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
- 账单在一周内未支付,则自动结算。
- 用户注册成功后,如果三天内没有登陆则进行短信提醒。
- 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
- 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
如美团点餐,超时时间
2.3 常见方案
2.3.1 数据库轮询
该方案通常是在小型项目中使用,即通过一个线程定时的去扫描数据库,通过订单时间来判断是否有超时的订单,然后进行update或delete等操作
优点
代码简单,复杂度小
缺点
- 对服务器内存消耗大
- 存在延迟,比如你每隔3分钟扫描一次,那最坏的延迟时间就是3分钟
- 假设你的订单有几千万条,每隔几分钟这样扫描一次,数据库损耗极大
2.3.1 JDK的延迟队列
该方案是利用JDK自带的DelayQueue来实现,这是一个无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素,放入DelayQueue中的对象,是必须实现Delayed接口的。
优点
效率高,任务触发时间延迟低。
缺点
- 服务器重启后,数据全部消失,怕宕机
- 集群扩展相当麻烦
- 因为内存条件限制的原因,比如下单未付款的订单数太多,那么很容易就出现OOM异常
- 代码复杂度较高
2.3.3 netty时间轮算法
时间轮算法可以类比于时钟,如图箭头(指针)按某一个方向按固定频率轮动,每一次跳动称为一个 tick
这样可以看出定时轮由3个重要的属性参数,ticksPerWheel(一轮的tick数),tickDuration(一个tick的持续时间)以及 timeUnit(时间单位),例如当ticksPerWheel=60,tickDuration=1,timeUnit=秒,这就和现实中的始终的秒针走动完全类似了。
如果当前指针指在1上面,我有一个任务需要4秒以后执行,那么这个执行的线程回调或者消息将会被放在5上。那如果需要在20秒之后执行怎么办,由于这个环形结构槽数只到8,如果要20秒,指针需要多转2圈,位置是在2圈之后的5上面(20 % 8 + 1)
优点
效率高,任务触发时间延迟时间比delayQueue低,代码复杂度比delayQueue低。
缺点
- 服务器重启后,数据全部消失,怕宕机
- 集群扩展相当麻烦
- 因为内存条件限制的原因,比如下单未付款的订单数太多,那么很容易就出现OOM异常
2.3.4 使用消息队列
可以采用RabbitMQ的延时队列,RabbitMQ具有以下两个特性,可以实现延迟队列
- RabbitMQ可以针对Queue和Message设置 x-message-ttl,来控制消息的生存时间,如果超时,则消息变为
dead letter
- RabbitMQ的Queue可以配置
x-dead-letter-exchange
和x-dead-letter-routing-key
(可选)两个参数,用来控制队列内出现了dead letter
,则按照这两个参数重新路由。
优点
高效,可以利用rabbitmq的分布式特性轻易的进行横向扩展,消息支持持久化增加了可靠性。
缺点
本身的易用度要依赖于RabbitMq的运维,因为要引用RabbitMq,所以复杂度和成本变高
2.4 延时队列
RabbitMQ中没有对消息延迟进行实现,但是可以通过TTL以及死信路由来实现消息延迟。
还有一种使用官方自带的插件, 插件的方式参考: 跳转
2.4.1 TTL(消息过期时间)
在介绍延时队列之前,还需要先介绍一下RabbitMQ中的一个高级特性——
TTL(Time To Live)
。
TTL
是RabbitMQ中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间
,单位是毫秒,换句话说,如果一条消息设置了TTL属性或者进入了设置TTL属性的队列,那么这条消息如果在TTL设置的时间内没有被消费,则会成为“死信”,如果不设置TTL,表示消息永远不会过期,如果将TTL设置为0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。
2.4.1.1 配置队列TTL
一种是在创建队列的时候设置队列的“x-message-ttl”属性
@Bean
public Queue taxiOverQueue() {
Map<String, Object> args = new HashMap<>(2);
args.put("x-message-ttl", 30000);
return QueueBuilder.durable(TAXI_OVER_QUEUE).withArguments(args).build();
}
这样所有被投递到该队列的消息都最多不会存活超过30s,如果没有任何处理,消息会被丢弃,如果配置有死信队列,超时的消息会被投递到死信队列
2.5 死信队列
2.5.1 什么是死信队列
顾名思义就是无法被消费的消息
一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信,自然就有了死信队列;
2.5.2 死信队列使用场景
RabbitMQ中的死信交换器(dead letter exchange)可以接收下面三种场景中的消息:
- 消费者对消息使用了
basicReject
或者basicNack
回复,并且requeue
参数设置为false
,即不再将该消息重新在消费者间进行投递 - 消息在队列中超时,RabbitMQ可以在单个消息或者队列中设置
TTL
属性 - 队列中的消息已经超过其设置的最大消息个数
2.5.3 死信队列如何使用
死信交换器不是默认的设置,这里是被投递消息被拒绝后的一个可选行为,是在创建队列的时进行声明的,往往用在对问题消息的诊断上。
死信交换器仍然只是一个普通的交换器,创建时并没有特别要求和操作,在创建队列的时候,声明该交换器将用作保存被拒绝的消息即可,相关的参数是x-dead-letter-exchange
。
2.5.4 相关代码
@Bean
public Queue taxiOverQueue() {
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", TAXI_DEAD_QUEUE_EXCHANGE);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", TAXI_DEAD_KEY);
return QueueBuilder.durable(TAXI_OVER_QUEUE).withArguments(args).build();
}
2.6 延迟消息处理
2.6.1 延迟消息实现
在创建队列的时候配置死信交换器并设置队列的“x-message-ttl”属性
@Bean
public Queue taxiDeadQueue() {
return new Queue(TAXI_DEAD_QUEUE,true);
}
@Bean
public Queue taxiOverQueue() {
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", TAXI_DEAD_QUEUE_EXCHANGE);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", TAXI_DEAD_KEY);
// x-message-ttl 声明队列的TTL
args.put("x-message-ttl", 30000);
return QueueBuilder.durable(TAXI_OVER_QUEUE).withArguments(args).build();
}
这样所有被投递到该队列的消息都最多不会存活超过30s,超时后的消息会被投递到死信交换器
3. RabbitMQ消息可靠性保障
消息的可靠性投递是使用消息中间件不可避免的问题
从上面的图可以看到,消息的投递有三个对象参与:
- 生产者
- broker
- 消费者
3.1 生产者保证
生产者发送消息到broker时,要保证消息的可靠性,主要的方案有以下2种
- 失败通知
- 发送方确认
3.1.1 RabbitMQ流程
生产者通过指定一个 exchange 和 routingkey 把消息送达到某个队列中去,然后消费者监听队列,进行消费处理, 但是在某些情况下,如果在发送消息时,当前的 exchange 不存在或者指定的 routingkey 路由不到,这个时候如果要监听这种不可达的消息,这个时候就需要失败通知。
不做任何配置的情况下,生产者是不知道消息是否真正到达RabbitMQ,也就是说消息发布操作不返回任何消息给生产者。
3.1.2 失败通知
如果出现消息无法投递到队列会出现失败通知
可以启动失败通知,在原生编程中在发送消息时设置mandatory
标志,即可开启故障检测模式。
注意:它只会通知失败,而不会通知成功,如果消息正确路由到队列,则发布者不会受到任何通知,带来的问题是无法确保发布消息一定是成功的,因为通知失败的消息可能会丢失
3.1.2.1 实现方式
spring配置
spring:
rabbitmq:
# 消息在未被队列收到的情况下返回
publisher-returns: true
关键代码,注意需要发送者实现
ReturnCallback
接口方可实现失败通知
/**
* 失败通知
* 队列投递错误应答
* 只有投递队列错误才会应答
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
//消息体为空直接返回
if (null == message) {
return;
}
TaxiBO taxiBO = JSON.parseObject(message.getBody(), TaxiBO.class);
if (null != taxiBO) {
//删除rediskey
redisHelper.handelAccountTaxi(taxiBO.getAccountId());
//记录错误日志
recordErrorMessage(taxiBO, replyText, exchange, routingKey, message, replyCode);
}
}
3.1.2.2 遇到的问题问题
如果消息正确路由到队列,则发布者不会受到任何通知,带来的问题是无法确保发布消息一定是成功的,因为路由到队列的消息可能会丢失
3.1.3 发送发确认
发送方确认是指生产者投递消息后,如果 Broker 接收到消息,则会给生产者一个应答,生产者进行接收应答,用来确认这条消息是否正常的发送到 Broker,这种方式也是消息可靠性投递的核心保障
rabbitmq消息发送分为两个阶段:
- 将消息发送到broker,即发送到exchage交换机
- 消息通过交换机exchange被路由到队列queue
一旦消息投递到队列,队列则会向生产者发送一个通知,如果设置了消息持久化到磁盘,则会等待消息持久化到磁盘之后再发送通知
注意:发送发确认只有出现RabbitMQ内部错误无法投递才会出现发送发确认失败。
发送方确认模式需要分两种情况下列来看
3.1.3.1 不可路由
当前消息到达交换器后对于发送者确认是成功的
首先当RabbitMQ交换器不可路由时,消息也根本不会投递到队列中,所以这里只管到交换器的路径,当消息成功送到交换器后,就会进行确认操作。
另外在这过程中,生产者收到了确认消息后,那么因为消息无法路由,所以该消息也是无效的,无法投递到队列,所以一般情况下这里会结合失败通知来一同使用,这里一般会进行设置mandatory
模式,失败则会调用addReturnListener监听器来进行处理。
发送方确认模式的另一种情况肯定就是消息可以进行路由
3.1.3.2 可以路由
只要消息能够到达队列即可进行确认,一般是RabbitMQ发生内部错误才会出现失败
可以路由的消息,要等到消息被投递到所有匹配的队列之后,broker会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了。
如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号。
3.1.3.3 使用方式
spring配置
spring:
rabbitmq:
# 开启消息确认机制
publisher-confirm-type: correlated
关键代码,注意需要发送者实现
ConfirmCallback
接口方可实现失败通知
/**
* 发送发确认
* 交换器投递后的应答
* 正常异常都会进行调用
*
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//只有异常的数据才需要处理
if (!ack) {
//关联数据为空直接返回
if (correlationData == null) {
return;
}
//检查返回消息是否为null
if (null != correlationData.getReturnedMessage()) {
TaxiBO taxiBO = JSON.parseObject(correlationData.getReturnedMessage().getBody(), TaxiBO.class);
//处理消息还原用户未打车状态
redisHelper.handelAccountTaxi(taxiBO.getAccountId());
//获取交换器
String exchange = correlationData.getReturnedMessage().getMessageProperties().getHeader("SEND_EXCHANGE");
//获取队列信息
String routingKey = correlationData.getReturnedMessage().getMessageProperties().getHeader("SEND_ROUTING_KEY");
//获取当前的消息体
Message message = correlationData.getReturnedMessage();
//记录错误日志
recordErrorMessage(taxiBO, cause, exchange, routingKey, message, -1);
}
}
}
3.1.4 Broker丢失消息
如何在mq挂掉重启之后还能保证消息是存在的?
开启RabbitMQ的持久化,也即消息写入后会持久化到磁盘,此时即使mq挂掉了,重启之后也会自动读取之前存储的额数据
3.1.4.1 持久化队列
@Bean
public Queue queue(){
return new Queue(queueName,true);
}
3.1.4.2 持久化交换器
@Bean
DirectExchange directExchange() {
return new DirectExchange(exchangeName,true,false);
}
3.1.4.3 发送持久化消息
发送消息时,设置消息的deliveryMode=2
注意:如果使用SpringBoot的话,发送消息时自动设置deliveryMode=2,不需要人工再去设置
3.1.4.4 Broker总结
失败通知和发送方确认结合使用, 确保消息发送成功
3.2 消费方消息可靠性
3.2.1 消费者手动确认
RabbitMQ提供的ack机制,RabbitMQ默认是自动ack的,此时需要将其修改为手动ack,也即自己的程序确定消息已经处理完成后,手动提交ack,此时如果再遇到消息未处理进程就挂掉的情况,由于没有提交ack,RabbitMQ就不会删除这条消息,而是会把这条消息发送给其他消费者处理,但是消息是不会丢的。
3.2.1.1 配置文件
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual # 手动ack
3.2.1.2 参数介绍
acknowledge-mode: manual就表示开启手动ack,该配置项的其他两个值分别是none和auto
- auto:消费者根据程序执行正常或者抛出异常来决定是提交ack或者nack
- manual: 手动ack,用户必须手动提交ack或者nack
- none: 没有ack机制
默认值是auto,如果将ack的模式设置为auto,此时如果消费者执行异常的话,就相当于执行了nack方法,消息会被放置到队列头部,消息会被无限期的执行,从而导致后续的消息无法消费。
3.3.1.3 消费者实现
@RabbitListener(
bindings =
{
@QueueBinding(value = @Queue(value = RabbitConfig.TAXI_DEAD_QUEUE, durable = "true"),
exchange = @Exchange(value = RabbitConfig.TAXI_DEAD_QUEUE_EXCHANGE), key = RabbitConfig.TAXI_DEAD_KEY)
})
@RabbitHandler
public void processOrder(Message massage, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
TaxiBO taxiBO = JSON.parseObject(massage.getBody(), TaxiBO.class);
try {
//开始处理订单
logger.info("处理超时订单,订单详细信息:" + taxiBO.toString());
taxiService.taxiTimeout(taxiBO);
//手动确认机制 参数二: 是否批量进行确认
channel.basicAck(tag, false);
} catch (Exception e) {
e.printStackTrace();
}
}
3.3 业务可靠性分析
3.3.1 消息丢失
结合上面尽量减少消息的丢失, 如果丢失可以写入失败日志, 对业务进行回滚操作
3.3.2 幂等性校验
使用redis进行幂等性校验, 对key设置有效期, 或者MessageId入库
3.3.3 数据回滚
虽然无需做到消息完全不丢失以及消息的幂等性,但是需要考虑如果出现问题,需要将插入Redis的的key值回滚掉,防止影响业务正常判断
3.3.4 限流QOS
因为RabbitMQ是消息推送的模式, 大量消息服务器可能崩溃, 设置QOS解决
spring:
rabbitmq:
host: 192.168.153.130
port: 5672
username: guest
password: guest
#virtual-host:
listener:
simple:
prefetch: 2 # 代表多少消息未被ack时,rabbitmq不会给消费者发送新的消息