消息从生产者发送到exchange,再到queue,再到消费者,有哪些导致消息丢失的可能性?
(1)发送时丢失:
1️⃣生产者发送的消息未送达exchange
2️⃣消息到达exchange后未到达queue
(2)MQ宕机,queue将消息丢失
(3)consumer接收到消息后未消费就宕机
一、生产者消息确认(P158)
1. 生产者确认机制
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:
(1)publisher-confirm,发送者确认
1️⃣消息成功投递到交换机,返回ack
2️⃣消息未投递到交换机,返回nack
(2)publisher-return,发送者回执
1️⃣消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。
注意:
确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突
2. SpringAMQP实现生产者确认
2.1 在publisher这个微服务的application.yml中添加配置:
spring: rabbitmq: host: 192.168.150.101 # rabbitMQ的ip地址 port: 5672 # 端口 username: itcast password: 123321 virtual-host: / publisher-confirm-type: correlated publisher-returns: true template: mandatory: true
配置说明:
(1)publish-confirm-type:开启publisher-confirm,这里支持两种类型:
1️⃣simple:同步等待confirm结果,直到超时
2️⃣correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
(2)publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
(2)template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
2.2 每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:
@Slf4j @Configuration public class CommonConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) { // 获取RabbitTemplate对象 RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); // 配置ReturnCallback rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { // 记录日志 log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}", replyCode, replyText, exchange, routingKey, message.toString()); // 如果有需要的话,重发消息 }); } }
2.3 发送消息,指定消息ID、消息ConfirmCallback
@Slf4j @RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSendMessage2SimpleQueue() throws InterruptedException { // 1.准备消息 String message = "hello, spring amqp!"; // 2.准备CorrelationData // 2.1.消息ID CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // 2.2.准备ConfirmCallback correlationData.getFuture().addCallback( // 成功回调 result -> { // 判断结果 if (result.isAck()) { // ACK log.debug("消息成功投递到交换机!消息ID: {}", correlationData.getId()); } else { // NACK log.error("消息投递到交换机失败!消息ID:{}", correlationData.getId()); // 重发消息 } }, // 异常回调 ex -> { // 记录日志 log.error("消息发送失败!", ex); // 重发消息 }); // 3.发送消息 rabbitTemplate.convertAndSend("amq.topic", "a.simple.test", message, correlationData); } }
二、消息持久化
MQ默认是内存存储消息,开启持久化功能可以确保缓存在MQ中的消息不丢失。
1. 交换机持久化:
@Bean public DirectExchange simpleDirect(){ //三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除 return new DirectExchange("simple.direct",true,false); }
2. 队列持久化:
@Bean public Queue simpleQueue(){ // 使用QueueBuilder构建队列,durable就是持久化的 return QueueBuilder.durable("simple.queue").build(); }
3. 消息持久化,
SpringAMQP 中的的消息默认是持久的,可以通过 MessageProperties 中的 DeliveryMode 来指定的:
Message msg = MessageBuilder .withBody(message.getBytes(StandardCharsets.UTF_8)) // 消息体 .setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化 .build();
三、消费者消息确认
RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。而SpringAMQP则允许配置三种确认模式:
(1)manual:手动ack,需要在业务代码结束后,调用api发送ack。
(2)auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
(3)none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
配置方式是修改application.yml文件,添加下面配置:
spring: rabbitmq: listener: simple: prefetch: 1 acknowledge-mode: auto
四、消费失败重试机制
1. 消费者失败重试
当消费者出现异常后,消息会不断 requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次 requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力
我们可以利用 Spring 的 retry 机制,在消费者出现异常时利用本地重试,而不是无限制的requeue 到 mq 队列。
spring: rabbitmq: addresses: 192.168.150.101:8071, 192.168.150.101:8072, 192.168.150.101:8073 username: itcast password: 123321 virtual-host: / listener: simple: prefetch: 1 acknowledge-mode: auto retry: enabled: true #开启消费者失败重试 initial-interval: 1000 # 初始的失败等待时长为1秒 multiplier: 3 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval max-attempts: 4 # 最大重试次数 stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
2. 消费者失败消息处理策略
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有 MessageRecoverer 接口来处理,它包含三种不同的实现:
(1)RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
(2)ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队 (3)RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
测试下RepublishMessageRecoverer处理模式:
首先,定义接收失败消息的交换机、队列及其绑定关系。
然后,定义RepublishMessageRecoverer:
@Configuration public class ErrorMessageConfig { @Bean public DirectExchange errorMessageExchange(){ return new DirectExchange("error.direct"); } @Bean public Queue errorQueue(){ return new Queue("error.queue"); } @Bean public Binding errorMessageBinding(){ return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error"); } @Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){ return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); } }