Java 源码 - DelayQueue 源码解析

news2025/1/23 6:05:15

文章目录

        • 1. 整体设计
          • 1.1 类注释
          • 1.2、类图
          • 1.3 延迟队列的属性
          • 1.4 DelayQueue 的主要方法
            • 1.4.1 offer 添加元素
            • 1.4.2 take 取出元素
            • 1.4.3 poll 取出元素

1. 整体设计

DelayQueue 延迟队列底层使用的是锁的能力,比如说要在当前时间往后延迟 5 秒执行,那么当前线程就会沉睡 5 秒,等 5 秒后线程被唤醒时,如果能获取到资源的话,线程即可立马执行。原理上似乎很简单,但内部实现却很复杂,有很多难点,比如当运行资源不够,多个线程同时被唤醒时,如何排队等待?比如说在何时阻塞?何时开始执行等等?接下来我们从源码角度来看下是如何实现的。

1.1 类注释

类注释上比较简单,只说了三个概念:

  1. 无界延时队列中元素将在过期时被执行,越靠近队头,越早过期;
  2. 未过期的元素不能够被 take;
  3. 不允许空元素。
1.2、类图

DelayQueue 的类图,关键是 DelayQueue 类上是有泛型的,如下:
image.png

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {

从泛型中可以看出,DelayQueue 中的元素必须是 Delayed 的子类,Delayed 是表达延迟能力的关键接口,其继承了 Comparable 接口,并定义了还剩多久过期的方法,如下:

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

也就是说 DelayQueue 队列中的元素必须是实现 Delayed 接口和 Comparable 接口的,并覆写了 getDelay 方法和 compareTo 的方法才行,不然在编译时,编译器就会提醒我们元素必须强制实现 Delayed 接口。
compareTo(Delayed o):用于比较延时,这是队列里元素的排序依据。当生产者线程调用 put 之类的方法加入元素时,会触发 Delayed 接口中的 compareTo 方法进行排序,也就是说队列中元素的顺序是按到期时间排序的,而非它们进入队列的顺序。排在队列头部的元素是最早到期的,越往后到期时间赿晚。
getDelay(TimeUnit unit):这个接口返回元素是否到期,小于等于 0 表示元素已到期,大于 0 表示元素未到期。消费者线程查看队列头部的元素(注意是查看不是取出),然后调用元素的 getDelay 方法,如果此方法返回的值小于0或者等于0,则消费者线程会从队列中取出此元素,并进行处理。如果 getDelay 方法返回的值大于 0,则消费者线程 wait 返回的时间值后,再从队列头部取出元素,此时元素已经到期。

1.3 延迟队列的属性

DelayQueue 中的重要属性如下所示。

// 可重入锁,用于保证线程安全
private final transient ReentrantLock lock = new ReentrantLock();
// DelayQueue 的实现依赖于 PriorityQueue(优先队列),用于存储元素,并按过期时间优先排序
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 用于优化内部阻塞通知的线程
// 第一个等待某个延时对象的线程,在延时对象还没有到期时其他线程看到这个 leader 不为 null,那么就直接 wait,主要是为了避免大量线程在同一时间点唤醒,导致大量的竞争,反而影响性能
private Thread leader = null;
// 用于实现阻塞的 Condition 对象
private final Condition available = lock.newCondition();

DelayQueue 内部使用非线程安全的优先队列(PriorityQueue),并使用 Leader-Followers (领导者-追随者)模式,最小化不必要的等待时间。什么是领导者-追随者模式.

1.4 DelayQueue 的主要方法
1.4.1 offer 添加元素
public boolean offer(E e) {
        // 获取全局独占锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 向优先队列中插入元素
            q.offer(e);
            // 检验元素是否为队首,是则设置 leader 为 null, 并唤醒一个消费线程 
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            // 释放全局独占锁
            lock.unlock();
        }
    }

leader 是等待获取队头元素的线程,领导者-追随者模式设计减少不必要的等待。
如果 leader != null,表示已经有线程在等待获取队头元素,会通过 await() 方法让出当前线程等待信号。
如果 leader == null,则把当前线程设置为 leader,当一个线程为 leader 时,会使用 awaitNanos() 让当前线程等待接受信号,或等待 delay 时间。

DelayQueue 的其他入队方法,如 add(E e) 和 put(E e) 方法,都是调用上述 offer(E e) 方法实现的。

