DelayQueue 使用和延时功能源码分析

news2025/1/18 9:10:36

DelayQueue 延迟队列使用和延时功能源码分析,先看DelayQueue 的使用

目录

1、基本使用

2、延时功能源码分析

3、总结


1、基本使用

想要实现延时功能,需要实现 Delayed 接口,重写 getDelay 方法,在 getDelay 方法里返回延时时间

笔者定义一个 Order 类

在构造函数中传入延时的时间

package com.wsjzzcbq.java.queue;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * Order
 *
 * @author wsjz
 * @date 2023/09/22
 */
public class Order implements Delayed {

    /**
     * 延时时长
     */
    private long time;
    /**
     * 延时开始时间
     */
    private long start = System.currentTimeMillis();

    public Order(long time) {
        this.time = time;
    }

    public Order(long time, long start) {
        this.time = time;
        this.start = start;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert((start + time) - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        Order order = (Order)o;
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - order.getDelay(TimeUnit.MILLISECONDS));
    }
}

延时队列使用

(1)、以现在时间为开始时间,延时获取

package com.wsjzzcbq.java.queue;

import java.time.LocalDateTime;
import java.util.concurrent.DelayQueue;

/**
 * DelayQueueLearn
 *
 * @author wsjz
 * @date 2023/09/22
 */
public class DelayQueueLearn {

    public static void main(String[] args) throws InterruptedException {
        DelayQueue<Order> delayQueue = new DelayQueue<>();
        Order order = new Order(5*1000);

        System.out.println(LocalDateTime.now());
        delayQueue.add(order);
        Order order1 = delayQueue.take();
        System.out.println(LocalDateTime.now());
        System.out.println(order1);
    }
}

延时 5 秒钟才能获取

测试运行

添加到队列后5秒钟,获取数据

(2)、以指定时间为开始时间,延时获取

以当前时间加 5 秒为开始时间,延时 5 秒钟获取

package com.wsjzzcbq.java.queue;

import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.concurrent.DelayQueue;

/**
 * DelayQueueLearn
 *
 * @author wsjz
 * @date 2023/09/22
 */
public class DelayQueueLearn {

    public static void main(String[] args) throws InterruptedException {
        DelayQueue<Order> delayQueue = new DelayQueue<>();
        
        //当前时间加5秒为开始时间
        LocalDateTime localDateTime = LocalDateTime.now().plusSeconds(5);
        long start = localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();
        System.out.println(start);
        Order order = new Order(5*1000, start);

        System.out.println(LocalDateTime.now());
        delayQueue.add(order);
        Order order1 = delayQueue.take();
        System.out.println(LocalDateTime.now());
        System.out.println(order1);
    }
}

测试运行

一共延时 10 秒钟

2、延时功能源码分析

DelayQueue 是基于 PriorityQueue(优先队列)实现的,PriorityQueue 默认是最小堆结构

我们先看 add 添加方法

    /**
     * Inserts the specified element into this delay queue.
     *
     * @param e the element to add
     * @return {@code true} (as specified by {@link Collection#add})
     * @throws NullPointerException if the specified element is null
     */
    public boolean add(E e) {
        return offer(e);
    }

offer 方法

    /**
     * Inserts the specified element into this delay queue.
     *
     * @param e the element to add
     * @return {@code true}
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

先获取锁,然后调用 PriorityQueue 的 offer 方法,如果此时 PriorityQueue 的头部元素是新添加的元素,则 leader = null,并唤醒等待线程;否则直接返回 true

因为这里的 PriorityQueue 是最小堆结构,所以它能保证延时时间最小的元素最先出队(添加进去的元素 Order 对象实现了 compareTo 方法)

PriorityQueue 的 offer 方法

    /**
     * Inserts the specified element into this priority queue.
     *
     * @return {@code true} (as specified by {@link Queue#offer})
     * @throws ClassCastException if the specified element cannot be
     *         compared with elements currently in this priority queue
     *         according to the priority queue's ordering
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        modCount++;
        int i = size;
        if (i >= queue.length)
            grow(i + 1);
        size = i + 1;
        if (i == 0)
            queue[0] = e;
        else
            siftUp(i, e);
        return true;
    }

如果超出容量的话,调用 grow 方法扩容

如果是首次添加的话放在数组索引是0的首位

如果队列中有元素的话,调用 siftUp 方法添加

grow 方法

PriorityQueue 基于数组实现

    /**
     * Increases the capacity of the array.
     *
     * @param minCapacity the desired minimum capacity
     */
    private void grow(int minCapacity) {
        int oldCapacity = queue.length;
        // Double size if small; else grow by 50%
        int newCapacity = oldCapacity + ((oldCapacity < 64) ?
                                         (oldCapacity + 2) :
                                         (oldCapacity >> 1));
        // overflow-conscious code
        if (newCapacity - MAX_ARRAY_SIZE > 0)
            newCapacity = hugeCapacity(minCapacity);
        queue = Arrays.copyOf(queue, newCapacity);
    }

