解锁ArrayBlockingQueue奥秘:深入源码的精彩之旅

news2025/1/12 1:55:49

1.简介

ArrayBlockingQueueBlockingQueue 接口的一个实现类,它基于数组实现了一个有界阻塞队列。创建 ArrayBlockingQueue 实例时需要指定队列的容量,队列的大小是固定的,无法动态增长

主要特点包括:

  • 有界性ArrayBlockingQueue容量是固定的,在创建队列时需要指定容量大小,一旦队列达到最大容量,再尝试向队列中添加元素将会导致阻塞,直到队列中有空间。

  • 线程安全性: 提供了线程安全的队列操作,多个线程可以安全地在队列中进行添加和移除元素的操作,无需额外的同步措施。

  • FIFO 排序:基于数组实现,采用先进先出(FIFO)的顺序,保证了队列中元素的顺序性

  • 阻塞操作:当尝试向队列中添加元素时,如果队列已满生产者线程将会被阻塞,直到队列中有空间可用;当尝试从队列中取出元素时,如果队列为空消费者线程将会被阻塞,直到队列中有新的元素可用。

  • 性能稳定:基于数组实现的,在添加和移除元素时具有稳定的性能表现,与队列大小无关。

2.重要的方法

BlockingQueue 具有 4 组不同的方法用于插入移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:

抛异常特定值阻塞超时
插入add(o)offer(o)put(o)offer(o, timeout, timeunit)
插入remove()poll()take()poll(timeout, timeunit)
检查element()peek()

四组不同的行为方式解释:

  • 抛异常: 如果试图的操作无法立即执行,抛一个异常

  • 特定值: 如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。

  • 阻塞: 如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。

  • 超时: 如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。

无法向一个 ArrayBlockingQueue 中插入 null。如果你试图插入nullArrayBlockingQueue 将会抛出一个 NullPointerException

3.重要属性

   final Object[] items;
    int takeIndex;
    int putIndex;
    int count;
    final ReentrantLock lock;
    private final Condition notEmpty;
    private final Condition notFull;
  1. items 数组: 用于存储队列元素的数组。

  2. count: 用于记录当前队列中的元素数量

  3. takeIndexputIndex: 分别表示队列头部尾部元素在数组中的索引位置。

  4. lock: 使用ReentrantLock来实现对队列操作的线程安全控制

  5. notEmptynotFull: 条件变量,用于实现队列非空队列未满的等待通知机制

这些属性在ArrayBlockingQueue 内部使用,可以帮助实现队列的基本功能和线程安全访问。

4.构造方法

构造方法如下所示:

  public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
  1. 首先,构造方法接受两个参数:capacity 表示队列的容量大小,fair 表示是否使用公平性策略

  2. 在方法中首先对 capacity 进行检查,如果小于等于 0,则抛出 IllegalArgumentException 异常。

  3. 然后,创建一个 Object 类型的数组 items 作为队列的存储结构,其大小为 capacity,即指定了队列的容量。

  4. 创建一个 ReentrantLock 对象 lock,用于在多线程环境下对队列进行加锁操作。fair 参数用于决定是否使用公平性策略,如果为 true,则表示使用公平性策略,否则为非公平性策略。

  5. 最后,创建两个 Condition 对象 notEmptynotFull,它们分别用于表示队列非空非满的条件,通过它们可以实现线程的等待唤醒操作。

5.添加元素方法

5.1 offer(e)方法

offer(e)方法会在加锁的情况下尝试向队列中添加元素,如果队列已满则返回false,否则将元素插入队列并返回true

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}
  1. 首先,对传入的元素e进行空指针检查,如果为空则抛出NullPointerException

  2. 获取队列的ReentrantLock对象lock,并对其进行加锁操作。

  3. 在加锁的情况下,判断队列是否已满,如果已满则返回false;如果未满,调用enqueue(e)方法将元素e插入队列,并返回true

  4. 最后,无论是否成功添加元素,最终都会释放锁

