一、MQ的问题
基于上篇存在的问题
1. 问题说明
MQ在分布式项目中是非常重要的,
它可以实现异步、削峰、解耦,但是在项目中引入MQ也会带来一系列的问题。
今天我们要解决以下几个常见的问题:
消息可靠性问题:如何确保消息被成功送达消费者,并且被消费者成功消费掉
延迟消息问题:如果一个消息,需要延迟15分钟再消费,像12306超时取消订单,如何实现消息的延迟投递
消息堆积问题:如果消息无法被及时消费而堆积,如何解决百万级消息堆积的问题
MQ的高可用问题:如何避免MQ因为单点故障而不可用的问题
2. 准备代码环境
注意:为了后续的演示效果,暂不声明交换机、队列、绑定关系
创建project
删除project里的src文件夹
添加依赖坐标
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.9.RELEASE</version> <relativePath/> </parent> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!--AMQP依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!--单元测试--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> </dependencies>
创建生产者模块依赖
不需要添加,直接继承父工程的依赖
配置
修改application.yaml,添加配置:
spring: rabbitmq: host: 192.168.200.137 port: 5672 virtual-host: / username: itcast password: 123321
引导类
package com.mqrebbit; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class ProducerApplication { public static void main(String[] args) { SpringApplication.run(ProducerApplication.class,args); } }
发消息测试类
package com.mqrebbit; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest public class Demo01SimpleTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void test(){ rabbitTemplate.convertAndSend("demo.exchange","demo","hello"); } }
创建消费者模块
依赖
不需要添加,直接继承父工程的依赖
配置
修改application.yaml,添加配置:
spring: rabbitmq: host: 192.168.200.137 port: 5672 virtual-host: / username: itcast password: 123321 listener: simple: prefetch: 1
引导类
package com.mqrebbit; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class,args); } }
创建Listener
package com.mqrebbit.listener; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @Slf4j public class DemoListener { @RabbitListener(queues = "demo.queue") public void handleDemoQueueMsg(String msg){ log.info("从{}队列接收到消息:{}", "demo.queue", msg); System.out.println("模拟:处理消息中……"); log.info("消息处理完毕"); } }
二、消息可靠性
1. 介绍
当我们的生产者发送一条消息后,这条消息最终会到达消费者。
那么在这整个过程中任何一个环境出错,都可能会导致消息的丢失,而导致不够可靠。
可能出问题的环节有:
生产者发送消息到Broker时 丢失:
消息未送达Exchange
消息到达了Exchange,但未到达Queue
Broker收到消息后丢失:
MQ宕机,导致未持久化保存消息
消费者从Broker接收消息丢失:
消费者接收消息后,尚未消费就宕机
针对这些问题,RabbitMQ给出了对应的解决方案
生产者发送消息丢失:使用生产者确认机制
Broker接收消息丢失:MQ消息持久化
消费者接收消息丢失:消费者确认机制与失败重试机制
2. 生产者确认机制
介绍
在了解生产者确认机制之前,
我们需要先明确一件事:生产者发送的消息,怎么样才算是发送成功了?
消息发送成功,有两个标准
消息被成功送达Exchange
消息被成功送达匹配的Queue
以上两个过程任何一步失败,都认为消息发送失败了。
生产者确认机制,可以确保生产者明确知道消息是否成功发出,如果未成功的话,是哪一步出现问题。然后开发人员就可以根据投递结果做进一步处理。
Confirm Callback机制:确定回收
说明
使用发送者的ConfirmCallback机制,
用于让生产者确认 消息是否送达交换机:
如果消息成功送达交换机,MQ会给生产者返回一个ack(确认)。
当生产者得到ack之后,就可以确定消息成功送达交换机了
使用步骤,在生产者一方做如下操作:
修改配置文件,指定 confirm确认的处理方式,使用异步方式
发送消息时,配置CorrelationData,用于处理确认结果
示例
1. 修改配置文件
修改生产者一方的配置文件application.yaml,增加如下配置
如果配置为
simple
,表示使用同步方式处理确认的结果如果配置为
correlated
,表示使用异步方式处理确认的结果,但是发送消息时需要我们准备一个CorrelationData对象,用于接收确认结果spring: rabbitmq: #生产者确认机制类型。simple同步方式确认;correlated异步方式确认,将使用CorrelationData接收确认结果 publisher-confirm-type: correlated
2.发送消息
package com.mqrebbit; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @Slf4j @SpringBootTest public class Demo01SimpleTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void test(){ //创建一个CorrelationData Correlation:关联 CorrelationData data = new CorrelationData(); //设置消息的id data.setId("msg-001"); //准备好 回调函数:Callback,用于接收 将来:Future 的确认结果 data.getFuture().addCallback( //Success:成功 /** * 当消息发送没有出现异常时,这个方法会被调用 */ result -> { if (result.isAck()){ log.info("发送消息完成,且已经到达交换机。消息id={}",data.getId()); }else { log.info("发送消息完成,但是没有到达交换机。消息id={},原因={}",data.getId(),result.getReason()); } }, //Failure:失败 /** * 当消息发送出现异常时,这个方法会被调用 */ ex -> { log.warn("发送消息出现异常,消息id={}",data.getId(),ex); } ); rabbitTemplate.convertAndSend("demo.exchange", "demo", "hello",data); } }
3.测试结果-未送达交换机的结果
首先,我们先要保证
demo.exchange
交换机不存在,再运行单元测试方法,发送消息。可看到如下结果4.测试结果-成功送达交换机
然后,我们再创建配置类,声明一个名称为
demo.exchange
的交换机package com.mqrebbit.config; import org.springframework.amqp.core.ExchangeBuilder; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitBindingConfig { //创建一个交换机 @Bean public TopicExchange demoExchange(){ return ExchangeBuilder.topicExchange("demo.exchange").build(); } }
然后重新发送消息,可看到如下结果:
Return Callback机制
说明
使用生产者的Confirm Callback机制,可以确保消息成功送达交换机。
但是消息是否被送达队列呢?我们同样需要进行确认。
为了解决这个问题,RabbitMQ提供了Return Callback机制:
如果消息被交换机成功路由到队列,一切正常
如果消息路由到队列时失败了,Return回调会把消息回退给生产者。生产者可以自行决定后续要如何处理
使用步骤,在生产者一方操作:
修改配置文件,开启return callback机制,并设置强制return back
创建配置类,预先设置Return回调函数
示例
1. 修改配置文件
修改生产者的配置文件application.yaml,开启return回调机制
spring: rabbitmq: publisher-returns: true #开启生产者return回调机制 template: mandatory: true #开启强制回调。如果为true,消息路由失败时会调用ReturnCallback回退消息;如果为false,消息路由失败时会丢弃消息
设置Return回调
当消息未被路由到Queue时,Return回调会执行
注意:
只要给RabbitTemplate对象设置一次回调函数即可,并不需要每次发送消息都设置Return回调。所以我们在配置类里给RabbitTemplate设置一次即可
给单例的RabbitTemplate对象设置Return回调的方式有多种,使用哪种都行,只要能够设置成功即可
创建一个配置类,在配置类里设置Return回调函数:
package com.mqrebbit.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Configuration; /* * ApplicationContext是: spring容器,所有bean对象都在这里面 * ApplicationContextAware接口:当ApplicationContext初始化完成之后, * 接口的setApplicationContext方法将会被自动调用 */ @Slf4j @Configuration public class RabbitConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { //当交换机把消息路由到队列出现问题时,这个方法会自动执行 log.warn("把消息路由到队列失败,replyCode{},replyText{},RoutingKey={},msg={}", replyCode,replyText,exchange,routingKey, message); }); } }
测试结果-未路由到队列
首先,我们要先保证交换机没有绑定队列
demo.queue
,再运行单元测试方法,发送消息,可看到如下结果:测试结果-成功路由到队列
然后,我们再找到
RabbitBindingConfig
配置类,增加一个队列
demo.queue
,并绑定给交换机demo.exchange
,最终代码如下:@Configuration public class RabbitBindingConfig { //创建一个交换机 @Bean public TopicExchange demoExchange(){ return ExchangeBuilder.topicExchange("demo.exchange").build(); } @Bean public Queue demoQueue(){ return QueueBuilder.durable("demo.queue").build(); } @Bean public Binding demoBinding(Queue demoQueue,TopicExchange demoExchange){ return BindingBuilder.bind(demoQueue).to(demoExchange).with("demo"); } }
然后再发送消息,不报错,就说明路由成功了。可以去RabbitMQ控制台上查看消息
3. MQ消息持久化
介绍
通过生产者确认机制,我们可以把消息投递到队列中。但是如果这时候MQ宕机了,队列里的消息同样有可能会丢失。这是因为:
交换机可能是非持久化的。MQ一重启,交换机就消失了
队列可能是非持久化的。MQ一重启,队列就消失了
消息可能是非持久化的(在RabbitMQ内存中)。MQ一重启,消息就丢失了
所以我们必须要保证:交换机、队列、消息都是持久化的。
但实际上,我们创建交换机、队列、消息的方式都是持久化创建的,所以以下内容我们仅仅了解即可
交换机持久化
队列持久化
消息持久化
4. 消费者确认机制介绍
RabbitMQ采用的是阅后即焚模式,即只要消息被消费成功获取,MQ就会立刻删除掉这条消息。所以,我们必须保证,消息确实成功的被消费掉了。为此,RabbitMQ也提供了ack确认机制:
RabbitMQ将消息投递给消费者
消费者成功处理消息
消费者向RabbitMQ返回ack确认
RabbitMQ收到ack确认,删除消息
从上述过程中我们可以得到,消费者返回ack的时机是非常关键的:如果消费者仅仅是得到消息还未处理,就给RabbitMQ返回ack,然后消费者宕机了,就会导致消息丢失。
SpringAMQP允许消费者使用以下三种ack模式:
manual:手动ack。由开发人员在处理完业务后,手动调用API,向RabbitMQ返回ack确认
auto:自动ack【默认】。当消费者方法正常执行完毕后,由Spring自动给RabbitMQ返回ack确认;如果出现异常,就给RabbitMQ返回
nack
(未消费成功)none:关闭ack。MQ假定所有消息都会被成功消费,因为RabbitMQ投递消息后会立即删除
我们一般使用默认的auto模式
none模式
auto模式
5. 消费者失败重试介绍
当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:
我们可以利用Spring本身的retry机制,在消费者出现异常后,在消费者内部进行本地重试;而不是让消息重新入队列,然后让RabbitMQ重新投递。
消费者本地重试
只要修改消费者一方的配置文件,设置消费者本地重试,并配置重试参数
修改消费者一方的配置文件
application.yaml
,增加如下配置:
失败后的恢复策略在刚刚的本地重试中,在达到最大次数后,消息会被丢弃,这是Spring内部机制决定的。
但是,其实在重试多次消费仍然失败后,SpringAMQP提供了
MessageRecoverer
接口,定义了不同的恢复策略可以用来进一步处理消息:
RejectAndDontRequeueRecoverer
:重试次数耗尽后,直接reject
,丢弃消息。是默认的处理策略
ImmediateRequeueMessageRecoverer
:重试次数耗尽后,立即重新入队requeue
RepublishMessageRecoverer
:重试次数耗尽后,将失败消息投递到指定的交换机
6. 小结
三、死信交换机
1. 介绍
什么是死信
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
-
消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
-
消息是一个过期消息,超时无人消费
-
要投递的队列消息满了,无法投递
死信交换机
如果这个包含死信的队列配置了dead-letter-exchange
属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,检查DLX)。
如图,一个消息被消费者拒绝了,变成了死信;因为demo.queue绑定了死信交换机 dl.direct,因此死信会投递给这个交换机;如果这个死信交换机也绑定了一个队列,则消息最终会进入这个存放死信的队列:
注意:
-
死信交换机,其实是普通交换机。只是用于处理死信,所以称为死信交换机
-
死信队列,其实也是普通队列。只是用于处理死信,所以称为死信队列
2. 消费失败成为死信【拓展】
在失败重试策略中,默认的RejectAndDontRequeueRecoverer
会在本地重试次数耗尽后,发送reject给RabbitMQ,消息变成死信,被丢弃。
我们可以给demo.queue
添加一个死信交换机,给死信交换机绑定一个队列。这样消息变成死信后也不会丢弃,而是最终投递到死信交换机,路由到与死信交换机绑定的队列。
取消恢复策略
把消费者服务中RabbitMsgRecovererConfig
配置类里的恢复策略注释掉,则SpringAMQP将会使用默认的RejectAndDontRequeueRecoverer
策略,在本地重试次数耗尽后,发送reject给RabbitMQ。
配置死信交换机
修改生产者的配置类,在声明队列时绑定死信交换机
测试效果
-
先去RabbitMQ控制台页面(http://192.168.200.137:15672)中,把
demo.queue
队列删除掉。因为之前声明的队列并没有绑定死信交换机,必须要删除掉,重新声明才行 -
运行生产者的单元测试方法,发送消息
-
启动消费者服务,开始从
demo.queue
中接收消息但出现异常;在耗尽重试次数后,因为恢复策略是默认的RejectAndDontRequeueRecoverer
成为死信。消息被投递到死信交换机,然后路由到死信队列 -
在RabbitMQ控制台中查看死信队列
dl.queue
,可看到死信队列中有一条消息
3. 消息超时成为死信
说明
如果一条消息超时未被消费,也会成为死信。而超时有两种方式:
-
消息所在的队列设置了超时
-
消息本身设置了超时
我们将按照如下设计,演示超时成为死信的效果:
队列TTL示例
注意:为了方便演示死信队列的效果,我们将创建一个新的project,准备新的代码环境。参考第一章节中准备的代码环境。
生产者
声明队列和交换机
-
声明死信交换机与死信队列,并绑定
-
声明普通交换机与普通队列,并绑定。注意,声明普通队列时要:
设置队列的TTL
给队列设置死信交换机与死信的RoutingKey
消息TTL示例
生产者
声明队列和交换机
可以直接使用刚刚的“队列TTL示例”中的配置,与之相比,仅仅是声明队列时不再设置队列的TTL。代码如下: