Java并发编程 —— 延迟队列DelayQueue源码解析

news2024/10/6 22:27:14

一、什么是DelayQueue

DelayQueue是一个支持并发的无界延迟队列,队列中的每个元素都有个预定时间,当线程从队列获取元素时,只有到期元素才会出队列,没有到期元素则阻塞等待。队列头元素是最快要到期的元素。因此DelayQueue可用于实现定时任务队列。

DelayQueue中的主要成员变量和方法如下:
在这里插入图片描述
q:使用优先队列PriorityQueue存储数据,队列中的元素需实现Delayed接口,实现getDelay()和compareTo()方法,以实现优先队列内部的优先级比较,剩余到期时间越短的元素优先级越高

public interface Delayed extends Comparable<Delayed> {
	//获取元素剩余到期时间
    long getDelay(TimeUnit unit);
}

lock:使用ReentrantLock对插入和读取队列元素的方法进行加锁,以实现多线程并发读写队列操作的同步。
available:用一个条件等待队列存放等待获取到期元素的线程。
leader:用于表示当前正在等待获取队头元素的线程,这里使用了一个Leader-Follower模式的变体,线程获取完元素后从等待队列中选择一个线程成为leader继续等待获取队头元素,以避免不必要的竞争消耗。

Leader-Follower模式
在并发IO中,当一个线程收到IO事件后,会考虑启动一个新的线程去处理,而自己继续等待下一个请求。但这里可能会有性能问题,就是把工作交给别一个线程的时候需上下文切换,包括数据拷贝。
而在Leader-Follower模式中所有线程会有三种身份中的一种:leader和follower,以及一个干活中的状态:proccesser。它的基本原则就是,永远最多只有一个leader。而所有follower都在等待成为leader。线程池启动时会自动产生一个Leader负责等待事件,当有一个事件产生时,Leader线程首先通知一个Follower线程将其提拔为新的Leader,然后自己去处理这个事件,处理完毕后加入Follower线程等待队列,等待下次成为Leader。这种方法可以增强CPU高速缓存相似性,及消除动态内存分配和线程间的数据交换。
参考:Leader/Follower多线程网络模型介绍

二、主要方法源码解析

1. offer()

插入元素到队列。首先获取锁,拿到锁后向优先队列中插入元素,若插入完毕后发现队头元素就是自己,即最近到期时间的元素就是自己,刷新了记录,那就赶紧从等待队列中通知一个线程准备来获取这个元素,然后释放锁。

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

这里为什么要将leader先置为null?
因为如果此时leader线程在超时等待获取前任队头元素,而signal通知了另一个线程,看完take()的源码可以知道如果有leader线程,那么此线程会直接阻塞等待,让leader线程超时完后获取队头,那显然时间就不正确了,只有将leader设为null,后续线程才能成为leader并设置正确的超时时间来等待获取最新队头元素
因此,leader变量的真正含义是:超时等待获取队列最新队头元素的线程,等待的时间即为最新队头元素剩余到期时间
因此,当队头元素发生变动(插入/删除更新)时,就需要唤醒一个线程更新leader

2. take()

获取优先队列队头元素。首先获取锁,拿到锁后进入一个循环,首先检测队头元素,若为空则进入等待队列阻塞等待,若不为空且队头元素已到期则直接将其出队返回,如果还没到期就看有没有线程已经在准备获取队头元素了,如果有就不用抢了,进入等待队列阻塞等待,如果没有就超时等待准备获取队头元素,被唤醒后进入下一次循环获取队头元素。获取完毕后就从等待队列中通知一个线程准备获取队头元素然后释放锁。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
        	//从优先堆中获取堆顶元素,即优先级最高,即预定时间最近的元素
            E first = q.peek();
            if (first == null)
				//若队列中无元素则直接进入条件队列等待
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                //若堆顶元素已经到期,则直接将其出队返回
                if (delay <= 0)
                    return q.poll();
				//等待期间不持有元素引用,防止该元素被其他线程出队消费后,仍不能被垃圾回收
                first = null; // don't retain ref while waiting
                if (leader != null)
                	//若已经有leader了,则进入条件队列无限期等待
                    available.await();
                else {
                	//否则成为leader进入条件队列超时等待,到预期时间或者有更近时间元素插入就到同步队列竞争锁,再重复循环去取堆顶元素
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
    	//取完元素后若leader为null且队列中还有元素则从条件等待队列通知一个线程到同步队列
    	//为什么存在leader不为null的情况:leader线程从awaitNanos()中结束后没有竞争过新进take()的线程,因此继续在同步队列中被阻塞,因此无需再从条件等待队列中通知线程,直接让leader线程再去竞争锁,
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();//释放锁资源让同步队列中的线程竞争锁
    }
}

Leader-Follower模式在这里的作用在于,在队头元素还没到期的情况下,只需要有一个线程(leader)超时等待,其余线程进来后发现已经有leader了,就直接无限等待就行了,避免了无意义的超时等待和竞争消耗。

参考:What exactly is the leader used for in DelayQueue?

3. poll()

