一,死信交换机
1,什么是死信交换机
了解死信交换机之前我们先来了解一下什么是死信,当一个队列满足下面的三种情况的时候我们一般称为死信(dead letter):
- 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false(当消费者未成功处理完消息的时候会给与其绑定的队列返回nack,此时就需要将消息重新入队requeue,如果将requeue参数设置成false消息就会变成死信了);
- 消息已经过期了,过期的消息没有消费者进行消费就会变成死信;
- 所需要投递的队列已经满了,最早的消息就会变成死信。
队列中的死信需要将消息转发给另一个交换机,此时整个交换机就被称为死信交换机,死信就可以往整个交换机里进行投递了;我们知道交换机不具备存储消息的能力,所以也需要一个死信队列与其进行绑定。
2,死信交换机投递的流程图
二,TTL
1,什么是TTL
TTL,也就是Time-To-Live。如果一个队列中的消息TTL结束仍未消费,则会变为死信,ttl超时分为两种情况:
- 消息所在的队列设置了存活时间;
- 消息本身设置了存活时间。
注意:这里是假定没有消费者绑定ttl队列,这样投递到ttl队列中的消息没有消费者进行消费,消息一定会超时,这样就可以将死信成功投递到死信交换机中了!
2,利用TTL模拟消息超时被投递到死信交换机中
1. 演示给队列设置超时时间的情况
1.1 在config包下定义TTLMessageConfig配置ttl交换机和ttl队列的信息,并将队列的超时时间设置为10s
@Configuration
public class TTLMessageConfig {
@Bean
public DirectExchange ttlDirectExchange(){
return new DirectExchange("ttl.direct");
}
@Bean
public Queue ttlQueue(){
return QueueBuilder
.durable("ttl.queue")
.ttl(10000)//设置队列的超时时间是10s
.deadLetterExchange("dl.direct")
.deadLetterRoutingKey("dl")
.build();
}
@Bean
public Binding ttlBinding(){
return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");
}
}
1.2 利用注解的方式将ttl队列和死信交换机进行绑定并配置相关路由参数
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dl.queue", durable = "true"),
exchange = @Exchange(name = "dl.direct"),//默认交换机的类型为direct且为持久化的
key = "dl"
))
public void listenDlQueue(String msg) {
log.info("消费者接收到了dl.queue的延迟消息" + msg);
}
1.3 定义一个测试类进行消息发送观察消息发送成功和被处理的时间间隔是否是10s
@Test
public void testTTLMessage() {
// 1.准备消息
Message message = MessageBuilder
.withBody("hello, ttl messsage".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
// 2.发送消息
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message);
// 3.记录日志
log.info("消息已经成功发送!");
}
2. 演示给消息设置超时时间的情况
给消息设置超时的场景只需要在消息发送的时候给消息设置上setExpiration参数即可:
@Test
public void testTTLMessage() {
// 1.准备消息
Message message = MessageBuilder
.withBody("hello, ttl messsage".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.setExpiration("5000")//给消息设置超时时间
.build();
// 2.发送消息
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message);
// 3.记录日志
log.info("消息已经成功发送!");
}
总结:
- 给队列设置ttl属性,进入队列后超过ttl时间的消息变为死信;
- 给消息设置ttl属性,队列接收到消息超过ttl时间后变为死信;
- 两者共存时,以时间短的ttl为准。
三,延迟队列
1,什么是延迟队列
利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式;在idea中可以使用延迟插件来对延迟队列进行操作(自行下载)。
2,延迟队列使用场景
延迟队列的使用场景包括:
- 延迟发送短信 用户下单,如果用户在15 分钟内未支付,则自动取消;
- 预约工作会议,20分钟后自动通知所有参会人员。
3,延迟队列的设计及使用
DelayExchange的本质还是官方的三种交换机,只是添加了延迟功能。因此使用时只需要声明一个交换机,交换机的类型可以是任意类型,然后设定delayed属性为true即可。
3.1 基于注解的方式声明一个延迟交换机
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),//添加delayed属性
表明是一个延迟交换机
key = "delay"
))
public void listenDelayExchange(String msg) {
log.info("消费者接收到了delay.queue的延迟消息");
}
3.2 向delay为true的交换机中发送消息,一定要给消息添加一个header:x-delay,值为延迟的时间,单位为毫秒
@Test
public void testSendDelayMessage() throws InterruptedException {
// 1.准备消息
Message message = MessageBuilder
.withBody("hello, ttl messsage".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.setHeader("x-delay", 5000)//加上header属性
.build();
// 2.准备CorrelationData
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 3.发送消息
rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);
log.info("发送消息成功");
}
延迟队列插件使用步骤:
声明一个交换机,添加delayed属性为true;
发送消息时,添加x-delay头,值为超时时间。