RabbitMQ ⾼级特性
- 1. 消息确认
- 1.1 消息确认机制
- 1.2 代码示例
- 2. 持久化
- 2.1 交换机持久化
- 2.2 队列持久化
- 2.3 消息持久化
- 3. 发送⽅确认
- 3.1 confirm确认模式
- 3.2 return退回模式
- 3.3 问题: 如何保证RabbitMQ消息的可靠传输?
- 4. 重试机制
- 5. TTL
- 5.1 设置消息的TTL
- 5.2 设置队列的TTL
- 5.3 两者区别
- 6. 死信队列
- 6.1 死信的概念
- 6.2 代码示例
- 6.3 常⻅问题
- 7. 延迟队列
- 7.1 概念
- 7.2 应⽤场景
- 7.3 TTL+死信队列实现
- 存在问题
- 7.4 延迟队列插件
- 7.5 常⻅问题
- 8. 事务
- 9. 消息分发
- 9.1 概念
- 9.2 应⽤场景
- 9.2.1 限流
- 9.2.2 负载均衡
- 代码演示获取
1. 消息确认
1.1 消息确认机制
⽣产者发送消息之后, 到达消费端之后, 可能会有以下情况:
a. 消息处理成功
b. 消息处理异常
RabbitMQ向消费者发送消息之后, 就会把这条消息删掉, 那么第两种情况, 就会造成消息丢失
那么如何确保消费端已经成功接收了, 并正确处理了呢?
为了保证消息从队列可靠地到达消费者, RabbitMQ提供了消息确认机制(message acknowledgement)。
消费者在订阅队列时,可以指定 autoAck 参数, 根据这个参数设置, 消息确认机制分为以下两种:
- ⾃动确认: 当autoAck 等于true时, RabbitMQ 会⾃动把发送出去的消息置为确认, 然后从内存(或者磁盘)中删除, ⽽不管消费者是否真正地消费到了这些消息. ⾃动确认模式适合对于消息可靠性要求不⾼的场景.
- ⼿动确认: 当autoAck等于false时,RabbitMQ会等待消费者显式地调⽤Basic.Ack命令, 回复确认信号后才从内存(或者磁盘) 中移去消息. 这种模式适合对消息可靠性要求⽐较⾼的场景.
当autoAck参数置为false, 对于RabbitMQ服务端⽽⾔, 队列中的消息分成了两个部分:
⼀是等待投递给消费者的消息。
⼆是已经投递给消费者, 但是还没有收到消费者确认信号的消息。
如果RabbitMQ⼀直没有收到消费者的确认信号, 并且消费此消息的消费者已经断开连接, 则RabbitMQ会安排该消息重新进⼊队列,等待投递给下⼀个消费者,当然也有可能还是原来的那个消费者.
从RabbitMQ的Web管理平台上, 也可以看到当前队列中Ready状态和Unacked状态的消息数
Ready: 等待投递给消费者的消息数
Unacked: 已经投递给消费者, 但是未收到消费者确认信号的消息数
1.2 代码示例
链接
2. 持久化
我们在前⾯讲了消费端处理消息时, 消息如何不丢失, 但是如何保证当RabbitMQ服务停掉以后, ⽣产者发送的消息不丢失呢. 默认情况下, RabbitMQ 退出或者由于某种原因崩溃时, 会忽视队列和消息, 除⾮告知他不要这么做.
RabbitMQ的持久化分为三个部分:交换器的持久化、队列的持久化和消息的持久化.
2.1 交换机持久化
交换器的持久化是通过在声明交换机时是将durable参数置为true实现的.相当于将交换机的属性在服务器内部保存,当MQ的服务器发⽣意外或关闭之后,重启 RabbitMQ 时不需要重新去建⽴交换机, 交换机会⾃动建⽴,相当于⼀直存在.
如果交换器不设置持久化, 那么在 RabbitMQ 服务重启之后, 相关的交换机元数据会丢失, 对⼀个⻓期使⽤的交换器来说,建议将其置为持久化的.
ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build()
2.2 队列持久化
队列的持久化是通过在声明队列时将 durable 参数置为 true实现的.
如果队列不设置持久化, 那么在RabbitMQ服务重启之后,该队列就会被删掉, 此时数据也会丢失. (队列没有了, 消息也⽆处可存了)
队列的持久化能保证该队列本⾝的元数据不会因异常情况⽽丢失, 但是并不能保证内部所存储的消息不会丢失. 要确保消息不会丢失, 需要将消息设置为持久化.
咱们前⾯⽤的创建队列的⽅式都是持久化的
QueueBuilder.durable(Constant.ACK_QUEUE).build();
点进去看源码会发现,该⽅法默认durable 是true
通过下⾯代码,可以创建⾮持久化的队列
QueueBuilder.nonDurable(Constant.ACK_QUEUE).build();
2.3 消息持久化
消息实现持久化, 需要把消息的投递模式( MessageProperties 中的 deliveryMode )设置为2,也就是 MessageDeliveryMode.PERSISTENT
设置了队列和消息的持久化, 当 RabbitMQ 服务重启之后, 消息依旧存在. 如果只设置队列持久化, 重启之后消息会丢失. 如果只设置消息的持久化, 重启之后队列消失, 继⽽消息也丢失. 所以单单设置消息持久化⽽不设置队列的持久化显得毫⽆意义.
MessageProperties.PERSISTENT_TEXT_PLAIN 实际就是封装了这个属性
如果使⽤RabbitTemplate 发送持久化消息, 代码如下:
RabbitMQ默认情况下会将消息视为持久化的,除⾮队列被声明为⾮持久化,或者消息在发送时被标记为⾮持久化
将交换器、队列、消息都设置了持久化之后就能百分之百保证数据不丢失了吗? 答案是否定的.
- 从消费者来说, 如果在订阅消费队列时将autoAck参数设置为true, 那么当消费者接收到相关消息之后, 还没来得及处理就宕机了, 这样也算数据居丢失. 这种情况很好解决, 将autoAck参数设置为false, 并进⾏⼿动确认,详细可以参考[消息确认]章节.
- 在持久化的消息正确存⼊RabbitMQ之后,还需要有⼀段时间(虽然很短,但是不可忽视)才能存⼊磁盘中.RabbitMQ并不会为每条消息都进⾏同步存盘(调⽤内核的fsync⽅法)的处理, 可能仅仅保存到操作系统缓存之中⽽不是物理磁盘之中. 如果在这段时间内RabbitMQ服务节点发⽣了宕机、重启等异常情况, 消息保存还没来得及落盘, 那么这些消息将会丢失.
这个问题怎么解决呢?
- 引⼊RabbitMQ的仲裁队列, 如果主节点(master)在此特殊时间内挂掉, 可以⾃动切换到从节点(slave),这样有效地保证了⾼可⽤性, 除⾮整个集群都挂掉(此⽅法也不能保证100%可靠, 但是配置了仲裁队列要⽐没有配置仲裁队列的可靠性要⾼很多, 实际⽣产环境中的关键业务队列⼀般都会设置仲裁队列).
- 还可以在发送端引⼊事务机制或者发送⽅确认机制来保证消息已经正确地发送并存储⾄RabbitMQ中
3. 发送⽅确认
在使⽤ RabbitMQ的时候, 可以通过消息持久化来解决因为服务器的异常崩溃⽽导致的消息丢失, 但是还有⼀个问题, 当消息的⽣产者将消息发送出去之后, 消息到底有没有正确地到达服务器呢? 如果在消息到达服务器之前已经丢失(⽐如RabbitMQ重启, 那么RabbitMQ重启期间⽣产者消息投递失败), 持久化操作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化?
RabbitMQ为我们提供了两种解决⽅案:
a. 通过事务机制实现
b. 通过发送⽅确认(publisher confirm) 机制实现
事务机制⽐较消耗性能, 在实际⼯作中使⽤也不多, 咱们主要介绍confirm机制来实现发送⽅的确认.
RabbitMQ为我们提供了两个⽅式来控制消息的可靠性投递
- confirm确认模式
- return退回模式
3.1 confirm确认模式
Producer 在发送消息的时候, 对发送端设置⼀个ConfirmCallback的监听, ⽆论消息是否到达Exchange, 这个监听都会被执⾏, 如果Exchange成功收到, ACK( Acknowledge character , 确认字符)为true, 如果没收到消息, ACK就为false.
具体代码
关键代码
3.2 return退回模式
消息到达Exchange之后, 会根据路由规则匹配, 把消息放⼊Queue中. Exchange到Queue的过程, 如果⼀条消息⽆法被任何队列消费(即没有队列与消息的路由键匹配或队列不存在等), 可以选择把消息退回给发送者. 消息退回给发送者时, 我们可以设置⼀个返回回调⽅法, 对消息进⾏处理
在配置 RabbitTemplate 的时候要设置这两个属性
// 设置 return 模式
rabbitTemplate.setMandatory(true); // 只有设置为true, 才会回调ReturnCallback
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println("消息被退回: " + returned.getMessage());
}
});
回调函数中有⼀个参数: ReturnedMessage, 包含以下属性:
3.3 问题: 如何保证RabbitMQ消息的可靠传输?
先放⼀张RabbitMQ消息传递图
从这个图中, 可以看出, 消息可能丢失的场景以及解决⽅案:
- ⽣产者将消息发送到 RabbitMQ失败
a. 可能原因: ⽹络问题等
b. 解决办法: 参考本章节[发送⽅确认-confirm确认模式] - 消息在交换机中⽆法路由到指定队列
a. 可能原因: 代码或者配置层⾯错误, 导致消息路由失败
b. 解决办法: 参考本章节[发送⽅确认-return模式] - 消息队列⾃⾝数据丢失
a. 可能原因: 消息到达RabbitMQ之后, RabbitMQ Server 宕机导致消息丢失
b. 解决办法: 参考本章节[持久性]. 开启 RabbitMQ持久化, 就是消息写⼊之后会持久化到磁盘, 如果RabbitMQ 挂了, 恢复之后会⾃动读取之前存储的数据. (极端情况下, RabbitMQ还未持久化就挂了, 可能导致少量数据丢失, 这个概率极低, 也可以通过集群的⽅式提⾼可靠性) - 消费者异常, 导致消息丢失
a. 可能原因: 消息到达消费者, 还没来得及消费, 消费者宕机. 消费者逻辑有问题.
b. 解决办法: 参考本章节[消息确认]. RabbitMQ 提供了 消费者应答机制 来使 RabbitMQ 能够感知到消费者是否消费成功消息. 默认情况下消费者应答机制是⾃动应答的, 可以开启⼿动确认, 当消费者确认消费成功后才会删除消息, 从⽽避免消息丢失. 除此之外, 也可以配置重试机制(参考下⼀章节), 当消息消费异常时, 通过消息重试确保消息的可靠性
4. 重试机制
在消息传递过程中, 可能会遇到各种问题, 如⽹络故障, 服务不可⽤, 资源不⾜等, 这些问题可能导致消息处理失败. 为了解决这些问题, RabbitMQ 提供了重试机制, 允许消息在处理失败后重新发送.
但如果是程序逻辑引起的错误, 那么多次重试也是没有⽤的, 可以设置重试次数
5. TTL
TTL(Time to Live, 过期时间), 即过期时间. RabbitMQ可以对消息和队列设置TTL.
当消息到达存活时间之后, 还没有被消费, 就会被⾃动清除
5.1 设置消息的TTL
⽬前有两种⽅法可以设置消息的TTL.
⼀是设置队列的TTL, 队列中所有消息都有相同的过期时间. ⼆是对消息本⾝进⾏单独设置, 每条消息的TTL可以不同. 如果两种⽅法⼀起使⽤, 则消息的TTL以两者之间较⼩的那个数值为准.
5.2 设置队列的TTL
5.3 两者区别
设置队列TTL属性的⽅法, ⼀旦消息过期, 就会从队列中删除
设置消息TTL的⽅法, 即使消息过期, 也不会⻢上从队列中删除, ⽽是在即将投递到消费者之前进⾏判定的
为什么这两种⽅法处理的⽅式不⼀样?
因为设置队列过期时间, 队列中已过期的消息肯定在队列头部, RabbitMQ只要定期从队头开始扫描是否有过期的消息即可
⽽设置消息TTL的⽅式, 每条消息的过期时间不同, 如果要删除所有过期消息需要扫描整个队列, 所以不如等到此消息即将被消费时再判定是否过期, 如果过期再进⾏删除即可.
6. 死信队列
6.1 死信的概念
死信(dead message) 简单理解就是因为种种原因, ⽆法被消费的信息, 就是死信.
有死信, ⾃然就有死信队列. 当消息在⼀个队列中变成死信之后,它能被重新被发送到另⼀个交换器中,这个交换器就是DLX( Dead Letter Exchange ), 绑定DLX的队列, 就称为死信队列(DeadLetter Queue,简称DLQ).
消息变成死信⼀般是由于以下⼏种情况:
- 消息被拒绝( Basic.Reject/Basic.Nack ),并且设置 requeue 参数为 false.
- 消息过期.
- 队列达到最⼤⻓度
6.2 代码示例
队列和交换机的配置
@Configuration
public class DLXConfig {
// 死信交换机
@Bean("dlxExchange")
public Exchange dlxExchange() {
return ExchangeBuilder.directExchange(Constants.DLX_EXCHANGE_NAME).durable(true).build();
}
// 死信队列
@Bean("dlxQueue")
public Queue dlxQueue() {
return QueueBuilder.durable(Constants.DLX_QUEUE_NAME).build();
}
// 绑定
@Bean("dlxBind")
public Binding dlxBind(@Qualifier("dlxQueue") Queue queue, @Qualifier("dlxExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(Constants.DLX_ROUTING_KEY).noargs();
}
// 正常交换机
@Bean("normalExchange")
public Exchange normalExchange() {
return ExchangeBuilder
.directExchange(Constants.NORMAL_EXCHANGE_NAME)
.durable(true)
.build();
}
// 正常队列
@Bean("normalQueue")
public Queue normalQueue() {
return QueueBuilder
.durable(Constants.NORMAL_QUEUE_NAME)
.deadLetterExchange(Constants.DLX_EXCHANGE_NAME).deadLetterRoutingKey(Constants.DLX_ROUTING_KEY) // 正常队列绑定死信交换机
.ttl(10 * 1000).maxLength(10L) // 制造死信产⽣的条件: 10s后消息变成死信, 队列最多存10条消息
.build();
}
// 绑定
@Bean("normalBind")
public Binding normalBind(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(Constants.NORMAL_ROUTING_KEY).noargs();
}
}
6.3 常⻅问题
- 死信队列的概念
死信(Dead Letter)是消息队列中的⼀种特殊消息, 它指的是那些⽆法被正常消费或处理的消息. 在消息队列系统中, 如RabbitMQ, 死信队列⽤于存储这些死信消息
- 死信的来源
-
消息过期: 消息在队列中存活的时间超过了设定的TTL
-
消息被拒绝: 消费者在处理消息时, 可能因为消息内容错误, 处理逻辑异常等原因拒绝处理该消息. 如果拒绝时指定不重新⼊队(requeue=false), 消息也会成为死信.
-
队列满了: 当队列达到最⼤⻓度, ⽆法再容纳新的消息时, 新来的消息会被处理为死信.
- 死信队列的应⽤场景
对于RabbitMQ来说, 死信队列是⼀个⾮常有⽤的特性. 它可以处理异常情况下,消息不能够被消费者正确消费⽽被置⼊死信队列中的情况, 应⽤程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况, 进⽽可以改善和优化系统.
⽐如: ⽤⼾⽀付订单之后, ⽀付系统会给订单系统返回当前订单的⽀付状态
为了保证⽀付信息不丢失, 需要使⽤到死信队列机制. 当消息消费异常时, 将消息投⼊到死信队列中, 由订单系统的其他消费者来监听这个队列, 并对数据进⾏处理(⽐如发送⼯单等,进⾏⼈⼯确认).
场景的应⽤场景还有:
- 消息重试:将死信消息重新发送到原队列或另⼀个队列进⾏重试处理.
- 消息丢弃:直接丢弃这些⽆法处理的消息,以避免它们占⽤系统资源.
- ⽇志收集:将死信消息作为⽇志收集起来,⽤于后续分析和问题定位.
7. 延迟队列
7.1 概念
延迟队列(Delayed Queue),即消息被发送以后, 并不想让消费者⽴刻拿到消息, ⽽是等待特定时间后,消费者才能拿到这个消息进⾏消费.
7.2 应⽤场景
延迟队列的使⽤场景有很多, ⽐如:
- 智能家居: ⽤⼾希望通过⼿机远程遥控家⾥的智能设备在指定的时间进⾏⼯作. 这时候就可以将⽤⼾指令发送到延迟队列, 当指令设定的时间到了再将指令推送到智能设备.
- ⽇常管理: 预定会议后,需要在会议开始前⼗五分钟提醒参会⼈参加会议
- ⽤⼾注册成功后, 7天后发送短信, 提⾼⽤⼾活跃度等
RabbitMQ本⾝没有直接⽀持延迟队列的的功能, 但是可以通过前⾯所介绍的TTL+死信队列的⽅式组合模拟出延迟队列的功能.
假设⼀个应⽤中需要将每条消息都设置为10秒的延迟, ⽣产者通过 normal_exchange 这个交换器将发送的消息存储在 normal_queue 这个队列中. 消费者订阅的并⾮是 normal_queue 这个队列, ⽽是 dlx_queue 这个队列. 当消息从 normal_queue 这个队列中过期之后被存⼊ dlx_queue 这个
队列中,消费者就恰巧消费到了延迟10秒的这条消息.
7.3 TTL+死信队列实现
生产者
@RequestMapping("/delay")
public String delay() {
rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE_NAME,
Constants.NORMAL_ROUTING_KEY, "delay test 5s... " + new Date(), message -> {
message.getMessageProperties().setExpiration("5000");
return message;
});
rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE_NAME,
Constants.NORMAL_ROUTING_KEY, "delay test 10s... " + new Date(), message -> {
message.getMessageProperties().setExpiration("10000");
return message;
});
return "delay";
}
消费者
@RabbitListener(queues = Constants.DLX_QUEUE_NAME)
public void listenerDLXQueue(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("%tc 死信队列接收到消息: %s, deliveryTag: %d%n",
new Date(), new String(message.getBody(), StandardCharsets.UTF_8),
message.getMessageProperties().getDeliveryTag());
// 手动确认
channel.basicAck(deliveryTag, true);
}
运行结果
存在问题
接下来把⽣产消息的顺序修改⼀下
先发送20s过期数据, 再发送10s过期数据
运行结果
这时会发现: 10s过期的消息, 也是在20s后才进⼊到死信队列.
消息过期之后, 不⼀定会被⻢上丢弃. 因为RabbitMQ只会检查队⾸消息是否过期, 如果过期则丢到死信队列. 此时就会造成⼀个问题, 如果第⼀个消息的延时时间很⻓, 第⼆个消息的延时时间很短, 那第⼆个消息并不会优先得到执⾏.
所以在考虑使⽤TTL+死信队列实现延迟任务队列的时候, 需要确认业务上每个任务的延迟时间是⼀致的, 如果遇到不同的任务类型需要不同的延迟的话, 需要为每⼀种不同延迟时间的消息建⽴单独的消息队列.
7.4 延迟队列插件
安装
docker compose 安装队列插件的 rabbitmq
交换机和队列声明并绑定
生产者
消费者
运行结果
从结果可以看出, 使⽤延迟队列, 可以保证消息按照延迟时间到达消费者.
7.5 常⻅问题
介绍下RabbitMQ的延迟队列
延迟队列是⼀个特殊的队列, 消息发送之后, 并不⽴即给消费者, ⽽是等待特定的时间, 才发送给消费者.
延迟队列的应⽤场景有很多, ⽐如:
- 订单在⼗分钟内未⽀付⾃动取消
- ⽤⼾注册成功后, 3天后发调查问卷
- ⽤⼾发起退款, 24⼩时后商家未处理, 则默认同意, ⾃动退款
但RabbitMQ本⾝并没直接实现延迟队列, 通常有两种⽅法:
- TTL+死信队列组合的⽅式
- 使⽤官⽅提供的延迟插件实现延迟功能
⼆者对⽐:
- 基于死信实现的延迟队列
a. 优点: 1) 灵活不需要额外的插件⽀持
b. 缺点: 1) 存在消息顺序问题 2) 需要额外的逻辑来处理死信队列的消息, 增加了系统的复杂性 - 基于插件实现的延迟队列
a. 优点: 1) 通过插件可以直接创建延迟队列, 简化延迟消息的实现. 2) 避免了DLX的时序问题
b. 缺点: 1) 需要依赖特定的插件, 有运维⼯作 2) 只适⽤特定版本
8. 事务
RabbitMQ是基于AMQP协议实现的, 该协议实现了事务机制, 因此RabbitMQ也⽀持事务机制. Spring AMQP也提供了对事务相关的操作. RabbitMQ事务允许开发者确保消息的发送和接收是原⼦性的, 要么全部成功, 要么全部失败.
配置类
生产者
9. 消息分发
9.1 概念
RabbitMQ队列拥有多个消费者时, 队列会把收到的消息分派给不同的消费者. 每条消息只会发送给订阅列表⾥的⼀个消费者. 这种⽅式⾮常适合扩展, 如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可.
默认情况下, RabbitMQ是以轮询的⽅法进⾏分发的, ⽽不管消费者是否已经消费并已经确认了消息. 这种⽅式是不太合理的, 试想⼀下, 如果某些消费者消费速度慢, ⽽某些消费者消费速度快, 就可能会导致某些消费者消息积压, 某些消费者空闲, 进⽽应⽤整体的吞吐量下降.
如何处理呢? 我们可以使⽤前⾯章节讲到的channel.basicQos(int prefetchCount) ⽅法, 来限制当前信道上的消费者所能保持的最⼤未确认消息的数量
⽐如: 消费端调⽤了 channelbasicQos(5) , RabbitMQ会为该消费者计数, 发送⼀条消息计数+1, 消费⼀条消息计数-1, 当达到了设定的上限, RabbitMQ就不会再向它发送消息了,直到消费者确认了某条消息.类似TCP/IP中的"滑动窗⼝".
prefetchCount 设置为0时表⽰没有上限.
basicQos 对拉模式的消费⽆效
9.2 应⽤场景
消息分发的常⻅应⽤场景有如下:
- 限流
- ⾮公平分发
9.2.1 限流
如下使⽤场景:
订单系统每秒最多处理5000请求, 正常情况下, 订单系统可以正常满⾜需求
但是在秒杀时间点, 请求瞬间增多, 每秒1万个请求, 如果这些请求全部通过MQ发送到订单系统, ⽆疑会把订单系统压垮.
RabbitMQ提供了限流机制, 可以控制消费端⼀次只拉取N个请求
通过设置prefetchCount参数, 同时也必须要设置消息应答⽅式为⼿动应答
prefetchCount: 控制消费者从队列中预取(prefetch)消息的数量, 以此来实现流控制和负载均衡.
9.2.2 负载均衡
我们也可以⽤此配置,来实现"负载均衡"
如下图所⽰, 在有两个消费者的情况下,⼀个消费者处理任务⾮常快, 另⼀个⾮常慢,就会造成⼀个消费者会⼀直很忙, ⽽另⼀个消费者很闲. 这是因为 RabbitMQ 只是在消息进⼊队列时分派消息. 它不考虑消费者未确认消息的数量.
我们可以使⽤设置prefetch=1 的⽅式, 告诉 RabbitMQ ⼀次只给⼀个消费者⼀条消息, 也就是说, 在处理并确认前⼀条消息之前, 不要向该消费者发送新消息. 相反, 它会将它分派给下⼀个不忙的消费者.
消费者
运行结果
deliveryTag 有重复是因为两个消费者使⽤的是不同的Channel, 每个 Channel 上的deliveryTag 是独⽴计数的.
代码演示获取
代码演示