数据结构学习网站:
Data Structure Visualization
思维导图
DelayQueue (延时队列)
DelayQueue 是一个支持延时获取元素的阻塞队列
, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。
延迟队列的特点是:
不是先进先出,而是会按照延迟时间的 长短来排序,下一个即将执行的任务会排到队列的最前面。
它是无界队列,放入的元素必须实现 Delayed 接口,而 Delayed 接口又继承了 Comparable 接
口,所以自然就拥有了比较和排序的能力,代码如下:
public interface Delayed extends Comparable<Delayed> {
//getDelay 方法返回的是“还剩下多长的延迟时间才会被执行”,
//如果返回 0 或者负数则代表任务已过期。
//元素会根据延迟时间的长短被放到队列的不同位置,越靠近队列头代表越早过期。
long getDelay(TimeUnit unit);
}
DelayQueue使用
DelayQueue 实现延迟订单
在实现一个延迟订单的场景中,我们可以定义一个 Order 类,其中包含订单的基本信息,例如订单编 号、订单金额、订单创建时间等。同时,我们可以让 Order 类实现 Delayed 接口,重写 getDelay 和 compareTo 方法。在 getDelay 方法中,我们可以计算订单的剩余延迟时间,而在 compareTo 方法 中,我们可以根据订单的延迟时间进行比较。
下面是一个简单的示例代码,演示了如何使用 DelayQueue 来实现一个延迟订单的场景:
public class DelayQueueExample {
public static void main(String[] args) throws InterruptedException {
DelayQueue<Order> delayQueue = new DelayQueue<>();
// 添加三个订单,分别延迟 5 秒、2 秒和 3 秒
delayQueue.put(new Order("order1", System.currentTimeMillis(), 5000));
delayQueue.put(new Order("order2", System.currentTimeMillis(), 2000));
delayQueue.put(new Order("order3", System.currentTimeMillis(), 3000));
// 循环取出订单,直到所有订单都被处理完毕
while (!delayQueue.isEmpty()) {
Order order = delayQueue.take();
System.out.println("处理订单:" + order.getOrderId());
}
}
static class Order implements Delayed{
private String orderId;
private long createTime;
private long delayTime;
public Order(String orderId, long createTime, long delayTime) {
this.orderId = orderId;
this.createTime = createTime;
this.delayTime = delayTime;
}
public String getOrderId() {
return orderId;
}
@Override
public long getDelay(TimeUnit unit) {
long diff = createTime + delayTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
long diff = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
return Long.compare(diff, 0);
}
}
}
由于每个订单都有不同的延迟时间,因此它们将会按照延迟时间的顺序被取出。当延迟时间到达时, 对应的订单对象将会被从队列中取出,并被处理。
DelayQueue原理
数据结构
//用于保证队列操作的线程安全
private final transient ReentrantLock lock = new ReentrantLock();
// 优先级队列,存储元素,用于保证延迟低的优先执行
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 用于标记当前是否有线程在排队(仅用于取元素时) leader 指向的是第一个从队列获取元素阻塞的线程
private Thread leader = null;
// 条件,用于表示现在是否有可取的元素 当新元素到达,或新线程可能需要成为leader时被通知
private final Condition available = lock.newCondition();
public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
入队put方法
public void put(E e) {
offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 入队
q.offer(e);
if (q.peek() == e) {
// 若入队的元素位于队列头部,说明当前元素延迟最小
// 将 leader 置空
leader = null;
// available条件队列转同步队列,准备唤醒阻塞在available上的线程
available.signal();
}
return true;
} finally {
lock.unlock(); // 解锁,真正唤醒阻塞的线程
}
}
出队take方法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();// 取出堆顶元素( 最早过期的元素,但是不弹出对象)
if (first == null)// 如果堆顶元素为空,说明队列中还没有元素,直接阻塞等待
available.await();//当前线程无限期等待,直到被唤醒,并且释放锁。
else {
long delay = first.getDelay(NANOSECONDS);// 堆顶元素的到期时间
if (delay <= 0)// 如果小于0说明已到期,直接调用poll()方法弹出堆顶元素
return q.poll();
// 如果delay大于0 ,则下面要阻塞了
// 将first置为空方便gc
first = null;
// 如果有线程争抢的Leader线程,则进行无限期等待。
if (leader != null)
available.await();
else {
// 如果leader为null,把当前线程赋值给它
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待剩余等待时间
available.awaitNanos(delay);
} finally {
// 如果leader还是当前线程就把它置为空,让其它线程有机会获取元素
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 成功出队后,如果leader为空且堆顶还有元素,就唤醒下一个等待的线程
if (leader == null && q.peek() != null)
// available条件队列转同步队列,准备唤醒阻塞在available上的线程
available.signal();
// 解锁,真正唤醒阻塞的线程
lock.unlock();
}
}
1.
当获取元素时,先获取到锁对象。
2.
获取最早过期的元素,但是并不从队列中弹出元素。
3.
最早过期元素是否为空,如果为空则直接让当前线程无限期等待状态,并且让出当前锁对象。
4.
如果最早过期的元素不为空
5.
获取最早过期元素的剩余过期时间,如果已经过期则直接返回当前元素
6.
如果没有过期,也就是说剩余时间还存在,则先获取Leader对象,如果Leader已经有线程在处理,则当前线程进 行无限期等待,如果Leader为空,则首先将Leader设置为当前线程,并且让当前线程等待剩余时间。
7.
最后将Leader线程设置为空
8.
如果Leader已经为空,并且队列有内容则唤醒一个等待的队列。
如何选择适合的阻塞队列
选择策略
通常我们可以从以下 5 个角度考虑,来选择合适的阻塞队列:
功能
第 1 个需要考虑的就是
功能层面
,比如是否需要阻塞队列帮我们排序,如
优先级排序、延迟执行
等。如果有这个需要,我们就必须选择类似于
PriorityBlockingQueue
之类的有排序能力的阻塞队 列。
容量
第 2 个需要考虑的是
容量
,或者说
是否有存储的要求
,还是只需要“直接传递”。在考虑这一点
的时候,我们知道前面介绍的那几种阻塞队列,有的是
容量固定的,如 ArrayBlockingQueue
;有的 默认是
容量无限的,如 LinkedBlockingQueue
;而有的里面
没有任何容量,如
SynchronousQueue
;而对于
DelayQueue 而言,它的容量固定就是 Integer.MAX_VALUE
。所以不同阻塞队列的容量是千差万别的,
我们需要根据任务数量来推算出合适的容量
,从而去选取合适的 BlockingQueue。
能否扩容
第 3 个需要考虑的是
能否扩容
。因为有时我们并不能在初始的时候很好的准确估计队列的大小, 因为业务可能有高峰期、低谷期。如果一开始就固定一个容量,可能无法应对所有的情况,也是不合适的,有可能需要动态扩容。如果我们需要动态扩容的话,那么就不能选择 ArrayBlockingQueue , 因为它的容量在创建时就确定了,无法扩容。相反
PriorityBlockingQueue 即使在指定了初始容量 之后,后续如果有需要,也可以自动扩容
。所以
我们可以根据是否需要扩容来选取合适的队列。
内存结构
第 4 个需要
考虑的点就是内存结构
。我们分析过
ArrayBlockingQueue
的源码,看到了它的内部
结构是“数组”
的形式。和它不同的是,
LinkedBlockingQueue 的内部是用链表
实现的,所以这里就需要我们考虑到,
ArrayBlockingQueue 没有链表所需要的“节点”,空间利用率更高
。所以如果我们对性能有要求可以从内存的结构角度去考虑这个问题。
性能
第 5 点就是
从性能的角度去考虑
。比如
LinkedBlockingQueue 由于拥有两把锁,它的操作粒度更细
,
在并发程度高的时候,相对于
只有一把锁的 ArrayBlockingQueue
性能会更好
。另外,
SynchronousQueue
性能往往优于其他实现
,因为
它只需要“直接传递”
,而不需要存储的过程。
如果我们的场景需要直接传递的话,可以优先考虑 SynchronousQueue
。
线程池对于阻塞队列的选择
线程池有很多种,不同种类的线程池会根据自己的特点,来选择适合自己的阻塞队列。
Executors
类下的线程池类型:
FixedThreadPool
(SingleThreadExecutor 同理)选取的是 LinkedBlockingQueue
CachedThreadPool
选取的是 SynchronousQueue
ScheduledThreadPool(
SingleThreadScheduledExecutor同理)选取的是延迟队列