阻塞队列DelayQueue是一种无界阻塞队列,用于放置实现了Delayed
接口的对象。这些对象只能在其到期时才能从队列中取走,这种队列是有序的,即队头对象的延迟到期时间最长。如果没有任何延迟到期的对象,那么就不会有任何头元素,并且poll
将返回null
(当然,这取决于具体的实现,有可能抛出异常)。DelayQueue通常用于解决类似缓存系统的设计、定时任务调度、订单超时处理等问题。
- 缓存系统的设计:可以使用DelayQueue保存缓存元素的有效期。使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了,可以删除或者更新缓存。
- 定时任务调度:系统需要定时执行的任务可以交给DelayQueue保存,然后使用一个线程循环查询DelayQueue,一旦获取到任务就开始执行,比如每个一段时间发送一封邮件、扫描一次日志文件等。
- 订单超时处理:在电商系统中,用户提交了一个订单,就可以把这个订单放进DelayQueue,设置超时时间,超时后仍然没有支付就取消订单。
使用DelayQueue时,可以通过offer()
方法向队列中添加元素,如果队列已满则插入操作会失败并返回false
(尽管DelayQueue实际上是无界的,但理论上存在这种可能性)。通过take()
方法可以从队列中获取并移除头部元素,如果队列为空,则此方法会阻塞等待直到有元素可用。此外,DelayQueue还支持超时获取元素的操作,即可以设置一个超时时间,如果在该时间内队列中没有元素可用,则方法会返回。
在实现上,DelayQueue使用了内部锁和条件变量来处理元素的添加和移除操作。当添加元素时,如果队列为空或者新元素的延迟时间比当前头部元素的延迟时间要短,那么就会将新元素添加到队列中并可能唤醒等待的线程。而当移除元素时,如果队列为空或者请求的超时时间已经到达,那么线程就会被阻塞直到有元素可用或者超时。这种机制使得DelayQueue能够支持延迟处理和阻塞等待的功能
DelayQueue属于排序队列,它的特殊之处在于队列的元素必须实现Delayed接口,该接口需要实现compareTo和getDelay方法。
static class Task implements Delayed{
@Override
//比较延时,队列里元素的排序依据
public int compareTo(Delayed o) {
return 0;
}
@Override
//获取剩余时间
public long getDelay(TimeUnit unit) {
return 0;
}
}
DelayQueue是一个支持延时操作的无界阻塞队列。列头的元素是最先“到期”的元素,如果队列里面没有元素到期,是不能从列头获取元素的,哪怕有元素也不行。也就是说只有在延迟期满时才能够从队列中去元素。
它主要运用于如下场景:
1.缓存系统的设计:缓存是有一定的时效性的,可以用DelayQueue保存缓存的有效期,然后利用一个线程查询DelayQueue,如果取到元素就证明该缓存已经失效了。
2.定时任务的调度:DelayQueue保存当天将要执行的任务和执行时间,一旦取到元素(任务),就执行该任务。
DelayQueue采用支持优先级的PriorityQueue来实现,但是队列中的元素必须要实现Delayed接口,Delayed接口用来标记那些应该在给定延迟时间之后执行的对象,该接口提供了getDelay()方法返回元素节点的剩余时间。同时,元素也必须要实现compareTo()方法,compareTo()方法需要提供与getDelay()方法一致的排序。
元素进入队列后,先进行排序,然后,只有getDelay也就是剩余时间为0的时候,该元素才有资格被消费者从队列中取出来,所以构造函数一般都有一个时间传入。
具体实例:
package yxxy.c_025;
import java.sql.Time;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class Delayquue {
public static void main(String[] args) throws Exception {
BlockingQueue<Task> delayqueue = new DelayQueue<>();
long now = System.currentTimeMillis();
delayqueue.put(new Task(now+3000));
delayqueue.put(new Task(now+4000));
delayqueue.put(new Task(now+6000));
delayqueue.put(new Task(now+1000));
System.out.println(delayqueue);
for(int i=0; i<4; i++) {
System.out.println(delayqueue.take());
}
}
static class Task implements Delayed{
long time = System.currentTimeMillis();
public Task(long time) {
this.time = time;
}
@Override
public int compareTo(Delayed o) {
if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
return -1;
else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))
return 1;
else
return 0;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(time - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
}
@Override
public String toString() {
return "" + time;
}
}
}
输出结果:
可以看出来,每隔一段时间就会输出一个元素,这个间隔时间就是由构造函数定义的秒数来决定的。