1. 消息确认机制
1.1 介绍
我们可以看到RabbitMQ的消息流转图:
当消息从Broker投递给消费者的时候会存在以下两种情况:
- consumer消费消息成功
- consumer消费消息异常
如果说RabbitMQ在每次将消息投递给消费者的时候就将消息从Broker中删除,此时如果消息处理异常,就会造成消息丢失的情况!因此RabbitMQ提供了消息确认机制(Message Acknowledge),消费者可以设置autoAck参数来进行确认:
- 自动确认:当设置autoAck参数为true时,RabbitMQ就会将自己发送出去的消息置为确认,并从内存和硬盘上移除,不管消费者是否消费消息成功,适用于消息可靠性要求不高的场景
- 手动确认:当设置autoAck参数为false时,RabbitMQ会等待消费者显示调用Basic.Ack命令,如果确认消费成功则进行删除消息操作,适用于消息可靠性较高的场景
当autoAck参数设置为false的时候,消息会被分为两部分:一部分是等待进行投递的消息,另一部分是已经投递但是还没有等到消费者回复的消息,其结构如下:
从RabbitMQ的Web管理平台也可以看到这两种状态:
1.2 手动确认方法
消费者在收到消息之后,可以进行确认应答,也可以进行拒绝确认,RabbitMQ也提供的不同的确认方法API,在消费者端可以使用channel
的以下三种不同API进行应答:
- 肯定应答:
channel.basicAck(long deliveryTag, boolean multiple)
表示消息已经被消费者正确处理,通知RabbitMQ可以将消息进行移除了
参数说明:
deliveryTag
:是消息的唯一标识,是一个64位递增的长整数,该参数由每个channel进行单独维护,即在每个channel内部deliveryTag是不重复的multiple
:是否进行批量确认,在某些情况下为了减少网络传输带宽,可以对连续的多个deliveryTag进行批量确认,当值设置为true的时候则会将ack<=deliveryTag的消息全部确认;如果值设置为false则只会将对应deliveryTag的消息进行确认
- 否定确认:
channel.basicReject(long deliveryTag, boolean requeue)
表示消费者拒绝该消息
参数说明:
deliveryTag
:参考basicAckrequeue
:表示拒绝该消息之后该消息如何处理,如果设置为true,则RabbitMQ会重新将该消息放入队列,以便投递给下一个订阅的消费者;如果设置为false,则RabbitMQ会将该消息从队列中移除
- 否定确认:
channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
表示消费者拒绝该消息,并且可以批量拒绝消息
参数说明(参考上方)
1.3 代码演示
下面我们基于Spring-AMQP演示消息的确认机制,该确认机制有三种模式可以配置(需要注意与上述client模式有些不同):
1.3.1 NONE模式
该模式类似于上述讲的自动确认模式:即只要Broker将消息投递给消费者就会删除队列中的消息,而不管消费者有没有消费成功,可能会造成消息丢失场景!
- 配置确认机制为NONE模式:
spring:
application:
name: mq-advanced
rabbitmq:
username: guest
password: guest
host: 127.0.0.1
port: 5672
virtual-host: springboot-mq
listener:
simple:
acknowledge-mode: NONE # NONE模式
- 发送消息
@RequestMapping("/none")
public String testNone() {
rabbitTemplate.convertAndSend("", QueueConstant.ACK_QUEUE, "test none mode");
return "消息发送成功!";
}
- 监听消息
@Component
public class AckListener {
@RabbitListener(queues = QueueConstant.ACK_QUEUE)
public void ackListener(Message message, Channel channel) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
String body = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("接收到消息: " + body + " deliveryTag: " + deliveryTag);
System.out.println("开始处理消息...");
int ret = 3 / 0;
System.out.println("消息处理完毕...");
}
}
此时就会出现以下情况:在消费者中抛出异常,但是RabbitMQ中消息已经丢失!
1.3.2 AUTO模式(默认)
该模式作用如下:
- 当消费者业务代码处理正常时就会对消息进行确认
- 但是如果消费者业务代码中抛出了异常,就会对消息进行否定确认并重新投递
- 配置确认机制为AUTO模式:
spring:
application:
name: mq-advanced
rabbitmq:
username: guest
password: guest
host: 127.0.0.1
port: 5672
virtual-host: springboot-mq
listener:
simple:
acknowledge-mode: AUTO # AUTO模式
- 发送消息
@RequestMapping("/none")
public String testNone() {
rabbitTemplate.convertAndSend("", QueueConstant.ACK_QUEUE, "test none mode");
return "消息发送成功!";
}
- 监听消息
@Component
public class AckListener {
@RabbitListener(queues = QueueConstant.ACK_QUEUE)
public void ackListener(Message message, Channel channel) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
String body = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("接收到消息: " + body + " deliveryTag: " + deliveryTag);
System.out.println("开始处理消息...");
int ret = 3 / 0;
System.out.println("消息处理完毕...");
}
}
此时就会出现以下情况:在消费者中抛出异常,但是消息不会丢失,而是源源不断投递给可用的消费者!
1.3.3 MANUAL模式
该模式就可以进行手动确认:
- 配置确认机制为MANUAL模式:
spring:
application:
name: mq-advanced
rabbitmq:
username: guest
password: guest
host: 127.0.0.1
port: 5672
virtual-host: springboot-mq
listener:
simple:
acknowledge-mode: MANUAL # MANUAL模式
- 发送消息
@RequestMapping("/none")
public String testNone() {
rabbitTemplate.convertAndSend("", QueueConstant.ACK_QUEUE, "test none mode");
return "消息发送成功!";
}
- 监听消息
@Component
public class AckListener {
@RabbitListener(queues = QueueConstant.ACK_QUEUE)
public void ackListener(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
String body = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("接收到消息: " + body + " deliveryTag: " + deliveryTag);
System.out.println("开始处理消息...");
int ret = 3 / 0;
System.out.println("消息处理完毕...");
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
channel.basicReject(deliveryTag, true);
}
}
}
此时就会出现以下情况:如果处理成功,就会进行basicAck肯定确认,但是如果捕获到了异常就进行拒绝确认,并将消息重新入队投递给下一个消费者使用!
2. 持久化机制
2.1 介绍
我们再次回看RabbitMQ的消息流转图:
前面我们通过消息确认机制保证了Broker能够将消息可靠地投递给Consumer消费者端,但是现在还存在一个问题:当消息存储在Broker中,但是RabbitMQ服务器遇到断电重启的情况如何保证将消息恢复呢?RabbitMQ就提供了 持久化机制 ,在RabbitMQ中有以下三种持久化:
- 队列持久化
- 交换机持久化
- 消息持久化
2.2 队列持久化
队列的持久化是通过在声明队列的时候设置参数durable
为true实现的
- 如果队列不进行持久化,那么在重启的时候关于队列的元数据信息就会丢失(此时哪怕消息进行了持久化也无法恢复消息了,因为消息保存在队列中)
- 如果将队列设置为持久化,此时队列相关的元数据就可以从硬盘上进行恢复,但是并不能保证内部的消息不丢失,如果想要让消息不丢失,还需要设置消息的持久化
我们之前所创建队列的代码默认设置为持久化:
/**
* 声明持久化队列
*/
@Bean("persistQueue")
public Queue persistQueue() {
return QueueBuilder
.durable(QueueConstant.PERSIST_QUEUE)
.build();
}
追踪durable
方法源码:
继续追踪setDurable
方法源码可以发现默认是进行持久化的!
如果我们想要设置队列为非持久化,可以使用如下代码:
/**
* 声明非持久化队列
*/
@Bean("nonPersistQueue")
public Queue nonPersistQueue() {
return QueueBuilder
.nonDurable(QueueConstant.NON_PERSIST_QUEUE)
.build();
}
2.3 交换机持久化
交换机的持久化是通过在声明交换机的时候设置参数durable
为true实现的
同队列一样,只有设置为持久化,才会将有关交换机的元数据信息保存在硬盘上,在重启RabbitMQ服务器的时候才会读取然后恢复交换机数据信息,我们可以通过在声明交换机的时候设置durable(true | false)
显示声明是否持久化:
/**
* 声明持久化交换机
*/
@Bean("persistDirectExchange")
public DirectExchange persistDirectExchange() {
return ExchangeBuilder
.directExchange(ExchangeConstant.PERSIST_EXCHANGE)
.durable(true)
.build();
}
/**
* 声明非持久化交换机
*/
@Bean("nonPersistDirectExchange")
public DirectExchange nonPersistDirectExchange() {
return ExchangeBuilder
.directExchange(ExchangeConstant.NON_PERSIST_EXCHANGE)
.durable(false)
.build();
}
2.4 消息持久化
如果想要让消息进行持久化,我们就需要设置消息的投递模式MessageProperties.deliveryMode
为PERSISITENT
,使用RabbitTemplate发送持久化消息代码如下:
@RestController
public class PersistController {
@Resource
private RabbitTemplate rabbitTemplate;
@RequestMapping("/persist")
public String sendPersist() {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message message = new Message("persist info".getBytes(), messageProperties);
rabbitTemplate.convertAndSend(ExchangeConstant.PERSIST_EXCHANGE, "persist", message);
return "发送成功!";
}
}
如果想要设置消息的不持久化,则对应代码如下:
@RequestMapping("/nonPersist")
public String sendNonPersist() {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
Message message = new Message("non-persist info".getBytes(), messageProperties);
rabbitTemplate.convertAndSend(ExchangeConstant.NON_PERSIST_EXCHANGE, "non-persist", message);
return "发送成功!";
}
3. 发送方确认机制
3.1 介绍
我们再次回看RabbitMQ的消息流转图:
现在我们通过消息确认机制保证从Broker到Consumer链路上消息可靠性,通过持久化机制保证Broker内部消息可靠性,但是此时还存在着问题:如果说消息在生产者投递给Broker过程中由于网络等问题导致消息丢失、或者Broker处于重启等服务不可用状态该怎么办呢?即生产者如何保证消息能够可靠到达RabbitMQ服务器?
RabbitMQ为了解决这个问题,提供了以下两种机制:
- 事务机制(性能较低,此处不介绍)
- 发送方确认机制(Publisher Confirm)
在发送方确认机制中,可以配置以下两种模式:
- confirm确认模式:
确认模式指的是在发送者发送消息时设置一个ConfirmCallback的监听器,无论消息是否到达对应的Exchange,这个监听都会执行。如果消息到达对应的Exchange,则对应ACK参数为true,反之没有到达Exchange则ACK参数为false
- return回退模式
我们期待Exchange能够依据特定的路由规则将消息投递给对应的队列,但是如果设置的路由键错误或者队列不存在时导致消息迟迟没有投递给队列,此时我们希望可以将消息退回给生产者,退回模式指的是在发送者发送消息时设置一个ReturnsCallback的监听器对退回的消息进行处理
🔑 总结:确认模式和退回模式并不是互斥的,两者可以同时设置!确认模式主要解决的是保证消息可靠到达Exchange的问题,而退回模式保证的是消息可靠到达Queue的问题
3.2 代码演示
3.2.1 Confirm确认模式
配置步骤如下:
- 进行confirm模式配置
- 在发送方设置ConfirmCallback并发送消息
- 测试
接下来看实现步骤:
- 配置confirm模式开启
spring:
application:
name: mq-advanced
rabbitmq:
username: guest
password: guest
host: 127.0.0.1
port: 5672
virtual-host: springboot-mq
publisher-confirm-type: correlated # 开启发送者确认模式
- 声明队列与交换机
public interface ExchangeConstant {
String CONFIRM_EXCHANGE = "confirm.exchange";
}
public interface QueueConstant {
String CONFIRM_QUEUE = "confirm.queue";
}
@Configuration
public class RabbitMQConfig {
/**
* 声明发送者确认模式队列
*/
@Bean("confirmQueue")
public Queue confirmQueue() {
return QueueBuilder
.durable(QueueConstant.CONFIRM_QUEUE)
.build();
}
/**
* 声明发送者确认模式交换机
*/
@Bean("confirmExchange")
public DirectExchange confirmExchange() {
return ExchangeBuilder
.directExchange(ExchangeConstant.CONFIRM_EXCHANGE)
.durable(true)
.build();
}
/**
* 声明发送者确认模式交换机
*/
@Bean("confirmBinding")
public Binding confirmBinding(@Qualifier("confirmExchange") DirectExchange exchange, @Qualifier("confirmQueue") Queue queue) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with("confirm");
}
}
- 编写发送者代码
@Configuration
public class RabbitTemplateConfig {
@Bean("rabbitTemplate")
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
@Bean("confirmRabbitTemplate")
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 设置confirm回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("执行了confirm方法...");
if (b) {
// 到达交换机
System.out.println("消息id: " + (correlationData == null ? null : correlationData.getId()) + "到达交换机");
} else {
// 没有到达交换机
System.out.println("消息id: " + (correlationData == null ? null : correlationData.getId()) + "没有到达交换机, 原因是: " + s);
}
}
});
return rabbitTemplate;
}
}
@RestController
public class ConfirmController {
@Resource(name = "confirmRabbitTemplate")
private RabbitTemplate rabbitTemplate;
@RequestMapping("/confirm")
public String confirm() {
// 发送消息
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend(ExchangeConstant.CONFIRM_EXCHANGE, "confirm", "test confirm...", correlationData);
return "发送消息成功!";
}
}
- 测试接口
发现如果交换机名称设置正确则当消息到达交换机时回调被执行,我们尝试设置一个不存在的交换机名称查看现象:
此时就会走没有到达交换机的逻辑,此处就可以进行重新投递消息等业务逻辑!
💡 答疑解惑:为什么此处我们明确注入一个自己创建出来的RabbitTemplate,而不使用Spring提供的呢?有以下两点原因:
- 这是因为Spring默认配置Bean为单例的,因此如果使用Spring提供的RabbitTemplate设置回调函数则会影响其余接口同样使用回调
- 我们不能重复在controller层代码中重复多次调用setConfirmCallback回调,因为明确规定每个RabbitTemplate只能设置一次ConfirmCallback
3.2.2 Return退回模式
配置步骤如下:
- 进行return模式配置
- 在发送方设置setMandatory(true)表示进行退回
- 设置ReturnsCallback回调逻辑并发送消息
- 测试
接下来看实现步骤:
- 配置return模式开启(同confirm模式一致)
spring:
application:
name: mq-advanced
rabbitmq:
username: guest
password: guest
host: 127.0.0.1
port: 5672
virtual-host: springboot-mq
publisher-confirm-type: correlated # 开启发送者确认模式
- 设置ReturnsCallback回调逻辑并发送消息
@Configuration
public class RabbitTemplateConfig {
@Bean("rabbitTemplate")
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
@Bean("confirmRabbitTemplate")
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 设置confirm回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("执行了confirm方法...");
if (b) {
// 到达交换机
System.out.println("消息id: " + (correlationData == null ? null : correlationData.getId()) + "到达交换机");
} else {
// 没有到达交换机
System.out.println("消息id: " + (correlationData == null ? null : correlationData.getId()) + "没有到达交换机, 原因是: " + s);
}
}
});
// 设置return回调
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("收到回退消息: " + returnedMessage);
}
});
return rabbitTemplate;
}
}
@RestController
public class ConfirmController {
@Resource(name = "confirmRabbitTemplate")
private RabbitTemplate rabbitTemplate;
@RequestMapping("/confirm")
public String confirm() {
// 发送消息
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend(ExchangeConstant.CONFIRM_EXCHANGE, "confirm", "test confirm...", correlationData);
return "发送消息成功!";
}
@RequestMapping("/returns")
public String returns() {
// 发送消息
CorrelationData correlationData = new CorrelationData("2");
rabbitTemplate.convertAndSend(ExchangeConstant.CONFIRM_EXCHANGE, "confirm", "test return...", correlationData);
return "发送消息成功!";
}
}
- 下面进行测试(设置不存在的routingkey)
此时证明当消息长期存放在exchange中没有投递到queue的时候就会触发消息退回回调