RabbitMQ通过生产者、消费者以及MQ Broker达到了解耦的特点,实现了异步通讯等一些优点,但是在消息的传递中引入了MQ Broker必然会带来一些其他问题,比如如何保证消息在传输过程中可靠性(即不让数据丢失,发送一次消息就会被消费一次)?这篇博客将详细从生产者,MQ Broker以及消费者的角度讲解如何保证消息的可靠性!
1,消息丢失的情况
1.1 消息传递流程图如下
Producer -> exchange ->queue -> Consumer(其中exchange和queue属于MQ Broker的组件)
1.2 消息可能丢失的情况
- 生产者给交换机exchange的过程中发生数据丢失;
- 交换机exchange路由给队列queue的过程中发生数据丢失;
- 消息到达MQ的一瞬间,MQ发生了宕机的情况造成数据丢失;
- 消费者从队列queue中取出消息进行消费的一瞬间消费者宕机了造成数据丢失。
2,生产者确认机制
生产者确认机制主要是站在生产者的角度来保证消息的可靠性,针对的是生产者给交换机发送消息以及交换机给队列发送消息的过程中数据丢失的情况!
2.1 书写配置信息
# 配置日志信息
logging:
pattern:
dateformat: HH:mm:ss:SSS
level:
cn.itcast: debug
spring:
rabbitmq:
host: 123.207.72.43 # rabbitMQ的ip地址
port: 5672 # 端口
username: admin
password: 123
virtual-host: /
publisher-confirm-type: correlated
publisher-returns: true
#消息发送失败时执行returnCallback回调函数
template:
mandatory: true
- publisher-confirm-type:表示开启publisher-confirm;这个参数有两种类型,分别是correlated和simple(correlated代表异步等待回调,类似于js中发送的ajax请求的回调函数,MQ返回结果时会执行定义的confirmCallback函数;simple代表同步等待confirm结果直到超时);
- publisher-returns:表示开启publish-return功能,同样是基于callback机制,不过是定义returnCallback;
- template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息。
2.2 定义return回调机制
我们使用的是SpringBoot来整合的RabbitMQ,所以不论是return回调还是confim回调都是用rabbittemplate对象进行定义的。
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
//获取获取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
//配置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
//记录日志
log.error("消息发送队列失败,响应码:{},失败原因:{},交换机:{},路由key:{},消息:{}",
replyCode,replyText,exchange,routingKey,message.toString());
//如果需要的话进行消息的重发
});
}
}
注意:
- 一个RabbitTemplate只能配置一个ReturnCallback,所以需要在项目启动的时候进行定义,这样rabbitTemplate就是全局唯一的了(也可以采用PostConstruct注解中的init方法进行定义);
- ApplicationContextAware是Spring创建完Bean工厂之后的通知方法,当Spring创建完Bean工厂之后就可以在Spring容器中拿到RabbitTemplate对象了;
- 配置ReturnCallback时可以采用匿名内部类的方法简化代码,如果消息发送失败可以根据需要进行消息重发操作。
2.3 定义confirm回调机制
ConfirmCallback可以在发送消息时指定,因为每个业务处理confirm成功或失败的逻辑不一定相同,可以通过测试方法进行定义。
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage() throws InterruptedException {
//1.准备消息
String message = "hello spring amqp";
//2.准备CorrelationData
//2.1 消息ID
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//2.2 准备准备ConfirmCallback
correlationData.getFuture().addCallback(confirm -> {
if (confirm.isAck()) {
log.debug("消息成功投递到交换机!消息ID:{}", correlationData.getId());
} else {
log.error("消息投递到交换机上失败!消息ID:{}", correlationData.getId());
//重发消息
}
}, throwable -> {
//记录日志
log.error("发送消息失败!",throwable);
//重发消息
});
//3.发送消息
rabbitTemplate.convertAndSend("amq.topic","a.simple.hello",message,correlationData);
//加上休眠时间 避免mq连接直接关闭
Thread.sleep(1000);
}
注意:
- 生产者给交换机发送的消息数据很多的,为了区分每个消息的归属,每个消息都要附属上一个ID信息,可以采用UUID的方式生成唯一身份标识;
- 在发送消息的时候需要增加一个correlation变量,这个变量记录了两个东西(1.每个消息的ID 2.定义的cinfirm回调机制);
- 加上线程休眠的操作是为了避免消息发送到交换机之后mq的连接直接关闭,这样会导致返回ack的错误。
3,消息持久化
消息持久化是站在MQ Broker的角度来保证消息的可靠性的,将交换机、队列以及消息设置成持久化的从而避免MQ宕机造成消息的丢失!
3.1 交换机持久化
@Bean
public DirectExchange simpleDirect(){
return new DirectExchange("simple.direct",true,false);
}
第二个参数设置成true就是让就交换机是可持久化的,第三个参数是是否自动删除,一般设为false;
3.2 队列持久化
@Bean
public Queue simpleQueue(){
return QueueBuilder.durable("simple.queue").build();
}
durable的意思就是可持久化的,传入队列名称然后进行build操作,这样创建的队列就是一个可持久化的队列;
3.3 消息持久化
将交换机和队列设置为持久化的之后重启MQ服务器之后消息依然会丢失,因为发送的消息不是可持久化的,所以也需要将消息设置成可持久化的
4,消费者消息确认
消费者消息确认是站在消费者的角度来保证消息可靠性的,消息者处理完一条消息之后需要给MQ Broker返回一条ACK表示消息处理完成!
4.1 三种确认模式
RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。而SpringAMQP则允许配置三种确认模式:
- manual:手动ack,需要在业务代码结束后,调用api发送ack;
- auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack;
- none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除。
4.2 none模式的演示
1.修改消费者工程中的配置文件
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 关闭ack
2.监听一个队列,在监听的方法中模拟一个异常情况,观察消息是否会被删除
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
log.debug("消费者接收到simple.queue的消息:【" + msg + "】");
//这里模拟一个异常
System.out.println(1 / 0);
log.info("消费者处理消息成功!");
}
3.在rabbitmq控制台模拟发送一条消息,观察抛出异常之后消息是否会重发
抛出异常消费者并没有处理消息成功,再观察控制台是否将消息删除:
队列中已经没有消息了,说明消息被删除了!
消费者确认机制为none的时候,只要消费者拿到消息之后MQ就会把消息删除,不关心消费者是否将消息成功处理!
4.3 auto模式的演示
1.修改消费者工程中的配置文件
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 关闭ack
2.监听一个队列,在监听的方法中模拟一个异常情况,观察消息是否会被删除
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
log.debug("消费者接收到simple.queue的消息:【" + msg + "】");
//这里模拟一个异常
System.out.println(1 / 0);
log.info("消费者处理消息成功!");
}
3.在rabbitmq控制台模拟发送一条消息,观察抛出异常之后消息是否会重发
消费者确认机制为auto的时候,消费者拿到消息之后MQ并不会立刻删除队列中的消息,只有消费者成功处理完消息之后给队列返回一个ack的时候队列才会删除消息!
5, 消费者失败重试机制
我们发现当消费者确认机制为auto时,如果代码中出现了异常,消息会进行重复入队列(requeue)的操作,重复入队的操作对于MQ来说开销会非常大,消息处理飙升,所以引入了失败重试机制:当代码中出现了异常的时候,消费者内部会进行重发的操作(可以控制重发的时间和次数),如果超过设置的重发次数消费者还未成功处理消息默认将消息丢弃!
5.1 本地重试
Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列,可以在消费者工程的yml文件中添加如下配置:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000 # 初识的失败等待时长为1秒
multiplier: 3 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 4 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
4次重发之后消息还未成功处理spring抛出了AmqpRejectAndDontRequeueException异常,这是失败之后的默认处理方式,默认消费者给队列返回了ack,此时队列会将消息从队列中删除!
5.2 失败策略
失败达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息,默认就是这种方式;
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队;
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机。
如果消息这个消息比较重要,达到最大重试次数之后这个消息不能被丢弃该怎么办,此时就可以使用RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
@Configuration
public class ErrorMessageConfig {
//定义失败之后处理的交换机和队列
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
//将交换机和队列进行绑定
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
//定义一个RepublishMessageRecoverer,替换spring默认的处理机制
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
流程图如下:
6, 如何保证RabbitMQ消息的可靠性?
- 开启生产者确认机制,确保生产者的消息能到达队列;
- 开启持久化功能,确保消息未消费前在队列中不会丢失;
- 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack;
- 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理。