前言
消息从生产者发送到exchange, 再到 queue, 再到消费者。这个过程中有哪些有消息丢失的可能性呢?
- 发送时丢失:
- 生产者发送的消息未送达 exchange
- 消息到达 exchange 后未到达 queue
- MQ 宕机,queue将消息丢失
- consumer 接收到消息后未消费就宕机
消息可靠性问题及其对应的解决方案:
场景 | publisher发送时丢失 | MQ消息丢失 | consumer消费问题 |
---|---|---|---|
解决方案 | 生产者确认机制 | 消息持久化 | 消费者消息确认&&失败重试机制 |
下面我们先说一下publisher 发送时丢失的问题应该如何处理
生产者确认机制的理论说明
RabbitMQ 提供了 publisher confirm 机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后, 会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:
- publish-confirm, 发送者确认
- 消息成功投递到交换机,返回ack
- 消息未投递到交换机,返回nack
- publish-return, 发送回执
- 消息投递到交换机,但是没有路由到队列,返回ACK, 及路由失败原因
注意: 确认机制发送消息时, 需要给每个消息设置一个全局唯一 id, 以区分不同消息,避免ack 冲突
代码实现
下面基于SpringAMQP 实现的生产者确认机制
- 在 publisher 服务的 application,yml 中添加以下配置:
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启异步回调
publisher-returns: true
template:
mandatory: true
配置说明:
- publish-confirm-type: 开启 publisher-confirm, 这里支持两种类型:
- simple: 同步等待 confirm 结果, 直到超时
- correlated: 异步回调, 定义ConfirmCallback, MQ 返回结果时会回调这个ConfirmCallback
- publish-returns: 开启 publish-return 功能,同样是基于 callback 机制,不过是定义 ReturnCallbcak
- template.mandatory: 定义消息路由失败时的策略。true, 则调用ReturnCallback, false: 则直接丢弃消息
ConfirmCallBack是基于每条消息设置的,所以需要一个全局唯一id 进行区分。
ReturenCallbcak 则是基于每个RabbitTemplate操作实例,是一种全局性的回调。
- 由于每个 RabbitTemplate 只能配置一个 ReturnCallback, 因此需要在项目启动过程中配置:
(这里可以实现ApplicationContextAware,它可以在SpringIOC 容器初始化的时候,进行一些全局性回调的操作)
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取 RabbitTemplate对象
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 配置 ReturnCallback
rabbitTemplate.setReturnCallback((message, replayCode, replayText,exchange, routingKey) -> {
// 记录日志
log.error("消息发送到队列失败, 响应码:{}, 失败原因:{},交换机:{}, 路由key:{},消息:{},",
replayCode, replayText, exchange, routingKey, message.toString());
// 如果有需要的话,重发消息
});
}}
}
- 为每条发送的消息,指定消息 ID, 并编写对应的 ConfirmCallback
public void testSendMessage2SimpleQueue() throws InterruptedException {
// 1. 准备消息
String message = "hello, spring amqp!";
// 2. 准备CorrelationData
// 2.1 消息id
CorrelationData correlationData = new
CorrelationData(UUID.randomUUID().toString());
// 2.2 准备 ConfirmCallback
correlationData.getFuture().addCallback(confirm -> {
// 判断结果
if(confirm.isAck()){
// ACK
log.debug("消息成功投递到交换机!消息ID:{}",correlationData.getId());
}else {
// NACK
log.error("消息投递到交换机失败!消息ID:{}", correlationData.getId());
// 重发消息
}
}, throwable -> {
// 记录日志
log.error("消息发送失败", throwable);
// 重发消息
});
// 3.发送消息
rabbitTemplate.convertAndSend("amq.topic", "asimple.test", message, correlationData);
}
总结
SpringAMQP 中处理消息确认的几种情况:
- publisher-confirm:
- 消息发送到 exchange, 返回 ack
- 消息发送失败,没有到达交换机,返回 nack
- 消息发送过程中出现异常,没有收到回执
- 消息成功发送到 exchange, 但没有路由到 queue, 调用 ReturnCallback