一、什么是DelayQueue
DelayQueue是一个支持并发的无界延迟队列,队列中的每个元素都有个预定时间,当线程从队列获取元素时,只有到期元素才会出队列,没有到期元素则阻塞等待。队列头元素是最快要到期的元素。因此DelayQueue可用于实现定时任务队列。
DelayQueue中的主要成员变量和方法如下:
q:使用优先队列PriorityQueue存储数据,队列中的元素需实现Delayed接口,实现getDelay()和compareTo()方法,以实现优先队列内部的优先级比较,剩余到期时间越短的元素优先级越高
public interface Delayed extends Comparable<Delayed> {
//获取元素剩余到期时间
long getDelay(TimeUnit unit);
}
lock:使用ReentrantLock对插入和读取队列元素的方法进行加锁,以实现多线程并发读写队列操作的同步。
available:用一个条件等待队列存放等待获取到期元素的线程。
leader:用于表示当前正在等待获取队头元素的线程,这里使用了一个Leader-Follower模式的变体,线程获取完元素后从等待队列中选择一个线程成为leader继续等待获取队头元素,以避免不必要的竞争消耗。
Leader-Follower模式
在并发IO中,当一个线程收到IO事件后,会考虑启动一个新的线程去处理,而自己继续等待下一个请求。但这里可能会有性能问题,就是把工作交给别一个线程的时候需上下文切换,包括数据拷贝。
而在Leader-Follower模式中所有线程会有三种身份中的一种:leader和follower,以及一个干活中的状态:proccesser。它的基本原则就是,永远最多只有一个leader。而所有follower都在等待成为leader。线程池启动时会自动产生一个Leader负责等待事件,当有一个事件产生时,Leader线程首先通知一个Follower线程将其提拔为新的Leader,然后自己去处理这个事件,处理完毕后加入Follower线程等待队列,等待下次成为Leader。这种方法可以增强CPU高速缓存相似性,及消除动态内存分配和线程间的数据交换。
参考:Leader/Follower多线程网络模型介绍
二、主要方法源码解析
1. offer()
插入元素到队列。首先获取锁,拿到锁后向优先队列中插入元素,若插入完毕后发现队头元素就是自己,即最近到期时间的元素就是自己,刷新了记录,那就赶紧从等待队列中通知一个线程准备来获取这个元素,然后释放锁。
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
这里为什么要将leader先置为null?:
因为如果此时leader线程在超时等待获取前任队头元素,而signal通知了另一个线程,看完take()的源码可以知道如果有leader线程,那么此线程会直接阻塞等待,让leader线程超时完后获取队头,那显然时间就不正确了,只有将leader设为null,后续线程才能成为leader并设置正确的超时时间来等待获取最新队头元素
因此,leader变量的真正含义是:超时等待获取队列最新队头元素的线程,等待的时间即为最新队头元素剩余到期时间
因此,当队头元素发生变动(插入/删除更新)时,就需要唤醒一个线程更新leader
2. take()
获取优先队列队头元素。首先获取锁,拿到锁后进入一个循环,首先检测队头元素,若为空则进入等待队列阻塞等待,若不为空且队头元素已到期则直接将其出队返回,如果还没到期就看有没有线程已经在准备获取队头元素了,如果有就不用抢了,进入等待队列阻塞等待,如果没有就超时等待准备获取队头元素,被唤醒后进入下一次循环获取队头元素。获取完毕后就从等待队列中通知一个线程准备获取队头元素然后释放锁。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
//从优先堆中获取堆顶元素,即优先级最高,即预定时间最近的元素
E first = q.peek();
if (first == null)
//若队列中无元素则直接进入条件队列等待
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
//若堆顶元素已经到期,则直接将其出队返回
if (delay <= 0)
return q.poll();
//等待期间不持有元素引用,防止该元素被其他线程出队消费后,仍不能被垃圾回收
first = null; // don't retain ref while waiting
if (leader != null)
//若已经有leader了,则进入条件队列无限期等待
available.await();
else {
//否则成为leader进入条件队列超时等待,到预期时间或者有更近时间元素插入就到同步队列竞争锁,再重复循环去取堆顶元素
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
//取完元素后若leader为null且队列中还有元素则从条件等待队列通知一个线程到同步队列
//为什么存在leader不为null的情况:leader线程从awaitNanos()中结束后没有竞争过新进take()的线程,因此继续在同步队列中被阻塞,因此无需再从条件等待队列中通知线程,直接让leader线程再去竞争锁,
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();//释放锁资源让同步队列中的线程竞争锁
}
}
Leader-Follower模式在这里的作用在于,在队头元素还没到期的情况下,只需要有一个线程(leader)超时等待,其余线程进来后发现已经有leader了,就直接无限等待就行了,避免了无意义的超时等待和竞争消耗。
参考:What exactly is the leader used for in DelayQueue?
3. poll()
加锁获取并移除队头过期元素,如果没有过期元素则不等待直接返回 null。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}
4. size()
加锁获取队列当前剩余元素个数
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.size();
} finally {
lock.unlock();
}
}
三、使用案例
如下使用案例,首先向DelayQueue插入5个定时任务,然后用3个线程并发读取
public class DelayQueueTest {
//队列元素类
static class DelayTask implements Delayed {
long exeTime;//预定执行时间
public DelayTask(long exeTime) {
this.exeTime = exeTime;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.exeTime - System.currentTimeMillis(), unit);
}
@Override
public int compareTo(Delayed o) {
long delta = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
return (int) delta;
}
}
public static void main(String[] args) {
DelayQueue<DelayTask> delayQueue = new DelayQueue<>();
for (int i = 1;i <= 5;i++) {
delayQueue.offer(new DelayTask(System.currentTimeMillis() + new Random().nextInt(10)*1000));
}
for (int i = 1;i <= 3;i++) {
new Thread(() -> {
try {
while (true) {
DelayTask task = delayQueue.take();
System.out.printf("取出任务!取出时间:%s 任务预定执行时间:%s%n", hms(System.currentTimeMillis()), hms(task.exeTime));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
public static String hms(long milliseconds) {
return new SimpleDateFormat("HH:mm:ss").format(milliseconds);
}
}
运行结果:
取出任务!取出时间:10:27:39 任务预定执行时间:10:27:39
取出任务!取出时间:10:27:39 任务预定执行时间:10:27:39
取出任务!取出时间:10:27:40 任务预定执行时间:10:27:40
取出任务!取出时间:10:27:42 任务预定执行时间:10:27:42
取出任务!取出时间:10:27:46 任务预定执行时间:10:27:46