📝个人主页:五敷有你
🔥系列专栏:MQ
⛺️稳中求进,晒太阳
消息到达MQ以后,如果MQ不能及时保存,也会导致消息丢失,所以MQ的可靠性也非常重要。
2.数据持久化
为了提高性能,默认情况下,MQ的数据都是再内存存储的临时数据,重启后就会消失,为了保证数据的可靠性,必须配置数据持久化,包括:
-
交换机持久化
-
队列持久化
-
消息持久化
我们以控制台界面为例来说明。
2.1.交换机持久化
在控制台的Exchanges
页面,添加交换机时可以配置交换机的Durability
参数:
设置为Durable
就是持久化模式,Transient
就是临时模式。
2.2.队列持久化
在控制台的Queues页面,添加队列时,同样可以配置队列的Durability
参数:
除了持久化以外,你可以看到队列还有很多其它参数,有一些我们会在后期学习。
2.3.消息持久化
在控制台发送消息的时候,可以添加很多参数,而消息的持久化是要配置一个properties
:
说明:
在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。
不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。
代码层次实现:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "msg.queue",durable ="true"),
exchange = @Exchange(name = "msg.topic",type = ExchangeTypes.TOPIC,durable = "true"),
key = "msg"
))
public void listenMsg(String jsonStr){
log.info("收到消息{}", jsonStr);
Map<String,Object> map = JSONUtil.toBean(jsonStr, Map.class);
JSONObject object=new JSONObject(map);
String actionName =object.getString(Action.ACTION);
Action action = getAction(actionName);
action.doMessage(getWebSocketManager(),object);
}
3.消费者的可靠性
当RabbitMQ向消费者投递消息以后,需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定被正确消费了,可能出现的故障有很多,比如:
-
消息投递的过程中出现了网络故障
-
消费者接收到消息后突然宕机
-
消费者接收到消息后,因处理不当导致异常
RabbitMQ如何得知消费者的处理状态呢?
3.1消费者确认机制
为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:
-
ack:成功处理消息,RabbitMQ从队列中删除该消息
-
nack:消息处理失败,RabbitMQ需要再次投递消息
-
reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch
机制捕获,消息处理成功时返回ack,处理失败时返回nack.
由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:
-
none
:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
测试下面代码验证: 在none情况下,异常产生后,消息队列中的消息被删除了
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "msg.queue",durable ="true"),
exchange = @Exchange(name = "msg.topic",type = ExchangeTypes.TOPIC,durable = "true"),
key = "msg"
))
public void listenMsg(String jsonStr){
log.info("收到消息{}", jsonStr);
throw new RuntimeException("异常");
}
-
manual
:手动模式。需要自己在业务代码中调用api,发送ack
或reject
,存在业务入侵,但更灵活 -
auto
:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack
. 当业务出现异常时,根据异常判断返回不同结果:-
如果是业务异常,会自动返回
nack
; -
如果是消息处理或校验异常,自动返回
reject
;
-
测试下面代码验证: 在auto情况下,异常产生后,消息一直在被重复投递,
在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unacked
(未确定状态):放行以后,由于抛出的是业务异常,所以Spring返回ack
,最终消息恢复至Ready
状态,并且没有被RabbitMQ删除: 这个一直Unack的状态。当我们把配置改为auto
时,消息处理失败后,会回到RabbitMQ,并重新投递到消费者。
如果是消息转换异常,spring会返回reject
刚刚发现,当消费者出现异常后,消息会不断的requeue(重入队)到队列,再重新发送给消费者,如果消费者执行依然出错,消息会再次投递到队列,直到处理成功为止。
极端情况下,消费之一直无法执行成功,那么消息requeue就会无限循环,导致mq的处理消息飙升,带来不必要的压力。
3.2.失败重试机制
为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
重启consumer服务,重复之前的测试。可以发现:
-
消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次
-
本地重试3次以后,抛出了
AmqpRejectAndDontRequeueException
异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject
试了三次,失败就停下来了。
结论:
-
开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
-
重试达到最大次数后,Spring会返回reject,消息会被丢弃
3.2失败处理策略
本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery
接口来定义的,它有3个不同实现:
-
RejectAndDontRequeueRecoverer
:重试耗尽后,直接reject
,丢弃消息。默认就是这种方式 -
ImmediateRequeueMessageRecoverer
:重试耗尽后,返回nack
,消息重新入队 -
RepublishMessageRecoverer
:重试耗尽后,将失败消息投递到指定的交换机
比较优雅的一种处理方案是RepublishMessageRecoverer
,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
1)在consumer服务中定义处理失败消息的交换机和队列
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
2)定义一个RepublishMessageRecoverer,关联队列和交换机
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
package com.aqiuo.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {
@Bean
public TopicExchange errorMessageExchange(){
return new TopicExchange("error.topic",true,false);
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, TopicExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");
}
}