5.1.1 enqueue(e)方法

enqueue(e)插入元素,并通过更新putIndexcount来维护队列的状态,同时通过notEmpty.signal()唤醒可能阻塞在队列非空条件上的消费者线程。

/**
 * Inserts element at current put position, advances, and signals.
 * Call only when holding lock.
 */
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
    putIndex = 0;
count++;
notEmpty.signal();
}
  1. 首先获取ArrayBlockingQueue中的items数组,items数组用于存放队列中的元素。

  2. items[putIndex] = x;: 将元素x插入到items数组的putIndex位置上,putIndex表示当前要插入元素的位置

  3. if (++putIndex == items.length) putIndex = 0;: 这一行代码用于更新putIndex的值,如果putIndex超出了数组长度,则将其置为0,实现循环利用数组空间的目的。

  4. count++;: 增加队列的元素数量计数器count

  5. notEmpty.signal();: 发送信号通知处于等待状态的消费者线程,队列中已经有元素可以消费了。

5.2 offer(o, timeout, timeunit)方法源码如下:

offer(o, timeout, timeunit)方法会在加锁的情况下尝试向队列中添加元素,在指定的超时时间内等待队列有空间可用,如果成功添加则返回true,超时未成功则返回false,如果被中断则抛出InterruptedException异常。

public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
   checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
}
  1. 首先,对传入的元素e进行空指针检查,如果为空则抛出NullPointerException

  2. timeout转换为纳秒单位,并获取队列的ReentrantLock对象lock,使用lockInterruptibly方法加锁,支持响应中断

  3. 加锁的情况下,如果队列已满,则会进入循环等待,同时根据剩余的超时时间不断进行等待,直到超时或者队列有空间可用。

  4. 如果在等待过程中成功添加元素,则返回true,否则如果超时未成功添加元素则返回false

  5. 最后,无论是否成功添加元素,最终都会释放锁

5.3 add(e)方法源码如下:

add(e)方法是调用父类AbstractQueueadd(e)方法:

public boolean add(E e) {
return super.add(e);
}
5.3.1 AbstractQueue的add(e)方法,源码如下

add(e)方法会首先尝试向队列中添加元素,如果成功则返回true,如果队列已满抛出异常

public boolean add(E e) {
    if (offer(e)) {
        return true;
    } else {
        throw new IllegalStateException("Queue full");
    }
}
  1. 首先,调用了offer(e)方法尝试向队列中添加元素

  2. 如果成功添加元素,则返回true

  3. 如果队列已满offer(e)方法会返回false,此时会抛出IllegalStateException异常,提示队列已满

5.4 put(e)方法

put(e)方法会在加锁的情况下尝试向队列中添加元素,如果队列已满则会阻塞等待直到有空间可以插入元素,如果被中断则会抛出InterruptedException异常。

public void put(E e) throws InterruptedException {
    checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
}
  1. 首先,对传入的元素e进行空指针检查,如果为空则抛出NullPointerException

  2. 获取队列的ReentrantLock对象lock,并使用lockInterruptibly方法加锁,支持响应中断

  3. 加锁的情况下,如果队列已满,则使用notFull条件变量等待直到队列有空间可以插入元素。

  4. 一旦有空间可以插入元素,调用insert(e)方法将元素e插入队列。

  5. 最后,无论是否成功添加元素,最终都会释放锁

6.移除元素

6.1 poll()方法

poll()方法用于从队列中取出并移除头部的元素,如果队列为空,则返回null

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}
  1. 首先获取队列中的锁对象lock,并尝试获取锁

  2. 判断队列元素数量count是否为0,如果为0则表示队列为空,直接返回null

  3. 如果队列不为空,则调用dequeue()方法来取出并返回队列头部的元素

  4. 最后释放锁并返回取出的元素。

6.1.1 dequeue()方法

dequeue()主要完成了从队列中取出元素的操作,并通过更新takeIndexcount来维护队列的状态,同时通过notFull.signal()来唤醒可能阻塞在队列非满条件上的生产者线程。

