目录
消息可靠性
生产者消息确认
第一步:修改application.yml配置文件信息
第二步:定义发送者确认confirm回调方法
第三步:创建消息发送者回执return回调方法(确保消息从交换机到消息队列)
总结:
消息持久化
消费者消息确认
SpringAMQP则允许配置三种确认模式:
auto问题
消费者是失败重试
本地重试
失败策略
总结
消息队列在使用时,有以下的问题需要考虑
- 消息可靠性问题(一个消息至少被消费一次)
- 延迟消息问题
- 高可用问题
- 消息堆积问题
消息可靠性
消息从发送,到消费者接收,会经理多个过程:
其中的每一步都可能导致消息丢失,常见的丢失原因包括:
- 发送时丢失:
- 生产者发送的消息未送达exchange
- 消息到达exchange后未到达queue
- MQ宕机,queue将消息丢失
- consumer接收到消息后未消费就宕机
针对这些问题,RabbitMQ分别给出了解决方案:
- 生产者确认机制
- mq持久化
- 消费者确认机制
- 失败重试机制
生产者消息确认
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。
这个可以保证消息发送者成功发送消息到交换机以及消息队列中
有两种返回结果:
第一种:确认消息是否发送到交换机--》publisher-confirm
- publisher-confirm,发送者确认
- 消息成功投递到交换机,返回ack
- 消息未投递到交换机,返回nack
第二种:确认消息是否成功从交换机路由到对应的消息队列中--》publisher-return
- publisher-return,发送者回执
- 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。
第一步:修改application.yml配置文件信息
spring:
rabbitmq:
host: 192.168.230.100 # rabbitMQ的ip地址
port: 5672 # 端口
username: hhh
password: 1234
virtual-host: /
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
说明:
publish-confirm-type
:开启publisher-confirm,这里支持两种类型:
simple
:同步等待confirm结果,直到超时correlated
:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallbackpublish-returns
:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallbacktemplate.mandatory
:定义消息路由失败时的策略。true,则调用ReturnCallback回调方法;false:则直接丢弃消息
第二步:定义发送者确认confirm回调方法
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
//获取rabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
//设置发送者确认回调方法
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 自定义的消息
* @param ack ack确认,true为发送到交换机成功
* @param s 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
if(ack){
log.info("消息到达交换机");
}else {
log.error("消息没有到交换机,原因为:{}",s);
}
}
});
}
}
发送消息:
@Test
public void test01() throws InterruptedException {
String routingKey="simple";
String message="hello spring";
rabbitTemplate.convertAndSend("mqAd.topic",routingKey,message);
Thread.sleep(2000); //让主线程休眠2s,返回回调方法还没执行,主线程就关闭
}
发送失败触发消息确认的回调方法,因为我现在没有创建maAd.topic交换机,所以无法发送消息到交换机中
创建交换机
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("mqAd.topic");
}
第三步:创建消息发送者回执return回调方法(确保消息从交换机到消息队列)
//设置消息发送者回执回调方法,交换机路由消息队列错误才会回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
// *
// *
// * @param message 返回的信息
// * @param i 回复的状态码
// * @param s 回复内容
// * @param s1 交换机
// * @param s2 路由
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
log.error("路由定位失败,{},{},{},{},{}",message,i,s,s1,s2);
}
});
这时候没有对应的消息队列,会触发消息回执return 回调方法
路由定位失败,(Body:'hello spring' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),312,NO_ROUTE,mqAd.topic,simple
创建消息队列并绑定
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("mqAd.topic");
}
@Bean
public Queue simpleQueue(){
return new Queue("mqAd.simple.queue");
}
@Bean
public Binding binding(){
return BindingBuilder.bind(simpleQueue()).to(topicExchange()).with("simple");
}
总结:
1.消息确认confirm回调方法触发:不论消息是否成功到达交换机都会触发,成功返回的是ack,失败返回的是nack
2.消息回执return回调方法触发:只有交换机路由消息队列失败时才会触发
消息持久化
防止mq宕机,然后消息队列里面的消息全部丢失
生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能导致消息丢失。
要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制。
- 交换机持久化
- 队列持久化
- 消息持久化
实际上默认交换机和队列,消息都是持久化的
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("mqAd.topic",true,false);
}
@Bean
public Queue simpleQueue(){
return new Queue("mqAd.simple.queue",true);
持久化标识:
消费者消息确认
RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。
而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。
设想这样的场景:
- 1)RabbitMQ投递消息给消费者
- 2)消费者获取消息后,返回ACK给RabbitMQ
- 3)RabbitMQ删除消息
- 4)消费者宕机,消息尚未处理
SpringAMQP则允许配置三种确认模式:
•manual:手动ack,需要在业务代码结束后,调用api发送ack。
•auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
•none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
由此可知:
- none模式下,消息投递是不可靠的,可能丢失
- auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
- manual:自己根据业务情况,判断什么时候该ack
一般,我们都是使用默认的auto即可。
logging:
pattern:
dateformat: HH:mm:ss:SSS
level:
cn.itcast: debug
spring:
rabbitmq:
host: 192.168.230.100 # rabbitMQ的ip地址
port: 5672 # 端口
#addresses: 192.168.150.101:8071, 192.168.150.101:8072, 192.168.150.101:8073
username: hhh
password: 1234
virtual-host: /
listener:
simple:
prefetch: 1
acknowledge-mode: auto
auto问题
auto会有一种问题,就会监听方法出现异常,消息队列的消息就无法正常处理,那么spring会自动发消息到mq说明这个消息没有被正常消费,不要删除,然后mq又会把消息重新返回队列,而监听方法又会监听到这个消息队列的信息,然后监听方法又出现异常,消息又回到消息队列,这就在项目中进行无限次的循环重试
消费者是失败重试
当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:
本地重试
我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
修改consumer服务的application.yml文件,添加内容:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000 # 初识的失败等待时长为1秒
multiplier: 2 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 4 # 执行次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
失败-->1s后重试1次-->2秒后重试第二次-->4秒后重试第三次
每次失败的等待时长是之前的2倍数 ,执行次数包括第一次失败
注意:重试次数结束之后还没有成功,消息队列的消息就会被删除
失败策略
在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:
-
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
-
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
-
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
声明处理错误消息的交换机和消息队列,并且声明一个RepublishMessageRecoverer bean,告诉spring使用的是这种失败策略
@Configuration
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue");
}
@Bean
public Binding errorMessageBinding(){
return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
}
//设置消息失败处理策略
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
重复处理三次之后还是错误,就把这个消息发送到 消息错误交换机中
总结
如何确保RabbitMQ消息的可靠性?
- 开启生产者确认,回执机制,确保生产者的消息能到达队列
- 开启持久化功能,确保消息未消费前在队列中不会丢失
- 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
- 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理