1.4.2 take 取出元素

take() 方法取出队列元素,当没有元素被取出时,该方法阻塞。
一开始看到全局独占锁,理所当然详情属于队列消费模式。 无法理解 “领导者-追随者模式”。take方法实现了一个“领导者-追随者模式”的线程处理方式,只有leader线程会等待指定时间后获得锁,其他线程都会进入无限期等待。 如果多个线程调用take() 方法, 当available.awaitNanos(delay);的时候, 其它线程可以抢锁进入。 下面有测试例子。源码中:java.util.concurrent.locks.AbstractQueuedLongSynchronizer.ConditionObject.await()和await(long time, TimeUnit unit); 方法 Node node = addConditionWaiter(); long savedState = fullyRelease(node); 队列状态释放
take方法主要实现逻辑为(for循环体):
1. 获取头节点对象,如果为空,线程释放锁,并进入无限期等待。等待调用offer方法,放入对象后,通过signal()方法唤醒。【看offer方法的源码】
2. 如果头节点对象不为空,获取该对象的延迟时间,如果小于0,直接从队列中取出并移除该对象,返回。
3. 如果头节点对象延迟时间大于0,判断是否“leader线程”是否已经存在,如果存在说明当前线程为“追随者线程”,进入无限期等待(等待leader线程take方法完成后,唤醒)。
4. 如果“leader线程”不存在,把当前线程设置为“leader线程”,释放锁并等待头节点对象的延迟时间后,重新获得锁,下次循环获取头节点对象返回。
5. finally代码块,每次leader线程执行完成take方法后,需要唤醒其他线程获得锁成为新的leader线程。