/**
 * Extracts element at current take position, advances, and signals.
 * Call only when holding lock.
 */
private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}
  1. final Object[] items = this.items;: 首先获取items数组,该数组用于存放队列中的元素。

  2. E x = (E) items[takeIndex];: 将takeIndex位置上的元素强制转换为泛型类型E,并赋值给变量x,表示要取出的元素。 3. items[takeIndex] = null;: 将takeIndex位置上的元素置为null,表示该位置上的元素已经被取出。

  3. if (++takeIndex == items.length) takeIndex = 0;: 更新takeIndex的值,如果takeIndex超出了数组长度,则将其置为0,实现循环利用数组空间的目的。

  4. count--;: 减少队列的元素数量计数器count。

  5. if (itrs != null) itrs.elementDequeued();: 如果存在迭代器iterator,则调用elementDequeued()方法通知迭代器,表示有元素被取出。

  6. notFull.signal();: 发送信号通知处于等待状态的生产者线程,队列中已经有空间可以生产新元素了。

  7. return x;: 返回被取出的元素。

6.2 l(long timeout, TimeUnit unit)方法

poll(long timeout, TimeUnit unit)方法用于从队列中取出并移除头部的元素,如果队列为空,则在指定的时间范围等待元素可用。当超时时间到达后仍然没有可用元素,则返回null

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
  1. 将传入的timeout参数转换纳秒单位。

  2. 获取队列中的锁对象lock,并尝试获取锁(支持中断)。

  3. 当队列为空时,进入循环等待状态。如果nanos小于等于0,表示超时时间已到,直接返回null

  4. 使用notEmpty.awaitNanos(nanos)来等待非空条件满足,并根据等待时间的剩余纳秒数更新nanos值。

  5. 当队列不为空时,调用dequeue()方法来取出并返回队列头部的元素

  6. 最终释放锁并返回取出的元素。

6.3 remove()方法

remove()方法用于移除并返回队列的头部元素。如果队列为空,则抛出NoSuchElementException异常。

public E remove() {
    E x = poll();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}
  1. 调用队列的poll()方法来尝试取出并返回队列的头部元素。

  2. 如果poll()方法返回的元素不为null,表示成功取出了一个元素,则直接返回该元素。

  3. 如果poll()方法返回的元素为null,表示队列为空,此时抛出NoSuchElementException异常。

6.4 take()方法

take()方法用于从队列中取出并移除头部的元素。如果队列为空,该方法将阻塞直到队列中有可用元素为止。

 public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
  1. 获取队列中的锁对象lock,并尝试获取锁(支持中断)。

  2. 当队列为空时,进入循环等待状态,调用notEmpty.await()来等待非空条件满足,即等待队列中有元素可用。

  3. 当队列不为空时,调用dequeue()方法来取出并返回队列头部的元素。

  4. 最终释放锁并返回取出的元素。

7.检查元素

7.1 peek

在调用peek()时,会返回队列头部的元素不将其从队列中移除。如果队列为空,则返回null。

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return itemAt(takeIndex); // null when queue is empty
    } finally {
        lock.unlock();
    }
}
  1. 获取队列中的锁对象lock,并尝试获取锁

  2. 调用itemAt(int i)获取索引为takeIndex的元素。

  3. 最终释放锁

7.2 itemAt

返回items中索引为i的元素

final E itemAt(int i) {
return (E) items[i];
}
7.3 element

element()方法用于获取但不移除队列的头部元素。如果队列为空,则抛出NoSuchElementException异常。

public E element() {
    E x = peek();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}
  1. 调用队列的peek()方法来获取但不移除队列的头部元素。

  2. 如果peek()方法返回的元素不为null,表示成功获取了一个元素,则直接返回该元素。

  3. 如果peek()方法返回的元素为null,表示队列为空,此时抛出NoSuchElementException异常。

 关注公众号:小黄学编程 回复:架构师 获取小黄收集的架构师资料

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1791760.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AI大模型+产品经理:打造智能产品的黄金组合

