延迟队列--DelayQueue(JDK)

news2025/1/12 19:07:57

JDK自身支持延迟队列的数据结构,其实类:java.util.concurrent.DelayQueue。

我们通过阅读源码的方式理解该延迟队列类的实现过程。

1.定义

DelayQueue:是一种支持延时获取元素的无界阻塞队列。

特性:

  1. 线程安全;

  2. 内部元素有“延迟”特性:只有延迟到期的元素才允许被获取;

  3. 具有优先级特性的无界队列,优先级以元素延迟时间为标准,最先过期的元素优先级最高(队首);

  4. 入队操作不会被阻塞,获取元素在特定情况会阻塞(队列为空,队首元素延迟未到期等);

根据其源码分析为何如此定义以及其特性的由来。

DelayQueue继承关系:

DelayQueue

类图分析:

其核心继承/实现:

1.BlockingQueue:说明其具有阻塞队列的特性;

2.元素必实现接口Delayed,而Delayed继承了接口Comparable。因此所有元素必须实现两个方法:

compareTo方法用于元素比较;
getDelay方法用于获取元素剩余延时时间。

public interface Delayed extends Comparable<Delayed> {

    /**
     * 返回关联对象的剩余延迟时间(可指定时间单位)
     */
    long getDelay(TimeUnit unit);
}

2.源码:

