并发编程-延时队列DelayQueue

news2024/10/6 12:28:03

数据结构学习网站:

Data Structure Visualization

思维导图

      

DelayQueue (延时队列)

          DelayQueue 是一个支持延时获取元素的阻塞队列 , 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。
        延迟队列的特点是: 不是先进先出,而是会按照延迟时间的 长短来排序,下一个即将执行的任务会排到队列的最前面。
它是无界队列,放入的元素必须实现 Delayed 接口,而 Delayed 接口又继承了 Comparable 接
口,所以自然就拥有了比较和排序的能力,代码如下:
public interface Delayed extends Comparable<Delayed> {
 //getDelay 方法返回的是“还剩下多长的延迟时间才会被执行”,
 //如果返回 0 或者负数则代表任务已过期。
 //元素会根据延迟时间的长短被放到队列的不同位置,越靠近队列头代表越早过期。
 long getDelay(TimeUnit unit);
 }

DelayQueue使用

DelayQueue 实现延迟订单
在实现一个延迟订单的场景中,我们可以定义一个 Order 类,其中包含订单的基本信息,例如订单编 号、订单金额、订单创建时间等。同时,我们可以让 Order 类实现 Delayed 接口,重写 getDelay 和 compareTo 方法。在 getDelay 方法中,我们可以计算订单的剩余延迟时间,而在 compareTo 方法 中,我们可以根据订单的延迟时间进行比较。
下面是一个简单的示例代码,演示了如何使用 DelayQueue 来实现一个延迟订单的场景:
public class DelayQueueExample {

    public static void main(String[] args) throws InterruptedException {
        DelayQueue<Order> delayQueue = new DelayQueue<>();

        // 添加三个订单,分别延迟 5 秒、2 秒和 3 秒
        delayQueue.put(new Order("order1", System.currentTimeMillis(), 5000));
        delayQueue.put(new Order("order2", System.currentTimeMillis(), 2000));
        delayQueue.put(new Order("order3", System.currentTimeMillis(), 3000));

        // 循环取出订单,直到所有订单都被处理完毕
        while (!delayQueue.isEmpty()) {
            Order order = delayQueue.take();
            System.out.println("处理订单:" + order.getOrderId());
        }
    }

    static class  Order implements Delayed{
        private String orderId;
        private long createTime;
        private long delayTime;

        public Order(String orderId, long createTime, long delayTime) {
            this.orderId = orderId;
            this.createTime = createTime;
            this.delayTime = delayTime;
        }

        public String getOrderId() {
            return orderId;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            long diff = createTime + delayTime - System.currentTimeMillis();
            return unit.convert(diff, TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            long diff = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
            return Long.compare(diff, 0);
        }
    }
}
        由于每个订单都有不同的延迟时间,因此它们将会按照延迟时间的顺序被取出。当延迟时间到达时, 对应的订单对象将会被从队列中取出,并被处理。

DelayQueue原理

数据结构
 //用于保证队列操作的线程安全
 private final transient ReentrantLock lock = new ReentrantLock();

 // 优先级队列,存储元素,用于保证延迟低的优先执行
 private final PriorityQueue<E> q = new PriorityQueue<E>();

 // 用于标记当前是否有线程在排队(仅用于取元素时) leader 指向的是第一个从队列获取元素阻塞的线程
 private Thread leader = null;

 // 条件,用于表示现在是否有可取的元素 当新元素到达,或新线程可能需要成为leader时被通知
 private final Condition available = lock.newCondition();

 public DelayQueue() {}
 public DelayQueue(Collection<? extends E> c) {
 this.addAll(c);
 }

入队put方法

    public void put(E e) {
        offer(e);
    }
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 入队
            q.offer(e);
            if (q.peek() == e) {
                // 若入队的元素位于队列头部,说明当前元素延迟最小
                // 将 leader 置空
                leader = null;
                // available条件队列转同步队列,准备唤醒阻塞在available上的线程
                available.signal();
            }
            return true;
        } finally {
            lock.unlock(); // 解锁,真正唤醒阻塞的线程
        }
    }

