1、 前言
1.1、什么是延迟队列?
延时队列相比于普通队列最大的区别就体现在其延时的属性上,普通队列的元素是先进先出,按入队顺序进行处理,而延时队列中的元素在入队时会指定一个延迟时间,表示其希望能够在经过该指定时间后处理。从某种意义上来讲,延迟队列的结构并不像一个队列,而更像是一种以时间为权重的有序堆结构。
1.2、应用场景
我们在一些业务场景中,经常会遇到一些需要经历一段时间后,或者到达某个时间节点才会执行的功能。就比如以下这些场景:
新建一个订单,在规定时间内未支付需要自动取消
外卖或者打车在预计时间到达的前十分钟提醒骑手或者司机即将超时
快递收货后在规定时间内用户没有确认收货会自动确认收货
预定的会议在会议开始前十分钟会去提醒你尽快加入会议
每日周报在截止半小时前会提醒你尽快提交
1.3、为什么要使用延迟队列
对于一些数据量小并且对数据的时效性不怎么要求的项目来说,最简单有效的方法就是写一个定时任务去扫描数据库以达到业务的实现。当然,如果在数据达到数百万或者千万级别的时候,如果去定时扫描数据库,容易挨揍哈。想信大家也有所了解,当数据达到这种地步的时候,还去定时扫表会非常低效,甚至对于那些定时间隔比较小的情景来说,这一遍还没扫完下一遍就要开始了。这时候如果用延迟队列的话或许会很有效。
实现延迟队列的几种途径
Quartz 定时任务
DelayQueue 延迟队列
Redis sorted set
Redis 过期键监听回调
RabbitMQ死信队列
RabbitMQ基于插件实现延迟队列
wheel时间轮算法
2、Redis sorted set
在Redis中,zet作为有序集合,可以利用其有序的特性,将任务添加到zset中,将任务的到期时间作为score,利用zset的默认有序特性,获取score值最小的元素(也就是最近到期的任务),判断系统时间与该任务的到期时间大小,如果达到到期时间,就执行业务,并删除该到期任务,继续判断下一个元素,如果没有到期,就sleep一段时间(比如1秒),如果集合为空,也sleep一段时间。
通过zadd命令向队列delayqueue中添加元素,并设置score值表示元素过期的时间;向delayqueue添加三个order1、order2、order3,分别是10秒、20秒、30秒后过期。
zadd delayqueue 3 order3
消费端轮询队列delayqueue,将元素排序后取最小时间与当前时间比对,如小于当前时间代表已经过期移除key。
/**
* 消费消息
*/
public void pollOrderQueue() {
while (true) {
Set<Tuple> set = jedis.zrangeWithScores(DELAY_QUEUE, 0, 0);
String value = ((Tuple) set.toArray()[0]).getElement();
int score = (int) ((Tuple) set.toArray()[0]).getScore();
Calendar cal = Calendar.getInstance();
int nowSecond = (int) (cal.getTimeInMillis() / 1000);
if (nowSecond >= score) {
jedis.zrem(DELAY_QUEUE, value);
System.out.println(sdf.format(new Date()) + " removed key:" + value);
}
if (jedis.zcard(DELAY_QUEUE) <= 0) {
System.out.println(sdf.format(new Date()) + " zset empty ");
return;
}
Thread.sleep(1000);
}
}
我们看到执行结果符合预期:
2020-05-07 13:24:09 add finished.
2020-05-07 13:24:19 removed key:order1
2020-05-07 13:24:29 removed key:order2
2020-05-07 13:24:39 removed key:order3
2020-05-07 13:24:39 zset empty
3、Redis 过期键监听回调
Redis的key过期回调事件,也能达到延迟队列的效果,简单来说我们开启监听key是否过期的事件,一旦key过期会触发一个callback事件。
修改redis.conf文件开启notify-keyspace-events Ex。
notify-keyspace-events Ex
Redis监听配置,注入Bean RedisMessageListenerContainer。
其次,配置redis监听器 最后,编写redis key过期监听回调方法
@Configuration
public class RedisListenerConfig {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
}
编写Redis过期回调监听方法,必须继承KeyExpirationEventMessageListener ,有点类似于MQ的消息监听。
@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Override
public void onMessage(Message message, byte[] pattern) {
String expiredKey = message.toString();
System.out.println("监听到key:" + expiredKey + "已过期");
}
}
到这代码就编写完成,非常的简单,接下来测试一下效果,在redis-cli客户端添加一个key并给定3s的过期时间。
set xiaofu 123 ex 3
在控制台成功监听到了这个过期的key。
监听到过期的key为:xiaofu
4、Quartz定时任务
Quartz一款非常经典任务调度框架,在Redis、RabbitMQ还未广泛应用时,超时未支付取消订单功能都是由定时任务实现的。
导入Quartz依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
>在启动类中使用@EnableScheduling注解开启定时任务功能。
```java
@SpringBootApplication
@EnableScheduling
public class DelayQueueApplication {
public static void main(String[] args) {
SpringApplication.run(DelayQueueApplication.class, args);
}
}
编写定时任务
@Slf4j
@Component
public class QuartzDemo {
/**
* 每隔五秒开启一次任务
*/
@Scheduled(cron = "0/5 * * * * ? ")
public void process(){
log.info("--------------定时任务测试--------------");
}
}
5、DelayQueue 延迟队列
JDK中提供了一组实现延迟队列的API,位于Java.util.concurrent包下DelayQueue。
DelayQueue是一个BlockingQueue(无界阻塞)队列,它本质就是封装了一个PriorityQueue(优先队列),PriorityQueue内部使用完全二叉堆(不知道的自行了解哈)来实现队列元素排序,我们在向DelayQueue队列中添加元素时,会给元素一个Delay(延迟时间)作为排序条件,队列中最小的元素会优先放在队首。队列中的元素只有到了Delay时间才允许从队列中取出。队列中可以放基本数据类型或自定义实体类,在存放基本数据类型时,优先队列中元素默认升序排列,自定义实体类就需要我们根据类属性值比较计算了。
先简单实现一下看看效果,添加三个order入队DelayQueue,分别设置订单在当前时间的5秒、10秒、15秒后取消。
要实现DelayQueue延时队列,队中元素要implements Delayed 接口,这哥接口里只有一个getDelay方法,用于设置延期时间。Order类中compareTo方法负责对队列中的元素进行排序。
public class Order implements Delayed {
/**
* 延迟时间
*/
@JsonFormat(locale = "zh", timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
private long time;
String name;
public Order(String name, long time, TimeUnit unit) {
this.name = name;
this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
}
@Override
public long getDelay(TimeUnit unit) {
return time - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
Order Order = (Order) o;
long diff = this.time - Order.time;
if (diff <= 0) {
return -1;
} else {
return 1;
}
}
}
DelayQueue的put方法是线程安全的,因为put方法内部使用了ReentrantLock锁进行线程同步。DelayQueue还提供了两种出队的方法poll()和take() , poll()为非阻塞获取,没有到期的元素直接返回null;take()阻塞方式获取,没有到期的元素线程将会等待。
public class DelayQueueDemo {
public static void main(String[] args) throws InterruptedException {
Order Order1 = new Order("Order1", 5, TimeUnit.SECONDS);
Order Order2 = new Order("Order2", 10, TimeUnit.SECONDS);
Order Order3 = new Order("Order3", 15, TimeUnit.SECONDS);
DelayQueue<Order> delayQueue = new DelayQueue<>();
delayQueue.put(Order1);
delayQueue.put(Order2);
delayQueue.put(Order3);
System.out.println("订单延迟队列开始时间:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
while (delayQueue.size() != 0) {
/**
* 取队列头部元素是否过期
*/
Order task = delayQueue.poll();
if (task != null) {
System.out.format("订单:{%s}被取消, 取消时间:{%s}\n", task.name,
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
}
Thread.sleep(1000);
}
}
}
上边只是简单的实现入队与出队的操作,实际开发中会有专门的线程,负责消息的入队与消费。
执行后看到结果如下,Order1、Order2、Order3 分别在 5秒、10秒、15秒后被执行,至此就用DelayQueue实现了延时队列。
订单延迟队列开始时间:2020-05-06 14:59:09
订单:{Order1}被取消, 取消时间:{2020-05-06 14:59:14}
订单:{Order2}被取消, 取消时间:{2020-05-06 14:59:19}
订单:{Order3}被取消, 取消时间:{2020-05-06 14:59:24}
6、 RabbitMQ 延时队列
利用RabbitMQ做延时队列是比较常见的一种方式,而实际上RabbitMQ自身并没有直接支持提供延迟队列功能,而是通过 RabbitMQ 消息队列的 TTL和 DXL这两个属性间接实现的。
先来认识一下 TTL和 DXL两个概念:
Time To Live(TTL):
TTL 顾名思义:指的是消息的存活时间,RabbitMQ可以通过x-message-tt参数来设置指定Queue(队列)和 Message(消息)上消息的存活时间,它的值是一个非负整数,单位为微秒。
RabbitMQ 可以从两种维度设置消息过期时间,分别是队列和消息本身:
设置队列过期时间,那么队列中所有消息都具有相同的过期时间。
设置消息过期时间,对队列中的某一条消息设置过期时间,每条消息TTL都可以不同。
如果同时设置队列和队列中消息的TTL,则TTL值以两者中较小的值为准。而队列中的消息存在队列中的时间,一旦超过TTL过期时间则成为Dead Letter(死信)。
Dead Letter Exchanges(DLX)
总结
为了让大家更容易理解,上边的代码写的都比较简单粗糙,几种实现方式的demo已经都提交到GitHub 地址:https://github.com/chengxy-nds/delayqueue,感兴趣的小伙伴可以下载跑一跑。
可能写的有不够完善的地方,如哪里有错误或者不明了的,欢迎大家踊跃指正!