siftUp 方法

    /**
     * Inserts item x at position k, maintaining heap invariant by
     * promoting x up the tree until it is greater than or equal to
     * its parent, or is the root.
     *
     * To simplify and speed up coercions and comparisons. the
     * Comparable and Comparator versions are separated into different
     * methods that are otherwise identical. (Similarly for siftDown.)
     *
     * @param k the position to fill
     * @param x the item to insert
     */
    private void siftUp(int k, E x) {
        if (comparator != null)
            siftUpUsingComparator(k, x);
        else
            siftUpComparable(k, x);
    }

默认 comparator 是 null,调用 siftUpComparable 方法

    private void siftUpComparable(int k, E x) {
        Comparable<? super E> key = (Comparable<? super E>) x;
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = queue[parent];
            if (key.compareTo((E) e) >= 0)
                break;
            queue[k] = e;
            k = parent;
        }
        queue[k] = key;
    }

siftUpComparable 方法会进行比较,保证延时时间最小的元素在最上面

然后我们直接看 take 方法

    /**
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element with an expired delay is available on this queue.
     *
     * @return the head of this queue
     * @throws InterruptedException {@inheritDoc}
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                //队列头部元素
                E first = q.peek();
                if (first == null)
                    //如果头部元素是null 则让当前线程等待
                    available.await();
                else {
                    //头部元素不为空,获取延时时间
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        //延时时间小于等于0,出队返回
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            //让线程等待延时时间
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

相关说明在代码注释中,先看队列头部元素是不是null,如果是说明当前队列为空,让线程等待;如果不为空,看头部元素延时时间,如果延时时间小于等于0,则出队返回,leader 默认是null,因此线程等待延时时间的时长,等待时间到达后,重新开始循环,此时延时时间小于等于0,出队返回,达到延时效果

关于leader 的分析,leader 这里使用了 Leader-Follower 模式的变体

    /**
     * Thread designated to wait for the element at the head of
     * the queue.  This variant of the Leader-Follower pattern
     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
     * minimize unnecessary timed waiting.  When a thread becomes
     * the leader, it waits only for the next delay to elapse, but
     * other threads await indefinitely.  The leader thread must
     * signal some other thread before returning from take() or
     * poll(...), unless some other thread becomes leader in the
     * interim.  Whenever the head of the queue is replaced with
     * an element with an earlier expiration time, the leader
     * field is invalidated by being reset to null, and some
     * waiting thread, but not necessarily the current leader, is
     * signalled.  So waiting threads must be prepared to acquire
     * and lose leadership while waiting.
     */
    private Thread leader = null;

假设没有 leader,现在有2个线程,线程A 和线程B,线程A 和线程B 都会执行 available.awaitNanos(delay) 进行等待,等待时间结束后,线程A 和线程B中只有一个能拿到元素返回,另外一个将重新等待,对于没拿到元素的线程来说一开始等待,之后等待结束被唤醒,最后再次等待,是一种资源浪费,不如一开始就让它一直等待(如果它不是leader的话)

leader 更详细的分析:https://stackoverflow.com/questions/48493830/what-exactly-is-the-leader-used-for-in-delayqueue

3、总结

DelayQueue 内部基于优先队列 PriorityQueue(最小堆结构)实现延时时间小的元素总是先出队。延时功能是通过循环加线程等待的方式实现的,先判断 PriorityQueue 中延时时间最小的元素的延时时间是否小于等于0,如果是则直接出队返回;否则让线程等待延时的时长,等待结束后,开始新一轮循环,这时延时时间肯定是小于等于0的,出队返回,达到延时的效果