前言 当我们谈论AI大模型与产品经理的结合&#xff0c;不仅仅是技术与创意的碰撞&#xff0c;更是对未来智能生活的期待。想象一下&#xff0c;当产品的灵魂被注入智能的血液&#xff0c;它们将成为我们生活中不可或缺的伙伴。 我们不仅仅是要探索AI大模型的深层技术&#xf…

泛微开发修炼之旅--07通过后端代码实现创建并发送待办、源码及示例

文章链接&#xff1a;泛微开发修炼之旅--07通过后端代码实现创建并发送待办、源码及示例

云原生架构案例分析_5.某体育用品公司云原生架构的业务中台构建

1.背景和挑战 某体育用品公司作为中国领先的体育用品企业之一&#xff0c;在2016年&#xff0c;某体育用品公司启动集团第三次战略升级&#xff0c;打造以消费者体验为核心的“3”&#xff08;“互联网”、“体育”和“产品”&#xff09;的战略目标&#xff0c;积极拥抱云计算…

告别拥堵:SpringBoot+消息队列打造你的专属交通指挥家!

随着5G和物联网技术的飞速发展&#xff0c;系统的智能化已成为不可逆转的趋势。带你一窥未来&#xff0c;探索如何通过SpringBoot和消息队列技术的结合&#xff0c;开启智能系统的新纪元。从事件驱动架构的实现&#xff0c;到异步消息处理的最佳实践&#xff0c;再到集成主流消…

iOS——类与对象底层探索

类和对象的本质 当我们使用OC创建一个testClass类并在main函数创建它的实例对象的时候&#xff0c;OC的底层到底是什么样的呢&#xff1f; 首先&#xff0c;我们要了解OC对象的底层结构&#xff0c;那么我们就得知道&#xff1a;OC本质底层实现转化其实都是C/C代码。 使用下面…

【scikit-learn010】sklearn算法模型清单实战及经验总结(已更新)

1.一直以来想写下基于scikit-learn训练AI算法的系列文章,作为较火的机器学习框架,也是日常项目开发中常用的一款工具,最近刚好挤时间梳理、总结下这块儿的知识体系。 2.熟悉、梳理、总结下scikit-learn框架模型算法包相关技术点及经验。 3.欢迎批评指正,欢迎互三,跪谢一键…

【重磅丨教育设备】推动大规模设备更新和消费品以旧换新行动方案

近日&#xff0c;国务院印发《推动大规模设备更新和消费品以旧换新行动方案》&#xff08;以下简称《行动方案》&#xff09;。《行动方案》提出&#xff1a;实施设备更新行动。提升教育文旅医疗设备水平&#xff0c;推动符合条件的高校、职业院校&#xff08;含技工院校&#…

数据资产入表-数据治理-标签设计标准

前情提要&#xff1a;数据价值管理是指通过一系列管理策略和技术手段&#xff0c;帮助企业把庞大的、无序的、低价值的数据资源转变为高价值密度的数据资产的过程&#xff0c;即数据治理和价值变现。上一讲介绍了数据清洗标准设计的基本逻辑和思路。 上一讲介绍了其他的通用标…

JVM类加载机制详解(JDK源码级别)

提示&#xff1a;从JDK源码级别彻底剖析JVM类加载机制、双亲委派机制、全盘负责委托机制、打破双亲委派机制的程序、Tomcat打破双亲委派机制、tomcat自定义类加载器详解、tomcat的几个主要类加载器、手写tomcat类加载器 文章目录 前言一、loadClass的类加载大概有如下步骤二、j…

centos8stream 编译安装 php-rabbit-mq模块

官方GitHub&#xff1a;https://github.com/php-amqp/php-amqp 环境依赖安装 dnf install cmake make -y 1.安装rabbitmq-c cd /usr/local/src/ wget https://github.com/alanxz/rabbitmq-c/archive/refs/tags/v0.14.0.tar.gz tar xvf v0.14.0.tar.gz cd rabbitmq-c-0.14.0/…

