目录:
1.前言
2.生产者
3.数据持久化
4.消费者
5.死信队列
1.前言
RabbitMQ 是一款高性能、高可靠性的消息中间件,广泛应用于分布式系统中。它允许系统中的各个模块进行异步通信,提供了高度的灵活性和可伸缩性。然而,这种通信模式也带来了一些挑战,其中最重要的之一是确保消息的可靠性。
影响消息可靠性的因素主要有以下几点:
- 发送消息时连接RabbitMQ失败
- 发送时丢失:
- 生产者发送的消息未送达交换机;
- 消息到达交换机后未到达队列;
- MQ 宕机,队列中的消息会丢失;
- 消费者接收到消息后未消费就宕机了。
2.生产者
2.1.生产者重连机制
生产者发送消息时,出现了网络故障,导致与MQ的连接中断。为了解决这个问题,RabbitMQ提供的消息发送时的重连机制。即:当RabbitTemplate
与MQ连接超时后,多次重试。
在生产者yml文件添加配置开启重连机制
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
max-attempts: 3 # 最大重试次数
当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。但是RabbitMQ提供的重试机制是阻塞式的重试。 如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,就需要合理配置等待时长和重试次数,或者使用异步线程来执行发送消息的代码
2.2.生产者确认机制
RabbitMQ的生产者确认机制(Publisher Confirm)是一种确保消息从生产者发送到MQ过程中不丢失的机制。当消息发送到 RabbitMQ 后,系统会返回一个结果给消息的发送者,表明消息的处理状态。这个结果有两种可能的值:
返回结果有两种方式:
- publisher-confirm(发送者确认)
- 消息成功投递到交换机,返回ACK。
- 消息未投递到交换机,返回NACK。(可能是由于网络波动未能连接到RabbitMQ,可利用生产者重连机制解决)
- publisher-return(发送者回执)
- 消息投递到交换机了,但是没有路由到队列。返回ACK和路由失败原因。(这种问题一般是因为路由键设置错误,可以人为规避)
通过这种机制,生产者在发送消息后获取返回的回执结果,从而采取对应的策略,如消息重发或记录失败信息。
3.数据持久化
3.1.配置持久化
在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题
- RabbitMQ宕机,存在内存中的消息会丢失。
- 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞。
为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。RabbitMQ可以通过配置数据持久化,从而将消息保存在磁盘,包括:
- 交换机持久化(确保RabbitMQ重启后交换机仍然存在)
- 队列持久化(确保RabbitMQ重启后队列仍然存在)
- 消息持久化(确保RabbitMQ重启后队列中的消息仍然存在)
由于Spring会在创建队列时默认将交换机和队列设置为持久化,发送消息时也默认指定消息为持久化消息,因此不需要额外配置。
// 将消息指定为持久化消息
Message message = MessageBuilder
.withBody("hello".getBytes(standardcharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
// 给队列发送消息
rabbitTemplate.convertAndSend("simple.queue", message);
3.2.惰性队列
从RabbitMQ的3.6.0版本开始,就增加了Lazy Queue
的概念,也就是惰性队列。
在3.12版本后,所有队列都是Lazy Queue模式,无法更改。
惰性队列的特点如下:
-
接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条)
-
消费者要消费消息时才会从磁盘中读取并加载到内存
-
支持数百万条的消息存储
对于低于3.12版本的情况,可以使用注解的arguments来指定
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "grade.queue", durable = "true"),
exchange = @Exchange(name = "intel.topic", type = ExchangeTypes.TOPIC),
key = "intel.grade",
arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
3.3.为什么需要数据持久化?
数据持久化在 RabbitMQ 中有以下重要作用:
队列和交换机的持久化:
- 防止重启后丢失:将队列和交换机设置为持久化,可以防止 RabbitMQ 服务器重启后丢失这些队列和交换机,确保它们的存在和绑定关系保持不变。
消息的持久化:
- 安全性:
- 防止数据丢失:消息持久化后,可以防止 RabbitMQ 服务器重启或宕机时数据丢失,方便数据恢复,保证消息的可靠性和耐久性。
- 性能:
- 内存管理:未持久化的临时消息默认存储在内存中。内存空间有限,大量消息涌入时会导致内存占满,系统需要进行
page out
操作将消息写入磁盘。频繁的page out
操作会严重影响性能。 - 预防内存溢出:通过持久化消息,可以缓解内存压力,防止因内存溢出导致的系统性能问题和崩溃。
- 内存管理:未持久化的临时消息默认存储在内存中。内存空间有限,大量消息涌入时会导致内存占满,系统需要进行
4.消费者
4.1.消费者确认机制
为了确认消费者是否正确处理了消息,RabbitMQ提供了消费者确认机制。当消费者处理消息后,会返回回执信息给RabbitMQ。回执有三种值:
- ack:消息处理成功,RabbitMQ从队列中删除消息。
- nack:消息处理失败,RabbitMQ需要再次投递消息。
- reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除消息。
在SpringBoot项目中,我们可以通过配置文件选择回执信息的处理方式,一共有三种处理方式:
-
none:不处理。RabbitMQ 假定消费者获取消息后会一定会成功处理,因此消息投递后立即返回
ack
,将消息从队列中删除。 -
manual:手动模式。需要在业务代码结束后,调用SpringAMQP提供的API发送
ack
或reject
,存在代码侵入问题,但比较灵活。 -
auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑进行了环绕增强,返回结果如下:
-
如果消费者正常处理消息,自动返回
ack
并删除队列的消息。 -
如果消费者消息处理失败,自动返回
nack
并重新向消费者投递消息。 -
如果消息校验异常,自动返回
reject
并删除队列中的消息。
-
注意: 手动模式返回回执消息时通常需要显式指定
requeue
参数,当requeue=true
时,表明消息需要重新入队;当requeue=false
时,RabbitMQ将从队列删除消息。
spring:
rabbitmq:
listener:
simple:
prefetch: 1
acknowledge-mode: auto # none,关闭ack;manual,手动ack;auto,自动 ack
4.2.消息失败重试机制
当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue无限循环,导致mq的消息处理飙升,带来不必要的压力。
可以通过设置yml文件开启失败重试机制,在消息异常时利用本地重试,而不是无限制的进行requeue操作。
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000 # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false 有状态。如果业务中包含事务,这里改为 false
4.3.消息失败处理策略
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有 MessageRecoverer 接口来处理,它包含三种不同的实现:
- RejectAndDontRequeueRecoverer:重试次数耗尽后,直接
reject
,丢弃消息,这是默认采取的方式; - ImmediateRequeueMessageRecoverer:重试次数耗尽后,返回
nack
,消息重新入队; - RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机。
5.死信队列
尽管通过以上设置可以确保消息在生产者、消息队列和消费者之间的传递过程中不会丢失,但在某些情况下,消费者仍可能无法成功处理消息(如消息重试次数耗尽后仍无法被消费)。这时候,我们需要一个机制来妥善处理这些无法被正常消费的消息。死信队列便是用于解决这一问题的兜底机制。
5.1.死信
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
- 消息被拒绝: 当消费者明确拒绝一个消息并且设置不再重新入队(requeue=false)时,这个消息会被标记为死信。
- 消息过期: 每个消息或队列可以设置一个TTL(Time-To-Live),即消息的存活时间。如果消息在队列中停留的时间超过了这个TTL,消息会被认为过期,并被转移到死信队列。
- 队列达到最大长度: 如果队列设置了最大长度并且达到了这个限制,那么新进入的消息会被转移到死信队列中。
5.2.创建死信队列
5.2.1.创建死信交换机和死信队列
正常使用注解,创建交换机和队列即可
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dead.queue", durable = "true",
arguments = @Argument(name = "x-queue-mode", value = "lazy")),
exchange = @Exchange(name = "dead.exchange", type = ExchangeTypes.TOPIC),
key = "dead.key"
))
public void deadLetterQueue(String msg) {
System.out.println("您的消息已经死亡:" + msg);
}
5.2.2.绑定死信交换机
如果队列通过dead-letter-exchange
属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)
可以通过@Argument
注解指定死信交互机和路由键,如下。
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "simple.queue", durable = "true",
arguments = {
@Argument(name = "x-queue-mode", value = "lazy"),
@Argument(name = "x-dead-letter-exchange", value = "dead.exchange"),
@Argument(name = "x-dead-letter-routing-key", value = "dead.key")
}),
exchange = @Exchange(name = "simple.topic",
type = ExchangeTypes.TOPIC),
key = "simple.key"
))