目录
一、引言
二、死信队列
三、核心代码实现
四、运行效果
五、总结
一、引言
什么是延迟消息?
发送者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间后收到消息。
什么是延迟任务?
设置在一定时间之后才执行的任务。
延迟消息使用场景
我们在实际项目中经常会有一些场景,需要延迟指定时间后发送消息,比如在电商或者外卖平台中订单10分钟后自动取消功能等。
对于上述延迟消息的场景,我们该怎么实现呢?
RabbitMQ 官方并没有直接内置延迟消息的功能,但是可以通过 TTL(Time-To-Live)和死信队列(Dead Letter Exchanges)的组合来实现延迟消息的效果,另外RabbitMQ 也可以通过安装延迟消息插件的方式来实现。
二、死信队列
电商购物中,针对用户下单扣减库存的服务逻辑,我们希望删除10分钟后状态为未支付的订单。在过去的项目中,我们可能第一时间会想到通过定时任务定期查询未支付的订单并做删除来实现:
定时任务会有两个问题:
1. 当针对订单量特别大的电商项目而言,定时任务间断性地查询整个订单数据会极大增加订单服务的压力。
2. 定时任务存在时间上的滞后性。
通过使用RabbitMQ延迟消息,我们可以在完成需求的同时,有效的避免上述问题。如下图所示,用户通过交易服务下单(状态为未支付),随后交易服务调用商品服务扣减库存。 用户在调用交易服务的同时发送一个延迟消息到RabbitMQ,10分钟后交易服务收到消息,此时如果订单还是未支付状态,则取消订单。
RabbitMQ中的死信队列,就是一种可以实现延迟消息的方式。当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):
1. 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
2. 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
3. 要投递的队列消息堆积满了,最早的消息可能成为死信
如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。
三、核心代码实现
package com.example.consumer;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 常规的RabbitMQ 交换机/队列绑定配置类
*/
@Configuration
public class RabbitMQConfig {
@Bean
Queue normalQueue() {
// 使用 QueueBuilder 创建一个持久化队列
return QueueBuilder.durable("normal.queue")
.deadLetterExchange("dead.direct")
.deadLetterRoutingKey("dead")
.build();
}
@Bean
DirectExchange normalDirect() {
return ExchangeBuilder.directExchange("normal.direct").build();
}
@Bean
Binding bindingNormal(Queue normalQueue, DirectExchange normalDirect) {
return BindingBuilder.bind(normalQueue).to(normalDirect).with("normal");
}
@Bean
Queue deadQueue() {
// 使用 QueueBuilder 创建一个持久化队列
return QueueBuilder.durable("dead.queue").build();
}
@Bean
DirectExchange deadDirect() {
return ExchangeBuilder.directExchange("dead.direct").build();
}
@Bean
Binding bindingDead(Queue deadQueue, DirectExchange deadDirect) {
return BindingBuilder.bind(deadQueue).to(deadDirect).with("dead");
}
}
package com.example.publisher;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import java.nio.charset.StandardCharsets;
/**
* 生产者
*/
@Slf4j
@SpringBootTest
class PublisherApplicationTests {
@Resource
private RabbitTemplate rabbitTemplate;
@Test
void test() {
String content = "生活不易,所以保持足够的努力,对自己要有信心,积极地去面对工作生活的挑战!";
Message message = MessageBuilder.withBody(content.getBytes(StandardCharsets.UTF_8))
.setExpiration("10000").build();
rabbitTemplate.convertAndSend("normal.direct",
"normal", message);
}
}
package com.example.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
/**
* 消费者
*/
@Slf4j
@Component
public class SimpleListener {
@RabbitListener(queues = "dead.queue")
public void listener1(Message message) throws Exception {
String msg = new String(message.getBody(), StandardCharsets.UTF_8); ;
System.out.println("消费者1:人生是个不断攀登的过程【" + msg + "】");
}
}
四、运行效果
五、总结
虽然我们通过RabbitMQ的死信队列能够实现延迟消息的功能,但是通过代码我们可以看到,这种实现方式相对来说比较繁琐。而且关键是RabbitMQ提供死信队列的初衷并不是让我们用来发送延迟消息的,而是为了作为兜底方案,来接收没有消费的死信的,便于定位问题。因此,后续章节会给大家讲解更优的解决方案,即延迟插件。