1、延时队列的概念
队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素希望在指定时间到了以后被取出处理
延时队列就是用来存放需要在指定时间被处理的元素的队列
2、延时队列使用的场景
订单在十分钟之内未支付则自动取消
3、RabbitMQ 中的 TTL
TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间
如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的 TTL 和消息的TTL,那么较小的那个值将会被使用
3.1 消息设置TTL
发送消息时指定消息的过期属性:
3.2 队列设置TTL属性
在创建队列的时候设置队列的“x-message-ttl”属性
3.3 两者之间的区别
- 如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中)
- 如果设置了消息的TTL属性,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;
前一小节我们介绍了死信队列,刚刚又介绍了 TTL,至此利用 RabbitMQ 实现延时队列的两大要素已经集齐,接下来只需要将它们进行融合,再加入一点点调味料,延时队列就可以新鲜出炉了。想想看,延时队列,不就是想要消息延迟多久被处理吗,TTL 则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就完事了,因为里面的消息都是希望被立即处理的消息
4、整合SpringBoot
1)添加依赖
<dependencies>
<!--RabbitMQ依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
2) 添加最基本的 RabbitMQ 的配置
spring:
rabbitmq:
host: 192.168.126.10
port: 5672
username: admin
password: 123
5、队列TTL实现延时队列
5.1 代码架构图
队列QA : 队列TTL属性设置为10s
队列QB:队列TTL属性设置为40s
正常交换机 X
死信交换机 Y
死信队列 QD
5.2 配置文件
创建交换机、队列、绑定关系等
/**
* @author houChen
* @date 2022/11/13 22:58
* @Description: 配置交换机
*/
@Configuration
public class TtlQueueConfig {
private static final String X_EXCHANGE = "X";
private static final String QA_QUEUE = "QA";
private static final String QB_QUEUE = "QB";
//死信交换机
private static final String DEAD_EXCHANGE = "Y";
private static final String DEAD_QUEUE = "QD";
//创建普通交换机
@Bean
public DirectExchange xExchange() {
return new DirectExchange(X_EXCHANGE);
}
//创建死信交换机
@Bean
public DirectExchange yExchange() {
return new DirectExchange(DEAD_EXCHANGE);
}
//创建队列QA, ttl为10秒,绑定到对应的死信交互机
@Bean
public Queue queueA() {
Map<String, Object> args = new HashMap<>();
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//声明当前队列的死信路由键
args.put("x-dead-letter-routing-key", "YD");
//声明队列的TTL
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(QA_QUEUE).withArguments(args).build();
}
//队列QA绑定 X交换机
@Bean
public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
//创建队列QB, ttl为40秒,绑定到对应的死信交互机
@Bean
public Queue queueB() {
Map<String, Object> args = new HashMap<>();
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//声明当前队列的死信路由键
args.put("x-dead-letter-routing-key", "YD");
//声明队列的TTL
args.put("x-message-ttl", 40000);
return QueueBuilder.durable(QB_QUEUE).withArguments(args).build();
}
//队列QB绑定 X交换机
@Bean
public Binding queuebBindingX(@Qualifier("queueB") Queue queueB,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
//声明死信队列 QD
@Bean
public Queue queueD() {
return QueueBuilder.durable(DEAD_QUEUE).build();
}
//死信队列和死信交换机进行绑定
@Bean
public Binding queuedBindingY(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange) {
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
5.3 消息生产者
/**
* @author houChen
* @date 2022/11/14 22:39
* @Description: 消息生产者代码
*/
@RestController
@RequestMapping("/ttl")
@Slf4j
public class SendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMessage/{message}")
public void sendMsg(@PathVariable String message) {
log.info("当前时间:{},发送一条消息给两个TTL队列:{}", new Date(), message);
rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10s的队列:" + message);
rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40s的队列:" + message);
}
}
5.4 消费者代码
/**
* @author houChen
* @date 2022/11/14 23:34
* @Description: 消费者代码
*/
@Slf4j
@Component
public class DelayQueueConsumer {
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) throws UnsupportedEncodingException {
String msg = new String(message.getBody(), "utf-8");
log.info("当前时间:{},收到死信队列消息{}", new Date(), msg);
}
}
5.5 测试
发送请求 :http://localhost:8080/ttl/sendMessage/aaaa
第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了
缺点
使用队列的 x-message-ttl 属性的话,每增加一个新的时间需求,就需要新增一个队列,不太好扩展