JUC阻塞队列(四):DelayQueue

news2025/1/17 13:47:06

1、DelayQueue介绍

      DelayQueue 是一个延迟队列,生产者写入一个数据,这个数据具有被直接消费的延迟时间,

      让数据具有延迟的特性。

      DelayQueue底层也是基于二叉堆来实现的,DelayQueue本就是基于PriorityBQueue 实现的。

      二叉堆结构每次获取的是堆顶数据,在比较时,根据延迟时间进行比较,延迟时间剩余端的放

      在堆顶。

      由于 DelayQueue 基于 PriorityQueue  实现的,因此 DelayQueue 理论上也是一个无边界队

      列,DelayQueue 容量可以进行无限扩容。

      

2、DelayQueue核心属性

      由DelayQueue 结构可以发现,DelayQueue 存储的数据必须实现 Delayed 接口

      DelayQueue 结构如下:

            

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {

    //锁,阻塞队列需要使用锁来保证线程安全
    //只有一把锁,表示生产者和消费者使用的是同一把锁
    private final transient ReentrantLock lock = new ReentrantLock();
    //基于优先级队列 PriorityQueue
    private final PriorityQueue<E> q = new PriorityQueue<E>();

    /**
     * leader 一般用来保存等待堆顶数据的消费者线程
     */
    private Thread leader = null;

    /**
     * 基于 PriorityQueue(基于二叉堆)实现数据存储,生产者在插入数据时是不会阻塞的,
     * 当前的Condition就是给消费者用的,当消费者获取数据时,当堆顶数据的延迟时间还不为
     * 0(即还没到执行时间点),此时消费者线程会阻塞挂起等待一会(等待的是堆顶数据),直到堆顶数据延迟时间为0(到达任务执行时间点)
     * 或者 生产者新插入的数据到了堆顶,此时生产者会调用Condition.signal() 方法唤醒消费者线程
     */
    private final Condition available = lock.newCondition();

   
    public DelayQueue() {}

   
    public DelayQueue(Collection<? extends E> c) {
        this.addAll(c);
    }
}



/**
   Delayed 继承 Comparable接口,所以 Delayed 的实现都可以进行比较操作
*/
public interface Delayed extends Comparable<Delayed> {

   
    long getDelay(TimeUnit unit);//获取延迟时间,比较延迟时间
}

3、DelayQueue使用示例

      DelayQueue 常用方法也是 BlockingQueue接口中定义的那几个存储数据和获取数据的方法,

      只有一点需要注意,即 DelayQueue 保存的数据必须实现接口Delayed 

       DelayQueue 使用示例如下:

                

public class TaskDelayed implements Delayed {

    private String name;
    /** 执行时间点*/
    private Long time;

   
    public TaskDelayed(String name,Long time){
        this.name = name;
        this.time = System.currentTimeMillis()+time;
    }

    /**
     * 设置 任务TaskDelayed 什么时候可以出延迟队列DelayedQueue
     * 该方法返回值小于等于0时任务才会从 延迟队列DelayedQueue 中取出执行
     *
     */
    @Override
    public long getDelay(TimeUnit unit) {
        //TimeUnit.MILLISECONDS :将时间转换为毫秒
        return unit.convert(time - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
    }

    /**
     * 比较器
     * 2个 TaskDelayed 任务在存储到延迟队列时的比较方式,通过time属性进行比较
     * 返回值:
     *    < 0: 按从小到大排列
     *    == 0 : 相等
     *    >0 : 从大到小排列
     * @param o
     * @return
     */
    @Override
    public int compareTo(Delayed o) {
        TaskDelayed task = (TaskDelayed) o;

        return (int)(this.time - task.getTime());
    }
}



public class DelayedQueueDemo01 {

