第十一章 Java多线程--阻塞队列--PriorityBlockingQueue

news2025/1/11 18:45:50

目录

一、PriorityBlockingQueue基础概念

主要特点

常用方法

使用示例

二、PriorityBlockingQueue深入了解

1 PriorityBlockingQueue介绍

2 二叉堆结构介绍

3 PriorityBlockingQueue核心属性

4 PriorityBlockingQueue的写入操作

4.1 offer基本流程

4.2 offer扩容操作

4.3 offer添加数据

5 PriorityBlockingQueue的读取操作

5.1 查看获取方法流程

5.2 查看dequeue获取数据

5.3 下移做平衡操作


一、PriorityBlockingQueue基础概念

PriorityBlockingQueue 是 Java 并发包 (java.util.concurrent) 中的一个类,它是一个无界阻塞队列(实际上有一个内部容量限制,但是非常大,默认为 Integer.MAX_VALUE)。PriorityBlockingQueue 是基于优先级堆实现的,可以保证具有优先级的元素能够优先出队。它支持等待的线程获取元素,并且只有在队列为空的时候才会阻塞;同样地,当队列满的时候,添加操作也会阻塞(虽然由于它是无界的,这种情况很少发生)。

主要特点

  • 有序性:队列中的元素按照其自然排序或由提供的比较器确定的顺序进行排列。

  • 阻塞特性:当队列为空时,从队列中获取元素的操作将会阻塞,直到有新的元素加入队列。

  • 无界性:从理论上讲,PriorityBlockingQueue 是无界的,但在实际应用中会受到内存大小的限制。

  • 线程安全:PriorityBlockingQueue 是线程安全的,多个线程可以安全地访问队列。

常用方法

  • put(E e):将一个元素插入队列中,如果队列已满,则会阻塞。

  • take():从队列中取出并返回头元素,如果队列为空,则会阻塞。

  • add(E e):将一个元素插入队列中,与 put 类似,但是不会阻塞。

  • poll():从队列中取出并返回头元素,如果队列为空则返回 null

  • peek():查看队列头部的元素,但不移除它,如果队列为空则返回 null

  • isEmpty():检查队列是否为空。

  • size():返回队列中的元素数量。

使用示例

import java.util.concurrent.PriorityBlockingQueue;

public class PriorityBlockingQueueExample {
    public static void main(String[] args) {
        // 创建一个基于自然排序的 PriorityBlockingQueue
        PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();

        // 添加元素到队列
        queue.add(5);
        queue.add(1);
        queue.add(3);

        // 输出队列中的元素
        while (!queue.isEmpty()) {
            System.out.println(queue.take());
        }
    }
}

请注意,在使用 PriorityBlockingQueue 时,需要确保放入队列的对象是可比较的(实现了 Comparable 接口),或者提供一个合适的比较器来避免 ClassCastException

二、PriorityBlockingQueue深入了解

1 PriorityBlockingQueue介绍

首先PriorityBlockingQueue是一个优先级队列,他不满足先进先出的概念。

会将查询的数据进行排序,排序的方式就是基于插入数据值的本身。

如果是自定义对象必须要实现Comparable接口才可以添加到优先级队列

排序的方式是基于二叉堆实现的。底层是采用数据结构实现的二叉堆。

2 二叉堆结构介绍

优先级队列PriorityBlockingQueue基于二叉堆实现的。

private transient Object[] queue;

PriorityBlockingQueue是基于数组实现的二叉堆。

二叉堆是什么?

  • 二叉堆就是一个完整的二叉树。

  • 任意一个节点大于父节点或者小于父节点

  • 基于同步的方式,可以定义出小顶堆和大顶堆

小顶堆以及小顶堆基于数据实现的方式。

3 PriorityBlockingQueue核心属性

// 数组的初始长度
private static final int DEFAULT_INITIAL_CAPACITY = 11;

// 数组的最大长度
// -8的目的是为了适配各个版本的虚拟机
// 默认当前使用的hotspot虚拟机最大支持Integer.MAX_VALUE - 2,但是其他版本的虚拟机不一定。
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

// 存储数据的数组,也是基于这个数组实现的二叉堆。
private transient Object[] queue;

// size记录当前阻塞队列中元素的个数
private transient int size;

// 要求使用的对象要实现Comparable比较器。基于comparator做对象之间的比较
private transient Comparator<? super E> comparator;