至此完

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1047089.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

关于对象数组的一些方法总结

关于数组对象的一些方法总结 一、判断对象数组是否重复二、复制对象数组 一、判断对象数组是否重复 根据某个属性判断对象数组中&#xff0c;是否含有该属性重复的对象。 let objectArr [{id: 1, name: 2, age: 3}, {id: 2, name: 3, age:24},{id: 1, name: 5, age:6} ]func…

已实现:关于富文本组件库vue2-editor的使用方法以及一些必要的注意事项,特别是设置完富文本以后的回显问题。以及在光标位置插入字符串等

前言 目前常见的基于vue的富文本编辑器有两个&#xff1a;“vue2-editor” 和 “vue-quill-editor” 都是用于Vue.js的富文本编辑器组件&#xff0c;它们具有一些共同的特点&#xff0c;但也有一些区别。 共同点&#xff1a; 1、富文本编辑功能&#xff1a; 两者都提供了富文…

【数据中台建设系列之一】数据中台-元数据管理

本编文章主要介绍数据中台核心模块—元数据模块的一些建设经验分享&#xff0c;供大家一起交流学习。 一、什么是元数据 ​ 元数据可以简单理解为是数据的"数据"&#xff0c;它描述了数据的特征&#xff0c;属性&#xff0c;来源和其他一些数据的基本信息&#xff0…

手机上记录的备忘录内容怎么分享到电脑上查看?

手机已经成为了我们生活中不可或缺的一部分&#xff0c;我们用它来处理琐碎事务&#xff0c;记录生活点滴&#xff0c;手机备忘录就是我们常用的工具之一。但随着工作的需要&#xff0c;我们往往会遇到一个问题&#xff1a;手机上记录的备忘录内容&#xff0c;如何方便地分享到…

基于微信小程序的停车场预约收费小程序设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言系统主要功能&#xff1a;具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序&#xff08;小蔡coding&#xff09;有保障的售后福利 代码参考源码获取 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计…

数字文创产权版权登记保护多少重要

数字文创产权版权登记保护多少重要 在如今数字化时代&#xff0c;创作者们的作品往往以数字形式存储和传播&#xff0c;这给他们带来了无限的机遇&#xff0c;同时也伴随着一些挑战。在这个环境下&#xff0c;数字文创产权的版权登记显得尤为重要。它不仅能够保护创作者的利益&…

冲刺十五届蓝桥杯P0001阶乘求和

文章目录 题目描述思路分析代码解析 题目描述 思路分析 阶乘是蓝桥杯中常考的知识。 首先我们需要知道 int 和long的最大值是多少。 我们可以知道19的阶乘就已经超过了long的最大值&#xff0c;所以让我们直接计算202320232023&#xff01;的阶乘是不现实的。 所以我们需要…

【逆向】在程序空白区添加Shellcode

目录 硬编码 内存对齐和文件对齐 节表 实战 滴水逆向03-17 #include <windows.h>LRESULT CALLBACK WndProc(HWND hwnd, UINT msg, WPARAM wParam, LPARAM lParam);int WINAPI WinMain(HINSTANCE hInstance, HINSTANCE hPrevInstance, PSTR szCmdLine, int iCmdShow) …

Springboot和Vue:六、ORM介绍+MybatisPlus快速上手

ORM和Mybatis(Plus)介绍 ORM&#xff08;Object Relational Mapping 对象关系映射&#xff09;即映射程序中的对象和数据库中的数据。 MyBatis是一款优秀的数据持久层ORM框架&#xff0c;能够非常灵活地实现动态SQL&#xff0c;可以使用XML或注解来配置和映射原生信息&#xf…

联想笔记本重装系统Win10步骤和详细教程

我们给笔记本电脑重装系统可以解决运行缓慢、出现错误提示等问题&#xff0c;恢复笔记本电脑的流畅运行状态。但是&#xff0c;很多使用联想笔记本电脑的用户不知道重装系统Win10的具体步骤&#xff0c;下面小编给大家介绍关于重装联想笔记本电脑Win10系统的方法步骤吧。 推荐下…