    public static void main(String[] args) throws InterruptedException {

        TaskDelayed task1 = new TaskDelayed("Tome",2000L);
        TaskDelayed task2 = new TaskDelayed("JieRui",4000L);
        TaskDelayed task3 = new TaskDelayed("zhuDy",3000L);
        TaskDelayed task4 = new TaskDelayed("zhanmusi",1000L);

        //DelayQueue 存放的数据必须实现接口Delayed
        DelayQueue<TaskDelayed> queue = new DelayQueue<>();
        //添加数据
        queue.add(task1);
        queue.offer(task2);
        queue.offer(task3,4, TimeUnit.SECONDS);
        queue.put(task4);

        
        //取数据
        System.out.println(queue.remove());//若堆顶数据的延迟时间还没到达,则poll()返回null,remove()会直接抛出异常
        System.out.println(queue.poll());
        System.out.println(queue.poll(5,TimeUnit.SECONDS));
        System.out.println(queue.take());


    }
}

4、DelayQueue写入流程分析

      因为 DelayQueue 底层是基于 PriorityQueue  实现的,也就是基于二叉堆实现的,所以

      DelayQueue 是一个无界的队列,存储数据的数组可以动态扩容,所以生产者不需要关注

      队列满了而阻塞的问题,因此这里只需要关注offer(E e) 方法就可以了

             offer(E e) 代码如下:

               

public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //直接调用PriorityQueue的插入方法
            q.offer(e);
            //判断数据插入后的二叉堆的堆顶元素是不是刚插入的数据,
            //若是,则说明当前堆顶数据可能已到达延迟时间可以进行消费,唤醒等待的消费者线程,并将当前等待的消费者线程设置为null
            if (q.peek() == e) {
                /**
                 * leader 赋值为null
                 * todo 在消费者消费数据时会判断leader 是否为null
                 */
                leader = null;
                /**
                 * 唤醒挂起阻塞的消费者线程,避免刚插入的数据的延迟时间出现问题
                 * 这里可以发现消费者等待的是堆顶数据
                 */
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

              

5、DelayQueue取数据流程分析

      消费者取数据过程需要考虑阻塞问题:

              1)队列为空,无数据,消费者线程需要挂起等待一会

              2)堆顶数据的延迟时间还没到,此时消费者线程需要挂起等待一会

              3)当消费者A已经在等待堆顶数据,此时消费B也过来取数据,此时消费者B需要

                    挂起等待一会

5.1、remove() 

        该方法功能是取数据,取堆顶数据,若取不到数据,则直接抛出异常。

        注意:若堆顶数据的延迟时间还没到达,则取不到数据,也会抛出异常

        remove 方法如下:

              

5.2、poll()

         该方法功能是读取数据,poll()方法不会阻塞消费者,能获取数据就直接返回,否则返回null

         poll 方法代码如下:

public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //查看堆顶数据
            E first = q.peek();
            /**
             *first == null 表示堆为空,没有数据
             * getDelay 方法返回值大于0,表示堆顶数据还没到延迟时间,不能执行,堆顶数据无法取出
             */
            if (first == null || first.getDelay(NANOSECONDS) > 0)
                return null;
            else
                return q.poll();
        } finally {
            lock.unlock();
        }
    }

5.3、poll(long timeout, TimeUnit unit)

         带超时时间的读取数据的方法

         poll方法代码如下:

            

