1. 前言
前面我们了解了基于数组,链表实现的阻塞队列,以及优先级队列。今天我们来了解下基于优先级队列的延迟队列,而且今天的内容很核心哦。 大家快搬好小板凳做好,听我慢慢分析
2. 简单实例
- Task 类
public class Task implements Delayed {
private String name;
private Long time;
public Task(String name, Long delay) {
this.name = name;
this.time = System.currentTimeMillis() + delay;
}
@Override
public String toString() {
return "Task{" +
"name='" + name + '\'' +
", time=" + time +
'}';
}
public Task() {}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Long getTime() {
return time;
}
public void setTime(Long time) {
this.time = time;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(time - System.currentTimeMillis(), TimeUnit.NANOSECONDS);
}
@Override
public int compareTo(Delayed o) {
return (int) (this.time - ((Task)o).getTime());
}
}
- 延迟队列使用实例
public class T04_Queue_Test04 {
public static void main(String[] args) throws InterruptedException {
DelayQueue<Delayed> delayeds = new DelayQueue<>();
Task a = new Task("A", 1000l);
Task b = new Task("B", 500l);
Task c = new Task("C", 1200l);
Task d = new Task("D", 300l);
delayeds.put(a);
delayeds.put(b);
delayeds.put(c);
delayeds.put(d);
System.out.println(delayeds.take()); // D
System.out.println(delayeds.take()); // B
System.out.println(delayeds.take()); // A
System.out.println(delayeds.take()); // C
}
}
3. 核心实现
3.1 构造方法
3.1.1 类
通过上述类本身我们可以知道,类
DelayQueue
的元素必须实现Delayed
才能被添加成为元素,详细的内容可以看上述实例中Task
类
3.1.1 构造基本属性
// 定义的锁
private final transient ReentrantLock lock = new ReentrantLock();
// 是优先级队列 因为延迟队列是基于优先级队列实现的
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 表示等待堆顶的元素
private Thread leader = null;
// 表示挂起线程的Condition
private final Condition available = lock.newCondition();
3.2 生产者方法
3.2.1 add
public boolean add(E e) {
// 函数内部 本质调用了offer 方法
return offer(e);
}
3.2.2 offer
// 表示添加元素的方法
public boolean offer(E e) {
// 表示获取锁实例
final ReentrantLock lock = this.lock;
// 开始上锁
lock.lock();
try {
// 优先级队列 添加元素
q.offer(e);
// 如果堆顶的元素 === 刚才添加的元素
if (q.peek() == e) {
leader = null;
// 唤醒消费者 有可能之前队列中没有元素,添加元素后,消费者就可以消费了
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
3.2.3 有参offer
public boolean offer(E e, long timeout, TimeUnit unit) {
// 因为本身延迟队列 基于 优先级队列实现的。 所以也可以理解为无界队列。 所以不会出现队列满的时候 会将线程挂起
return offer(e);
}
3.2.4 put
public void put(E e) {
// 一句话 本质也是基于offer实现的
offer(e);
}
3.3 消费者方法
3.3.1 remove
public E remove() {
// 是基于poll来删除元素
E x = poll();
// 如果元素不为null的话 直接返回元素
if (x != null)
return x;
else
// 反之都会抛出异常
throw new NoSuchElementException();
}
3.3.2 poll
public E poll() {
// 表示获取锁实例
final ReentrantLock lock = this.lock;
// 开始上锁
lock.lock();
try {
// 获取堆顶元素 但是不会剔除元素,只是获取
E first = q.peek();
// 如果堆顶元素 为null || 如果> 0 的话 因为不需要等待 所以直接返回null
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
// 延迟时间到期了
return q.poll();
} finally {
lock.unlock();
}
}
3.3.3 有参poll
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 统一转换时间 转换纳秒
long nanos = unit.toNanos(timeout);
// 获取锁实例
final ReentrantLock lock = this.lock;
// 加锁 可被打断锁
lock.lockInterruptibly();
try {
// 一个死循环
for (;;) {
// 获取堆顶 元素
E first = q.peek();
// 如果堆顶元素是null 说明本身队列中没有元素
if (first == null) {
// 等待时间到了 直接返回null
if (nanos <= 0)
return null;
else
// 线程挂起 等待执行
nanos = available.awaitNanos(nanos);
} else {
// 如果执行到此处的话 说明堆顶是有元素的
// 获取元素的延迟到期时间
long delay = first.getDelay(NANOSECONDS);
// 如果时间 <= 0 的话,直接返回堆顶元素
if (delay <= 0)
return q.poll();
// 如果到这的话 延迟时间未到 && 等待时间到了 直接发返回null
if (nanos <= 0)
return null;
first = null;
// 等待时间 < 延迟时间
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
// 获取当前线程
Thread thisThread = Thread.currentThread();
// 设置当前线程设置为leader 马上要执行的线程
leader = thisThread;
try {
// 挂起
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
3.3.4 take
此时
take
方法跟有参poll
方法类似,只不过不需要判断等待时间,只有一个延迟时间而已。
4. 总结
延迟队列核心:通过优先级队列来判断延迟时间大小,将延迟小的元素会放到堆顶。所以添加顺序 不一定等于 输出顺序。跟延迟的时间的大小有很大关系。好了,就分析到这里了,不敢说
一一明白
但是大体的源码意思是透彻了。如果大家有什么新的看法,可以通过评论区告诉我哦。