出队take方法

 public E take() throws InterruptedException {
         final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
         try {
             for (;;) {
                 E first = q.peek();// 取出堆顶元素( 最早过期的元素,但是不弹出对象)
                 if (first == null)// 如果堆顶元素为空,说明队列中还没有元素,直接阻塞等待
                     available.await();//当前线程无限期等待,直到被唤醒,并且释放锁。
                 else {
                     long delay = first.getDelay(NANOSECONDS);// 堆顶元素的到期时间

                     if (delay <= 0)// 如果小于0说明已到期,直接调用poll()方法弹出堆顶元素
                         return q.poll();

                     // 如果delay大于0 ,则下面要阻塞了
                     // 将first置为空方便gc
                     first = null;
                     // 如果有线程争抢的Leader线程,则进行无限期等待。
                    if (leader != null)
                        available.await();
                     else {
                         // 如果leader为null,把当前线程赋值给它
                         Thread thisThread = Thread.currentThread();
                         leader = thisThread;
                        try {
                             // 等待剩余等待时间
                             available.awaitNanos(delay);
                             } finally {
                             // 如果leader还是当前线程就把它置为空,让其它线程有机会获取元素
                             if (leader == thisThread)
                                 leader = null;
                             }
                         }
                     }
                 }
             } finally {
             // 成功出队后,如果leader为空且堆顶还有元素,就唤醒下一个等待的线程
             if (leader == null && q.peek() != null)
                // available条件队列转同步队列,准备唤醒阻塞在available上的线程
             available.signal();
             // 解锁,真正唤醒阻塞的线程
             lock.unlock();
             }
         }
1. 当获取元素时,先获取到锁对象。
2. 获取最早过期的元素,但是并不从队列中弹出元素。
3. 最早过期元素是否为空,如果为空则直接让当前线程无限期等待状态,并且让出当前锁对象。
4. 如果最早过期的元素不为空
5. 获取最早过期元素的剩余过期时间,如果已经过期则直接返回当前元素
6. 如果没有过期,也就是说剩余时间还存在,则先获取Leader对象,如果Leader已经有线程在处理,则当前线程进 行无限期等待,如果Leader为空,则首先将Leader设置为当前线程,并且让当前线程等待剩余时间。
7. 最后将Leader线程设置为空
8. 如果Leader已经为空,并且队列有内容则唤醒一个等待的队列。

如何选择适合的阻塞队列

 选择策略

通常我们可以从以下 5 个角度考虑,来选择合适的阻塞队列:
功能
        第 1 个需要考虑的就是 功能层面 ,比如是否需要阻塞队列帮我们排序,如 优先级排序、延迟执行 等。如果有这个需要,我们就必须选择类似于 PriorityBlockingQueue 之类的有排序能力的阻塞队 列。
容量
        第 2 个需要考虑的是 容量 ,或者说 是否有存储的要求 ,还是只需要“直接传递”。在考虑这一点
的时候,我们知道前面介绍的那几种阻塞队列,有的是 容量固定的,如 ArrayBlockingQueue ;有的 默认是 容量无限的,如 LinkedBlockingQueue ;而有的里面 没有任何容量,如
SynchronousQueue ;而对于 DelayQueue 而言,它的容量固定就是 Integer.MAX_VALUE 。所以不同阻塞队列的容量是千差万别的, 我们需要根据任务数量来推算出合适的容量 ,从而去选取合适的 BlockingQueue。
能否扩容
        第 3 个需要考虑的是 能否扩容 。因为有时我们并不能在初始的时候很好的准确估计队列的大小, 因为业务可能有高峰期、低谷期。如果一开始就固定一个容量,可能无法应对所有的情况,也是不合适的,有可能需要动态扩容。如果我们需要动态扩容的话,那么就不能选择 ArrayBlockingQueue , 因为它的容量在创建时就确定了,无法扩容。相反 PriorityBlockingQueue 即使在指定了初始容量 之后,后续如果有需要,也可以自动扩容 。所以 我们可以根据是否需要扩容来选取合适的队列。
内存结构
        第 4 个需要 考虑的点就是内存结构 。我们分析过 ArrayBlockingQueue 的源码,看到了它的内部 结构是“数组” 的形式。和它不同的是, LinkedBlockingQueue 的内部是用链表 实现的,所以这里就需要我们考虑到, ArrayBlockingQueue 没有链表所需要的“节点”,空间利用率更高 。所以如果我们对性能有要求可以从内存的结构角度去考虑这个问题。
性能
        第 5 点就是 从性能的角度去考虑 。比如 LinkedBlockingQueue 由于拥有两把锁,它的操作粒度更细 在并发程度高的时候,相对于 只有一把锁的 ArrayBlockingQueue 性能会更好 。另外, SynchronousQueue 性能往往优于其他实现 ,因为 它只需要“直接传递” ,而不需要存储的过程。 如果我们的场景需要直接传递的话,可以优先考虑 SynchronousQueue

 线程池对于阻塞队列的选择

 线程池有很多种,不同种类的线程池会根据自己的特点,来选择适合自己的阻塞队列。
