1、DelayQueue介绍
DelayQueue 是一个延迟队列,生产者写入一个数据,这个数据具有被直接消费的延迟时间,
让数据具有延迟的特性。
DelayQueue底层也是基于二叉堆来实现的,DelayQueue本就是基于PriorityBQueue 实现的。
二叉堆结构每次获取的是堆顶数据,在比较时,根据延迟时间进行比较,延迟时间剩余端的放
在堆顶。
由于 DelayQueue 基于 PriorityQueue 实现的,因此 DelayQueue 理论上也是一个无边界队
列,DelayQueue 容量可以进行无限扩容。
2、DelayQueue核心属性
由DelayQueue 结构可以发现,DelayQueue 存储的数据必须实现 Delayed 接口
DelayQueue 结构如下:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
//锁,阻塞队列需要使用锁来保证线程安全
//只有一把锁,表示生产者和消费者使用的是同一把锁
private final transient ReentrantLock lock = new ReentrantLock();
//基于优先级队列 PriorityQueue
private final PriorityQueue<E> q = new PriorityQueue<E>();
/**
* leader 一般用来保存等待堆顶数据的消费者线程
*/
private Thread leader = null;
/**
* 基于 PriorityQueue(基于二叉堆)实现数据存储,生产者在插入数据时是不会阻塞的,
* 当前的Condition就是给消费者用的,当消费者获取数据时,当堆顶数据的延迟时间还不为
* 0(即还没到执行时间点),此时消费者线程会阻塞挂起等待一会(等待的是堆顶数据),直到堆顶数据延迟时间为0(到达任务执行时间点)
* 或者 生产者新插入的数据到了堆顶,此时生产者会调用Condition.signal() 方法唤醒消费者线程
*/
private final Condition available = lock.newCondition();
public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
}
/**
Delayed 继承 Comparable接口,所以 Delayed 的实现都可以进行比较操作
*/
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);//获取延迟时间,比较延迟时间
}
3、DelayQueue使用示例
DelayQueue 常用方法也是 BlockingQueue接口中定义的那几个存储数据和获取数据的方法,
只有一点需要注意,即 DelayQueue 保存的数据必须实现接口Delayed
DelayQueue 使用示例如下:
public class TaskDelayed implements Delayed {
private String name;
/** 执行时间点*/
private Long time;
public TaskDelayed(String name,Long time){
this.name = name;
this.time = System.currentTimeMillis()+time;
}
/**
* 设置 任务TaskDelayed 什么时候可以出延迟队列DelayedQueue
* 该方法返回值小于等于0时任务才会从 延迟队列DelayedQueue 中取出执行
*
*/
@Override
public long getDelay(TimeUnit unit) {
//TimeUnit.MILLISECONDS :将时间转换为毫秒
return unit.convert(time - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
}
/**
* 比较器
* 2个 TaskDelayed 任务在存储到延迟队列时的比较方式,通过time属性进行比较
* 返回值:
* < 0: 按从小到大排列
* == 0 : 相等
* >0 : 从大到小排列
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
TaskDelayed task = (TaskDelayed) o;
return (int)(this.time - task.getTime());
}
}
public class DelayedQueueDemo01 {
public static void main(String[] args) throws InterruptedException {
TaskDelayed task1 = new TaskDelayed("Tome",2000L);
TaskDelayed task2 = new TaskDelayed("JieRui",4000L);
TaskDelayed task3 = new TaskDelayed("zhuDy",3000L);
TaskDelayed task4 = new TaskDelayed("zhanmusi",1000L);
//DelayQueue 存放的数据必须实现接口Delayed
DelayQueue<TaskDelayed> queue = new DelayQueue<>();
//添加数据
queue.add(task1);
queue.offer(task2);
queue.offer(task3,4, TimeUnit.SECONDS);
queue.put(task4);
//取数据
System.out.println(queue.remove());//若堆顶数据的延迟时间还没到达,则poll()返回null,remove()会直接抛出异常
System.out.println(queue.poll());
System.out.println(queue.poll(5,TimeUnit.SECONDS));
System.out.println(queue.take());
}
}
4、DelayQueue写入流程分析
因为 DelayQueue 底层是基于 PriorityQueue 实现的,也就是基于二叉堆实现的,所以
DelayQueue 是一个无界的队列,存储数据的数组可以动态扩容,所以生产者不需要关注
队列满了而阻塞的问题,因此这里只需要关注offer(E e) 方法就可以了
offer(E e) 代码如下:
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//直接调用PriorityQueue的插入方法
q.offer(e);
//判断数据插入后的二叉堆的堆顶元素是不是刚插入的数据,
//若是,则说明当前堆顶数据可能已到达延迟时间可以进行消费,唤醒等待的消费者线程,并将当前等待的消费者线程设置为null
if (q.peek() == e) {
/**
* leader 赋值为null
* todo 在消费者消费数据时会判断leader 是否为null
*/
leader = null;
/**
* 唤醒挂起阻塞的消费者线程,避免刚插入的数据的延迟时间出现问题
* 这里可以发现消费者等待的是堆顶数据
*/
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
5、DelayQueue取数据流程分析
消费者取数据过程需要考虑阻塞问题:
1)队列为空,无数据,消费者线程需要挂起等待一会
2)堆顶数据的延迟时间还没到,此时消费者线程需要挂起等待一会
3)当消费者A已经在等待堆顶数据,此时消费B也过来取数据,此时消费者B需要
挂起等待一会
5.1、remove()
该方法功能是取数据,取堆顶数据,若取不到数据,则直接抛出异常。
注意:若堆顶数据的延迟时间还没到达,则取不到数据,也会抛出异常
remove 方法如下:
5.2、poll()
该方法功能是读取数据,poll()方法不会阻塞消费者,能获取数据就直接返回,否则返回null
poll 方法代码如下:
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//查看堆顶数据
E first = q.peek();
/**
*first == null 表示堆为空,没有数据
* getDelay 方法返回值大于0,表示堆顶数据还没到延迟时间,不能执行,堆顶数据无法取出
*/
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}
5.3、poll(long timeout, TimeUnit unit)
带超时时间的读取数据的方法
poll方法代码如下:
/**
*带超时时间的取数据方法
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
//将超时时间转换为纳秒
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
//加锁,可被中断,当被中断时会抛出异常,直接退出
lock.lockInterruptibly();
try {
//自旋
for (;;) {
//查看堆顶数据
E first = q.peek();
//若堆顶数据为空,即堆无数据,则判断超时时间是否已过,若超时时间也已经过了,则返回null
if (first == null) {
if (nanos <= 0)
return null;
else
//阻塞等待,并返回剩余超时时间
//等待生产者线程添加数据之后唤醒
nanos = available.awaitNanos(nanos);
} else {//堆有数据
//获取堆顶数据的延迟时间,单位纳秒
long delay = first.getDelay(NANOSECONDS);
//若堆顶数据的延迟时间小于等于0,表示当前堆顶数据可以执行,立即取出
if (delay <= 0)
return q.poll();
//若堆顶数据的延迟时间大于0(表示堆顶数据还不能执行)且超时时间已经过了,则返回null
if (nanos <= 0)
return null;
/**
* 指定到这里,说明堆顶数据的延迟时间大于0(表示延迟时间没到,堆顶数据还不能执行)且方法超时时间还没过
* 消费者需要挂起等待
*/
first = null; // don't retain ref while waiting
//方法剩余超时时间小于堆顶数据的延迟时间,则消费者线程继续阻塞,并返回剩余超时时间
/**
* todo 疑问:这里为什么不直接结束,反正最终是无法获取数据的?
* 因为 你不确定在剩余的超时时间nanos内,是否有新的数据插入(新插入的数据可能延迟时间很短),
* 前边offer(E e)新增数据后也会唤醒等待的消费者线程
* 第二个条件 leader != null,leader != null表示前边已经有
* 消费者线程在挂起阻塞堆顶数据的延迟时间到期,后边的消费者线程执行到这里
* 需要直接阻塞挂起,这样避免 leader 的重复赋值
*/
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {//方法剩余超时时间大于堆顶数据的延迟时间,表示当前消费者可以在超时时间nanos内拿到堆顶数据,
// 且当前没有消费者在等待堆顶数据
//将leader 设置为 当前消费者线程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//阻塞等待,并返回剩余的阻塞时间
//当前消费者阻塞堆顶数据的延迟时间
long timeLeft = available.awaitNanos(delay);
//更新剩余的可阻塞时间,已消耗的超时时间是 delay - timeLeft
nanos -= delay - timeLeft;
} finally {
//堆顶数据的延迟时间到了,将 leader 设置为null
//这一步只有生产者和消费者自己可以做
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
//没有消费者线程在等待堆顶数据的延迟时间且堆不为空,此时可能还有消费者在阻塞,则唤醒这些阻塞的消费者线程
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
5.4、take()
读取数据,允许中断,若队列为空则一直阻塞,直到队列有数据 或 被中断时异常退出
take() 方法代码如下:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//加锁,允许中断,中断异常退出
lock.lockInterruptibly();
try {
//自旋
for (;;) {
//查看堆顶数据
E first = q.peek();
//若队列没有数据,则阻塞,由生产者唤醒 或者被中断异常退出
if (first == null)
available.await();
else {//队列不为空
//获取堆顶数据的延迟时间
long delay = first.getDelay(NANOSECONDS);
//表示堆顶数据可以执行,则从队列中取出数据
if (delay <= 0)
return q.poll();
//执行到这里,表示堆顶数据的延迟时间还没到,消费者线程需要阻塞,等待堆顶数据到达其延迟时间
first = null; // don't retain ref while waiting
//前边有消费者线程在阻塞等待堆顶数据的延迟时间到达,则当前线程直接阻塞
if (leader != null)
available.await();
else {//当前没有消费者线程在等待堆顶数据的延迟时间到达,则把当前消费者设置为等待堆顶数据延迟时间到达的线程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//阻塞,阻塞时间是堆顶数据的延迟时间
available.awaitNanos(delay);
} finally {
//阻塞结束,获取到堆顶数据后将 leader 设置为Null
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
//没有消费者线程在等待堆顶数据的延迟时间且堆不为空,此时可能还有消费者在阻塞,则唤醒这些阻塞的消费者线程
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}