// 实现阻塞队列的lock锁
private final ReentrantLock lock;

// 挂起线程操作。
private final Condition notEmpty;

// 因为PriorityBlockingQueue的底层是基于二叉堆的,而二叉堆又是基于数组实现的,数组长度是固定的,如果需要扩容,需要构建一个新数组。PriorityBlockingQueue在做扩容操作时,不会lock住的,释放lock锁,基于allocationSpinLock属性做标记,来避免出现并发扩容的问题。
private transient volatile int allocationSpinLock;

// 阻塞队列中用到的原理,其实就是普通的优先级队列。
private PriorityQueue<E> q;

4 PriorityBlockingQueue的写入操作

毕竟是阻塞队列,添加数据的操作,咱们是很了解,无法还是add,offer,offer(time,unit),put。但是因为优先级队列中,数组是可以扩容的,虽然有长度限制,但是依然属于无界队列的概念,所以生产者不会阻塞,所以只有offer方法可以查看。

这次核心的内容并不是添加数据的区别。主要关注的是如何保证二叉堆中小顶堆的结构的,并且还要查看数组扩容的一个过程是怎样的。

4.1 offer基本流程

因为add方法依然调用的是offer方法,直接查看offer方法即可

public boolean offer(E e) {
    // 非空判断。
    if (e == null)
        throw new NullPointerException();
    // 拿到锁,直接上锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    // n:size,元素的个数
    // cap:当前数组的长度
    // array:就是存储数据的数组
    int n, cap;
    Object[] array;
    while ((n = size) >= (cap = (array = queue).length))
        // 如果元素个数大于等于数组的长度,需要尝试扩容。
        tryGrow(array, cap);
    try {
        // 拿到了比较器
        Comparator<? super E> cmp = comparator;
        // 比较数据大小,存储数据,是否需要做上移操作,保证平衡的
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
            siftUpUsingComparator(n, e, array, cmp);
        // 元素个数 + 1
        size = n + 1;
        // 如果有挂起的线程,需要去唤醒挂起的消费者。
        notEmpty.signal();
    } finally {
        // 释放锁
        lock.unlock();
    }
    // 返回true
    return true;
}

4.2 offer扩容操作

在添加数据之前,会采用while循环的方式,来判断当前元素个数是否大于等于数组长度。如果满足,需要执行tryGrow方法,对数组进行扩容

如果两个线程同时执行tryGrow,只会有一个线程在扩容,另一个线程可能多次走while循环,多次走tryGrow方法,但是依然需要等待前面的线程扩容完毕。