Executors 类下的线程池类型:
FixedThreadPool (SingleThreadExecutor 同理)选取的是 LinkedBlockingQueue
CachedThreadPool 选取的是 SynchronousQueue
ScheduledThreadPool SingleThreadScheduledExecutor同理)选取的是延迟队列

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1116347.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Elasticsearch 8.X 分词插件版本更新不及时解决方案

1、关于 Elasticsearch 8.X IK 分词插件相关问题 球友在 ElasticSearch 版本选型问题中提及&#xff1a;如果要使用ik插件&#xff0c;是不是就使用目前最新的IK对应elasticsearch的版本“8.8.2”&#xff1f; https://github.com/medcl/elasticsearch-analysis-ik/releases/ta…

C++入门3+类和对象上

C入门3类和对象上 一.内联函数1.宏函数的缺点2.宏函数的优点3.内联函数的语法4.内联函数的优缺点5.内联函数的使用条件6.内联函数的展开7.内联函数的一大注意事项1.内联函数声明跟定义分离2.内联函数声明跟定义分离的"奇怪"现象 二.C11对于C语法的补充1.auto关键字1.…

【Nginx34】Nginx学习:安全链接、范围分片以及请求分流模块

Nginx学习&#xff1a;安全链接、范围分片以及请求分流模块 又迎来新的模块了&#xff0c;今天的内容不多&#xff0c;但我们都进行了详细的测试&#xff0c;所以可能看起来会多一点哦。这三个模块之前也从来都没用过&#xff0c;但是通过学习之后发现&#xff0c;貌似还都挺有…

python模块之feapder 爬虫框架

一、简介 官网&#xff1a;https://feapder.com/#/ feapder是一款上手简单&#xff0c;功能强大的Python爬虫框架&#xff0c;内置AirSpider、Spider、TaskSpider、BatchSpider四种爬虫解决不同场景的需求&#xff0c;但像任何工具一样&#xff0c;它也有其优点和缺点。以下是…

如何利用考培系统进行个性化学习和评估

考培系统作为一种现代化的学习和评估工具&#xff0c;可以为学生提供个性化的学习和评估服务。它利用先进的技术和算法&#xff0c;根据学生的学习情况和需求&#xff0c;为其量身定制学习计划&#xff0c;并提供相应的评估反馈。 1. 个性化学习 考培系统通过分析学生的学习情…

QML(25)——文本输入框组件的区别(TextField TextInput TextArea TextEdit)

目录 效果展示适用场景文本组件TextLabelText和Label的区别 单行文本输入框TextFieldTextInputTextField 和 TextInput的区别 多行文本输入框TextAreaTextArea 和 TextEdit 的区别 效果展示 适用场景 场景组件属性短文本Text长文本 末尾省略Textelide: Text.ElideRight文本设置…

通用FIFO设计深度8宽度64,verilog仿真,源码和视频

名称&#xff1a;通用FIFO设计深度8宽度64&#xff0c;verilog仿真 软件&#xff1a;Quartus 语言&#xff1a;verilog 本代码为FIFO通用代码&#xff0c;其他深度和位宽可简单修改以下参数得到 reg [63:0] ram [7:0];//RAM。深度8&#xff0c;宽度64 代码功能&#xff1a…

ArmSoM-RK3588编解码之mpp解码demo解析:mpi_dec_test

1. 简介 [RK3588从入门到精通] 专栏总目录 mpi_dec_test 是rockchip官方解码 demo 本篇文章进行mpi_dec_test 的代码解析&#xff0c;解码流程解析 2. 环境介绍 硬件环境&#xff1a; ArmSoM-W3 RK3588开发板 软件版本&#xff1a; OS&#xff1a;ArmSoM-W3 Debian11 3.…

低代码助力软件开发

低代码开发工具正在日益变得强大&#xff0c;它正不断弥合着前后端开发之间的差距。对于后端来说&#xff0c;基于低代码平台开发应用时&#xff0c;完全不用担心前端的打包、部署等问题&#xff0c;也不用学习各种框架&#xff08;Vue、React、Angular等等&#xff09;&#x…

GO 语言如何用好变长参数?

