什么是死信以及死信交换机
当一个队列中的消息满足下列情况之一时,可以成为死信:
1. 消费者使用basic.reject
或 basic.nack
声明消费失败,并且消息的requeue
参数设置为false
2. 消息是一个过期消息,超时无人消费
3. 要投递的队列消息满了,无法投递
如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange
属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。
死信交换机有什么作用呢?
-
收集那些因处理失败而被拒绝的消息
-
收集那些因队列满了而被拒绝的消息
-
收集因TTL(有效期)到期的消息
为什么这里会介绍死信交换机呢,举个例子,我们在购买车票的时候会有一个支付时间,8分钟没有支付就会销毁订单,返回车票。mq不可能时刻监控客户有没有支付,可以使用延迟消息,延迟8分钟,八分钟后再去发送消息到mq,在查看支付情况。
DelayExchange插件
官网下载地址:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。
docker volume inspect mq-plugins
[ { "CreatedAt": "2024-06-19T09:22:59+08:00", "Driver": "local", "Labels": null, "Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data", "Name": "mq-plugins", "Options": null, "Scope": "local" } ]
插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data
这个目录,我们上传插件到该目录下。
接下来执行命令,安装插件:
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
具体使用
声明交换机,基于@Bean:
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class DelayExchangeConfig {
@Bean
public DirectExchange delayExchange(){
return ExchangeBuilder
.directExchange("delay.direct") // 指定交换机类型和名称
.delayed() // 设置delay的属性为true
.durable(true) // 持久化
.build();
}
@Bean
public Queue delayedQueue(){
return new Queue("sdgstu.queue");
}
@Bean
public Binding delayQueueBinding(){
return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
}
}
基于注解:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "stusdg.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}", msg);
}
发送消息:
@Test
void testPublisherDelayMessage() {
// 1.创建消息
String message = "hello, delayed message";
// 2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 添加延迟消息属性
message.getMessageProperties().setDelay(5000);
return message;
}
});
}