消息队列在使用过程中,面临着很多实际问题需要思考:
1.消息可靠性问题(面试很会问)
针对这些问题,RabbitMQ分别给出了解决方案:
-
生产者确认机制
-
mq持久化
-
消费者确认机制
-
失败重试机制
下面我们就通过案例来演示每一个步骤。
1.1.生产者消息确认
执行下面的命令来运行MQ容器:
docker run \ -e RABBITMQ_DEFAULT_USER=itcast \ -e RABBITMQ_DEFAULT_PASS=123321 \ -v mq-plugins:/plugins \ --name mq \ --hostname mq1 \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3.8-management
1.1.1.修改配置
首先,修改publisher服务中的application.yml文件,添加下面的内容:
spring: rabbitmq: publisher-confirm-type: correlated publisher-returns: true template: mandatory: true
说明:
-
publish-confirm-type
:开启publisher-confirm,这里支持两种类型:-
simple
:同步等待confirm结果,直到超时 -
correlated
:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
-
-
publish-returns
:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback -
template.mandatory
:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
ConfirmCallback可以在发送消息时指定,因为每个业务处理confirm成功或失败的逻辑不一定相同。
1.1.2.定义Return回调和confirmCallback
@Configuration @Slf4j //实现这个接口会在spring加载完成后直接加载该类 public class CommonConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { //从ioc容器中 拿到template对象 RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); //设置发送者确认回调函数 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * * @param correlationData 自定义的数据 消息UUID * @param b 是否确认 true 消息发送到exchange中 false就是未发送到 * @param s 原因: */ @Override public void confirm(CorrelationData correlationData, boolean b, String s) { log.info("发送确认回调触发 消息ID:{}",correlationData.getId()); if (b){ log.info("消息成功发送到交换机中"); }else { log.error("消息没能发送到交换机 原因:{}",s); //消息没到交换机可以触发重发逻辑 } } }); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * 只要这个方法触发,代币消息没能正确的路由到队列,被mq返还回来了 * @param message 返回的消息 * @param replyCode 回复状态码 * @param replyText 回复内容 * @param exchange 交换机 * @param routingKey 路由key */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}", replyCode,replyText,exchange,routingKey,message.toString()); //没到队列 可以触发重发消息逻辑 } }); } }
1.2.消息持久化
RabbitMQ中交换机默认是非持久化的,mq重启后就丢失。
SpringAMQP中可以通过代码指定交换机持久化:
@Bean public DirectExchange simpleExchange(){ // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除 return new DirectExchange("simple.direct", true, false); }
事实上,默认情况下,由SpringAMQP声明的交换机都是持久化的。
可以在RabbitMQ控制台看到持久化的交换机都会带上D
的标示:
1.2.2.队列持久化
RabbitMQ中队列默认是非持久化的,mq重启后就丢失。
SpringAMQP中可以通过代码指定交换机持久化:
@Bean public Queue simpleQueue(){ // 使用QueueBuilder构建队列,durable就是持久化的 return QueueBuilder.durable("simple.queue").build(); }
事实上,默认情况下,由SpringAMQP声明的队列都是持久化的。
可以在RabbitMQ控制台看到持久化的队列都会带上D
的标示:
1.2.3.消息持久化
利用SpringAMQP发送消息时,可以设置消息的属性(MessageProperties),指定delivery-mode:
-
1:非持久化
-
2:持久化
默认情况下,SpringAMQP发出的任何消息都是持久化的,不用特意指定。
1.3.消费者消息确认
RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。
而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。
设想这样的场景:
-
1)RabbitMQ投递消息给消费者
-
2)消费者获取消息后,返回ACK给RabbitMQ
-
3)RabbitMQ删除消息
-
4)消费者宕机,消息尚未处理
这样,消息就丢失了。因此消费者返回ACK的时机非常重要。
而SpringAMQP则允许配置三种确认模式:
•manual:手动ack,需要在业务代码结束后,调用api发送ack。
•auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
•none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
由此可知:
-
none模式下,消息投递是不可靠的,可能丢失
-
auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
-
manual:自己根据业务情况,判断什么时候该ack
一般,我们都是使用默认的auto即可。
1.3.1.演示none模式
修改consumer服务的application.yml文件,添加下面内容:
spring: rabbitmq: listener: simple: acknowledge-mode: none # 关闭ack
修改consumer服务的SpringRabbitListener类中的方法,模拟一个消息处理异常:
@RabbitListener(queues = "simple.queue") public void listenSimpleQueue(String msg) { log.info("消费者接收到simple.queue的消息:【{}】", msg); // 模拟异常 System.out.println(1 / 0); log.debug("消息处理完成!"); }
测试可以发现,当消息处理抛异常时,消息依然被RabbitMQ删除了。
1.3.2.演示auto模式
再次把确认机制修改为auto:
spring: rabbitmq: listener: simple: acknowledge-mode: auto # 关闭ack
在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unack(未确定状态):
我们用auto模式就行
1.4.消费失败重试机制
当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:
1.4.1.本地重试
我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
修改consumer服务的application.yml文件,添加内容:
spring: rabbitmq: listener: simple: retry: enabled: true # 开启消费者失败重试 initial-interval: 1000ms # 初始的失败等待时长为1秒 multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval max-attempts: 3 # 最大重试次数 stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
重启consumer服务,重复之前的测试。可以发现:
-
在重试3次后,SpringAMQP会抛出异常AmqpRejectAndDontRequeueException,说明本地重试触发了
-
查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是ack,mq删除消息了
结论:
-
开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
-
重试达到最大次数后,Spring会返回ack,消息会被丢弃
1.4.2.失败策略
在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:
-
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
-
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
-
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
1)在consumer服务中定义处理失败消息的交换机和队列
@Bean public DirectExchange errorMessageExchange(){ return new DirectExchange("error.direct"); } @Bean public Queue errorQueue(){ return new Queue("error.queue", true); } @Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){ return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error"); }
2)定义一个RepublishMessageRecoverer,关联队列和交换机
@Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){ return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); }
面试点:你有重试机制,失败了,消息会 被丢弃吗?
默认会丢弃,但是我们有三种策略,默认丢弃,第二种可以回到队列,第三种我们转到另一个自定义存放异常的交换机
2.死信交换机(了解概念就行)
2.1.1.什么是死信交换机
什么是死信?
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
-
消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
-
消息是一个过期消息,超时无人消费
-
要投递的队列消息满了,无法投递
如果这个包含死信的队列配置了dead-letter-exchange
属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,检查DLX)。
如图,一个消息被消费者拒绝了,变成了死信:
另外,队列将死信投递给死信交换机时,必须知道两个信息:
-
死信交换机名称
-
死信交换机与死信队列绑定的RoutingKey
这样才能确保投递的消息能到达死信交换机,并且正确的路由到死信队列。
2.1.2.利用死信交换机接收死信(拓展)
在失败重试策略中,默认的RejectAndDontRequeueRecoverer会在本地重试次数耗尽后,发送reject给RabbitMQ,消息变成死信,被丢弃。
我们可以给simple.queue添加一个死信交换机,给死信交换机绑定一个队列。这样消息变成死信后也不会丢弃,而是最终投递到死信交换机,路由到与死信交换机绑定的队列。
我们在consumer服务中,定义一组死信交换机、死信队列:
// 声明普通的 simple.queue队列,并且为其指定死信交换机:dl.direct @Bean public Queue simpleQueue2(){ return QueueBuilder.durable("simple.queue") // 指定队列名称,并持久化 .deadLetterExchange("dl.direct") // 指定死信交换机 .build(); } // 声明死信交换机 dl.direct @Bean public DirectExchange dlExchange(){ return new DirectExchange("dl.direct", true, false); } // 声明存储死信的队列 dl.queue @Bean public Queue dlQueue(){ return new Queue("dl.queue", true); } // 将死信队列 与 死信交换机绑定 @Bean public Binding dlBinding(){ return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple"); }
消费者确认模式: auto acknowledge-mode: auto 特征: 当消息不能被消费时,会重新入队,再次投递给消费者进行被消费 default-requeue-rejected: false # 拒绝消息重新入队,如果队列绑定了死信交换机则消息会投递到死信交换机并路由到死信队列 本地重试: 当本地重试次数耗尽时,如果当前队列没有绑定死信交换机或错误队列,则消息丢弃 如果提供了错误队列,则消息投递到错误对象 如果队列绑定了死信交换机,则消息以死信的形式存放到死信队列
2.1.3.总结
什么样的消息会成为死信?
-
消息被消费者reject或者返回nack
-
消息超时未消费
-
队列满了
死信交换机的使用场景是什么?
-
如果队列绑定了死信交换机,死信会投递到死信交换机;
-
可以利用死信交换机收集所有消费者处理失败的消息(死信),交由人工处理,进一步提高消息队列的可靠性。
2.2.TTL(实现延迟消费,但是新版本有延迟消息插件或者另外技术)
TTL,也就是Time-To-Live。一个队列中的消息如果超时未消费,则会变为死信,超时分为两种情况:
-
消息所在的队列设置了超时时间
-
消息本身设置了超时时间
2.2.1.接收超时死信的死信交换机
在consumer服务的SpringRabbitListener中,定义一个新的消费者,并且声明 死信交换机、死信队列:
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "dl.ttl.queue", durable = "true"), exchange = @Exchange(name = "dl.ttl.direct"), key = "ttl" )) public void listenDlQueue(String msg){ log.info("接收到 dl.ttl.queue的延迟消息:{}", msg); }
2.2.2.声明一个队列,并且指定TTL
要给队列设置超时时间,需要在声明队列时配置x-message-ttl属性:
@Bean public Queue ttlQueue(){ return QueueBuilder.durable("ttl.queue") // 指定队列名称,并持久化 .ttl(10000) // 设置队列的超时时间,10秒 .deadLetterExchange("dl.ttl.direct") // 指定死信交换机 .build(); }
注意,这个队列设定了死信交换机为dl.ttl.direct
声明交换机,将ttl与交换机绑定:
@Bean public DirectExchange ttlExchange(){ return new DirectExchange("ttl.direct"); } @Bean public Binding ttlBinding(){ return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl"); }
发送消息,但是不要指定TTL:
@Test public void testTTLQueue() { // 创建消息 String message = "hello, ttl queue"; // 消息ID,需要封装到CorrelationData中 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // 发送消息 rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData); // 记录日志 log.debug("发送消息成功"); }
发送消息的日志:
2.2.3.发送消息时,设定TTL
在发送消息时,也可以指定TTL:
@Test public void testSendTTLMessage() throws InterruptedException { // 1.消息体 // String msg = "超时消息..."; Message msg = MessageBuilder .withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8)) .setExpiration("5000") .build(); // 2.发送消息 rabbitTemplate.convertAndSend("ttl.direct","ttl", msg); log.info("发送消息成功..."); } }
这次,发送与接收的延迟只有5秒。说明当队列、消息都设置了TTL时,任意一个到期就会成为死信。
2.2.4.总结
消息超时的两种方式是?
-
给队列设置ttl属性,进入队列后超过ttl时间的消息变为死信
-
给消息设置ttl属性,队列接收到消息超过ttl时间后变为死信
如何实现发送一个消息20秒后消费者才收到消息?
-
给消息的目标队列指定死信交换机
-
将消费者监听的队列绑定到死信交换机
-
发送消息时给消息设置超时时间为20秒
2.3.延迟队列
利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。
延迟队列的使用场景包括:
-
延迟发送短信
-
用户下单,如果用户在15 分钟内未支付,则自动取消
-
预约工作会议,20分钟后自动通知所有参会人员
因为延迟队列的需求非常多,所以RabbitMQ的官方也推出了一个插件,原生支持延迟队列效果。
这个插件就是DelayExchange插件。参考RabbitMQ的插件列表页面:Community Plugins — RabbitMQ
2.3.1.安装DelayExchange插件
大家可以去对应的GitHub页面下载3.8.9版本的插件,地址为Release v3.8.9 · rabbitmq/rabbitmq-delayed-message-exchange · GitHub这个对应RabbitMQ的3.8.5以上版本。
2.2.上传插件
因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。如果不是基于Docker的同学,请参考第一章部分,重新创建Docker容器。
我们之前设定的RabbitMQ的数据卷名称为mq-plugins
,所以我们使用下面命令查看数据卷:
docker volume inspect mq-plugins
可以得到下面结果:
2.3.安装插件
最后就是安装了,需要进入MQ容器内部来执行安装。我的容器名为mq
,所以执行下面命令:
docker exec -it mq bash
执行时,请将其中的 -it
后面的mq
替换为你自己的容器名.
进入容器内部后,执行下面命令开启插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
结果如下:
这样就安装好了
2.3.3.使用DelayExchange
插件的使用也非常简单:声明一个交换机,交换机的类型可以是任意类型,只需要设定delayed属性为true即可,然后声明队列与其绑定即可。
1)声明DelayExchange交换机
基于注解方式(推荐):
2)发送消息
发送消息时,一定要携带x-delay属性,指定延迟的时间:
2.3.4.总结
延迟队列插件的使用步骤包括哪些?
•声明一个交换机,添加delayed属性为true
•发送消息时,添加x-delay头,值为超时时间
3.惰性队列
3.1.消息堆积问题
当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。之后发送的消息就会成为死信,可能会被丢弃,这就是消息堆积问题。
解决消息堆积思路:
-
增加更多消费者,提高消费速度。也就是我们之前说的work queue模式
-
在消费者内开启线程池加快消息处理速度
-
扩大队列容积,提高堆积上限
要提升队列容积,把消息保存在内存中显然是不行的。
3.2.惰性队列
从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。惰性队列的特征如下:
-
接收到消息后直接存入磁盘而非内存
-
消费者要消费消息时才会从磁盘中读取并加载到内存
-
支持数百万条的消息存储
3.2.1.基于命令行设置lazy-queue
而要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。可以通过命令行将一个运行中的队列修改为惰性队列:
rabbitmqctl set_policy Lazy "^simple.queue$" '{"queue-mode":"lazy"}' --apply-to queues
命令解读:
-
rabbitmqctl
:RabbitMQ的命令行工具 -
set_policy
:添加一个策略 -
Lazy
:策略名称,可以自定义 -
"^lazy-queue$"
:用正则表达式匹配队列的名字 -
'{"queue-mode":"lazy"}'
:设置队列模式为lazy模式 -
--apply-to queues
:策略的作用对象,是所有的队列
3.2.2.基于@Bean声明lazy-queue
3.2.3.基于@RabbitListener声明LazyQueue
3.3.总结
消息堆积问题的解决方案?
-
队列上绑定多个消费者,提高消费速度
-
使用惰性队列,可以再mq中保存更多消息
惰性队列的优点有哪些?
-
基于磁盘存储,消息上限高
-
没有间歇性的page-out,性能比较稳定
惰性队列的缺点有哪些?
-
基于磁盘存储,消息时效性会降低
-
性能受限于磁盘的IO