文章目录
- 1. 整体设计
- 1.1 类注释
- 1.2、类图
- 1.3 延迟队列的属性
- 1.4 DelayQueue 的主要方法
- 1.4.1 offer 添加元素
- 1.4.2 take 取出元素
- 1.4.3 poll 取出元素
1. 整体设计
DelayQueue 延迟队列底层使用的是锁的能力,比如说要在当前时间往后延迟 5 秒执行,那么当前线程就会沉睡 5 秒,等 5 秒后线程被唤醒时,如果能获取到资源的话,线程即可立马执行。原理上似乎很简单,但内部实现却很复杂,有很多难点,比如当运行资源不够,多个线程同时被唤醒时,如何排队等待?比如说在何时阻塞?何时开始执行等等?接下来我们从源码角度来看下是如何实现的。
1.1 类注释
类注释上比较简单,只说了三个概念:
- 无界延时队列中元素将在过期时被执行,越靠近队头,越早过期;
- 未过期的元素不能够被 take;
- 不允许空元素。
1.2、类图
DelayQueue 的类图,关键是 DelayQueue 类上是有泛型的,如下:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
从泛型中可以看出,DelayQueue 中的元素必须是 Delayed 的子类,Delayed 是表达延迟能力的关键接口,其继承了 Comparable 接口,并定义了还剩多久过期的方法,如下:
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
也就是说 DelayQueue 队列中的元素必须是实现 Delayed 接口和 Comparable 接口的,并覆写了 getDelay 方法和 compareTo 的方法才行,不然在编译时,编译器就会提醒我们元素必须强制实现 Delayed 接口。
compareTo(Delayed o):用于比较延时,这是队列里元素的排序依据。当生产者线程调用 put 之类的方法加入元素时,会触发 Delayed 接口中的 compareTo 方法进行排序,也就是说队列中元素的顺序是按到期时间排序的,而非它们进入队列的顺序。排在队列头部的元素是最早到期的,越往后到期时间赿晚。
getDelay(TimeUnit unit):这个接口返回元素是否到期,小于等于 0 表示元素已到期,大于 0 表示元素未到期。消费者线程查看队列头部的元素(注意是查看不是取出),然后调用元素的 getDelay 方法,如果此方法返回的值小于0或者等于0,则消费者线程会从队列中取出此元素,并进行处理。如果 getDelay 方法返回的值大于 0,则消费者线程 wait 返回的时间值后,再从队列头部取出元素,此时元素已经到期。
1.3 延迟队列的属性
DelayQueue 中的重要属性如下所示。
// 可重入锁,用于保证线程安全
private final transient ReentrantLock lock = new ReentrantLock();
// DelayQueue 的实现依赖于 PriorityQueue(优先队列),用于存储元素,并按过期时间优先排序
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 用于优化内部阻塞通知的线程
// 第一个等待某个延时对象的线程,在延时对象还没有到期时其他线程看到这个 leader 不为 null,那么就直接 wait,主要是为了避免大量线程在同一时间点唤醒,导致大量的竞争,反而影响性能
private Thread leader = null;
// 用于实现阻塞的 Condition 对象
private final Condition available = lock.newCondition();
DelayQueue 内部使用非线程安全的优先队列(PriorityQueue),并使用 Leader-Followers (领导者-追随者)模式,最小化不必要的等待时间。什么是领导者-追随者模式.
1.4 DelayQueue 的主要方法
1.4.1 offer 添加元素
public boolean offer(E e) {
// 获取全局独占锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 向优先队列中插入元素
q.offer(e);
// 检验元素是否为队首,是则设置 leader 为 null, 并唤醒一个消费线程
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
// 释放全局独占锁
lock.unlock();
}
}
leader 是等待获取队头元素的线程,领导者-追随者模式设计减少不必要的等待。
如果 leader != null,表示已经有线程在等待获取队头元素,会通过 await() 方法让出当前线程等待信号。
如果 leader == null,则把当前线程设置为 leader,当一个线程为 leader 时,会使用 awaitNanos() 让当前线程等待接受信号,或等待 delay 时间。
DelayQueue 的其他入队方法,如 add(E e) 和 put(E e) 方法,都是调用上述 offer(E e) 方法实现的。
1.4.2 take 取出元素
take() 方法取出队列元素,当没有元素被取出时,该方法阻塞。
一开始看到全局独占锁,理所当然详情属于队列消费模式。 无法理解 “领导者-追随者模式”。take方法实现了一个“领导者-追随者模式”的线程处理方式,只有leader线程会等待指定时间后获得锁,其他线程都会进入无限期等待。 如果多个线程调用take() 方法, 当available.awaitNanos(delay);的时候, 其它线程可以抢锁进入。 下面有测试例子。源码中:java.util.concurrent.locks.AbstractQueuedLongSynchronizer.ConditionObject.await()和await(long time, TimeUnit unit); 方法 Node node = addConditionWaiter(); long savedState = fullyRelease(node); 队列状态释放
take方法主要实现逻辑为(for循环体):
1. 获取头节点对象,如果为空,线程释放锁,并进入无限期等待。等待调用offer方法,放入对象后,通过signal()方法唤醒。【看offer方法的源码】
2. 如果头节点对象不为空,获取该对象的延迟时间,如果小于0,直接从队列中取出并移除该对象,返回。
3. 如果头节点对象延迟时间大于0,判断是否“leader线程”是否已经存在,如果存在说明当前线程为“追随者线程”,进入无限期等待(等待leader线程take方法完成后,唤醒)。
4. 如果“leader线程”不存在,把当前线程设置为“leader线程”,释放锁并等待头节点对象的延迟时间后,重新获得锁,下次循环获取头节点对象返回。
5. finally代码块,每次leader线程执行完成take方法后,需要唤醒其他线程获得锁成为新的leader线程。
public E take() throws InterruptedException {
// 获取全局独占锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 获取队头元素,peek 方法不会删除元素
E first = q.peek();
if (first == null)
// 若队头为空,则阻塞当前线程
available.await();
else {
// 否则获取队头元素的超时时间
long delay = first.getDelay(NANOSECONDS);
// 已超时,直接出队
if (delay <= 0)
return q.poll();
// 释放 first 的引用,避免内存泄漏
first = null; // don't retain ref while waiting
// leader != null 表明有其他线程在操作,阻塞当前线程
if (leader != null)
available.await();
else {
// leader 指向当前线程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 超时阻塞
available.awaitNanos(delay);
} finally {
// 释放 leader
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// leader 为 null 并且队列不为空,说明没有其他线程在等待,那就通知条件队列
if (leader == null && q.peek() != null)
available.signal();
// 释放全局独占锁
lock.unlock();
}
}
Condition.await() 和Condition.await(100, TimeUnit.SECONDS); 方法进入等待时候,其它线程可以抢抢到锁
package com.lvyuanj.test.timer;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
public class TestConditionAwait {
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
final Condition condition = lock.newCondition();
final Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
lock.lock();
try {
Thread.currentThread().setName("ConditionAwait");
log.error(Thread.currentThread().getName() + " beforeAwaitTime:" + System.currentTimeMillis());
condition.await(100, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error(Thread.currentThread().getName() + " finishAwaitTime:" + System.currentTimeMillis());
} finally {
lock.unlock();
log.error(Thread.currentThread().getName() + " unlockTime:" + System.currentTimeMillis());
}
}
});
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
Thread.currentThread().setName("ConditionSignal");
try {
lock.lock();
log.error(Thread.currentThread().getName() + " getLockTime:" + System.currentTimeMillis());
//thread1.interrupt();
long currentTime = System.currentTimeMillis();
while (System.currentTimeMillis() - currentTime < 8000) {
}
condition.signal();
log.error(Thread.currentThread().getName() + " signalTime:" + System.currentTimeMillis());
} catch (Exception e) {
} finally {
lock.unlock();
log.error(Thread.currentThread().getName() + " unlockTime:" + System.currentTimeMillis());
}
}
});
thread1.start();
Thread.sleep(50);
thread2.start();
}
}
1.4.3 poll 取出元素
取出队头元素,当延迟队列中没有到期的元素可以取出时,返回 null。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}