JDK自身支持延迟队列的数据结构,其实类:java.util.concurrent.DelayQueue。
我们通过阅读源码的方式理解该延迟队列类的实现过程。
1.定义
DelayQueue:是一种支持延时获取元素的无界阻塞队列。
特性:
-
线程安全;
-
内部元素有“延迟”特性:只有延迟到期的元素才允许被获取;
-
具有优先级特性的无界队列,优先级以元素延迟时间为标准,最先过期的元素优先级最高(队首);
-
入队操作不会被阻塞,获取元素在特定情况会阻塞(队列为空,队首元素延迟未到期等);
根据其源码分析为何如此定义以及其特性的由来。
DelayQueue继承关系:
类图分析:
其核心继承/实现:
1.BlockingQueue:说明其具有阻塞队列的特性;
2.元素必实现接口Delayed,而Delayed继承了接口Comparable。因此所有元素必须实现两个方法:
compareTo方法用于元素比较;
getDelay方法用于获取元素剩余延时时间。
public interface Delayed extends Comparable<Delayed> {
/**
* 返回关联对象的剩余延迟时间(可指定时间单位)
*/
long getDelay(TimeUnit unit);
}
2.源码:
public class DelayQueue<E extends Delayed>
extends AbstractQueue<E>
implements BlockingQueue<E> {
/**
* 可重入锁,用于保证线程安全
*/
private final transient ReentrantLock lock = new ReentrantLock();
/**
* 优先队列(容器),实际存储元素的地方
*/
private final PriorityQueue<E> q = new PriorityQueue<E>();
/**
* 等待取元素线程的领导(leader)线程,有且仅有一个leader。
* 具有最高优先级,第一个尝试获取元素的线程。
* leader取完元素后,会唤醒新的等待线程成为新的leader。
*/
private Thread leader = null;
/**
* 触发条件,表示是否可以从队列中读取元素.
* 用于等待(await())/通知(signal())其他线程
*/
private final Condition available = lock.newCondition();
/**
* 构造函数
*/
public DelayQueue() {
}
/**
* 构造函数: 调用addAll()方法:将集合c 存入队列中
*
*/
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
/*--------------------------添加元素(非阻塞)-------------------------------*/
/**
* 插入新元素.
* 核心内容见:public boolean offer(E e)
*/
public boolean add(E e) {
return offer(e);
}
/**
* 插入新元素.
* 核心内容见:public boolean offer(E e)
*/
public void put(E e) {
offer(e);
}
/**
* 插入新元素.
* 核心内容见:public boolean offer(E e)
* @param e 元素
* @param timeout 此参数将被忽略,因为该方法从不阻塞(废弃)
* @param unit 此参数将被忽略,因为该方法从不阻塞(废弃)
* @return {@code true}
* @throws NullPointerException {@inheritDoc}
*/
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
}
/**
* 插入新元素.(线程安全 lock)
* 逻辑:
* 1.入队;
* 2.如果入队元素为队首元素(原队列为空),唤醒一个等待的线程,通知获取数据。
*
* @param e 元素
* @return {@code true}
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 入队
q.offer(e);
// 若该元素为队列头部元素(说明原队列为空),可以唤醒等待的线程取元素数据
if (q.peek() == e) {
// 如果队首元素是刚插入的元素,则设置leader为null(腾位置)
leader = null;
// 唤醒一个等待的线程
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
/*--------------------------取出(返回并删除)元素-------------------------------*/
/**
* 取出延迟到期元素(非阻塞的).(线程安全 lock)
* poll() 方法是非阻塞的,即调用之后无论元素是否存在/延迟到期都会立即返回。
* 逻辑:
* 1.查询队首元素;
* 2.元素延迟到期返回,否则返回null
*/
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 查询队首元素
E first = q.peek();
// 队首元素为空或者延时未到期 返回null
if (first == null || first.getDelay(NANOSECONDS) > 0) {
return null;
} else {
// 如果到期,取出并删除队首元素
return q.poll();
}
} finally {
lock.unlock();
}
}
/**
* 取出延迟到期元素(带有超时时间,阻塞).(线程安全 lock)
* 如果队首元素未到期或者为null,等待:直到队首元素延迟到期或者超出指定等待时间(timeout)
* 逻辑(无限循环等待获取):
* 宗旨:在不超出timeout的时间内,循环去取出延迟到期的队首元素(前提无其他线程正在取数--互斥).
* 1.查询队首元素;
* 2.1.队列空:等待timeout一段时间,直到等待超时(即timeout被重置小于等于0);
* 2.2.队列不为空:
* 2.2.1. 队首元素延迟到期,取出队首元素(poll());
* 2.2.2. 队首元素延迟未到期:
* 2.2.3 等待超时 ,返回null;
* 2.2.4 等待未超时,等待时间<延迟时间或者有其他线程正在取数据,继续等待到超时到期
* 2.2.5 等待为超时,等待时间>=延迟时间并且无其他线程正在取数据,该线程设置为leader等待到延迟到期(最后清空leader)
* 3. 循环后,如果leader=null(无正在取数线程)并且队列还有数据,唤醒一个等待线程最终成为leader.
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
// 以可中断方式获取锁
lock.lockInterruptibly();
try {
for (; ; ) {
// 获取队首元素
E first = q.peek();
if (first == null) {
// 若队首元素为空(即队列为空,这时就需要关注,当前取值请求是否需要阻塞等待
// 等待时间小于等于0 ,不阻塞等待,直接返回null)
if (nanos <= 0) {
return null;
} else {
// 等待相应的时间
nanos = available.awaitNanos(nanos);
}
} else {
// 若队列元素非空,获取队首元素剩余延迟时间
long delay = first.getDelay(NANOSECONDS);
// 延时过期 返回元素
if (delay <= 0) {
return q.poll();
}
// 延时未过期 等待时间超时 ,不等待,直接返回null
if (nanos <= 0) {
return null;
}
first = null;
// 延时和等待都未到期且等待时间<延迟时间 或者 有其他线程在取数据,当前请求继续等待
if (nanos < delay || leader != null) {
nanos = available.awaitNanos(nanos);
} else {
// 没有其他线程等待,将当前线程设置为 leader,类似于“独占”操作
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待直到延迟到期
long timeLeft = available.awaitNanos(delay);
// 计算超时时间
nanos -= delay - timeLeft;
} finally {
// 该线程操作完毕,把 leader 置空
if (leader == thisThread) {
leader = null;
}
}
}
}
}
} finally {
// 如果leader线程为空 并且 queue非空,则唤醒其他等待线程
if (leader == null && q.peek() != null) {
available.signal();
}
lock.unlock();
}
}
/**
* 取出延迟到期元素(无超时时间限制,阻塞).(线程安全 lock)
* 逻辑(无限循环等待获取):
* 其逻辑参考poll(long timeout, TimeUnit unit).
* 其区别在于:不受超时时间限制(timeout)
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 以可中断方式获取锁
lock.lockInterruptibly();
try {
// 无限循环
for (; ; ) {
// 获取队首元素
E first = q.peek();
if (first == null) {
// 若队首元素为空(队列为空),则等待
available.await();
} else {
// 若队列元素非空,获取队首元素剩余延迟时间
long delay = first.getDelay(NANOSECONDS);
// 延迟到期,获取队首元素
if (delay <= 0) {
return q.poll();
}
// 延时未过期
first = null;
// leader 不为空表示有其他线程在读取数据,当前线程等待
if (leader != null) {
available.await();
} else {
// 没有其他线程等待,将当前线程设置为 leader
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待延迟时间过期
available.awaitNanos(delay);
} finally {
if (leader == thisThread) {
leader = null;
}
}
}
}
}
} finally {
// 如果leader线程为空 并且 queue非空,则唤醒其他等待线程
if (leader == null && q.peek() != null) {
available.signal();
}
lock.unlock();
}
}
/*--------------------------读取队首元素-------------------------------*/
/**
* 读取队首元素.(线程安全 lock)
*/
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.peek();
} finally {
lock.unlock();
}
}
/*--------------------------读取队列长度-------------------------------*/
/**
* 获取队列数据的长度.(线程安全 lock)
*/
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.size();
} finally {
lock.unlock();
}
}
/*--------------------------获取延迟到期元素集合-------------------------------*/
/**
* 将队列中延迟到期数据 收集到集合C中.(线程安全 lock)
*
* @return 返回延迟到期元素数量
*/
public int drainTo(Collection<? super E> c) {
if (c == null) {
throw new NullPointerException();
}
if (c == this) {
throw new IllegalArgumentException();
}
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = 0;
// peekExpired() 判断队首元素是否延迟到期
for (E e; (e = peekExpired()) != null; ) {
c.add(e);
q.poll();
++n;
}
return n;
} finally {
lock.unlock();
}
}
/**
* 将队列中延迟到期数据 收集到集合C中(C集合总数有限制小于maxElements).(线程安全 lock)
* @return 返回延迟到期元素数量
*/
public int drainTo(Collection<? super E> c, int maxElements) {
if (c == null) {
throw new NullPointerException();
}
if (c == this) {
throw new IllegalArgumentException();
}
if (maxElements <= 0) {
return 0;
}
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = 0;
// peekExpired() 判断队首元素是否延迟到期。并且到期元素总数不允许超过maxElements
for (E e; n < maxElements && (e = peekExpired()) != null; ) {
c.add(e);
q.poll();
++n;
}
return n;
} finally {
lock.unlock();
}
}
/**
* 读取队首元素(已延迟到期).(私有方法)
*/
private E peekExpired() {
// 获取队首元素
E first = q.peek();
// 队首元素存在并且延迟到期,否则返回null
return (first == null || first.getDelay(NANOSECONDS) > 0) ?
null : first;
}
/*--------------------------删除元素-------------------------------*/
/**
* 清除队列中所有元素(线程安全 lock)--暴力清除
*/
public void clear() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.clear();
} finally {
lock.unlock();
}
}
/**
* 删除指定元素O.(线程安全 lock)
*/
public boolean remove(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.remove(o);
} finally {
lock.unlock();
}
}
/**
* 删除指定元素O.(这里指的是相同的对象引用/内存地址)(线程安全 lock)
*/
void removeEQ(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
// 使用了对象引用/内存地址相等比较
if (o == it.next()) {
it.remove();
break;
}
}
} finally {
lock.unlock();
}
}
/*--------------------------队列转数组-------------------------------*/
/**
* 将队列元素都复制到数组中(无序).(线程安全 lock)
*/
public Object[] toArray() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.toArray();
} finally {
lock.unlock();
}
}
/**
* 将队列元素都复制到数组a中(无序).
*/
public <T> T[] toArray(T[] a) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.toArray(a);
} finally {
lock.unlock();
}
}
/*--------------------------私有内部类--迭代器-------------------------------*/
/**
* 返回此队列中所有元素(已过期和未过期)的迭代器。迭代器不按任何特定顺序返回元素。
*/
public Iterator<E> iterator() {
return new Itr(toArray());
}
/**
* 快照迭代器,用于处理底层 队列/数组的副本。
*/
private class Itr implements Iterator<E> {
final Object[] array; // Array of all elements
int cursor; // index of next element to return
int lastRet; // index of last element, or -1 if no such
Itr(Object[] array) {
lastRet = -1;
this.array = array;
}
public boolean hasNext() {
return cursor < array.length;
}
@SuppressWarnings("unchecked")
public E next() {
if (cursor >= array.length)
throw new NoSuchElementException();
lastRet = cursor;
return (E) array[cursor++];
}
public void remove() {
if (lastRet < 0)
throw new IllegalStateException();
removeEQ(array[lastRet]);
lastRet = -1;
}
}
}
3.使用demo:
使用DelayQueue实现延迟队列:
优点:实现简单。
缺点:可扩展性较差,内存限制、无持久化机制等。
@SneakyThrows
public static void main(String[] args) {
DelayQueue<TestTask> testTaskDelayQueue = new DelayQueue<>();
long time = System.currentTimeMillis();
testTaskDelayQueue.offer(TestTask.builder().name("test_1").endTime(time + 10 * 1000).build());
testTaskDelayQueue.offer(TestTask.builder().name("test_2").endTime(time + 4 * 1000).build());
testTaskDelayQueue.offer(TestTask.builder().name("test_3").endTime(time + 16 * 1000).build());
for(;;){
System.out.println(testTaskDelayQueue.take());
TimeUnit.SECONDS.sleep(2);
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
private static class TestTask implements Delayed {
private String name;
private Long endTime;
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
}
}