private void tryGrow(Object[] array, int oldCap) {
    // 释放锁资源。
    lock.unlock(); 
    // 声明新数组。
    Object[] newArray = null;
    // 如果allocationSpinLock属性值为0,说明当前没有线程正在扩容的。
    if (allocationSpinLock == 0 &&
        // 基于CAS的方式,将allocationSpinLock从0修改为1,代表当前线程可以开始扩容
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,0, 1)) {
        try {
            // 计算新数组长度
            int newCap = oldCap + ((oldCap < 64) ?
                                   // 如果数组长度比较小,这里加快扩容长度速度。
                                   (oldCap + 2) : 
                                   // 如果长度大于等于64了,每次扩容到1.5倍即可。
                                   (oldCap >> 1));
            // 如果新数组长度大于MAX_ARRAY_SIZE,需要做点事了。
            if (newCap - MAX_ARRAY_SIZE > 0) {   
                // 声明minCap,长度为老数组 + 1
                int minCap = oldCap + 1;
                // 老数组+1变为负数,或者老数组长度已经大于MAX_ARRAY_SIZE了,无法扩容了。
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    // 告辞,凉凉~~~~
                    throw new OutOfMemoryError();
                // 如果没有超过限制,直接设置为最大长度即可
                newCap = MAX_ARRAY_SIZE;
            }
            // 新数组长度,得大于老数组长度,
            // 第二个判断确保没有并发扩容的出现。
            if (newCap > oldCap && queue == array)
                // 构建出新数组
                newArray = new Object[newCap];
        } finally {
            // 新数组有了,标记位归0~~
            allocationSpinLock = 0;
        }
    }
    // 如果到了这,newArray依然为null,说明这个线程没有进到if方法中,去构建新数组
    if (newArray == null) 
        // 稍微等一手。
        Thread.yield();
    // 拿锁资源,
    lock.lock();
    // 拿到锁资源后,确认是构建了新数组的线程,这里就需要将新数组复制给queue,并且导入数据
    if (newArray != null && queue == array) {
        // 将新数组赋值给queue
        queue = newArray;
        // 将老数组的数据全部导入到新数组中。
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

4.3 offer添加数据

这里是数据如何放到数组上,并且如何保证的二叉堆结构

// k:当前元素的个数(其实就是要放的索引位置)
// x:需要添加的数据
// array:数组。。
private static <T> void siftUpComparable(int k, T x, Object[] array) {
    // 将插入的元素直接强转为Comparable(com.mashibing.User cannot be cast to java.lang.Comparable)
    // 这行强转,会导致添加没有实现Comparable的元素,直接报错。
    Comparable<? super T> key = (Comparable<? super T>) x;
    // k大于0,走while逻辑。(原来有数据)
    while (k > 0) {
        // 获取父节点的索引位置。
        int parent = (k - 1) >>> 1;
        // 拿到父节点的元素。
        Object e = array[parent];
        // 用子节点compareTo父节点,如果 >= 0,说明当前son节点比parent要大。
        if (key.compareTo((T) e) >= 0)
            // 直接break,完事,
            break;
        // 将son节点的位置设置上之前的parent节点
        array[k] = e;
        // 重新设置x节点需要放置的位置。
        k = parent;
    }
    // k == 0,当前元素是第一个元素,直接插入进去。
    array[k] = key;
}

5 PriorityBlockingQueue的读取操作

读取操作是存储现在挂起的情况的,因为如果数组中元素个数为0,当前线程如果执行了take方法,必然需要挂起。

其次获取数据,因为是优先级队列,所以需要从二叉堆栈顶拿数据,直接拿索引为0的数据即可,但是拿完之后,需要保持二叉堆结构,所以会有下移操作。

5.1 查看获取方法流程

poll:

public E poll() {
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lock();
    try {
        // 拿到返回数据,没拿到,返回null
        return dequeue();
    } finally {
        lock.unlock();
    }
}

poll(time,unit):

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // 将挂起的时间转换为纳秒
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    // 允许线程中断抛异常的加锁
    lock.lockInterruptibly();
    // 声明结果
    E result;
    try {
        // dequeue是去拿数据的,可能会出现拿到的数据为null,如果为null,同时挂起时间还有剩余,这边就直接通过notEmpty挂起线程
        while ( (result = dequeue()) == null && nanos > 0)
            nanos = notEmpty.awaitNanos(nanos);
    } finally {
        lock.unlock();
    }
    // 有数据正常返回,没数据,告辞~
    return result;
}

take:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        while ( (result = dequeue()) == null)
            // 无限等,要么有数据,要么中断线程
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

5.2 查看dequeue获取数据

获取数据主要就是从数组中拿到0索引位置数据,然后保持二叉堆结构

private E dequeue() {
    // 将元素个数-1,拿到了索引位置。
    int n = size - 1;
    // 判断是不是木有数据了,没数据直接返回null即可
    if (n < 0)
        return null;
    // 说明有数据
    else {
        // 拿到数组,array
        Object[] array = queue;
        // 拿到0索引位置的数据
        E result = (E) array[0];
        // 拿到最后一个数据
        E x = (E) array[n];
        // 将最后一个位置置为null
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        // 元素个数-1,赋值size
        size = n;
        // 返回result
        return result;
    }
}

5.3 下移做平衡操作

一定要以局部的方式去查看树结构的变化,他是从跟节点往下找较小的一个子节点,将较小的子节点挪动到父节点位置,再将循环往下走,如果一来,整个二叉堆的结构就可以保证了。

