RabbitMQ 消息可靠保障
- 消息的可靠性保证
- 生产端到交换机和队列的可靠性保障
- 解决思路A-确认机制
- 解决思路B-备份交换机
- MQ 服务器宕机导致消息丢失
- 消费端消息的可靠性保障
- 消费端限流
消息的可靠性保证
实际项目中 MQ 的流程一般是:生产端把消息路由到交换机,然后由交换机把消息发送到队列,接着就是消费端拿到消息进行消费,这三个过程都有可能造成消息的不稳定,导致不可靠
生产端到交换机和队列的可靠性保障
解决思路A-确认机制
在生产端进行确认,具体操作中会分别针对交换机和队列来确认,如果没有成功发送的队列服务器上,那就可以尝试重新发送
首先需要增加下列配置
spring.rabbitmq.host=192.168.133.128
spring.rabbitmq.port=5672
spring.rabbitmq.password=admin
spring.rabbitmq.username=admin
spring.rabbitmq.virtual-host=/
spring.rabbitmq.listener.type=simple
spring.rabbitmq.publisher-confirm-type=CORRELATED #交换机的确认
spring.rabbitmq.publisher-returns=true #队列的确认
这里 publisher-confirm-type 有三种模式可选:
- none:关闭 confirm 机制
- simple:同步阻塞等待MQ的回执消息
- CORRELATED:MQ异步回调方式返回回执消息
接下来增加一个Rabbit 配置
@Configuration
@Slf4j
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// 消息发送到交换机成功或失败时调用此方法
log.info("confirm函数打印correlationData:" + correlationData);
log.info("confirm函数打印ack:" + ack);
log.info("confirm函数打印cause:" + cause);
}
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
// 发送队列失败时才调用此方法
log.info("消息主体:" + new String(returnedMessage.getMessage().getBody()));
log.info("应答码:" + returnedMessage.getReplyCode());
log.info("描述:" + returnedMessage.getReplyText());
log.info("交换机:" + returnedMessage.getExchange());
log.info("路由key:" + returnedMessage.getRoutingKey());
}
}
为了方便测试,定义一个交换机和队列吧并声明它们的绑定关系
public class RabbitMQConfig {
//定义一个交换机已及 routingkey,用来测试消息的可靠传递测试
public static final String NORMAL_EXCHANGE = "normal.demo.exchange";
public static final String NORMAL_ROUTING_KEY = "normal.demo.routingkey";
public static final String NORMAL_QUEUE = "normal.demo.queue";
@Bean
public DirectExchange normalExchange() {
return new DirectExchange(NORMAL_EXCHANGE);
}
@Bean
public Queue normalQueue() {
return new Queue(NORMAL_QUEUE);
}
@Bean
public Binding normalBinding(@Qualifier("normalQueue") Queue queue,
@Qualifier("normalExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(NORMAL_ROUTING_KEY);
}
}
在 controller 层测试
@RestController
@RequestMapping("/rabbit")
public class RabbitController {
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/test")
public String test(){
rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE, RabbitMQConfig.NORMAL_ROUTING_KEY, "Message Test Confirm~~");
return "success.";
}
}
消息成功路由到交换机,然后交换机发送到队列,正常情况下是下面的输出
2024-08-17T17:08:35.105+08:00 INFO 2484 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.133.128:5672]
2024-08-17T17:08:35.149+08:00 INFO 2484 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#59f45950:0/SimpleConnection@3542faa [delegate=amqp://admin@192.168.133.128:5672/, localPort=63927]
2024-08-17T17:08:35.204+08:00 INFO 2484 --- [nectionFactory2] com.example.demo.config.RabbitConfig : confirm函数打印correlationData:null
2024-08-17T17:08:35.204+08:00 INFO 2484 --- [nectionFactory2] com.example.demo.config.RabbitConfig : confirm函数打印ack:true
2024-08-17T17:08:35.205+08:00 INFO 2484 --- [nectionFactory2] com.example.demo.config.RabbitConfig : confirm函数打印cause:null
现在改一下,模拟路由交换机失败的场景,注意没有 RabbitMQConfig.NORMAL_EXCHANGE+"~",
这个交换机
rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE+"~", RabbitMQConfig.NORMAL_ROUTING_KEY, "Message Test Confirm~~");
重新测试,此时输出如下,错误提示的还是很明显的
2024-08-17T17:11:39.981+08:00 INFO 15864 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.133.128:5672]
2024-08-17T17:11:40.037+08:00 INFO 15864 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#59f45950:0/SimpleConnection@568384f7 [delegate=amqp://admin@192.168.133.128:5672/, localPort=64266]
2024-08-17T17:11:40.074+08:00 ERROR 15864 --- [68.133.128:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'normal.demo.exchange~' in vhost '/', class-id=60, method-id=40)
2024-08-17T17:11:40.076+08:00 INFO 15864 --- [nectionFactory3] com.example.demo.config.RabbitConfig : confirm函数打印correlationData:null
2024-08-17T17:11:40.076+08:00 INFO 15864 --- [nectionFactory3] com.example.demo.config.RabbitConfig : confirm函数打印ack:false
2024-08-17T17:11:40.077+08:00 INFO 15864 --- [nectionFactory3] com.example.demo.config.RabbitConfig : confirm函数打印cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'normal.demo.exchange~' in vhost '/', class-id=60, method-id=40)
现在再模拟交换机发送消息到队列失败的场景,改一下 routingKey
rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE, RabbitMQConfig.NORMAL_ROUTING_KEY+"~", "Message Test Confirm~~");
重新测试,此时输出如下,returnedMessage 方法执行了
2024-08-17T17:13:49.835+08:00 INFO 12892 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.133.128:5672]
2024-08-17T17:13:49.889+08:00 INFO 12892 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#59f45950:0/SimpleConnection@572e41be [delegate=amqp://admin@192.168.133.128:5672/, localPort=64519]
2024-08-17T17:13:49.933+08:00 INFO 12892 --- [nectionFactory1] com.example.demo.config.RabbitConfig : 消息主体:Message Test Confirm~~
2024-08-17T17:13:49.934+08:00 INFO 12892 --- [nectionFactory1] com.example.demo.config.RabbitConfig : 应答码:312
2024-08-17T17:13:49.934+08:00 INFO 12892 --- [nectionFactory1] com.example.demo.config.RabbitConfig : 描述:NO_ROUTE
2024-08-17T17:13:49.934+08:00 INFO 12892 --- [nectionFactory1] com.example.demo.config.RabbitConfig : 交换机:normal.demo.exchange
2024-08-17T17:13:49.934+08:00 INFO 12892 --- [nectionFactory1] com.example.demo.config.RabbitConfig : 路由key:normal.demo.routingkey~
2024-08-17T17:13:49.935+08:00 INFO 12892 --- [nectionFactory1] com.example.demo.config.RabbitConfig : confirm函数打印correlationData:null
2024-08-17T17:13:49.936+08:00 INFO 12892 --- [nectionFactory1] com.example.demo.config.RabbitConfig : confirm函数打印ack:true
2024-08-17T17:13:49.936+08:00 INFO 12892 --- [nectionFactory1] com.example.demo.config.RabbitConfig : confirm函数打印cause:null
顺便了解一下spring.rabbitmq.template.mandatory
配置
当使用 RabbitMQ 作为消息队列时,spring.rabbitmq.template.mandatory
配置项用于控制 MQ 消息模板(RabbitTemplate)的发送行为,特别是在消息路由时找不到队列(queue)的情况下的处理方式。
2、spring.rabbitmq.template.mandatory
属性可能会返回三种值null、false、true,
3、spring.rabbitmq.template.mandatory
结果为 false 时会忽略掉spring.rabbitmq.publisher-returns
属性的值,也就是 returnedMessage 方法不会执行;结果为 true,returnedMessage 可以得到执行
4、spring.rabbitmq.template.mandatory
结果为null(即不配置)时结果由spring.rabbitmq.publisher-returns
确定
解决思路B-备份交换机
为目标交换机指定备份交换机,当目标交换机投递失败时,把消息投递至备份交换机(实际项目中很少用,一般用思路A)
在上面代码的基础上改一下,RabbitMQConfig 就单纯的声明队列和交换机
public class RabbitMQConfig {
//定义一个正常交换机以及 routingkey,用来测试消息的可靠传递测试
public static final String NORMAL_EXCHANGE = "normal.demo.exchange";
public static final String NORMAL_ROUTING_KEY = "normal.demo.routingkey";
public static final String NORMAL_QUEUE = "normal.demo.queue";
//定义一个备份交换机以及 队列,用来测试消息的可靠传递测试
public static final String NORMAL_EXCHANGE_BACKUP = "normal.demo.exchange.backup";
public static final String NORMAL_QUEUE_BACKUP = "normal.demo.queue.backup";
}
定义一个消费者,监听队列,实际项目中都会有的,有如下注意点:
- 备份交换机必须定义成 FANOUT 类型
- 声明正常交换机的时候指定一个备份交换机,通过
alternate-exchange
参数,代码有体现
@Slf4j
@Component
public class RabbitConsumer {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE),
exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE, type = ExchangeTypes.DIRECT, arguments = {@Argument(name = "alternate-exchange", value = RabbitMQConfig.NORMAL_EXCHANGE_BACKUP)}),
key = RabbitMQConfig.NORMAL_ROUTING_KEY))
public void receiveA(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("正常队列{}收到消息:{}", RabbitMQConfig.NORMAL_QUEUE, msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE_BACKUP),
exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE_BACKUP, type = ExchangeTypes.FANOUT)))
public void receiveB(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("备份队列{}收到消息:{}", RabbitMQConfig.NORMAL_QUEUE_BACKUP, msg);
}
}
注意:@RabbitListener 必须先创建队列,不然报错, 有三个 属性 queues()、bindings()、queuesToDeclare(),它们之间是互斥的。设定了queues(),就不能再设定 bindings() 和 queuesToDeclare()了,具体用法可以看这篇文章
接下来模拟无法到达队列(如果无法到达交换机直接报错,没有后面什么事了…),测试能不能正常进入备份交换机
rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE, RabbitMQConfig.NORMAL_ROUTING_KEY+"~", "Message Test Confirm~~");
运行结果如下:
2024-08-17T18:25:32.253+08:00 INFO 22476 --- [nectionFactory2] com.example.demo.config.RabbitConfig : confirm函数打印correlationData:null
2024-08-17T18:25:32.254+08:00 INFO 22476 --- [nectionFactory2] com.example.demo.config.RabbitConfig : confirm函数打印ack:true
2024-08-17T18:25:32.254+08:00 INFO 22476 --- [nectionFactory2] com.example.demo.config.RabbitConfig : confirm函数打印cause:null
2024-08-17T18:25:32.257+08:00 INFO 22476 --- [ntContainer#0-1] c.example.demo.rabbitmq.RabbitConsumer : 备份队列normal.demo.queue.backup收到消息:Message Test Confirm~~
MQ 服务器宕机导致消息丢失
这一点官方早就考虑到了,现在无论是交换机还是队列,消息都是默认持久化到硬盘上,哪怕服务器重启也不会导致消息丢失
消费端消息的可靠性保障
配置文件增加 spring.rabbitmq.listener.simple.acknowledge-mode=manual
配置,也就是下面这样
spring.rabbitmq.host=192.168.133.128
spring.rabbitmq.port=5672
spring.rabbitmq.password=admin
spring.rabbitmq.username=admin
spring.rabbitmq.virtual-host=/
spring.rabbitmq.listener.type=simple
spring.rabbitmq.publisher-confirm-type=CORRELATED
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.listener.simple.acknowledge-mode=manual #把消息确认模式改成手动确认
因为 RabbitMQ 客户端默认是 自动返回 ACK 确认的,也就是不管是处理成功还是失败,默认都按成功来处理,这样就不太好,所以这个配置要改成手动确认
需要提前知道个东西,啥是 deliveryTag?
队列的定义,这里把备份交换机队列都删了哈
public class RabbitMQConfig {
//定义一个正常交换机以及 routingkey,用来测试消息的可靠传递测试
public static final String NORMAL_EXCHANGE = "normal.demo.exchange";
public static final String NORMAL_ROUTING_KEY = "normal.demo.routingkey";
public static final String NORMAL_QUEUE = "normal.demo.queue";
}
消费者逻辑如下
@Slf4j
@Component
public class RabbitConsumer {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE),
exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE, type = ExchangeTypes.DIRECT, arguments = {@Argument(name = "alternate-exchange", value = RabbitMQConfig.NORMAL_EXCHANGE_BACKUP)}),
key = RabbitMQConfig.NORMAL_ROUTING_KEY))
public void receiveA(Message message, Channel channel) throws IOException {
// 消息内容
String msg = new String(message.getBody());
// 消息的 deliveryTag
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 操作成功,返回 ack 信息
int i = 1/0; // 模拟消息处理异常
channel.basicAck(deliveryTag, false);
log.info("正常队列{}收到消息:{}", RabbitMQConfig.NORMAL_QUEUE, msg);
} catch (Exception ex) {
// 获取当前消息是否是重复投递的
// true-说明消息已经重复投递过一次了;false-说明当前消息是第一次投递
Boolean redelivered = message.getMessageProperties().getRedelivered();
// 操作失败,返回 Nack 信息
if (redelivered) {
// requeue 参数:控制消息是否重新放入队列 true-重放队列,broker会重新投递消息; false-不重放,broker会丢弃消息
channel.basicNack(deliveryTag, false, false); // basicNack(long deliveryTag, boolean multiple, boolean requeue)
} else {
channel.basicNack(deliveryTag, false, true);
}
log.info("正常队列{}收到消息,,redelivered: {},但是处理异常:{}", RabbitMQConfig.NORMAL_QUEUE, redelivered, msg);
}
// basicReject 和 basicNack 区别-> 唯一的区别就是 basicNack 有批量操作控制,就是 multiple 参数
// channel.basicReject(deliveryTag, false); //basicReject(long deliveryTag, boolean requeue)
}
}
2024-08-17T19:48:38.131+08:00 INFO 18192 --- [nectionFactory2] com.example.demo.config.RabbitConfig : confirm函数打印correlationData:null
2024-08-17T19:48:38.132+08:00 INFO 18192 --- [nectionFactory2] com.example.demo.config.RabbitConfig : confirm函数打印ack:true
2024-08-17T19:48:38.132+08:00 INFO 18192 --- [nectionFactory2] com.example.demo.config.RabbitConfig : confirm函数打印cause:null
2024-08-17T19:48:55.205+08:00 INFO 18192 --- [ntContainer#0-1] c.example.demo.rabbitmq.RabbitConsumer : 正常队列normal.demo.queue收到消息,,redelivered: false,但是处理异常:Message Test Confirm~~
2024-08-17T19:49:12.007+08:00 INFO 18192 --- [ntContainer#0-1] c.example.demo.rabbitmq.RabbitConsumer : 正常队列normal.demo.queue收到消息,,redelivered: true,但是处理异常:Message Test Confirm~~
具体效果可以本地 debug 下哈,消费完并且都 ack 后这里都是0了
消费端限流
我们都知道 MQ 有削峰填谷的效果,假设有下面的场景,消息队列里面有10000条消息,但是消费端的并发能力只有 1000,为了避免一次性把这1万条消息全部取出,导致消费端压力太大,我们可以做个设置,每次最多取1000条消息,对消费端也是一种保护。
从操作层面来说也比较简单,配置spring.rabbitmq.listener.simple.prefetch
属性即可
做个小实验:
当不设置 prefetch 时,生产者一次性向交换机中投递100条消息,消费者确认消息的方式设置为手动确认,在确认消息之前线程休眠 5 秒(因为 rabbitmq 管理控制台数据更新是 5 秒更新一次,这里设置为 5 秒比较方便观察),可以观察到队列是一次性将100条数据全部发送给了消费者,然后消费者再进行处理:
生产者一次发送10条消息到交换机:
@GetMapping("/test")
public String test(){
for (int i = 0; i < 100; i++) {
rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE, RabbitMQConfig.NORMAL_ROUTING_KEY, "Message Test Confirm"+i);
}
return "success.";
}
消费者在确认之前休眠5秒:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE_BACKUP),
exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE_BACKUP, type = ExchangeTypes.FANOUT)))
public void receiveB(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("备份队列{}收到消息:{}", RabbitMQConfig.NORMAL_QUEUE_BACKUP, msg);
}
现在增加spring.rabbitmq.listener.simple.prefetch=2
配置,就是一次从队列中取两条数据,再观察结果:
会发现,消费端不再是一次性全部把消息取出,而是每次只取 2 个,这就是 prefetch 配置作用