加锁获取并移除队头过期元素,如果没有过期元素则不等待直接返回 null。

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}

4. size()

加锁获取队列当前剩余元素个数

public int size() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return q.size();
    } finally {
        lock.unlock();
    }
}

三、使用案例

如下使用案例,首先向DelayQueue插入5个定时任务,然后用3个线程并发读取

public class DelayQueueTest {

    //队列元素类
    static class DelayTask implements Delayed {

        long exeTime;//预定执行时间

        public DelayTask(long exeTime) {
            this.exeTime = exeTime;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(this.exeTime - System.currentTimeMillis(), unit);
        }

        @Override
        public int compareTo(Delayed o) {
            long delta = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
            return (int) delta;
        }
    }

    public static void main(String[] args) {
        DelayQueue<DelayTask> delayQueue = new DelayQueue<>();
        for (int i = 1;i <= 5;i++) {
            delayQueue.offer(new DelayTask(System.currentTimeMillis() + new Random().nextInt(10)*1000));
        }
        for (int i = 1;i <= 3;i++) {
            new Thread(() -> {
                try {
                    while (true) {
                        DelayTask task = delayQueue.take();
                        System.out.printf("取出任务!取出时间:%s 任务预定执行时间:%s%n", hms(System.currentTimeMillis()), hms(task.exeTime));
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }

    public static String hms(long milliseconds) {
        return new SimpleDateFormat("HH:mm:ss").format(milliseconds);
    }
}

运行结果:

取出任务!取出时间:10:27:39 任务预定执行时间:10:27:39
取出任务!取出时间:10:27:39 任务预定执行时间:10:27:39
取出任务!取出时间:10:27:40 任务预定执行时间:10:27:40
取出任务!取出时间:10:27:42 任务预定执行时间:10:27:42
取出任务!取出时间:10:27:46 任务预定执行时间:10:27:46

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/442080.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[java聊天室]多个客户端与服务器说话多线程(二)

多客户端链接 之前(java聊天室一)只有第一个连接的客户端可以与服务端说话。 原因: 服务端只调用过一次accept方法&#xff0c;因此只有第一个客户端链接时服务端接受了链接并返回了Socket,此时可以与其交互。 而第二个客户端建立链接时&#xff0c;由于服务端没有再次调用…

【Hello Linux】线程池

作者&#xff1a;小萌新 专栏&#xff1a;Linux 作者简介&#xff1a;大二学生 希望能和大家一起进步 本篇博客简介&#xff1a;简单介绍linux中线程池概念 线程池 Linux线程池线程池的概念线程池的优点线程池的应用场景线程池实现 Linux线程池 线程池的概念 线程池是一种线程…

PyTorch深度学习实战 | 高斯混合模型聚类原理分析

01、问题描述 为理解高斯混合模型解决聚类问题的原理&#xff0c;本实例采用三个一元高斯函数混合构成原始数据&#xff0c;再采用GMM来聚类。 1) 数据 三个一元高斯组件函数可以采用均值和协方差表示如表1所示&#xff1a; ▍表1 三个一元高斯组件函数的均值和协方差 每个高斯…

git的使用——操作流程

一、什么是git git是一个开源的分布式版本控制软件&#xff0c;能够有效并高效的处理很小到非常大的项目。 二、添加SSH公钥 安装下载后&#xff0c;会发现鼠标右击&#xff0c;会出现 Git Bash Here 这个选项&#xff0c;如图所示&#xff0c;点击进入 1.打开git窗口后&…

018:Mapbox GL加载Google地图(影像瓦片图)

第018个 点击查看专栏目录 本示例的目的是介绍演示如何在vue+mapbox中加载google地图。 直接复制下面的 vue+mapbox源代码,操作2分钟即可运行实现效果 文章目录 示例效果配置方式示例源代码(共80行)相关API参考:专栏目标示例效果 配置方式 1)查看基础设置:https://xia…

鉴智机器人重磅发布双目智驾解决方案,新一代全系智驾产品线亮相上海车展

4月18日&#xff0c;以「拥抱汽车行业新时代」为主题的2023上海车展正式拉开帷幕。以视觉3D理解为核心的下一代自动驾驶系统提供商鉴智机器人&#xff0c;携全新升级的智驾产品线首次亮相车展&#xff0c;重磅发布基于AI的双目立体视觉智驾方案。 凭借双目立体视觉系统的差异化…

智能洗地机好用吗?值得入手的洗地机推荐

洗地机是一款高效的地面清洁设备&#xff0c;不仅可以很好清理地面不同形态的干湿垃圾&#xff0c;还减少了人工和水资源的浪费&#xff0c;是我们日常生活中必不可少的清洁工具。作为以一位评测博主&#xff0c;很多朋友咨询我在选购洗地机时应该注意哪些要点&#xff0c;有哪…

记一次生产要我狗命的问题

问题起因&#xff1a;引入disruptor框架 简单理解就是生产消费者模式 用来支持高并发 先说问题和改正 再展开 问题&#xff1a;没有当时的截图了 直接描述吧 问题就是cpu占用过高 居高不下的那种 排查&#xff1a;就是看线程名字和占用的大概 再根据近期发布的东西 再根据本地…

学系统集成项目管理工程师(中项)系列08b_合同管理(下)

1. 项目变更约定 1.1. 合同生效后&#xff0c;当事人不得因姓名、名称的变更或者法定代表人、负责人、承办人的变动而不履行合同义务 2. 违约责任的承担方式 2.1. 继续履行 2.2. 采取补救措施 2.3. 赔偿损失 2.4. 支付约定违约金或定金 3. 注意事项 3.1. 当事人的法律资…

Linux FTP服务

FTP服务 作用 传输文件 端口 FTP服务器默认使用TCP协议的20、21端口与客户端进行通信 20端口用于建立数据连接&#xff0c;并传输文件数据 21端口用于建立控制连接&#xff0c;并传输FTP控制命令 模式 FTP数据连接分为主动模式和被动模式 主动模式&#xff1a;客户端告诉服务端…

电路原理-反激式电路

1、1反激式电路是小功率电源(150W以下)当中&#xff0c;最常用的电路&#xff0c;它的工作原理如下。 1、2如图1&#xff0c;变压器T1&#xff0c;标记红点的端&#xff0c;12、3、A为同名端&#xff0c;10、1、B为异名端。 当MOS管导通的时候&#xff0c;初级绕组N1、…

瑞吉外卖LinuxRedis

1、linux简介 Linux系统版本 Linux系统分为内核版和发行版 内核版&#xff1a; 由LinusTorvalds及其团队开发、维护 免费、开源 负责控制硬件 发行版&#xff1a; 基于Linux内核版进行扩展 由各个Linux厂商开发、维护 有收费…

C++类的理解与类型名,类的成员,两种定义方式,类的访问限定符,成员访问,作用域与实例化对象

面向过程和面向对象初步认识 C语言是面向过程的&#xff0c;关注的是过程&#xff0c;分析出求解问题的步骤&#xff0c;通过函数调用逐步解决问题 C是基于面向对象的&#xff0c;关注的是对象&#xff0c;将一件事情拆分成不同的对象&#xff0c;靠对象之间的交互完成 面向…

ETCD(三)操作指令

1. put put #将给定的key写入到存储 --ignore-lease[false] #使用当前租约更新key --ignore-value[false] #使用当前值更新key --lease"0" # 要附加到key的租约ID&#xff08;十六进制&#xff09; --prev-kv[false] # 返回修改前的上一个键值对2. get get #获取给…

无线洗地机哪款性价比高?高性价比的洗地机分享

虽说现在市面上清洁工具很多&#xff0c;但是要说清洁效果最好的&#xff0c;肯定非洗地机莫属。它集合了吸&#xff0c;洗&#xff0c;拖三大功能&#xff0c;干湿垃圾一次清理&#xff0c;还能根据地面的脏污程度进行清洁&#xff0c;达到极致的清洁效果&#xff0c;省时省力…

4月21日第壹简报,星期五,农历三月初二

4月21日第壹简报&#xff0c;星期五&#xff0c;农历三月初二坚持阅读&#xff0c;静待花开1. 推特拒向大模型免费开放数据&#xff01;马斯克威胁起诉微软&#xff1b;Reddit宣布不再向大模型免费开放数据&#xff0c;要求科技巨头付费使用API接口。2. 浙江&#xff1a;鼓励杭…

【JUC】Java并发机制的底层实现原理

【JUC】Java并发机制的底层实现原理 参考资料&#xff1a; CPU 缓存一致性 《Java并发编程的艺术》 【JUC并发编程】CAS到底加不加锁&#xff1f; 如何写出让 CPU 跑得更快的代码&#xff1f; 彻底理解Java并发编程之Synchronized关键字实现原理剖析 【JUC并发编程】Synchroni…

线程安全版本的单例设计模式 与 生产者消费者模型简介

目录 单例设计模式 单例设计模式——饿汉式 单例设计模式——懒汉式 单例设计模式——懒汉式&#xff08;优化步骤&#xff09; 生产者消费者模型 介绍 优点 补充&#xff1a;关于阻塞队列 单例设计模式 单例设计模式能够保证某个类的实例在程序运行过程中始终都只会存…

代码随想录Day57

1143.最长公共子序列 给定两个字符串 text1 和 text2&#xff0c;返回这两个字符串的最长 公共子序列 的长度。如果不存在 公共子序列 &#xff0c;返回 0 。 一个字符串的 子序列 是指这样一个新的字符串&#xff1a;它是由原字符串在不改变字符的相对顺序的情况下删除某些字…

Cesium 实战-最新版(1.104.0)通过异步方式初始化地球,加载影像以及高程图层

Cesium 实战-最新版&#xff08;1.104.0&#xff09;通过异步方式初始化地球&#xff0c;加载影像以及高程图层 遇到问题初始化底图初始化高程&#xff08;监听载入完成事件&#xff0c;开启关闭高程&#xff09;初始化 3dtile Cesium 最新版&#xff08;1.104.0&#xff09;变…