函数重载 对于函数重载相信编码过的 xdm 肯定不会陌生&#xff0c;函数重载就是在同一个作用域内定义多个具有相同名称但参数列表不同的函数 此处的参数列表不同&#xff0c;可以是参数的类型不同&#xff0c;参数的个数不同 那么我们一起分别来看看 C 语言&#xff0c;C 语…

物联网专业前景怎么样?

物联网专业前景怎么样&#xff1f; 物联网专业在当今技术发展迅速的背景下具有广阔的前景。以下是物联网专业的一些优势和就业前景&#xff1a; 1.市场需求大&#xff1a;物联网作为人工智能、云计算和大数据等技术的结合&#xff0c;已经成为许多行业的核心需求。各行各业都需…

【智能指针】

目录&#xff1a; 前言智能指针&#xff08;一&#xff09;智能指针初始了解内存泄漏1. 内存泄漏分类2. 如何检测内存泄漏3. 如何避免内存泄漏使用智能指针之前&#xff0c;异常安全的处理 &#xff08;二&#xff09;智能指针实现既原理智能指针RAII使用智能指针之后&#xff…

060:mapboxGL点击某处,通过flyTo,以动画的形式聚焦到此点

第060个 点击查看专栏目录 本示例的目的是介绍演示如何在vue+mapbox中点击某处,通过flyto,以动画的形式聚焦到此点。这里用到了flyTo的方法,里面可以设置bearing,zoom,pitch等众多的属性内容。 直接复制下面的 vue+mapbox源代码,操作2分钟即可运行实现效果 文章目录 示…

数据结构与算法-(10)---列表(List)

&#x1f308;write in front&#x1f308; &#x1f9f8;大家好&#xff0c;我是Aileen&#x1f9f8;.希望你看完之后&#xff0c;能对你有所帮助&#xff0c;不足请指正&#xff01;共同学习交流. &#x1f194;本文由Aileen_0v0&#x1f9f8; 原创 CSDN首发&#x1f412; 如…

Linux 最大可以打开多少文件描述符?

Linux 最大可以打开多少文件描述符&#xff1f; 在日常开发中&#xff0c;对文件的操作可谓是再寻常不过的意见事情。那么你是否有这样一个疑问&#xff0c; 我最多可以打开多少个文件呢&#xff1f; 在Linux系统中&#xff0c;当某个程序打开文件时&#xff0c;内核返回相应…

SQL查询命令互转vba格式

最近搞个Excel的vba查询数据库&#xff0c;发现vba有代码行长度限制需要转换下就弄了这个&#xff0c;布局和功能暂且这样了&#xff0c;哪位大佬如果有兴趣的可以再美化下&#xff01; 这次更新了SQL命令互转VBA格式&#xff0c; SQL原始格式要分行的不能一坨贴进去&#xff0…

Java日志系统之JUL

目录 JUL介绍 JUL的使用 日志级别 指定日志输出地址 Logger对象的父子关系 Logger读取配置文件 JUL介绍 Java自带的框架&#xff0c;使用简单&#xff0c;无需引入依赖 JUL的使用 public class JULTest {Testpublic void testLogger() throws Exception{//获取日志记录…

2 用TensorFlow构建一个简单的神经网络

上一篇&#xff1a;1 如何入门TensorFlow-CSDN博客 环境搭建 后续介绍的相关代码都是在pycharm运行&#xff0c;pycharm安装略。 打开pycharm&#xff0c;创建一个新的项目用于tensorflow编码练习&#xff0c;在Terminal输入命令&#xff1a; # 依赖最新版本的pip pip inst…

[AutoSAR系列] 1.2 AutoSar 综述

AutoSAR是一种汽车工业领域的标准化软件架构,旨在简化不同汽车制造商之间的软件开发和交互。该标准于2003年由一系列欧洲汽车制造商成立的AutoSAR联盟制定并发布,目前已经成为全球范围内的标准。下面将对AutoSAR的概念、架构和实现进行综述。 1. 概述 AutoSAR是汽车电子控制…

Qt 读写文件(QFileQTextStreamQDataStream) 详解

一、读写文本文件 (QFile 类) Qt QFile类是一个用于读取和写入文件的类&#xff0c;它提供了对文件的访问、读取和写入等操作。它既可以操作文本文件&#xff0c;也可以操作二进制文件。 QFile类的功能包括&#xff1a; 打开、关闭文件读取文件内容写入文件内容支持文本模式…