解决RabbitMQ设置TTL过期后不进入死信队列
- 问题发现
- 问题解决
- 方法一:只监听死信队列,在死信队列里面处理业务逻辑
- 方法二:改为自动确认模式
问题发现
最近再学习RabbitMQ过程中,看到关于死信队列内容:
来自队列的消息可以是 “死信”,这意味着当以下四个事件中的任何一个发生时,这些消息将被重新发布到 Exchange。
- 使用
basic.reject
或basic.nack
且 requeue 参数设置为false
的使用者否定该消息- 消息由于每条消息的 TTL 而过期
- 队列超出了长度限制
- 消息返回到 quorum 队列的次数超过了
delivery-limit
的次数。
再模拟TTL过期时遇到的疑惑,特此记录下来,示例代码如下:
先设置为手动应答模式:
#手动应答
spring.rabbitmq.listener.simple.acknowledge-mode = manual
绑定队列,示例代码如下:
@Configuration
public class MQConfig {
/**
* 死信队列
* @return
*/
@Bean
public Queue deadQueue(){
return new Queue("dead_queue");
}
/**
* 死信队列交换机
* @return
*/
@Bean
public DirectExchange deadExchange(){
return new DirectExchange("dead.exchange");
}
/**
* 死信队列和死信交换机绑定
* @return
*/
@Bean
public Binding deadBinding(){
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");
}
/**
* 普通队列
* @return
*/
@Bean
public Queue queue(){
// 方法一
// Queue normalQueue = new Queue("normal_queue");
// normalQueue.addArgument("x-dead-letter-exchange", "dead.exchange"); // 死信队列
// normalQueue.addArgument("x-dead-letter-routing-key", "dead"); // 死信队列routingKey
// normalQueue.addArgument("x-message-ttl", 10000); // 死信队列routingKey
// 方法二
return QueueBuilder.durable("normal_queue")
.deadLetterExchange("dead.exchange")
.deadLetterRoutingKey("dead")
.ttl(10000)
.build();
}
/**
* 普通交换机
* @return
*/
@Bean
public DirectExchange normalExchange(){
return new DirectExchange("normal.exchange");
}
/**
* 普通队列和普通交换机绑定
* @return
*/
@Bean
public Binding binding(){
return BindingBuilder.bind(queue()).to(normalExchange()).with("normal");
}
}
监听普通队列消费方,示例代码如下:
@Component
@RabbitListener(queues = "normal_queue")
public class MQReceiver {
private static final Logger log = LoggerFactory.getLogger(MQReceiver.class);
@RabbitHandler
public void receive(String msg, Message message, Channel channel) throws IOException, InterruptedException {
log.info("收到消息:"+msg);
}
}
监听死信队列消费方,示例代码如下:
@Component
@RabbitListener(queues = "dead_queue")
public class MQReceiver2 {
private static final Logger log = LoggerFactory.getLogger(MQReceiver2.class);
@RabbitHandler
public void receive(String msg, Message message, Channel channel) throws IOException {
log.info("死信队列收到消息:{}",msg);
// 参数一:当前消息标签,参数二:true该条消息已经之前所有未消费设置为已消费,false只确认当前消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
发送方,向普通队列发送消息,示例代码如下:
@Component
public class MQSender {
private static final Logger log = LoggerFactory.getLogger(MQSender.class);
@Autowired
private RabbitTemplate template;
public void send() throws UnsupportedEncodingException {
String msg = "hello world";
log.info("发送消息:"+msg);
template.convertAndSend("normal.exchange", "normal", msg);
}
}
执行结果如图:
时间到了后,死信队列长时间未收到消息,消息一直在普通队列中,如图所示:
然后开始百度,网上很多都说什么配置不对啥的,还有说队列的预取值太大导致的问题(扯犊子呢),反正就是没有找到一个合理的解释。
然后吃了个饭回来,发现RabbitMQ报了一个长时间未收到消息确认的错误(大概意思就是说ACK消息确认超时时间为18000毫秒也就是30分钟),原来RabbitMQ一直在等待消息确认,所以一直被持有,当普通队列挂了(重启后),被释放,进入死信队列。
PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more
这下知道为什么不进入死信队列的原因了。新的问题又来了,如果我手动确认或者拒绝了,那不就达不到TTL过期的效果了吗?
问题解决
方法一:只监听死信队列,在死信队列里面处理业务逻辑
这个方法是参考众多文章比较常见的一个做法,但是个人感觉与我理解的TTL有偏差(应该是在普通队列中处理超时的一种补偿机制,如果只监听死信队列,那就完全不需要在配置时普通队列里面定义死信队列,虽然这种做法可以解决业务问题),另一方面官方也有提到:
消息可以在写入套接字之后过期,但在到达消费者之前过期。
示例代码如下:
@Configuration
public class MQConfig {
/**
* 死信队列
* @return
*/
@Bean
public Queue deadQueue(){
return new Queue("dead_queue");
}
/**
* 死信队列交换机
* @return
*/
@Bean
public DirectExchange deadExchange(){
return new DirectExchange("dead.exchange");
}
/**
* 死信队列和死信交换机绑定
* @return
*/
@Bean
public Binding deadBinding(){
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");
}
/**
* 普通队列
* @return
*/
@Bean
public Queue queue(){
// 方法一
// Queue normalQueue = new Queue("normal_queue");
// normalQueue.addArgument("x-dead-letter-exchange", "dead.exchange"); // 死信队列
// normalQueue.addArgument("x-dead-letter-routing-key", "dead"); // 死信队列routingKey
// normalQueue.addArgument("x-message-ttl", 10000); // 死信队列routingKey
// 方法二
return QueueBuilder.durable("normal_queue")
.deadLetterExchange("dead.exchange")
.deadLetterRoutingKey("dead")
.ttl(10000)
.build();
}
/**
* 普通交换机
* @return
*/
@Bean
public DirectExchange normalExchange(){
return new DirectExchange("normal.exchange");
}
/**
* 普通队列和普通交换机绑定
* @return
*/
@Bean
public Binding binding(){
return BindingBuilder.bind(queue()).to(normalExchange()).with("normal");
}
}
消费方只监听死信队列:
@Component
@RabbitListener(queues = "dead_queue")
public class MQReceiver2 {
private static final Logger log = LoggerFactory.getLogger(MQReceiver2.class);
@RabbitHandler
public void receive(String msg, Message message, Channel channel) throws IOException {
log.info("死信队列收到消息:{}",msg);
// 伪代码:判断订单状态,1支付成功,2支付超时
// if(order.state == 1){
// // 参数一:当前消息标签,参数二:true该条消息已经之前所有未消费设置为已消费,false只确认当前消息
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// }else{
// // todo 修改订单状态
// channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
// }
}
}
发送方代码如下:
@Component
public class MQSender {
private static final Logger log = LoggerFactory.getLogger(MQSender.class);
@Autowired
private RabbitTemplate template;
public void send() throws UnsupportedEncodingException {
String msg = "hello world";
log.info("发送消息:"+msg);
template.convertAndSend("normal.exchange", "normal", msg);
}
}
调用send()
方法,执行结果如图:
可以看到从发送时间到进入死信队列时间正好间隔10s。
方法二:改为自动确认模式
经过思考后,既然手动确认走不通,那不如试一试自动模式,我们在普通队列里面,模拟业务出现异常情况(如果只是单纯模拟业务超时,不会进入死信队列,直接就确认消费了)。
我们先把手动确认的配置删除或者修改为自动确认,示例代码如下:
#spring.rabbitmq.listener.simple.acknowledge-mode = auto
发送方代码和配置的代码就不重复展示了(参考之前示例),消费方示例代码如下:
@Component
@RabbitListener(queues = "normal_queue")
public class MQReceiver {
private static final Logger log = LoggerFactory.getLogger(MQReceiver.class);
@RabbitHandler
public void receive(String msg) throws IOException, InterruptedException {
log.info("收到消息:"+msg);
throw new RuntimeException();
}
}
@Component
@RabbitListener(queues = "dead_queue")
public class MQReceiver2 {
private static final Logger log = LoggerFactory.getLogger(MQReceiver2.class);
@RabbitHandler
public void receive(String msg) throws IOException {
log.info("死信队列收到消息:{}",msg);
// 伪代码:判断订单状态,1支付成功,2支付超时
// if(order.state == 1){
// // 参数一:当前消息标签,参数二:true该条消息已经之前所有未消费设置为已消费,false只确认当前消息
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// }else{
// // todo 修改订单状态
// channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
// }
}
}
调用send()
方法,执行结果如图:
我们可以看到第一次进入普通队列时间和最后一次报错进入死信队列的时间,正好间隔10s。但是这中间会重复发起N次,不合理是时长,可能会导致资源消耗过高,但这又属于另外一个问题了。