public class DelayQueue<E extends Delayed>
        extends AbstractQueue<E>
        implements BlockingQueue<E> {
    /**
     * 可重入锁,用于保证线程安全
     */
    private final transient ReentrantLock lock = new ReentrantLock();
    /**
     * 优先队列(容器),实际存储元素的地方
     */
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    /**
     * 等待取元素线程的领导(leader)线程,有且仅有一个leader。
     * 具有最高优先级,第一个尝试获取元素的线程。
     * leader取完元素后,会唤醒新的等待线程成为新的leader。
     */
    private Thread leader = null;
    /**
     * 触发条件,表示是否可以从队列中读取元素.
     * 用于等待(await())/通知(signal())其他线程
     */
    private final Condition available = lock.newCondition();
    /**
     * 构造函数
     */
    public DelayQueue() {
    }
    /**
     * 构造函数: 调用addAll()方法:将集合c 存入队列中
     *
     */
    public DelayQueue(Collection<? extends E> c) {
        this.addAll(c);
    }
    /*--------------------------添加元素(非阻塞)-------------------------------*/
    /**
     * 插入新元素.
     * 核心内容见:public boolean offer(E e)
     */
    public boolean add(E e) {
        return offer(e);
    }
    /**
     * 插入新元素.
     * 核心内容见:public boolean offer(E e)
     */
    public void put(E e) {
        offer(e);
    }
    /**
     * 插入新元素.
     * 核心内容见:public boolean offer(E e)
     * @param e       元素
     * @param timeout 此参数将被忽略,因为该方法从不阻塞(废弃)
     * @param unit    此参数将被忽略,因为该方法从不阻塞(废弃)
     * @return {@code true}
     * @throws NullPointerException {@inheritDoc}
     */
    public boolean offer(E e, long timeout, TimeUnit unit) {
        return offer(e);
    }
    /**
     * 插入新元素.(线程安全 lock)
     * 逻辑:
     *  1.入队;
     *  2.如果入队元素为队首元素(原队列为空),唤醒一个等待的线程,通知获取数据。
     *
     * @param e 元素
     * @return {@code true}
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 入队
            q.offer(e);
            // 若该元素为队列头部元素(说明原队列为空),可以唤醒等待的线程取元素数据
            if (q.peek() == e) {
                // 如果队首元素是刚插入的元素,则设置leader为null(腾位置)
                leader = null;
                // 唤醒一个等待的线程
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }
    /*--------------------------取出(返回并删除)元素-------------------------------*/
    /**
     * 取出延迟到期元素(非阻塞的).(线程安全 lock)
     * poll() 方法是非阻塞的,即调用之后无论元素是否存在/延迟到期都会立即返回。
     * 逻辑:
     * 1.查询队首元素;
     * 2.元素延迟到期返回,否则返回null
     */
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 查询队首元素
            E first = q.peek();
            // 队首元素为空或者延时未到期 返回null
            if (first == null || first.getDelay(NANOSECONDS) > 0) {
                return null;
            } else {
                // 如果到期,取出并删除队首元素
                return q.poll();
            }
        } finally {
            lock.unlock();
        }
    }
  /**
     * 取出延迟到期元素(带有超时时间,阻塞).(线程安全 lock)
     * 如果队首元素未到期或者为null,等待:直到队首元素延迟到期或者超出指定等待时间(timeout)
     * 逻辑(无限循环等待获取):
     * 宗旨:在不超出timeout的时间内,循环去取出延迟到期的队首元素(前提无其他线程正在取数--互斥).
     * 1.查询队首元素;
     *  2.1.队列空:等待timeout一段时间,直到等待超时(即timeout被重置小于等于0);
     *  2.2.队列不为空:
     *      2.2.1. 队首元素延迟到期,取出队首元素(poll());
     *      2.2.2. 队首元素延迟未到期:
     *          2.2.3 等待超时 ,返回null;
     *          2.2.4 等待未超时,等待时间<延迟时间或者有其他线程正在取数据,继续等待到超时到期
     *          2.2.5 等待为超时,等待时间>=延迟时间并且无其他线程正在取数据,该线程设置为leader等待到延迟到期(最后清空leader)
     * 3. 循环后,如果leader=null(无正在取数线程)并且队列还有数据,唤醒一个等待线程最终成为leader.
     */
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        // 以可中断方式获取锁
        lock.lockInterruptibly();
        try {
            for (; ; ) {
                // 获取队首元素
                E first = q.peek();
                if (first == null) {
                    // 若队首元素为空(即队列为空,这时就需要关注,当前取值请求是否需要阻塞等待
                    // 等待时间小于等于0 ,不阻塞等待,直接返回null)
                    if (nanos <= 0) {
                        return null;
                    } else {
                        // 等待相应的时间
                        nanos = available.awaitNanos(nanos);
                    }
                } else {
                    // 若队列元素非空,获取队首元素剩余延迟时间
                    long delay = first.getDelay(NANOSECONDS);
                    // 延时过期 返回元素
                    if (delay <= 0) {
                        return q.poll();
                    }
                    // 延时未过期  等待时间超时 ,不等待,直接返回null
                    if (nanos <= 0) {
                        return null;
                    }
                    first = null;
                    // 延时和等待都未到期且等待时间<延迟时间 或者 有其他线程在取数据,当前请求继续等待
                    if (nanos < delay || leader != null) {
                        nanos = available.awaitNanos(nanos);
                    } else {
                        // 没有其他线程等待,将当前线程设置为 leader,类似于“独占”操作
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            // 等待直到延迟到期
                            long timeLeft = available.awaitNanos(delay);
                            // 计算超时时间
                            nanos -= delay - timeLeft;
                        } finally {
                            // 该线程操作完毕,把 leader 置空
                            if (leader == thisThread) {
                                leader = null;
                            }
                        }
                    }
                }
            }
        } finally {
            // 如果leader线程为空 并且  queue非空,则唤醒其他等待线程
            if (leader == null && q.peek() != null) {
                available.signal();
            }
            lock.unlock();
        }
    }
   /**
     * 取出延迟到期元素(无超时时间限制,阻塞).(线程安全 lock)
     * 逻辑(无限循环等待获取):
     * 其逻辑参考poll(long timeout, TimeUnit unit).
     * 其区别在于:不受超时时间限制(timeout)
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        // 以可中断方式获取锁
        lock.lockInterruptibly();
        try {
            // 无限循环
            for (; ; ) {
                // 获取队首元素
                E first = q.peek();
                if (first == null) {
                    // 若队首元素为空(队列为空),则等待
                    available.await();
                } else {
                    // 若队列元素非空,获取队首元素剩余延迟时间
                    long delay = first.getDelay(NANOSECONDS);
                    // 延迟到期,获取队首元素
                    if (delay <= 0) {
                        return q.poll();
                    }
                    // 延时未过期
                    first = null;
                    // leader 不为空表示有其他线程在读取数据,当前线程等待
                    if (leader != null) {
                        available.await();
                    } else {
                        // 没有其他线程等待,将当前线程设置为 leader
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            // 等待延迟时间过期
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread) {
                                leader = null;
                            }
                        }
                    }
                }
            }
        } finally {
            // 如果leader线程为空 并且  queue非空,则唤醒其他等待线程
            if (leader == null && q.peek() != null) {
                available.signal();
            }
            lock.unlock();
        }
    }
    /*--------------------------读取队首元素-------------------------------*/
    /**
     * 读取队首元素.(线程安全 lock)
     */
    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.peek();
        } finally {
            lock.unlock();
        }
    }
    /*--------------------------读取队列长度-------------------------------*/
    /**
     * 获取队列数据的长度.(线程安全 lock)
     */
    public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.size();
        } finally {
            lock.unlock();
        }
    }
    /*--------------------------获取延迟到期元素集合-------------------------------*/
    /**
     * 将队列中延迟到期数据 收集到集合C中.(线程安全 lock)
     * 
     * @return  返回延迟到期元素数量
     */
    public int drainTo(Collection<? super E> c) {
        if (c == null) {
            throw new NullPointerException();
        }
        if (c == this) {
            throw new IllegalArgumentException();
        }
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int n = 0;
            //  peekExpired() 判断队首元素是否延迟到期
            for (E e; (e = peekExpired()) != null; ) {
                c.add(e);       
                q.poll();
                ++n;
            }
            return n;
        } finally {
            lock.unlock();
        }
    }
    /**
     * 将队列中延迟到期数据 收集到集合C中(C集合总数有限制小于maxElements).(线程安全 lock)
     * @return  返回延迟到期元素数量
     */
    public int drainTo(Collection<? super E> c, int maxElements) {
        if (c == null) {
            throw new NullPointerException();
        }
        if (c == this) {
            throw new IllegalArgumentException();
        }
        if (maxElements <= 0) {
            return 0;
        }
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int n = 0;
            // peekExpired() 判断队首元素是否延迟到期。并且到期元素总数不允许超过maxElements
            for (E e; n < maxElements && (e = peekExpired()) != null; ) {
                c.add(e);       
                q.poll();
                ++n;
            }
            return n;
        } finally {
            lock.unlock();
        }
    }
    /**
     * 读取队首元素(已延迟到期).(私有方法)
     */
    private E peekExpired() {
        // 获取队首元素
        E first = q.peek();
        // 队首元素存在并且延迟到期,否则返回null
        return (first == null || first.getDelay(NANOSECONDS) > 0) ?
                null : first;
    }
    /*--------------------------删除元素-------------------------------*/
    /**
     * 清除队列中所有元素(线程安全 lock)--暴力清除
     */
    public void clear() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.clear();
        } finally {
            lock.unlock();
        }
    }
    /**
     * 删除指定元素O.(线程安全 lock)
     */
    public boolean remove(Object o) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.remove(o);
        } finally {
            lock.unlock();
        }
    }
    /**
     * 删除指定元素O.(这里指的是相同的对象引用/内存地址)(线程安全 lock)
     */
    void removeEQ(Object o) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
                // 使用了对象引用/内存地址相等比较
                if (o == it.next()) {
                    it.remove();
                    break;
                }
            }
        } finally {
            lock.unlock();
        }
    }
    /*--------------------------队列转数组-------------------------------*/

    /**
     * 将队列元素都复制到数组中(无序).(线程安全 lock)
     */
    public Object[] toArray() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.toArray();
        } finally {
            lock.unlock();
        }
    }
    /**
     * 将队列元素都复制到数组a中(无序).
     */
    public <T> T[] toArray(T[] a) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.toArray(a);
        } finally {
            lock.unlock();
        }
    }
    /*--------------------------私有内部类--迭代器-------------------------------*/
   /**
     * 返回此队列中所有元素(已过期和未过期)的迭代器。迭代器不按任何特定顺序返回元素。
     */
    public Iterator<E> iterator() {
        return new Itr(toArray());
    }
    /**
     * 快照迭代器,用于处理底层 队列/数组的副本。
     */
    private class Itr implements Iterator<E> {
        final Object[] array; // Array of all elements
        int cursor;           // index of next element to return
        int lastRet;          // index of last element, or -1 if no such

        Itr(Object[] array) {
            lastRet = -1;
            this.array = array;
        }

        public boolean hasNext() {
            return cursor < array.length;
        }

        @SuppressWarnings("unchecked")
        public E next() {
            if (cursor >= array.length)
                throw new NoSuchElementException();
            lastRet = cursor;
            return (E) array[cursor++];
        }

        public void remove() {
            if (lastRet < 0)
                throw new IllegalStateException();
            removeEQ(array[lastRet]);
            lastRet = -1;
        }
    }
}

