前言
当我们在使用消息队列时,难免会遇到一些消息被拒绝,重复投递或者超时等异常情况。这些异常消息如果不被正确处理,将会阻碍整个消息系统的正常运行。而此时,死信交换机(Dead Letter Exchange,简称DLX)就是一种非常有用的处理方法。它能够有效地“救活”那些无法被处理的消息,并将它们放入一个特定的队列中进行处理。
1. 何为死信
当一个队列中的消息满足下列情况之一时,可以称为死信 (dead letter):
- 消费者使用 basic.reject 或 basic.nack 声明消费失败,并且消息的 requeue 参数设置为 false
- 消息时一个过期消息,超时无人消费
- 要投递的队列消息堆满了,最早的消息可能成为死信
如果该队列配置了 dead-letter-exchange 属性,指定一个交换机,那么队列中的死信会投递到交换机种,而这个交换机称为死信交换机 (Dead Letter Exchange, 简称 DLX)
如何给队列绑定死信交换机?
- 给队列设置 dead-letter-exchange 属性,指定一个交换机
- 给队列设置 dead-letter-routing-key 属性,设置死信交换机与死信队列的 RoutingKey
下面我们将通过一个简单的例子说明死信交换机的使用
2. 如何使用死信?
编写实例之前,我们先了解 TTL的概念
2.1 TTL
TTL, 也就是 Time-To-Live。如果一个队列中的消息 TTL 结束仍未消费,则变为死信,ttl 超时分为两种情况:
- 消息所在的队列设置了存活时间
- 消息本身设置了存活时间
- 两者共存时,以时间短的 ttl 为准
2.2 案例
接着我们将依据下图编写案例
2.2.1 先声明一组死信交换机和队列 (dl.direct 和 dl.queue)
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dl.queue", durable = "true"),
exchange = @Exchange(name = "dl.direct"),
key = "dl"
))
public void listenDlQueue(String msg){
log.info("消费者接收到了 dl.queue的延迟消息");
}
2.2.2 声明投递消息的队列,并设置超时时间 (配置x-message-ttl 属性)
package cn.itcast.mq.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TTLMessageConfig {
// 延时交换机
@Bean
public DirectExchange directExchange(){
return new DirectExchange("ttl.direct");
}
// 延时队列
@Bean
public Queue ttlQueue(){
return QueueBuilder
.durable("ttl.queue") // 指定队列名称,并持久化
.ttl(10000) // 设置队列超时时间,10秒
.deadLetterExchange("dl.direct") // 指定死信交换机
.deadLetterRoutingKey("dl") // 指定死信RoutingKey
.build();
}
@Bean
public Binding ttlBinding(){
return BindingBuilder.bind(ttlQueue()).to(directExchange()).with("ttl");
}
}
-
deadLetterExchange(“dl.direct”): 主要是声明队列超时后消息所投放的死信交换机
-
deadLetterRoutingKey(“dl”): 指定死信交换机的 routingKey
-
.ttl(10000) : 设置队列超时时间,10秒
2.2.3 发送消息,给消息本身设置超时时间
@Test
public void testTTLMessage(){
// 1. 准备消息
Message message = MessageBuilder
.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.setExpiration("5000") // 设置消息超时时间
.build();
// 2. 发送消息
rabbitTemplate.convertAndSend("ttl.direct","ttl", message);
// 3.记录日志
log.info("消息已经成功发送!");
}
总结
消息超时的两种方式是?
- 给队列设置 ttl 属性,进入队列后超过 ttl 时间消息变为死信
- 给消息设置 ttl 属性,队列接收到消息超过 ttl 时间后变为死信
- 两者共存时,以时间短的 ttl 为准
如何实现发送一个消息 20秒后消费者才能收到消息
- 给消息的目标队列指定死信交换机
- 消费者监听与死信交换机绑定队列
- 发送消息时给消息设置 ttl 为 20秒