MYSQL数据库细节详细分析

MYSQL数据库的数据类型(一般只需要用到这些) 整型类型&#xff1a;用于存储整数值&#xff0c;可以选择不同的大小范围来适应特定的整数值。 TINYINTSMALLINTMEDIUMINTINTBIGINT 浮点型类型&#xff1a;用于存储带有小数部分的数值&#xff0c;提供了单精度&#xff08;FLOA…

调用上传文件接口出现格式错误

一、造成这种错误的可能有很多 1.检查一下传递格式 2.检查一下接口要求的格式 二、举个例子 这两个有什么区别&#xff1f; 那就是json、和form-data&#xff0c;一定要看仔细接口 如果还是按照json的方式去传就会报错 三、更改header里Content-Type的类型 json等的heade…

iOS18 新变化提前了解,除了AI还有这些变化

iOS 18即将在不久的将来与广大iPhone用户见面&#xff0c;这次更新被普遍认为是苹果历史上最重要的软件更新之一。据多方报道和泄露的消息&#xff0c;iOS 18将带来一系列全新的功能和改进&#xff0c;包括在人工智能领域的重大突破、全新的设计元素以及增强的性能和安全性。现…

【成都信息工程大学】只考程序设计!成都信息工程大学计算机考研考情分析!

成都信息工程大学&#xff08;Chengdu University of Information Technology&#xff09;&#xff0c;简称“成信大”&#xff0c;由中国气象局和四川省人民政府共建&#xff0c;入选中国首批“卓越工程师教育培养计划”、“2011计划”、“中西部高校基础能力建设工程”、四川…

SASAM软件架构静态分析法-系统架构师(六)

1、体系结构权衡分析法&#xff08;Architecture Tradeoff Analysis Method ATAM&#xff09;包含四个主要活动领域&#xff0c;分别是 场景和需求的收集、体系结构视图和场景的实现、&#xff08;&#xff09;、折中。基于场景的架构分析方法&#xff08;Scenarios-based Arch…

React常见的一些坑

文章目录 两个基础知识1. react的更新问题, react更新会重新执行react函数组件方法本身,并且子组件也会一起更新2. useCallback和useMemo滥用useCallback和useMemo要解决什么3. react的state有个经典的闭包,导致拿不到最新数据的问题.常见于useEffect, useMemo, useCallback4. …

Crosslink-NX器件应用连载(11): 图像(数据)远程传输

作者&#xff1a;Hello&#xff0c;Panda 大家下午好&#xff0c;晚上好。这里分享一个Lattice Crosslink-NX器件实现图像或数据&#xff08;卫星数据、雷达数据、ToF传感器数据等&#xff09;远程传输的案例&#xff08;因为所描述的内容颇杂&#xff0c;晒图不好晒&#xff…

618数码好物推荐!精选便宜又实用的数码产品推荐!

着618购物盛宴的脚步日益临近&#xff0c;你是否已经锁定了心仪的宝贝&#xff1f;那些曾让你心动不已的数码产品&#xff0c;现在正是以最低价收入囊中的绝佳时机。618不仅是一场购物狂欢&#xff0c;更是各大电商平台竞相推出优惠政策的盛宴。为了满足大家的需求&#xff0c;…

C语言笔记第13篇:自定义类型(联合union和枚举enum)

1、联合体 1.1 联合体类型的声明 像结构体一样&#xff0c;联合体也是由一个或多个成员构成&#xff0c;这些成员可以是不同的类型。 但是编译器只为最大的成员分配足够的内存空间&#xff0c;联合体的特点是所有成员共用一块内存空间&#xff0c;所以联合体也叫&#xff1a…

HTML+CSS+JS实现2048经典小游戏(附完整源码)

2048 小游戏的目标是通过合并数字单元格&#xff0c;最终在 4x4 的棋盘上创建一个值为 2048 的单元格。 一、预览效果 二、程序源码 html代码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"…