3.使用demo:

使用DelayQueue实现延迟队列:

优点:实现简单。

缺点:可扩展性较差,内存限制、无持久化机制等。

 @SneakyThrows
    public static void main(String[] args) {
        DelayQueue<TestTask> testTaskDelayQueue = new DelayQueue<>();
        long time = System.currentTimeMillis();
        testTaskDelayQueue.offer(TestTask.builder().name("test_1").endTime(time + 10 * 1000).build());
        testTaskDelayQueue.offer(TestTask.builder().name("test_2").endTime(time + 4 * 1000).build());
        testTaskDelayQueue.offer(TestTask.builder().name("test_3").endTime(time + 16 * 1000).build());
        for(;;){
            System.out.println(testTaskDelayQueue.take());
            TimeUnit.SECONDS.sleep(2);
        }
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @Builder
    private static class TestTask implements Delayed {
        private String name;
        private Long endTime;

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/636856.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

人工智能算力需求稳增,中国将持续夯实算力底座

中国始终强调科技兴国的重要性。数字经济时代&#xff0c;技术的力量更为凸显。近年来&#xff0c;中国政府相关部门相继发布一系列政策&#xff0c;更加明确了人工智能对于提升中国核心竞争力的重要支撑作用&#xff0c;加上新基建、数字经济等持续利好政策的推动&#xff0c;…

0202性能分析-索引-MySQL

1 索引语法 创建索引 CREATE [UNIQUE|FULLTEXT] INDEX index_name ON table_name(index_column_name,...);Index_name&#xff1a;规范为idx_表名_字段名... 查看索引 SHOW INDEX FROM table_name;删除索引 DROP INDEX index_name ON table_name;按照下列要求&#xff0c;创建…

仿交易猫链接 跳转APP功能

最新仿交易猫假链接&#xff0c;带有跳转APP功能 下载程序&#xff1a;https://pan.baidu.com/s/16lN3gvRIZm7pqhvVMYYecQ?pwd6zw3

C语言数据结构——循环链表

如果人生会有很长,愿有你的荣耀永不散场。——《全职高手》 一 . 循环单链表 循环单链表是单链表的另一种形式&#xff0c;其结构特点是&#xff0c;链表中最后一个结点的指针域不再是结束标记&#xff0c;而是指向整个链表的第一个结点&#xff0c;从而使链表形成一个环。 和单…

PLC现场安装和维护的注意事项

虽然PLC是专门在现场使用的控制装置&#xff0c;在设计制造时已采取了很多措施&#xff0c;使它对工业环境比较适应&#xff0c;但是为了确保整个系统稳定可靠&#xff0c;还是应当尽量使PLC有良好的工作环境条件&#xff0c; 并采取必要的抗干扰措施。因此&#xff0c;PLC在安…

python中,unicode对象怎么转换成dict?

python中&#xff0c;unicode对象怎么转换成dict&#xff1f; 使用loads两次

「展会前线」易天光通信盛装亮相2023越南通讯展会

2023年6月7日&#xff0c;在历经了忙碌有序的前期准备工作后&#xff0c;易天光通信销售团队带着满满的信心踏上了越南通讯展会之旅&#xff01; “千呼万唤始出来&#xff0c;犹抱琵琶半遮面”。2023年6月8日&#xff0c;各方期待已久的2023越南通讯展会在越南胡志明市正式开…

肠道有害菌属——假单胞菌属(Pseudomonas),多变且适应性强

谷禾健康 假单胞菌属&#xff08;Pseudomonas&#xff09;是最多样化和普遍存在的细菌属之一&#xff0c;其物种存在于沉积物、临床样本、植物&#xff08;或植物根际&#xff09;、患病动物、水、土壤、海洋、沙漠等&#xff0c;这反映在它们多变的代谢能力和广泛的适应环境的…

3款好用的客户系统管理软件推荐,你用过哪款?

进行客户资料管理确实很重要。我本人在工作中也常常遇到客户关系管理的难题&#xff0c;有时候忘记填写客户信息&#xff0c;亦或是填错信息等场景&#xff0c;甚至会造成许多尴尬局面。为了解决这个问题&#xff0c;我也试用了很多个方法来提高效率。下面我想谈一谈我本人在摸…

十肽-4/Decapeptide-10, CG-IDP2——有效逆转皮肤衰老

简介----十肽-4 十肽-4可以穿透真皮增加胶原蛋白&#xff0c;通过从内至外的重建来逆转皮肤老化的过程&#xff1b;刺激胶原蛋白、弹力纤维和透明质酸增生&#xff0c;提高肌肤的含水量和锁水度&#xff0c;增加皮肤厚度以及减少细纹。 功效与应用----十肽-4 抗皱抗衰老 改善…

浪潮 KaiwuDB x 大数据中心 | 数据驱动政府治理能力快速提升

业务背景 我国工业互联网大数据资源存在孤立、分散、封闭等问题&#xff0c;数据价值未能得到有效利用&#xff0c;数据主权和数据安全面临重大威胁。 发挥数据对工业经济的基础支撑和创新引擎作用&#xff0c;可促进工业互联网的创新发展&#xff0c;加速数据驱动政府治理能…

Pycharm中的find usages有什么用?

问题描述&#xff1a;我们经常使用Pycharm作为开发工具&#xff0c;我们右键会发现有个find usages功能。 比如&#xff0c;我们以YOLOv8中的detect/train.py中的DetectionTrainer()类为例&#xff0c;右键之后如下图所示。 答案&#xff1a;全局搜索&#xff0c;查找类、变量…

「最新」Parallels Desktop 18 for Mac(Pd虚拟机) 18.3.1通用版

Parallels Desktop 18是一款虚拟机软件&#xff0c;能够让Mac电脑上运行Windows、Linux和其他操作系统的应用程序。 此版本的Parallels Desktop 18提供了多项功能增强和改进&#xff0c;包括更快的性能、更好的图形处理、更简单的导入和导出虚拟机等。该软件还支持Apple M1芯片…

QT使用按钮打开新窗口

需求说明&#xff1a;主窗口名为mainwindow&#xff0c;在主窗口添加一个按钮&#xff0c;通过点击按钮能打开一个新的窗口。 第一步&#xff1a;在主窗口添加按钮 找到左边菜单栏的按钮控件拖出置窗口上 第二步&#xff1a;在工程里新建窗口 1.右击最顶层项目文件名&#x…

Springcloud之Feign、Hystrix、Ribbon如何设置超时时间

一&#xff0c;概述 我们在微服务调用服务的时候&#xff0c;会使用hystrix、feign和ribbon&#xff0c;比如有一个实例发生了故障而该情况还没有被服务治理机制及时的发现和摘除&#xff0c;这时候客户端访问该节点的时候自然会失败。 所以&#xff0c;为了构建更为健壮的应…

Vue3系列--provide与inject

目录 Provide inject 在Vue3项目开发的过程中&#xff0c;会创建很多组件&#xff0c;那么避免不了组件之间的通信&#xff0c;在父子组件通信我们可以使用defineProps、defineEmits、defineExpose和Emit方法完成通信&#xff0c;在使用这些方法的前提是需要引用对应的组件。…

第36步 深度学习图像识别:TensorFlow-gpu环境配置

基于WIN10的64位系统演示 一、写在前面 从这一期开始分享基于深度学习图像识别的学习笔记和代码&#xff0c;相比于之前的ML分类模型&#xff0c;图像识别的门槛会更高&#xff0c;包括硬件方面、代码复杂度和基础理论知识等。同样&#xff0c;首先把必要的深度学习框架&…

Nginx 轻松搞定跨域问题

当你遇到跨域问题&#xff0c;不要立刻就选择复制去尝试&#xff0c;请详细看完这篇文章再处理&#xff0c;我相信它能帮到你。 分析前准备&#xff1a; 前端网站地址&#xff1a;http://localhost:8080 服务端网址&#xff1a;http://localhost:59200 首先保证服务端是没有…

微软将GitHub Copilot 与 Visual Studio 深度整合有助于便捷开发

近日对于很多的开发者来说将迎来一个好消息&#xff0c;据悉微软目前正在改善Visual Studio的开发体验&#xff0c;并将GitHub Copilot更深度融合入Visual Studio 中&#xff0c;以提升“AI 写代码”的准确性。 值得注意的是&#xff0c;在Copilot 1.84 版本之前&#xff0c;…

亚马逊养号系统之亚马逊批量养号如何操作?

亚马逊新注册的买家号都是需要先养一段时间才可以的&#xff0c;如果想要同时养大量的买家号那么需要借助软件进行辅助操作才行。 亚马逊鲲鹏系统可以批量养亚马逊买家号&#xff0c;养号方法有两种&#xff0c;一种是AI智能一键养号&#xff0c;一种是设置关键词搜索浏览后进行…