前言
RabbitMq 消息可靠性问题(一) — publisher发送时丢失
前面我们从publisher的方向出发解决了发送时丢失的问题,那么我们在发送消息到exchange, 再由exchange转存到queue的过程中。如果MQ宕机了,那么我们的消息是如何确保可靠性的呢?当消息由队列发到对应的消费者处理时,consumer 接受到消息未消费就宕机,这时消息又如何确保可靠性呢?
消息可靠性问题及其对应的解决方案:
场景 | publisher发送时丢失 | MQ消息丢失 | consumer消费问题 |
---|---|---|---|
解决方案 | 生产者确认机制 | 消息持久化 | 消费者消息确认&&失败重试机制 |
消息持久化
MQ 默认是内存存储信息, 开启持久化功能可以确保缓存在 MQ 中的消息不丢失
- 交换机持久化
@Bean
public DirectExchange simpleDirect(){
// 三个参数:交换机名称, 是否持久化, 当没有queue 与其绑定时是否自动删除
return new DirectExchange("simple.direct", true, false);
}
- 队列持久化
@Bean
public Queue simpleQueue(){
// 使用QueueBuilder构建队列,durable就是持久化的
return QueueBuilder.durable("simple.queue").build();
}
- 消息持久化,SpringAMQP 中的消息默认是持久化, 可以通过MessagePropertie中的DeliveryMode 来指定的
@Test
public void testDurableMessage(){
// 1. 准备消息
Message message = MessageBuilder.withBody("hello,spring".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
// 2. 发送消息
rabbitTemplate.convertAndSend("simple.queue", message);
}
其实,默认情况下 SpringAMQP 中的交换机,队列,消息都是持久化的
(所以上诉的持久化参数和代码只需要了解即可)
消费者消息确认
RabbitMQ 支持消费者确认机制,即:消费者处理消息后可以向MQ 发送ack回执。MQ收到 ack 回执后才删除该消息。而SpringAMQP 则允许配置三种确认模式:
-
manual:手动 ack, 需要在业务代码结束后,调用 api 发送ack
-
auto: 自动 ack, 由Spring检测 listener 的代码是否出现异常,没有异常则返回 ack; 抛出异常则返回 nack
-
none: 关闭 ack, MQ 假定消费者获取消息后成功处理,因此消息投递后立即删除
在 consumer 服务的 application,yml 中进行如下配置:
spring:
rabbitmq:
listener:
simple:
prefetch: 1
acknowledge-mode: auto
失败重试机制
消费者失败重试
当消费者出现异常后,消息会不断 requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次 requeue, 无限循环,导致 mq 的消息处理飙升,带来不必要的压力
我们可以利用 Spring 的 retry 机制,在消费者出现异常先利用本地重试,而不是无限的 requeue 到 mq 的队列。
可在 yml 的文件中进行如下配置:
spring:
rabbitmq:
listener:
simple:
prefetch: 1
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000 # 初始失败等待时长为 1s
multiplier: 3 # 下次失败等待时长的倍数, 下次等待时长= multiplier * last-interval
max-attempts: 4 # 最大重试次数
消费者失败消息处理策略
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer 接口来处理,它包含三种不同实现:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject, 丢弃消息。默认就是这种方式
- ImmediateRequeueMessageRecoverer: 重试耗尽后,返回 nack, 消息重新入队
- RepublishMessageRecoverer: 重试耗尽后,将失败消息投递到指定交换机
下面简单说明一下 RepublishMessageRecoverer 处理模式
- 首先,定义接收失败消息的交换机,队列及其绑定关系:
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue");
}
@Bean
public Binding errorMessageBinding(){
return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
}
- 定义 RepublishMessageRecoverer
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate)
{
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct","error");
}
进行完上面的配置后,我们在消费者处理消息异常后,会进行一个本地的重试。而不是直接 requeue 到队列中。
当我们重试次数耗尽后,我们会把错误消息投递到 error.direct 的交换机上,然后在 error.queue上进行转存。这样既能减轻 mq 的压力,也能在队列上找到处理异常的消息,进行人工介入处理。
总结
如何确保 RabbitMQ 消息的可靠性?
- 开启生产者确认机制,确保生产者的消息能到达队列
- 开启持久化功能,确保消息未消费在队列中不会丢失
- 开启消费者确认机制为 auto, 由 spring 确认消息处理成功后完成 ack
- 开启消费者失败重试机制,并设置 MessageRecoverer, 多次重试失败后将消息投递到异常交换机,交由人工处理