// k:默认进来是0
// x:代表二叉堆的最后一个数据
// array:数组
// n:最后一个索引
private static <T> void siftDownComparable(int k, T x, Object[] array,int n) {
    // 健壮性校验,取完第一个数据,已经没数据了,那就不需要做平衡操作
    if (n > 0) {
        // 拿到最后一个数据的比较器
        Comparable<? super T> key = (Comparable<? super T>)x;
        // 因为二叉堆是一个二叉满树,所以在保证二叉堆结构时,只需要做一半就可以
        int half = n >>> 1; 
        // 做了超过一半,就不需要再往下找了。
        while (k < half) {
            // 找左子节点索引,一个公式,可以找到当前节点的左子节点
            int child = (k << 1) + 1; 
            // 拿到左子节点的数据
            Object c = array[child];
            // 拿到右子节点索引
            int right = child + 1;
            // 确认有右子节点
            // 判断左节点是否大于右节点
            if (right < n && c.compareTo(array[right]) > 0)
                // 如果左大于右,那么c就执行右
                c = array[child = right];
            // 比较最后一个节点是否小于当前的较小的子节点
            if (key.compareTo((T) c) <= 0)
                break;
            // 将左右子节点较小的放到之前的父节点位置
            array[k] = c;
            // k重置到之前的子节点位置
            k = child;
        }
        // 上面while循环搞定后,可以确认整个二叉堆中,数据已经移动ok了,只差当前k的位置数据是null
        // 将最后一个索引的数据放到k的位置
        array[k] = key;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2169493.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【伺服】Servo入坑学习记录①

前言 这是一个自我摸索的过程&#xff0c;如果有什么良好的、或严厉的批评和建议&#xff0c;恳请指教&#xff0c; 万分感谢经典控制理论中&#xff0c;有几个重要的概念和工具&#xff0c;用于分析和设计控制系统。以下是对 传递函数、伯德图、奈奎斯特图、稳定裕度 和 带宽 …

【Elasticsearch】-文本向量化

由于使用了QAnything 本地知识库应答平台 内部已集成Embedding 文本向量化服务&#xff0c;因此不在单独部署。 基于 transformers 如果需要单独部署&#xff0c;可以参看 BCEmbedding/README_zh.md at master netease-youdao/BCEmbedding GitHub 从启动脚本中可以看出&am…

脚手架是什么?详细版+通俗易懂版!!!!!!

脚手架&#xff08;Scaffolding&#xff09;在软件开发领域&#xff0c;特别是在前端开发和全栈开发环境中&#xff0c;是一个术语&#xff0c;用来描述一个辅助工具或框架&#xff0c;它旨在帮助开发者快速搭建项目的基础结构和开发环境。这些基础结构可能包括项目的目录结构、…

Android Input系统原理一

1. getevent 命令使用 getevent -h getevent -lrt-t 表示事件发生时间 -l label event types and names in plain text 表示把event事件类型名字打出来 -r 显示一下接受事件速率130|console:/ # getevent -lrt could n…

aloam框架laserMapping.cpp源码解读

一、详细源码解读 #include <math.h> #include <vector> #include <aloam_velodyne/common.h> #include <nav_msgs/Odometry.h> #include <nav_msgs/Path.h> #include <geometry_msgs/PoseStamped.h> #include <pcl_conversions/pcl_c…

动手学深度学习8.7. 通过时间反向传播-笔记练习(PyTorch)

本节课程地址&#xff1a;本节无视频 本节教材地址&#xff1a;8.7. 通过时间反向传播 — 动手学深度学习 2.0.0 documentation (d2l.ai) 本节开源代码&#xff1a;...>d2l-zh>pytorch>chapter_multilayer-perceptrons>bptt.ipynb 通过时间反向传播 到目前为止&…

[通义灵码] IDE 插件实现企业知识库问答

在2024杭州云栖大会上&#xff0c;随着通义大模型能力的全面提升&#xff0c;阿里云通义灵码迎来重磅升级&#xff0c;从一年前只能完成基础的辅助编程任务&#xff0c;进化到几句话就能完成需求理解、任务拆解、代码编写、修改BUG、测试等开发任务&#xff0c;最快几分钟可从0…

XSS | DOM 型 XSS 攻击

关注这个漏洞的其他相关笔记&#xff1a;XSS 漏洞 - 学习手册-CSDN博客 0x01&#xff1a;DOM 型 XSS —— 理论篇 DOM 全称 Document Object Model&#xff0c;使用 DOM 可以使程序和脚本能够动态访问和更新文档的内容、结构及样式。 DOM 型 XSS 是一种特殊类型的反射型 XSS&…

系统实现悬浮窗-菜单-悬浮按钮功能

文章目录 需求&#xff1a;系统实现悬浮窗菜单功能或悬浮小球定制功能实际手机产品效果悬浮窗作用 一、实际应用场景二、应用上面实现功能思路Demo演示效果部分源码分析Service层View层View初始化view 添加到窗体悬浮球拖动重点代码&#xff1a; 三、系统上面实现功能思路系统服…

秒懂Linux之信号

目录 信号的基本概念 信号的处理方式 默认动作 自定义处理信号 忽略该信号 信号的产生方式 kill命令 键盘组合键 系统调用 软件条件 异常 信号产生的深层理解 core的功能 信号的阻塞 内核中的表示 sigset_t 信号集操作函数 sigprocmask sigpending …

do while循环

/while(条件) {满足条件执行的代码&#xff0c;循环体 } /* do 做 */ while (false) { Console.WriteLine(" while循环执行了"); } do { //循环体逻辑 Console.WriteLine("dowhile循环执行了"); } while (true); Console.ReadLine(); /* w…

数据库索引:最左匹配原则——提升数据库的查询性能

数据库索引&#xff1a;最左匹配原则——提升数据库的查询性能 1、核心要点2、实例3、建议 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; 在数据库优化中&#xff0c;组合索引的使用深受最左匹配原则的影响。这一原则是提升查询效率的关键…

详细分析Nginx中的proxy_pass 末尾斜杠

目录 前言1. 基本知识2. Demo 前言 对于Nginx的讲解&#xff0c;更多推荐阅读&#xff1a; Nginx配置静态网页访问&#xff08;图文界面&#xff09;Nginx将https重定向为http进行访问的配置&#xff08;附Demo&#xff09;Nginx从入门到精通&#xff08;全&#xff09;详细分…

[Java EE] TCP 协议

Author&#xff1a;MTingle major:人工智能 Build your hopes like a tower! 文章目录 文章目录​​​​​​​ 一. TCP 协议 二. TCP 特性 1. 确认应答(ack) 2. 超时重传 3. 连接管理 三次握手 四次挥手 TCP状态 4 滑动窗口 5. 流量控制 6.拥塞控制 7. 延时应答 8.捎带应答 9…

前端性能初探

前端监控 提升稳定性&#xff0c;更快的发现异常&#xff0c;定位异常&#xff0c;解决异常&#xff0c;js错误&#xff0c;接口异常&#xff0c;资源异常&#xff0c;白屏等。 关注用户体验&#xff0c;建立性能规范&#xff0c;长期关注优化&#xff0c;页面性能&#xff0c…

TopOn对话游戏魔客:2024移动游戏广告应如何突破?

TopOn对话游戏魔客&#xff1a;2024移动游戏广告应如何突破&#xff1f; 近年来&#xff0c;游戏广告投放的成本日益走高&#xff0c;ROI如何回正&#xff0c;素材如何创新等问题困扰着每一个广告主。在隐私政策的实施下&#xff0c;广告投放难度也在不断升级。 据data.ai发布…

MK米客方德SD NAND参考设计

一、电路设计 参考电路&#xff1a; R1~R5 (10K-100 kΩ)是上拉电阻&#xff0c;当SD NAND处于高阻抗模式时&#xff0c;保护CMD和DAT线免受总线浮动。 即使主机使用SD NAND SD模式下的1位模式&#xff0c;主机也应通过上拉电阻上拉所有的DATO-3线。 R6&#xff08;RCLK&…

解决图片放大模糊

首先需要了解设备像素和CSS像素&#xff0c;CSS像素 是 Web 开发中的逻辑像素&#xff0c;设计者根据这些像素来布局页面。设备像素 是设备屏幕上的实际像素点数。 DPR 是 设备像素 和 CSS像素 的比率&#xff0c;所以进行缩放后&#xff0c;也需要对图片尺寸进行处理&#xf…

【HarmonyOS】鸿蒙自定义TabLayout示例

【HarmonyOS】自定义TabLayout代码示例&#xff0c;通过 Scroll 锚点 Tab 布局&#xff0c;滚动条会自动滚动使选中的标签居中显示。 class MyTabItem {label: string "";positionX: number -1; // 当前位置width: number -1; // 当前宽度constructor(label: stri…

OpenHarmony(鸿蒙南向)——平台驱动指南【HDMI】

往期知识点记录&#xff1a; 鸿蒙&#xff08;HarmonyOS&#xff09;应用层开发&#xff08;北向&#xff09;知识点汇总 鸿蒙&#xff08;OpenHarmony&#xff09;南向开发保姆级知识点汇总~ 持续更新中…… 概述 功能简介 HDMI&#xff08;High Definition Multimedia Int…