RabbitMQ 是如何做延迟消息的 ?
1、什么是死信?
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
- 消费者使用
basic.reject
或basic.nack
声明消费失败,并且消息的requeue
参数设置为 false - 消息是一个过期消息,超时无人消费
- 要投递的队列消息满了,无法投递
如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。
死信交换机有什么作用呢?
- 收集那些因处理失败而被拒绝的消息
- 收集那些因队列满了而被拒绝的消息
- 收集因 TTL(有效期)到期的消息
2、死信队列
架构:
由于第一个队列没有消费者,所以可以在第一个队列中设置 TTL,当消息过期的时候,这个消息就变成了死信,被丢掉私信交换机中,以此实现延迟任务功能。
3、延迟消息
前面两种作用场景可以看做是把死信交换机当做一种消息处理的最终兜底方案,与消费者重试时讲的RepublishMessageRecoverer
作用类似。
而最后一种场景,大家设想一下这样的场景: 如图,有一组绑定的交换机(ttl.fanout
)和队列(ttl.queue
)。但是ttl.queue
没有消费者监听,而是设定了死信交换机hmall.direct
,而队列direct.queue1
则与死信交换机绑定,RoutingKey是blue:
假如我们现在发送一条消息到ttl.fanout
,RoutingKey为blue,并设置消息的有效期为5000毫秒: 注意:尽管这里的ttl.fanout
不需要RoutingKey,但是当消息变为死信并投递到死信交换机时,会沿用之前的RoutingKey,这样hmall.direct
才能正确路由消息。
消息肯定会被投递到 ttl.queue
之后,由于没有消费者,因此消息无人消费。5秒之后,消息的有效期到期,成为死信: 死信被再次投递到死信交换机 hmall.direct
,并沿用之前的 RoutingKey,也就是 blue
: 由于 direct.queue1
与 hmall.direct
绑定的 key 是 blue,因此最终消息被成功路由到 direct.queue1
,如果此时有消费者与 direct.queue1
绑定,也就能成功消费消息了。但此时已经是5秒钟以后了: 也就是说,publisher 发送了一条消息,但最终 consumer 在5秒后才收到消息。我们成功实现了延迟消息。
[!info]
而且,RabbitMQ 中的这个 TTL 是可以设置任意时长的,这相比于 RocketMQ 只支持一些固定的时长而显得更加灵活一些。
死信队列消息堆积问题
[!danger] 死信队列消息堆积问题
但是,死信队列的实现方式存在一个问题,那就是可能造成队头阻塞。RabbitMQ 会定期扫描队列的头部检查队首的消息是否过期。如果队首消息过期了,它会被放到死信队列中。然而,RabbitMQ 不会逐个检查队列中的所有消息是否过期,而是仅检查队首消息。这样,如果队列的队头消息未过期,而它后面的消息已过期,这些后续消息将无法被单独移除,直到队头的消息被消费或过期。
因为队列是先进先出的,在普通队列中的消息,每次只会判断邢队头的消息是否过期,那么,如果队头的消息时间很长,一直都不过期,那么就会阻塞整个队列,这时候即使排在他后面的消息过期了,那么也会被一直阻塞。
基于 RabbitMQ 的死信队列,可以实现延迟消息,非常灵活的实现定时关单,并且借助 RabbitMQ 的集群扩展性,可以实现高可用,以及处理大并发量。他的缺点第一是可能存在消息阻塞的问题,还有就是方案比较复杂,不仅要依赖 RabbitMQ, 而目还需要声明很多队列出来,增加系统的复杂度
3、DelayExchange 插件
前面我们提到的基于死信队列的方式,是消息先会投递到一个正常队列,在 TTL 过期后进入死信队列。但是基于插件的这种方式,消息并不会立即进入队列,而是先把他们保存在一个基于 Erlang 开发的 Mnesia 数据库中,然后通过一个定时器去查询需要被投递的消息,再把他们投递到 x-delayed-message 交换机中。
基于 RabbitMQ 插件的方式可以实现延迟消息,并且不存在消息阻塞的问题,但是因为是基于插件的,而这个插件支持的最大延长时间是 (2^32)-1 毫秒,大约 49 天,超过这个时间就会被立即消费。
插件下载地址: GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ
由于我们安装的 MQ 是 3.8
版本,因此这里下载 3.8.17
版本:
附件:![[rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez]]
4.2.2. 安装
因为我们是基于 Docker 安装,所以需要先查看 RabbitMQ 的插件目录对应的数据卷。
docker volume inspect mq-plugins
结果如下:
[
{
"CreatedAt": "2024-06-19T09:22:59+08:00",
"Driver": "local",
"Labels": null,
"Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",
"Name": "mq-plugins",
"Options": null,
"Scope": "local"
}
]
插件目录被挂载到了 /var/lib/docker/volumes/mq-plugins/_data
这个目录,我们上传插件到该目录下。
注意上传插件
接下来执行命令,安装插件:
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
运行结果如下:
4.2.3. 声明延迟交换机
根据
1、创建交换机:
2、创建队列
3、根据 bandingKey 绑定队列:
基于注解方式:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}", msg);
}
基于 @Bean
的方式:
package com.itheima.consumer.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class DelayExchangeConfig {
@Bean
public DirectExchange delayExchange(){
return ExchangeBuilder
.directExchange("delay.direct") // 指定交换机类型和名称
.delayed() // 设置delay的属性为true
.durable(true) // 持久化
.build();
}
@Bean
public Queue delayedQueue(){
return new Queue("delay.queue");
}
@Bean
public Binding delayQueueBinding(){
return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
}
}
4.2.4. 发送延迟消息
发送消息时,必须通过 x-delay 属性设定延迟时间:
@Test
void testPublisherDelayMessage() {
// 1.创建消息
String message = "hello, delayed message";
// 2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 添加延迟消息属性
message.getMessageProperties().setDelay(5000);
return message;
}
});
}
warning 注意: 延迟消息插件内部会维护一个本地数据库表,同时使用 Elang Timers 功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的 CPU 开销,同时延迟消息的时间会存在误差。因此,不建议设置延迟时间过长的延迟消息。