/**
 *带超时时间的取数据方法
 */
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        //将超时时间转换为纳秒
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        //加锁,可被中断,当被中断时会抛出异常,直接退出
        lock.lockInterruptibly();
        try {
            //自旋
            for (;;) {
                //查看堆顶数据
                E first = q.peek();
                //若堆顶数据为空,即堆无数据,则判断超时时间是否已过,若超时时间也已经过了,则返回null
                if (first == null) {
                    if (nanos <= 0)
                        return null;
                    else
                        //阻塞等待,并返回剩余超时时间
                        //等待生产者线程添加数据之后唤醒
                        nanos = available.awaitNanos(nanos);
                } else {//堆有数据
                    //获取堆顶数据的延迟时间,单位纳秒
                    long delay = first.getDelay(NANOSECONDS);
                    //若堆顶数据的延迟时间小于等于0,表示当前堆顶数据可以执行,立即取出
                    if (delay <= 0)
                        return q.poll();
                    //若堆顶数据的延迟时间大于0(表示堆顶数据还不能执行)且超时时间已经过了,则返回null
                    if (nanos <= 0)
                        return null;
                    /**
                     * 指定到这里,说明堆顶数据的延迟时间大于0(表示延迟时间没到,堆顶数据还不能执行)且方法超时时间还没过
                     * 消费者需要挂起等待
                     */
                    first = null; // don't retain ref while waiting
                    //方法剩余超时时间小于堆顶数据的延迟时间,则消费者线程继续阻塞,并返回剩余超时时间
                    /**
                     * todo 疑问:这里为什么不直接结束,反正最终是无法获取数据的?
                     *           因为 你不确定在剩余的超时时间nanos内,是否有新的数据插入(新插入的数据可能延迟时间很短),
                     *           前边offer(E e)新增数据后也会唤醒等待的消费者线程
                     *           第二个条件 leader != null,leader != null表示前边已经有
                     *           消费者线程在挂起阻塞堆顶数据的延迟时间到期,后边的消费者线程执行到这里
                     *           需要直接阻塞挂起,这样避免 leader 的重复赋值
                     */
                    if (nanos < delay || leader != null)
                        nanos = available.awaitNanos(nanos);
                    else {//方法剩余超时时间大于堆顶数据的延迟时间,表示当前消费者可以在超时时间nanos内拿到堆顶数据,
                          // 且当前没有消费者在等待堆顶数据
                        //将leader 设置为 当前消费者线程
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            //阻塞等待,并返回剩余的阻塞时间
                            //当前消费者阻塞堆顶数据的延迟时间
                            long timeLeft = available.awaitNanos(delay);
                            //更新剩余的可阻塞时间,已消耗的超时时间是 delay - timeLeft
                            nanos -= delay - timeLeft;
                        } finally {
                            //堆顶数据的延迟时间到了,将 leader 设置为null
                            //这一步只有生产者和消费者自己可以做
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            //没有消费者线程在等待堆顶数据的延迟时间且堆不为空,此时可能还有消费者在阻塞,则唤醒这些阻塞的消费者线程
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

5.4、take()

         读取数据,允许中断,若队列为空则一直阻塞,直到队列有数据 或 被中断时异常退出

         take() 方法代码如下:     

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        //加锁,允许中断,中断异常退出
        lock.lockInterruptibly();
        try {
            //自旋
            for (;;) {
                //查看堆顶数据
                E first = q.peek();
                //若队列没有数据,则阻塞,由生产者唤醒 或者被中断异常退出
                if (first == null)
                    available.await();
                else {//队列不为空
                    //获取堆顶数据的延迟时间
                    long delay = first.getDelay(NANOSECONDS);
                    //表示堆顶数据可以执行,则从队列中取出数据
                    if (delay <= 0)
                        return q.poll();
                    //执行到这里,表示堆顶数据的延迟时间还没到,消费者线程需要阻塞,等待堆顶数据到达其延迟时间
                    first = null; // don't retain ref while waiting
                    //前边有消费者线程在阻塞等待堆顶数据的延迟时间到达,则当前线程直接阻塞
                    if (leader != null)
                        available.await();
                    else {//当前没有消费者线程在等待堆顶数据的延迟时间到达,则把当前消费者设置为等待堆顶数据延迟时间到达的线程
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            //阻塞,阻塞时间是堆顶数据的延迟时间
                            available.awaitNanos(delay);
                        } finally {
                            //阻塞结束,获取到堆顶数据后将 leader 设置为Null
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            //没有消费者线程在等待堆顶数据的延迟时间且堆不为空,此时可能还有消费者在阻塞,则唤醒这些阻塞的消费者线程
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2059952.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

NCL画出来的图模糊问题处理

问题介绍&#xff1a; 如图所示&#xff0c;NCL画出来的图分辨率比较低&#xff0c;图片比较模糊&#xff0c;怎么将分辨率提高&#xff1f; 解决方法&#xff1a; ; 采用这个方法来定义wkswks_type "png"wks_typewkWidth 2500wks_typewkHeight 2500wks gsn_…

网易、网易互娱、360、头条、商汤等公司面试真题....

测试岗/测试开发岗面试真题 来源与网易、网易互娱、360、头条、商汤等公司面试真题 自我介绍 项目中负责什么? 团队几个人&#xff1f;合作情况 为什么要读研 项目/实习介绍 项目中负责什么? 团队几个人&#xff1f;合作情况 项目的方法怎么改进&#xff0c;和别人方…

微信为什么会限制加好友?

微信限制加好友主要是为了防止垃圾信息和滥用行为&#xff0c;包括以下几个原因&#xff1a; 1、防止骚扰&#xff1a;限制加好友可以减少陌生人骚扰和垃圾广告。 2、保护用户隐私&#xff1a;控制好友请求能更好地保护用户的个人信息。 3、提升用户体验&#xff1a;避免用户…

用博达网站群管理平台设计网站时如何创建二级导航

1 介绍 现用博达网站群管理平台设计出的网站只能一级导航&#xff0c;亦即点击首页的顶端导航&#xff0c;直接出现列表页&#xff0c;无法出现二级菜单。二级菜单在网站开发中被称为二级导航。 怎样用博达网站群管理平台制作出二级导航的效果&#xff1f;这个问题在《网站群…

掌握电子邮件的艺术:使用 Mailbird 统一管理您的数字生活

在数字时代&#xff0c;电子邮件已成为我们沟通的骨干。无论是商务交流、家庭联络&#xff0c;还是订阅更新&#xff0c;我们几乎每天都在使用电子邮件。但随着账户数量的增加&#xff0c;管理这些账户变得日益复杂。如何有效地整合和优化您的电子邮件体验&#xff1f;Mailbird…

Arduino调试ESP32常见问题 exit status 1

问题1&#xff1a;代码上传&#xff08;烧录&#xff09;报Failed uploading: uploading error: exit status 1大概率原因&#xff1a;没有安装对应的驱动&#xff0c;我的ESP32驱动是CH340点击这里下载CH340 下载后打开&#xff0c;若出现乱码不用在意&#xff0c;点击第一个按…

原生js实现下滑到当前模块进度条填充

<div style"height: 1500px;"></div> <div class"progress-container"><div class"progress-bar" data-progress"90%"><p class"progress-text">Google Ads在Google搜索引擎上覆盖超过90%的互…

浙江大学蒋超实验室在JHM发文揭示日常使用量的一次性纸杯释放的微塑料或可能影响孕期健康

CQ师姐做的一个纸杯微塑料项目&#xff0c;非常有意思&#xff0c;揭示了日常生活中真实来源的孕期微塑料暴露&#xff0c;对生殖和代谢性能的影响和调控机制。我参与了其中的部分实验和分析&#xff0c;学习了养小鼠&#xff0c;灌胃&#xff0c;解剖和部分塑料的定性定量等实…

二、Socket链接方式分类

一、Socket通信基本流程图 1、流程图 2、链接方式 &#xff08;1&#xff09;同步 商业中不会用&#xff0c;会有阻塞的情况出现&#xff1b;举例&#xff1a; 客户端的玩家升级&#xff0c;向服务器发送这条信息&#xff0c;而服务器传输回来需要一定时间&#xff0c;此时…

探索《黑神话:悟空》品质保障的背后:ISO体系认证

《黑神话&#xff1a;悟空》横空出世 8月20日上午10点&#xff0c;国产首款大型3A游戏《黑神话&#xff1a;悟空》正式上线。游戏一经上线便吸引了无数国内外用户的关注&#xff0c;不仅仅是因为其高超的游戏制作技术&#xff0c;极高的画面精度&#xff0c;精良的的视觉和战斗…

如何将平淡无奇的产品推向市场?借助ChatGPT,仅需3秒即可化身短视频创意策划大师,助你的产品一夜成名!

本文通过一系列生动的实例&#xff0c;展示了如何通过ChatGPT生成创意和独特的宣传方式&#xff0c;将平凡或不起眼的产品转化为市场上的明星。从全红婵最爱的小乌龟到棋牌室排烟机&#xff0c;再到食物研磨器的成功案例&#xff0c;我们可以看到&#xff0c;创意和创新的宣传策…

手把手教你如何注册使用Runway Gen3,10秒搞定专业级视频制作

大家好&#xff01;我是YUAN。 今天&#xff0c;我们要介绍的是一款AI视频制作的王者级工具——Runway Gen-3。它不仅能够在短时间内生成高质量的视频&#xff0c;还能满足不同风格和场景的需求。 一、Runway Gen-3是什么&#xff1f; Runway Gen-3是一款功能强大的AI视频生…

jenkins最佳实践(二):Pipeline流水线部署springCloud微服务项目

各位小伙伴们大家好呀&#xff0c;我是小金&#xff0c;本篇文章我们将介绍如何使用Pipeline流水线部署我们自己的微服务项目&#xff0c;之前没怎么搞过部署相关的&#xff0c;以至于构建流水线的过程中中也遇到了很多自己以前没有考虑过的问题&#xff0c;特写此篇&#xff0…

水凝胶结机器人咋自主运动?利用拓扑调用的自我调节!

大家好&#xff0c;今天我们来聊聊一项有趣的研究 —— 水凝胶结机器人。这篇文章《Animating hydrogel knotbots with topology-invoked self-regulation》发表于《Nature Communications》。想象一下&#xff0c;小小的机器人能够像生物一样自主运动&#xff0c;这是不是很神…

极空间Z4Pro,最懂中国人的NAS,为了极影视值得入手

极空间Z4Pro&#xff0c;最懂中国人的NAS&#xff0c;为了极影视值得入手 小伙伴们大家好&#xff0c;我的极空间Z4Pro是在首发的时候交定金购买的&#xff0c;到目前为止也算是用了很长一段时间了&#xff0c;说一下自己的使用体验吧。 首先是这个极空间的外观&#xff0c;并…

【流媒体】RTMPDump—主流程简单分析

目录 1. main函数1.1 初始化socket&#xff08;InitSockets&#xff09;1.2 初始化RTMP&#xff08;RTMP_Init&#xff09;1.3 解析URL&#xff08;RTMP_ParseURL&#xff09;1.4 配置流信息&#xff08;RTMP_SetupStream&#xff09; RTMP协议相关&#xff1a; 【流媒体】RTMP…

MySQL基础操作探秘

ok&#xff0c;前面两个文章介绍了MySQL的安装与配置环境&#xff0c;以及如何进行删除。 那么&#xff0c;接下来探寻数据库的一些基本操作。 首先我们登录上数据库先&#xff1a; 我们要对数据库进行操作&#xff0c;那我们要用到有些命令&#xff0c;这些命令在这里称为&a…

企业财务自动化:RPA机器人的优势与挑战

随着数字化浪潮的推进&#xff0c;企业财务自动化已成为企业提升效率和降低成本的关键策略。在这一背景下&#xff0c;RPA以其独特的优势&#xff0c;正逐渐成为企业财务自动化的重要工具&#xff0c;然而&#xff0c;RPA在实际应用中也面临着一些挑战。本文金智维将围绕RPA机器…

VBA技术资料MF187:写入文件属性及自定义属性

我给VBA的定义&#xff1a;VBA是个人小型自动化处理的有效工具。利用好了&#xff0c;可以大大提高自己的工作效率&#xff0c;而且可以提高数据的准确度。“VBA语言専攻”提供的教程一共九套&#xff0c;分为初级、中级、高级三大部分&#xff0c;教程是对VBA的系统讲解&#…

英伟达与联发科合作生产支持G-SYNC完整功能的显示器 不需要英伟达专有模块

英伟达的 G-SYNC 同步技术在推出多年后终于出现比较让人关注的改动&#xff1a;英伟达宣布与联发科合作&#xff0c;将所有当前和未来的 G-SYNC 功能集成到联发科的缩放器中。 简单来说就是未来显示器直接使用联发科的缩放器技术即可&#xff0c;不需要再配备英伟达专有的 G-SY…