目录
- 什么是延时队列
- 延时队列的使用场景
- 前提准备
- 利用RabbitMQ实现延时队列
- 延时队列优化
- 利用RabbitMQ插件实现延迟队列
什么是延时队列
延时队列
,首先,它是一种队列,队列意味着内部的元素是有序的,元素出队和入队是有方向性的,元素从一端进入,从另一端取出。
其次,延时队列
,最重要的特性就体现在它的延时属性上,跟普通的队列不一样的是,普通队列中的元素总是等着希望被早点取出处理,而延时队列中的元素则是希望被在指定时间得到取出和处理
,所以延时队列中的元素是都是带时间属性的,通常来说是需要被处理的消息或者任务。
简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
延时队列的使用场景
那么什么时候需要用延时队列呢?考虑一下以下场景:
- 订单在十分钟之内未支付则自动取消。
- 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
- 账单在一周内未支付,则自动结算。
- 用户注册成功后,如果三天内没有登陆则进行短信提醒。
- 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
- 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。
这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;发生店铺创建事件,十天后检查该店铺上新商品数,然后通知上新数为0的商户;发生账单生成事件,检查账单支付状态,然后自动结算未支付的账单;发生新用户注册事件,三天后检查新注册用户的活动数据,然后通知没有任何活动记录的用户;发生退款事件,在三天之后检查该订单是否已被处理,如仍未被处理,则发送消息给相关运营人员;发生预定会议事件,判断离会议开始是否只有十分钟了,如果是,则通知各个与会人员等等情况。
前提准备
在介绍延时队列之前,还需要先介绍一下RabbitMQ中的一个高级特性——TTL(Time To Live)
。
TTL
是什么呢?TTL
是RabbitMQ
中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了TTL
属性或者进入了设置TTL属性的队列,那么这条消息如果在TTL
设置的时间内没有被消费,则会成为“死信”(至于什么是死信,可以点击查看)。如果同时配置了队列的TTL
和消息的TTL
,那么较小的那个值将会被使用。
设置TTL
的方式有两种:
第一种,在创建队列的时候设置队列的“x-message-ttl”属性:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
第二种,针对每条消息设置TTL:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());
通过以上两种方式,可以将消息延迟6秒在被消费。
这两种方式是有区别的,如果设置了队列的TTL属性,那么一旦消息过期,就会被队列丢弃,而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间。
另外,还需要注意的一点是,如果不设置TTL,表示消息永远不会过期,如果将TTL设置为0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。
利用RabbitMQ实现延时队列
想想看,延时队列,不就是想要消息延迟多久被处理吗,TTL则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就万事大吉了,因为里面的消息都是希望被立即处理的消息。
从下图可以大致看出消息的流向:
生产者生产一条延时消息,根据需要延时时间的不同,利用不同的routingkey将消息路由到不同的延时队列,每个队列都设置了不同的TTL属性,并绑定在同一个死信交换机中,消息过期后,根据routingkey的不同,又会被路由到不同的死信队列中,消费者只需要监听对应的死信队列进行处理即可。
接下来直接上代码
首先申明队列以及交换机的绑定关系,即添加一个RabbitmqConfig文件:
package com.miaosha.study.tet;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @Author: laz
* @CreateTime: 2023-02-27 09:50
* @Version: 1.0
*/
@Configuration
public class RabbitMQConfig {
/**
* 延时交换机
*/
public static final String DELAY_EXCHANGE_NAME = "delay.queue.business.exchange";
/**
* 延时队列a
*/
public static final String DELAY_QUEUEA_NAME = "delay.queue.business.queuea";
/**
*延时队列b
*/
public static final String DELAY_QUEUEB_NAME = "delay.queue.business.queueb";
/**
*延时队列a路由键
*/
public static final String DELAY_QUEUEA_ROUTING_KEY = "delay.queue.business.queuea.routingkey";
/**
*延时队列b路由键
*/
public static final String DELAY_QUEUEB_ROUTING_KEY = "delay.queue.business.queueb.routingkey";
/**
*死信交换机
*/
public static final String DEAD_LETTER_EXCHANGE = "delay.queue.deadletter.exchange";
/**
*死信队列a路由键
*/
public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "delay.queue.deadletter.queuea.routingkey";
/**
*死信队列b路由键
*/
public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "delay.queue.deadletter.queueb.routingkey";
/**
*死信队列a
*/
public static final String DEAD_LETTER_QUEUEA_NAME = "delay.queue.deadletter.queuea";
/**
*死信队列b
*/
public static final String DEAD_LETTER_QUEUEB_NAME = "delay.queue.deadletter.queueb";
/**
* 声明延时Exchange
* @return
*/
@Bean("delayExchange")
public DirectExchange delayExchange(){
return new DirectExchange(DELAY_EXCHANGE_NAME);
}
/**
* 声明死信Exchange
* @return
*/
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange(){
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
/**
* 声明延时队列A 延时2s, 并绑定到对应的死信交换机
* @return
*/
@Bean("delayQueueA")
public Queue delayQueueA(){
Map<String, Object> args = new HashMap<>();
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
//声明队列的TTL
args.put("x-message-ttl", 2000);
return QueueBuilder.durable(DELAY_QUEUEA_NAME).withArguments(args).build();
}
/**
* 声明延时队列B 延时20s, 并绑定到对应的死信交换机
* @return
*/
@Bean("delayQueueB")
public Queue delayQueueB(){
Map<String, Object> args = new HashMap<>(2);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);
//声明队列的TTL
args.put("x-message-ttl", 20000);
return QueueBuilder.durable(DELAY_QUEUEB_NAME).withArguments(args).build();
}
/**
* 声明死信队列A
* @return
*/
@Bean("deadLetterQueueA")
public Queue deadLetterQueueA(){
return new Queue(DEAD_LETTER_QUEUEA_NAME);
}
/**
* 声明死信队列B
* @return
*/
@Bean("deadLetterQueueB")
public Queue deadLetterQueueB(){
return new Queue(DEAD_LETTER_QUEUEB_NAME);
}
/**
* 声明延时队列A绑定关系
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue,
@Qualifier("delayExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEA_ROUTING_KEY);
}
/**
* 声明业务队列B绑定关系
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding delayBindingB(@Qualifier("delayQueueB") Queue queue,
@Qualifier("delayExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEB_ROUTING_KEY);
}
/**
* 声明死信队列A绑定关系
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
}
/**
* 声明死信队列B绑定关系
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
}
}
添加消费者:
package com.miaosha.study.tet;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;
import static com.miaosha.study.tet.RabbitMQConfig.*;
/**
* @Author: laz
* @CreateTime: 2023-02-27 09:58
* @Version: 1.0
*/
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
public void receiveA(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("当前时间:{},死信队列A收到消息:{}", new Date().toString(), msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
public void receiveB(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("当前时间:{},死信队列B收到消息:{}", new Date().toString(), msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
添加消息发送者:
package com.miaosha.study.tet;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
import static com.miaosha.study.tet.RabbitMQConfig.*;
/**
* @Author: laz
* @CreateTime: 2023-02-27 09:52
* @Version: 1.0
*/
@Slf4j
@Component
public class DelayMessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(String msg, Integer type){
switch (type){
case 1:
rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUEA_ROUTING_KEY, msg);
break;
case 2:
rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUEB_ROUTING_KEY, msg);
break;
}
}
}
添加一个测试接口:
package com.miaosha.study.tet;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
import java.util.Objects;
/**
* @Author: laz
* @CreateTime: 2023-02-27 10:58
* @Version: 1.0
*/
@Slf4j
@RequestMapping("rabbitmq")
@RestController
public class RabbitMQMsgController {
@Autowired
private DelayMessageSender sender;
@RequestMapping("sendmsg")
public void sendMsg(String msg, Integer delayType){
log.info("当前时间:{},收到请求,msg:{},delayType:{}", new Date(), msg, delayType);
sender.sendMsg(msg, delayType);
}
}
然后启动项目,访问本接口
浏览器分别请求:
http://localhost:8080/rabbitmq/sendmsg?msg=testMsg1&delayType=1
http://localhost:8080/rabbitmq/sendmsg?msg=testMsg2&delayType=2
观察控制台:
第一条消息在2s后变成了死信消息,然后被消费者消费掉,第二条消息在20s之后变成了死信消息,然后被消费掉,这样,一个延时队列就打造完成了。
不过,等等,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有2s和20s两个时间选项,如果需要一个小时后处理,那么就需要增加TTL为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求??
显然,需要一种更通用的方案才能满足需求,那么就只能将TTL设置在消息属性里了
延时队列优化
接下来,在添加一个延时队列:
package com.miaosha.study.tet;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @Author: laz
* @CreateTime: 2023-02-27 09:50
* @Version: 1.0
*/
@Configuration
public class RabbitMQConfig {
/**
* 延时交换机
*/
public static final String DELAY_EXCHANGE_NAME = "delay.queue.business.exchange";
/**
*延时队列c
*/
public static final String DELAY_QUEUEC_NAME = "delay.queue.business.queuec";
/**
*延时队列c路由键
*/
public static final String DELAY_QUEUEC_ROUTING_KEY = "delay.queue.business.queuec.routingkey";
/**
*死信交换机
*/
public static final String DEAD_LETTER_EXCHANGE = "delay.queue.deadletter.exchange";
/**
*死信队列c路由键
*/
public static final String DEAD_LETTER_QUEUEC_ROUTING_KEY = "delay.queue.deadletter.queuec.routingkey";
/**
*死信队列c
*/
public static final String DEAD_LETTER_QUEUEC_NAME = "delay.queue.deadletter.queuec";
/**
* 声明延时Exchange
* @return
*/
@Bean("delayExchange")
public DirectExchange delayExchange(){
return new DirectExchange(DELAY_EXCHANGE_NAME);
}
/**
* 声明死信Exchange
* @return
*/
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange(){
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
/**
* 声明延时队列C 不设置TTL(发送信息的时候设置),并绑定到对应的死信交换机
*
* @return
*/
@Bean("delayQueueC")
public Queue delayQueueC(){
Map<String, Object> args = new HashMap<>(3);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEC_ROUTING_KEY);
return QueueBuilder.durable(DELAY_QUEUEC_NAME).withArguments(args).build();
}
/**
* 声明死信队列C 用于接收延时任意时长处理的消息
* @return
*/
@Bean("deadLetterQueueC")
public Queue deadLetterQueueC(){
return new Queue(DEAD_LETTER_QUEUEC_NAME);
}
/**
* 声明延时列C绑定关系
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding delayBindingC(@Qualifier("delayQueueC") Queue queue,
@Qualifier("delayExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEC_ROUTING_KEY);
}
/**
* 声明死信队列C绑定关系
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding deadLetterBindingC(@Qualifier("deadLetterQueueC") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEC_ROUTING_KEY);
}
}
添加对应消费者:
package com.miaosha.study.tet;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;
import static com.miaosha.study.tet.RabbitMQConfig.*;
/**
* @Author: laz
* @CreateTime: 2023-02-27 09:58
* @Version: 1.0
*/
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = DEAD_LETTER_QUEUEC_NAME)
public void receiveC(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("当前时间:{},死信队列C收到消息:{}", new Date().toString(), msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
添加消息生产者:
package com.miaosha.study.tet;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
import static com.miaosha.study.tet.RabbitMQConfig.*;
/**
* @Author: laz
* @CreateTime: 2023-02-27 09:52
* @Version: 1.0
*/
@Slf4j
@Component
public class DelayMessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void delayMsg(String msg,Integer delayTime){
rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUEC_ROUTING_KEY, msg, a ->{
a.getMessageProperties().setExpiration(String.valueOf(delayTime));
return a;
});
}
}
编写测试接口:
package com.miaosha.study.tet;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
import java.util.Objects;
/**
* @Author: laz
* @CreateTime: 2023-02-27 10:58
* @Version: 1.0
*/
@Slf4j
@RequestMapping("rabbitmq")
@RestController
public class RabbitMQMsgController {
@Autowired
private DelayMessageSender sender;
@RequestMapping("delayMsg")
public void delayMsg(String msg, Integer delayTime){
log.info("当前时间:{},收到请求,msg:{},delayType:{}", new Date(), msg, delayTime);
sender.delayMsg(msg, delayTime);
}
}
重启项目,访问接口,分别请求:
http://localhost:8081/rabbitmq/delayMsg?msg=msg1&delayTime=20000
http://localhost:8081/rabbitmq/delayMsg?msg=msg2&delayTime=2000
观察控制台:
通过观察时间可以看到,延时队列确实生效了,不过有个问题,第一个20s后被消费,第二个本来应该在2s后被消费,但结果确是跟着第一个后面消费的。也就是说,消息可能并不会按时“死亡“,因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,索引如果第一个消息的延时时长很长,而第二个消息的延时时长很短,则第二个消息并不会优先得到执行。
这个结果显然不是我们想要的结果,那么,我们还得再次优化!
利用RabbitMQ插件实现延迟队列
这里需要用到RabbitMQ的rabbitmq_delayed_message_exchange
插件,如果没有安装插件,可以点击查看安装教程
安装完成之后,在添加一个延时队列配置:
package com.miaosha.study.tet;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @Author: laz
* @CreateTime: 2023-02-27 09:50
* @Version: 1.0
*/
@Configuration
public class RabbitMQConfig {
public static final String DELAYED_QUEUE_NAME = "delay.queue.delay.queue";
public static final String DELAYED_EXCHANGE_NAME = "delay.queue.delay.exchange";
public static final String DELAYED_ROUTING_KEY = "delay.queue.delay.routingkey";
@Bean
public Queue immediateQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
@Bean
public CustomExchange customExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
@Bean
public Binding bindingNotify(@Qualifier("immediateQueue") Queue queue,
@Qualifier("customExchange") CustomExchange customExchange) {
return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
添加消费者:
package com.miaosha.study.tet;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;
import static com.miaosha.study.tet.RabbitMQConfig.*;
/**
* @Author: laz
* @CreateTime: 2023-02-27 09:58
* @Version: 1.0
*/
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveD(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("当前时间:{},延时队列收到消息:{}", new Date().toString(), msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
添加消息生产者:
package com.miaosha.study.tet;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
import static com.miaosha.study.tet.RabbitMQConfig.*;
/**
* @Author: laz
* @CreateTime: 2023-02-27 09:52
* @Version: 1.0
*/
@Slf4j
@Component
public class DelayMessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendDelayMsg(String msg, Integer delayTime) {
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, a ->{
a.getMessageProperties().setDelay(delayTime);
return a;
});
}
}
在写一个测试接口:
package com.miaosha.study.tet;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
import java.util.Objects;
/**
* @Author: laz
* @CreateTime: 2023-02-27 10:58
* @Version: 1.0
*/
@Slf4j
@RequestMapping("rabbitmq")
@RestController
public class RabbitMQMsgController {
@Autowired
private DelayMessageSender sender;
@RequestMapping("delayMsg2")
public void delayMsg2(String msg, Integer delayTime) {
log.info("当前时间:{},收到请求,msg:{},delayTime:{}", new Date(), msg, delayTime);
sender.sendDelayMsg(msg, delayTime);
}
}
然后重启项目,再次访问:
http://localhost:8081/rabbitmq/delayMsg2?msg=msg1&delayTime=20000
http://localhost:8081/rabbitmq/delayMsg2?msg=msg2&delayTime=2000
再次观察控制台:
可以看到,已然达到我们的要求,符合预期结果。