public E take() throws InterruptedException {
    // 获取全局独占锁
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            // 获取队头元素,peek 方法不会删除元素
            E first = q.peek();
            if (first == null)
                // 若队头为空,则阻塞当前线程
                available.await();
            else {
                // 否则获取队头元素的超时时间
                long delay = first.getDelay(NANOSECONDS);
                // 已超时,直接出队
                if (delay <= 0)
                    return q.poll();
                // 释放 first 的引用,避免内存泄漏
                first = null; // don't retain ref while waiting
                // leader != null 表明有其他线程在操作,阻塞当前线程
                if (leader != null)
                    available.await();
                else {
                    // leader 指向当前线程
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 超时阻塞
                        available.awaitNanos(delay);
                    } finally {
                        // 释放 leader
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // leader 为 null 并且队列不为空,说明没有其他线程在等待,那就通知条件队列
        if (leader == null && q.peek() != null)
            available.signal();
        // 释放全局独占锁    
        lock.unlock();
    }
}

Condition.await() 和Condition.await(100, TimeUnit.SECONDS); 方法进入等待时候,其它线程可以抢抢到锁

package com.lvyuanj.test.timer;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j
public class TestConditionAwait {

    private static ReentrantLock lock = new ReentrantLock();


    public static void main(String[] args) throws InterruptedException {
        final Condition condition = lock.newCondition();
        final Thread thread1 = new Thread(new Runnable() {
            @Override
            public void run() {
                lock.lock();
                try {
                    Thread.currentThread().setName("ConditionAwait");
                    log.error(Thread.currentThread().getName() + " beforeAwaitTime:" + System.currentTimeMillis());
                    condition.await(100, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    log.error(Thread.currentThread().getName() + " finishAwaitTime:" + System.currentTimeMillis());
                } finally {
                    lock.unlock();
                    log.error(Thread.currentThread().getName() + " unlockTime:" + System.currentTimeMillis());
                }
            }
        });
        Thread thread2 = new Thread(new Runnable() {
            @Override
            public void run() {
                Thread.currentThread().setName("ConditionSignal");
                try {
                    lock.lock();
                    log.error(Thread.currentThread().getName() + " getLockTime:" + System.currentTimeMillis());
                    //thread1.interrupt();
                    long currentTime = System.currentTimeMillis();
                    while (System.currentTimeMillis() - currentTime < 8000) {
                    }
                    condition.signal();
                    log.error(Thread.currentThread().getName() + " signalTime:" + System.currentTimeMillis());
                } catch (Exception e) {
                } finally {
                    lock.unlock();
                    log.error(Thread.currentThread().getName() + " unlockTime:" + System.currentTimeMillis());
                }
            }
        });
        thread1.start();
        Thread.sleep(50);
        thread2.start();
    }
}
1.4.3 poll 取出元素

取出队头元素,当延迟队列中没有到期的元素可以取出时,返回 null。

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1634438.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

UnityWebGL获取话筒实时数据

看了木子李大佬的数字人https://digital.lkz.fit/之后&#xff0c;我也想搞一个&#xff0c;于是开始研究起来&#xff0c;先从WebGL录音开始&#xff0c;一共试了三个插件&#xff0c;个个都有问题…… 1、UnityWebGLMicrophone 用起来没啥问题&#xff0c;但是只能录音&#…

【百度Apollo】探索自动驾驶:Apollo 新版本 Beta 全新的Dreamview+,便捷灵活更丰富

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏: 《linux深造日志》《粉丝福利》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 文章目录 引入一、Dreamview介绍二、Dreamview 新特性2.1、基于模式的多场景——流程更简洁地图视角调节&#xff1a;调试流…

创业两个月以来我的思想感悟和日志记录

截止目前&#xff0c;出来创业差不多两个月时间了&#xff0c;写篇文章记录总结一下吧&#xff0c;给大家讲讲这两个来&#xff0c;我的感受和心路历程吧。 先来说说我为什么要出来创业&#xff0c;在如今市场环境这么差的情况下&#xff0c;很多身边的朋友都找不到工作&#…

财报解读:汽车销售基本盘,承载了特斯拉的“高科技梦”

“即使外星人明天绑架了我&#xff0c;特斯拉也要解决掉自动驾驶问题。”在自动驾驶业务布局上&#xff0c;马斯克的决心坚定。 不过&#xff0c;想要做好自动驾驶&#xff0c;马斯克可能还需解决最紧要的业绩问题。日前&#xff0c;特斯拉正式发布了其2024年第一季度财报&…

salesforce 如何访问lwc组件

访问lwc有哪些途径呢? Action ButtonTabAura use lwc(拓展)如何区分是新建页面还是编辑页面 Action Button xml文件中要配置tab<?xml version"1.0" encoding"UTF-8"?> <LightningComponentBundle xmlns"http://soap.sforce.com/2006/04/…

AI小白使用Macbook Pro安装llama3与langchain初体验

1. 背景 AI爆火了2年有余&#xff0c;但我仍是一个AI小白&#xff0c;最近零星在学&#xff0c;随手记录点内容供自己复习。 上次在Macbook Pro上安装了Stable Diffusion&#xff0c;体验了本地所心所欲地生成各种心仪的图片&#xff0c;完全没有任何限制的惬意。今天想使用M…

Pandas数据可视化 - Matplotlib、Seaborn、Pandas Plot、Plotly

可视化工具介绍 让我们一起探讨Matplotlib、Seaborn、Pandas Plot和Plotly这四个数据可视化库的优缺点以及各自的适用场景。这有助于你根据不同的需求选择合适的工具。 1. Matplotlib 优点: 功能强大&#xff1a;几乎可以用于绘制任何静态、动画和交互式图表。高度可定制&a…

《HCIP-openEuler实验指导手册》1.6 Apache静态资源配置

知识点 常用用途&#xff1a; 软件仓库镜像及提供下载服务&#xff1a; 配置步骤 删除网站主目录中的文件&#xff08;本实验机目录为/home/source ip为192.168.12.137 端口为81&#xff09; cd /home/source rm -rf *在主目录中新建6个文件夹如下图 mkdir test{1..6}新建…

深入浅出一文图解Vision Mamba(ViM)

文章目录 引言&#xff1a;Mamba第一章&#xff1a;环境安装1.1安装教程1.2问题总结1.3安装总结 第二章&#xff1a;即插即用模块2.1模块一&#xff1a;Mamba Vision代码&#xff1a;models_mamba.py运行结果 2.2模块二&#xff1a;MambaIR代码&#xff1a;MambaIR运行结果 第三…

【MyBatis】进阶使用 (动态SQL)

动态SQL \<if>\<trim>\<where>\<set>\<foreach>\<include> 在填写表单时&#xff0c;有些数据是非必填字段&#xff08;例如性别&#xff0c;年龄等字段&#xff09;&#xff0c;那就需要在接收到参数时判断&#xff0c;根据参数具体的情况…

ROS2 学习笔记(二)常用小工具

1. rqt_console #启动 ros2 run rqt_console rqt_console日志级别&#xff1a;Fatal --> Error --> Warn --> Info --> Debug #修改允许发布的日志级别 ros2 run <package_name> <executable_name> --ros-args --log-level WARN2. launch文件 ROS2中…

TMS320F280049 EQEP模块--QCAP(3)

功能框图 如上图所示&#xff0c;QCAP的核心功能块是CTCU捕获事件控制单元。CTCU以CAPCLK为时钟来计数&#xff0c;在UPEVNT事件时QCTMR值会锁存到QCPRD并重置。此时软件可以读取该QCPRD来计算速度。 速度计算公式 公式 QCAP主要为了在低速模式下使用&#xff0c;速度计算公…

49. 【Android教程】HTTP 使用详解

在你浏览互联网的时候&#xff0c;绝大多数的数据都是通过 HTTP 协议获取到的&#xff0c;也就是说如果你想要实现一个能上网的 App&#xff0c;那么就一定会和 HTTP 打上交道。当然 Android 发展到现在这么多年&#xff0c;已经有很多非常好用&#xff0c;功能非常完善的网络框…

无人机+低空经济:释放中国低空经济动力的必要条件

无人机与低空经济的结合&#xff0c;对于释放中国低空经济动力具有重要的意义。无人机作为低空经济的重要组成部分&#xff0c;可以为低空经济提供新的动力和发展方向。以下是无人机与低空经济结合释放中国低空经济动力的必要条件&#xff1a; 1. 无人机技术的不断发展和创新&a…

InternVL——GPT-4V 的开源替代方案

您的浏览器不支持 video 标签。 在人工智能领域&#xff0c;InternVL 无疑是一颗耀眼的新星。它被认为是最接近 GPT-4V 表现的可商用开源模型&#xff0c;为我们带来了许多惊喜。 InternVL 具备强大的功能&#xff0c;不仅能够处理图像和文本数据&#xff0c;还能精妙地理解…

神之浩劫2测试预约 神之浩劫2怎么预约测试资格教程

在备受赞誉的第三人称动作MOBA经典《神之浩劫》的荣耀轨迹上&#xff0c;其续集《神之浩劫2》即将于5月3日&#xff08;北京时间&#xff09;启幕Alpha测试阶段&#xff0c;首度揭露其神秘面纱&#xff0c;届时&#xff0c;14位英勇无畏的英雄将迎接被甄选玩家的驾驭与探索。此…

后端如何处理接口的重复调用

首先是&#xff0c;原理在请求接口之前&#xff0c;使用过滤器拦截数据&#xff0c;来进行判断两次数据是否一致。 1.自定义注解 2.创建一个Handler处理器 3.RepeatSubmitInterceptor的实现类 4.过滤器的配置

JavaEE技术之MySql高级(索引、索引优化、sql实战、View视图、Mysql日志和锁、多版本并发控制)

文章目录 1. MySQL简介2. MySQL安装2.1 MySQL8新特性2.2 安装MySQL2.2.1 在docker中创建并启动MySQL容器&#xff1a;2.2.2 修改mysql密码2.2.3 重启mysql容器2.2.4 常见问题解决 2.3 字符集问题2.4 远程访问MySQL(用户与权限管理)2.4.0 远程连接问题1、防火墙2、账号不支持远程…

场外期权交易合法吗?参与场外期权交易需要符合哪些规定?

场外期权交易是合法的金融交易方式&#xff0c;且得到了相应监管部门的支持和规范。它是一种新型的期权交易方式&#xff0c;具有灵活性高、可以满足特定投资者需求的特点。 文章来源/&#xff1a;股指研究院 场外期权是私下协商的&#xff0c;交易双方可根据个人预期、风险承…

中国移动算网大脑智能升级,助力移动云由云向算新启航!

2024中国移动算力网络大会于4月28日在苏州正式拉开帷幕。中国移动发布全面智能化升级的算网大脑&#xff0c;以“人工智能”赋能算网一体化调度&#xff0c;推动算力网络点亮AI新时代。 会上&#xff0c;中国移动云能力中心副总经理孙少陵发表题为《算网大脑&#xff0c;助力移…