JAMA子刊:最新研究发现腰臀比更能预测死亡

肥胖会危害健康已经称为一个共识&#xff0c;评价肥胖的指标也有多种&#xff0c;例如体质指数&#xff08;BMI)、腰围等。比如BMI大于24为超重&#xff0c;大于28为肥胖&#xff1b;男性腰围≥90cm&#xff0c;女性腰围≥85cm为中心性肥胖。 这些肥胖界值的确定&#xff0c;主…

自制网页。

文章目录 注:代码中图片等素材均来自网络,侵删 20230920_213831 index.html <!DOCTYPE html> <html lang="en"> <head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-…

Java应用生产Full GC或者OOM问题如何定位

1 引言 生产应用服务频繁Full GC却无法释放内存&#xff0c;甚至可能OOM&#xff0c;这种情况很有可能是内存泄露或者堆内存分配不足&#xff0c;此时需要dump堆信息来定位问题&#xff0c;查看是哪些地方内存泄漏。 Dump文件也称为内存转储文件或内存快照文件&#xff0c;是…

uni-app:canvas绘制图形3

效果 代码 <template><view><!-- 创建了一个宽度为300像素&#xff0c;高度为200像素的canvas元素。canvas-id属性被设置为"firstCanvas"&#xff0c;可以用来在JavaScript中获取该canvas元素的上下文对象。 --><canvas style"width:200…

mysql面试题3:谈谈你知道的MySQL索引?MySQL中一个表可以创建多少个列索引?MySQL索引有哪几种?他们的优缺点是什么?

该文章专注于面试,面试只要回答关键点即可,不需要对框架有非常深入的回答,如果你想应付面试,是足够了,抓住关键点 面试官:谈谈你知道的MySQL索引? MySQL索引是一种特殊的数据结构,用于加速数据库的查询操作。它通过存储列值和对应记录的指针,可以快速定位到满足查询…

计算机竞赛 深度学习卷积神经网络垃圾分类系统 - 深度学习 神经网络 图像识别 垃圾分类 算法 小程序

文章目录 0 简介1 背景意义2 数据集3 数据探索4 数据增广(数据集补充)5 垃圾图像分类5.1 迁移学习5.1.1 什么是迁移学习&#xff1f;5.1.2 为什么要迁移学习&#xff1f; 5.2 模型选择5.3 训练环境5.3.1 硬件配置5.3.2 软件配置 5.4 训练过程5.5 模型分类效果(PC端) 6 构建垃圾…

计算机网络-网络层总结

目录 网络层提供的两种服务 1、分组转发和路由选择 2、网络层提供的两种服务 网际协议IP 1、虚拟互联网 2、网际协议IP 3、异构网络互连 4、IPv4地址 概述 4.1分类编址的IPv4地址 4.2划分子网的IPv4地址 4.3无分类编址的IPv4地址 4.4IPv4地址的应用规划 4.5MAC地址…

使用 Spring Data Redis 访问 Redis 的 Spring Boot 应用

使用 Spring Data Redis 访问 Redis 的 Spring Boot 应用 Redis是一个高性能的内存键值存储数据库&#xff0c;常用于缓存、会话管理和消息队列等场景。Spring Boot通过Spring Data Redis提供了方便的方式来与Redis进行交互。在本文中&#xff0c;我们将详细介绍如何使用Sprin…

nodejs+vue 经典建筑网站elementui

第1章 项目概述 1 1.1 问题描述 1 1.2 项目目标 1 1.3 项目适用范围 2 1.4 项目应遵守的规范与标准 2 1.5 涉众 2 具有功能强大、支持跨平台、运行速度快、安全性高、成本低等优点。而对于后者我们使用 来完成它&#xff0c;使其网页功能完备&#xff0c;界面友好、易开发、易…

直方图投影法判断裂缝走势(裂缝类型)

裂缝类型 裂缝类型有很多种&#xff0c;这里我们仅仅判断线性裂缝与网状裂缝&#xff0c;线性裂缝按照其走势有可分为横向裂缝、纵向裂缝和斜向裂缝。 我觉得大家应当有这样的意识&#xff0c;面对网状裂缝&#xff0c;它的二维参数是否有意义&#xff1f;答案是没有&#xf…