解决RabbitMQ设置x-max-length队列最大长度后不进入死信队列
- 问题发现
- 问题解决
- 方法一:只监听死信队列,在死信队列里面处理业务逻辑
- 方法二:修改预取值
问题发现
最近再学习RabbitMQ过程中,看到关于死信队列内容:
来自队列的消息可以是 “死信”,这意味着当以下四个事件中的任何一个发生时,这些消息将被重新发布到 Exchange。
- 使用
basic.reject
或basic.nack
且 requeue 参数设置为false
的使用者否定该消息- 消息由于每条消息的 TTL 而过期
- 队列超出了长度限制
之前的文章里面有讲解过TTL过期后不进入死信队列的疑惑和解决办法,然后上手去实践另一个死信队列的方法,结果又是一道坑等着我,示例代码如下:
默认自动应答模式
@Configuration
public class MQConfig {
/**
* 死信队列
* @return
*/
@Bean
public Queue deadQueue(){
return new Queue("dead_queue");
}
/**
* 死信队列交换机
* @return
*/
@Bean
public DirectExchange deadExchange(){
return new DirectExchange("dead.exchange");
}
/**
* 死信队列和死信交换机绑定
* @return
*/
@Bean
public Binding deadBinding(){
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");
}
/**
* 普通队列
* @return
*/
@Bean
public Queue queue(){
// 方法一
// Queue normalQueue = new Queue("normal_queue");
// normalQueue.addArgument("x-dead-letter-exchange", "dead.exchange"); // 死信队列
// normalQueue.addArgument("x-dead-letter-routing-key", "dead"); // 死信队列routingKey
// normalQueue.addArgument("x-max-length", 5);//设置队列最大长度
// normalQueue.addArgument("x-overflow","reject-publish");//最近发布的消息将被丢弃
// 方法二
return QueueBuilder.durable("normal_queue")
.deadLetterExchange("dead.exchange")
.deadLetterRoutingKey("dead")
.maxLength(5) // 设置队列最大长度为5
.build();
}
/**
* 普通交换机
* @return
*/
@Bean
public DirectExchange normalExchange(){
return new DirectExchange("normal.exchange");
}
/**
* 普通队列和普通交换机绑定
* @return
*/
@Bean
public Binding binding(){
return BindingBuilder.bind(queue()).to(normalExchange()).with("normal");
}
}
监听普通队列消费方,示例代码如下:
@Component
@RabbitListener(queues = "normal_queue")
public class MQReceiver {
private static final Logger log = LoggerFactory.getLogger(MQReceiver.class);
@RabbitHandler
public void receive(String msg) {
log.info("收到消息:"+msg);
}
}
监听死信队列消费方,示例代码如下:
@Component
@RabbitListener(queues = "dead_queue")
public class MQReceiver2 {
private static final Logger log = LoggerFactory.getLogger(MQReceiver2.class);
@RabbitHandler
public void receive(String msg) {
log.info("死信队列收到消息:{}",msg);
}
}
发送方发送10条消息,示例代码如下:
@Component
public class MQSender {
@Autowired
private RabbitTemplate template;
public void send() {
for (int i = 0; i < 10; i++) {
String msg = "hello world_"+i;
template.convertAndSend("normal.exchange", "normal", msg);
}
}
}
然后调用send()
方法,执行结果如图:
按道理会将队列前面的5条消息进入死信队列,然后剩下的五条消息正常消费才对,我们检查一下队列是否设置成功,如图所示
设置没有问题,那就很懵逼了。。。
问题解决
我们先在页面上向普通队交换机发送10条消息,然后查看它的状态,如图所示:
超过5条消息就会放入死信队列中,如图所示:
然后再看一下,默认行为是从队列前面删除或死信消息,如图所示:
我们可以看到普通队列存放的是最后5条消息,前面的5条消息进入死信队列。也就是说,再没有进入普通消费者之前会将队列前面删除或死信消息(进入消费者之前将消息进行分配)。
方法一:只监听死信队列,在死信队列里面处理业务逻辑
这种做法也是网上大多数文章的一种处理方法(另外一种情况就是进入普通消费者,还没被消费完的情况下,消费者挂了,然后队列就会重新分配,将从队列前面删除或者进入死信队列),如图所示:
但是这些做法都是基于没有普通消费者监听的情况下进行的,感觉和我理解的略有偏差(应该是再有普通消费者监听和死信队列监听的情况下,发送消息时会对消息进行分配处理)。
发送方代码和配置的代码就不重复展示了(参考之前示例),监听死信队列(自动确认模式和手动确认模式都一样),示例代码如下:
@Component
@RabbitListener(queues = "dead_queue")
public class MQReceiver2 {
private static final Logger log = LoggerFactory.getLogger(MQReceiver2.class);
@RabbitHandler
public void receive(String msg,Message message,Channel channel) throws IOException {
log.info("死信队列收到消息:{}",msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
调用send()
方法,执行结果如图:
然后再死信队列里面处理业务逻辑即可。
方法二:修改预取值
再监听普通队列时抛异常或者手动拒绝重新进入队列,这两种方式都不能达到我想要的效果,我发现只要有普通消费方监听就会进入消费,然后我就在想是不是因为预期值的问题导致,可能我消息数量太少了,然后默认预期值太高导致消息直接进入消费,然后将预取值改为1,自动确认模式,示例代码如下:
spring.rabbitmq.listener.simple.prefetch=1
发送方代码和配置的代码就不重复展示了(参考之前示例),监听普通队列,示例代码如下:
@Component
@RabbitListener(queues = "normal_queue")
public class MQReceiver {
private static final Logger log = LoggerFactory.getLogger(MQReceiver.class);
@RabbitHandler
public void receive(String msg) throws InterruptedException {
log.info("收到消息:"+msg);
// 模拟业务处理队列等待场景
Thread.sleep(10000);
}
}
监听死信队列,示例代码如下:
@Component
@RabbitListener(queues = "dead_queue")
public class MQReceiver2 {
private static final Logger log = LoggerFactory.getLogger(MQReceiver2.class);
@RabbitHandler
public void receive(String msg) {
log.info("死信队列收到消息:{}",msg);
}
}
调用send()
方法,执行结果如图:
当然有的时候也不一定完全按照你设置的最大长度进入死信队列,有的时候消费速度太快(队列的第一个已经被消费了的情况),得看实际情况,至少可以确保再设置了大于队列最大长度时是可以正常进入死信队列的。归根结底:消息数量太少了。
另外我们再来介绍一下溢出方式,一般默认情况下溢出方式为:drop-head
(从队列前面删除或者进入死信队列),除此之外还有两种:
reject-publish
和reject-publish-dlx
:最近发布的消息将被丢弃。reject-publish
和reject-publish-dlx
之间的区别在于reject-publish-dlx
也是死信拒绝消息。
将配置的溢出模式改为reject-publish
,示例代码如下:
@Configuration
public class MQConfig {
/**
* 死信队列
* @return
*/
@Bean
public Queue deadQueue(){
return new Queue("dead_queue");
}
/**
* 死信队列交换机
* @return
*/
@Bean
public DirectExchange deadExchange(){
return new DirectExchange("dead.exchange");
}
/**
* 死信队列和死信交换机绑定
* @return
*/
@Bean
public Binding deadBinding(){
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");
}
/**
* 普通队列
* @return
*/
@Bean
public Queue queue() {
// 方法一
// Queue normalQueue = new Queue("normal_queue");
// normalQueue.addArgument("x-dead-letter-exchange", "dead.exchange"); // 死信队列
// normalQueue.addArgument("x-dead-letter-routing-key", "dead"); // 死信队列routingKey
// normalQueue.addArgument("x-max-length", 5);//设置队列最大长度
// normalQueue.addArgument("x-overflow","reject-publish");//最近发布的消息将被丢弃
// 方法二
return QueueBuilder.durable("normal_queue")
.deadLetterExchange("dead.exchange")
.deadLetterRoutingKey("dead")
.maxLength(5) // 设置队列最大长度为5
.overflow(QueueBuilder.Overflow.dropHead)
.build();
}
/**
* 普通交换机
* @return
*/
@Bean
public DirectExchange normalExchange(){
return new DirectExchange("normal.exchange");
}
/**
* 普通队列和普通交换机绑定
* @return
*/
@Bean
public Binding binding(){
return BindingBuilder.bind(queue()).to(normalExchange()).with("normal");
}
}
重新创建队列,调用send()
方法,如图所示:
由图可以,死信队列并不会受到消息。
然后我们再来看看将溢出模式设置为reject-publish-dlx
(QueueBuilder.Overflow没有该参数,手动定义),示例代码如下:
@Configuration
public class MQConfig {
//忽略死信队列创建和绑定过程,参考前面示例...
/**
* 普通队列
* @return
*/
@Bean
public Queue queue() {
Queue normalQueue = new Queue("normal_queue");
normalQueue.addArgument("x-dead-letter-exchange", "dead.exchange"); // 死信队列
normalQueue.addArgument("x-dead-letter-routing-key", "dead"); // 死信队列routingKey
normalQueue.addArgument("x-max-length", 5);//设置队列最大长度
normalQueue.addArgument("x-overflow","reject-publish-dlx"); //最近发布的消息进入死信队列
return normalQueue;
}
//忽略普通队列创建过程,参考前面示例...
}
重新创建队列,调用send()
方法,如图所示:
如果你有更好的方法或者不同的理解,欢迎评论区交流。