java并发编程:ArrayBlockingQueue详解

news2025/2/1 19:44:01

文章目录

  • 一、简介
  • 二、数据结构
  • 三、源码分析
    • 3.1 属性
    • 3.2 构造方法
    • 3.3 方法
      • 3.3.1 入队
      • 3.3.2 出队
      • 3.3.3 获取元素
      • 3.3.4 删除元素
  • 四、总结


一、简介

ArrayBlockingQueue 顾名思义:基于数组的阻塞队列。数组是要指定长度的,所以使用 ArrayBlockingQueue 时必须指定长度,也就是它是一个有界队列。它实现了 BlockingQueue 接口,有着队列、集合以及阻塞队列的所有方法。

img

ArrayBlockingQueue 是线程安全的,内部使用 ReentrantLock 来保证。ArrayBlockingQueue 支持对生产者线程和消费者线程进行公平的调度。当然默认情况下是不保证公平性的,因为公平性通常会降低吞吐量,但是可以减少可变性和避免线程饥饿问题。

二、数据结构

通常,队列的实现方式有数组和链表两种方式。对于数组这种实现方式来说,我们可以通过维护一个队尾指针,使得在入队的时候可以在 O(1)O(1) 的时间内完成;但是对于出队操作,在删除队头元素之后,必须将数组中的所有元素都往前移动一个位置,这个操作的复杂度达到了 O(n)O(n),效果并不是很好。如下图所示:

img

为了解决这个问题,我们可以使用另外一种逻辑结构来处理数组中各个位置之间的关系。假设现在我们有一个数组 A[1…n],我们可以把它想象成一个环型结构,即 A[n] 之后是 A[1],相信了解过一致性 Hash 算法的童鞋应该很容易能够理解。

如下图所示:我们可以使用两个指针,分别维护队头和队尾两个位置,使入队和出队操作都可以在 O(1O(1 )的时间内完成。当然,这个环形结构只是逻辑上的结构,实际的物理结构还是一个普通的数组。

img

讲完 ArrayBlockingQueue 的数据结构,接下来我们从源码层面看看它是如何实现阻塞的。

三、源码分析

3.1 属性

// 队列的底层结构
final Object[] items;
// 队头指针
int takeIndex;
// 队尾指针
int putIndex;
// 队列中的元素个数
int count;

final ReentrantLock lock;

// 并发时的两种状态
private final Condition notEmpty;
private final Condition notFull;

items 是一个数组,用来存放入队的数据;count 表示队列中元素的个数;takeIndex 和 putIndex 分别代表队头和队尾指针。

3.2 构造方法

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
    this(capacity, fair);

    final ReentrantLock lock = this.lock;
    lock.lock(); // Lock only for visibility, not mutual exclusion
    try {
        int i = 0;
        try {
            for (E e : c) {
                checkNotNull(e);
                items[i++] = e;
            }
        } catch (ArrayIndexOutOfBoundsException ex) {
            throw new IllegalArgumentException();
        }
        count = i;
        putIndex = (i == capacity) ? 0 : i;
    } finally {
        lock.unlock();
    }
}

第一个构造函数只需要指定队列大小,默认为非公平锁;第二个构造函数可以手动指定公平性和队列大小;第三个构造函数里面使用了 ReentrantLock 来加锁,然后把传入的集合元素按顺序一个个放入 items 中。这里加锁目的不是使用它的互斥性,而是让 items 中的元素对其他线程可见(参考 AQS 里的 state 的 volatile 可见性)。

3.3 方法

3.3.1 入队

ArrayBlockingQueue 提供了多种入队操作的实现来满足不同情况下的需求,入队操作有如下几种:

  • boolean add(E e)

  • void put(E e)

  • boolean offer(E e)

  • boolean offer(E e, long timeout, TimeUnit unit)

(1)add(E e)

public boolean add(E e) {
    return super.add(e);
}

//super.add(e)
public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

可以看到 add 方法调用的是父类,也就是 AbstractQueue 的 add 方法,它实际上调用的就是 offer 方法。

(2)offer(E e)

我们接着上面的 add 方法来看 offer 方法:

public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

offer 方法在队列满了的时候返回 false,否则调用 enqueue 方法插入元素,并返回 true。

private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    // 圆环的index操作
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}

enqueue 方法首先把元素放在 items 的 putIndex 位置,接着判断在 putIndex+1 等于队列的长度时把 putIndex 设置为0,也就是上面提到的圆环的 index 操作。最后唤醒等待获取元素的线程。

(3)offer(E e, long timeout, TimeUnit unit)

该方法在 offer(E e) 的基础上增加了超时的概念。

public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {

    checkNotNull(e);
    // 把超时时间转换成纳秒
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    // 获取一个可中断的互斥锁
    lock.lockInterruptibly();
    try {
        // while循环的目的是防止在中断后没有到达传入的timeout时间,继续重试
        while (count == items.length) {
            if (nanos <= 0)
                return false;
            // 等待nanos纳秒,返回剩余的等待时间(可被中断)
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(e);
        return true;
    } finally {
        lock.unlock();
    }
}

利用了 Condition 的 awaitNanos 方法,等待指定时间,因为该方法可中断,所以这里利用 while 循环来处理中断后还有剩余时间的问题,等待时间到了以后调用 enqueue 方法放入队列。

(4)put(E e)

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

put 方法在 count 等于 items 长度时,一直等待,直到被其他线程唤醒。唤醒后调用 enqueue 方法放入队列。

3.3.2 出队

入队列的方法说完后,我们来说说出队列的方法。ArrayBlockingQueue 提供了多种出队操作的实现来满足不同情况下的需求,如下:

  • E poll()

  • E poll(long timeout, TimeUnit unit)

  • E take()

  • drainTo(Collection<? super E> c, int maxElements)

(1)poll()

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

poll 方法是非阻塞方法,如果队列没有元素返回 null,否则调用 dequeue 把队首的元素出队列。

private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

dequeue 会根据 takeIndex 获取到该位置的元素,并把该位置置为 null,接着利用圆环原理,在 takeIndex 到达列表长度时设置为0,最后唤醒等待元素放入队列的线程。

(2)poll(long timeout, TimeUnit unit)

该方法是 poll() 的可配置超时等待方法,和上面的 offer 一样,使用 while 循环配合 Condition 的 awaitNanos 来进行等待,等待时间到后执行 dequeue 获取元素。

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return dequeue();
    } finally {
        lock.unlock();
    }
}

(3)take()

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

取走队列里排在首位的对象,不同于 poll() 方法,若BlockingQueue为空,就阻塞等待直到有新的数据被加入。

(4)drainTo()

public int drainTo(Collection<? super E> c) {
    return drainTo(c, Integer.MAX_VALUE);
}

public int drainTo(Collection<? super E> c, int maxElements) {
    checkNotNull(c);
    if (c == this)
        throw new IllegalArgumentException();
    if (maxElements <= 0)
        return 0;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int n = Math.min(maxElements, count);
        int take = takeIndex;
        int i = 0;
        try {
            while (i < n) {
                @SuppressWarnings("unchecked")
                E x = (E) items[take];
                c.add(x);
                items[take] = null;
                if (++take == items.length)
                    take = 0;
                i++;
            }
            return n;
        } finally {
            // Restore invariants even if c.add() threw
            if (i > 0) {
                count -= i;
                takeIndex = take;
                if (itrs != null) {
                    if (count == 0)
                        itrs.queueIsEmpty();
                    else if (i > take)
                        itrs.takeIndexWrapped();
                }
                for (; i > 0 && lock.hasWaiters(notFull); i--)
                    notFull.signal();
            }
        }
    } finally {
        lock.unlock();
    }
}

drainTo 相比于其他获取方法,能够一次性从队列中获取所有可用的数据对象(还可以指定获取数据的个数)。通过该方法,可以提升获取数据效率,不需要多次分批加锁或释放锁。

3.3.3 获取元素

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return itemAt(takeIndex); // null when queue is empty
    } finally {
        lock.unlock();
    }
}

final E itemAt(int i) {
    return (E) items[i];
}

这里获取元素时上锁是为了避免脏数据的产生。

3.3.4 删除元素

我们可以想象一下,队列中删除某一个元素时,是不是要遍历整个数据找到该元素,并把该元素后的所有元素往前移一位,也就是说,该方法的时间复杂度为 O(n)O(n)。

public boolean remove(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex;
             // 从takeIndex一直遍历到putIndex,直到找到和元素o相同的元素,调用removeAt进行删除
            do {
                if (o.equals(items[i])) {
                    removeAt(i);
                    return true;
                }
                if (++i == items.length)
                    i = 0;
            } while (i != putIndex);
        }
        return false;
    } finally {
        lock.unlock();
    }
}

remove 方法比较简单,它从 takeIndex 一直遍历到 putIndex,直到找到和元素 o 相同的元素,调用 removeAt 进行删除。我们重点来看一下 removeAt 方法。

void removeAt(final int removeIndex) {
    final Object[] items = this.items;
    if (removeIndex == takeIndex) {
        // removing front item; just advance
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
    } else {
        // an "interior" remove
        // slide over all others up through putIndex.
        final int putIndex = this.putIndex;
        for (int i = removeIndex;;) {
            int next = i + 1;
            if (next == items.length)
                next = 0;
            if (next != putIndex) {
                items[i] = items[next];
                i = next;
            } else {
                items[i] = null;
                this.putIndex = i;
                break;
            }
        }
        count--;
        if (itrs != null)
            itrs.removedAt(removeIndex);
    }
    notFull.signal();
}

removeAt 的处理方式和我想的稍微有一点出入,它内部分为两种情况来考虑:

  • removeIndex == takeIndex

  • removeIndex != takeIndex

也就是我考虑的时候没有考虑边界问题。当 removeIndex == takeIndex 时就不需要后面的元素整体往前移了,而只需要把 takeIndex的指向下一个元素即可(类比圆环);当 removeIndex != takeIndex 时,通过 putIndex 将 removeIndex 后的元素往前移一位。

四、总结

ArrayBlockingQueue 是一个阻塞队列,内部由 ReentrantLock 来实现线程安全,由 Condition 的 await 和 signal 来实现等待唤醒的功能。它的数据结构是数组,准确的说是一个循环数组(可以类比一个圆环),所有的下标在到达最大长度时自动从 0 继续开始。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/631205.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

在知乎逮到一个腾讯10年老测试,聊过之后收益良多...

老话说的好&#xff0c;这人呐&#xff0c;一单在某个领域鲜有敌手了&#xff0c;就会闲得蛋疼。前几天我在上班摸鱼刷知乎的时候认识了一位腾讯测试大佬&#xff0c;在腾讯工作了10年&#xff0c;因为本人天赋比较高&#xff0c;平时工作也兢兢业业&#xff0c;现在企业内有一…

Python基础知识讲解——main方法

前言 嗨喽&#xff0c;大家好呀~这里是爱看美女的茜茜呐 估计很多人跟我一样初学python看代码的时候先找一下main()方法&#xff0c;从main往下看。 但事实上python中是没有你理解中的“main()”方法的。 言归正传 if name "main":可以看成是python程序的入口&a…

数据结构与算法系列之习题练习

&#x1f497; &#x1f497; 博客:小怡同学 &#x1f497; &#x1f497; 个人简介:编程小萌新 &#x1f497; &#x1f497; 如果博客对大家有用的话&#xff0c;请点赞关注再收藏 &#x1f31e; 力扣习题 括号匹配问题。用队列实现栈。用栈实现队列。设计循环队列。 有效的括…

【数据分析案例】深度分析超市零售商店数据--Python数据分析实战

前言 咳咳&#xff0c;又是好久不见~这不高考已经结束了 对python感兴趣的准大学生们&#xff0c;是打算好好玩几个月还是&#xff0c;继续研究学习python呢~ &#x1f928; 我呢 还是建议大家劳逸结合哈哈 先玩再学习~ 当然啦 最重要的还是看你们自己呀 不过我以上这些都不能…

[NOI2007] 调兵遣将

题目描述 我军截获的情报显示&#xff0c;敌军正在集结兵力试图向我军重要的军械研究所发起进攻。由于我军正处于多线作战的状态&#xff0c;无法抽调大批兵力前去支援&#xff0c;指挥部决定通过有效的战前部署来提高胜率&#xff0c;减少伤亡和损失。 该军械研究所的平面图…

网络安全自学笔记+学习路线+就业规划(超详细)

每天都有新闻报道描述着新技术对人们的生活和工作方式带来的巨大乃至压倒性影响。与此同时有关网络攻击和数据泄露的头条新闻也是日益频繁。 攻击者可谓无处不在&#xff1a;企业外部充斥着黑客、有组织的犯罪团体以及民族国家网络间谍&#xff0c;他们的能力和蛮横程度正日渐…

如何使虚拟机自动生成ip地址

一. 打开虚拟机并登录账号进入命令行界面输入指令&#xff1a; vi /etc/sysconfig/network-scripts/ifcfg-ens33 通过指令进入到下面的界面当中 点击键盘输入 "i" 进入编辑模式将文件修改为 文件当中的 BOOTPROTO可以将ip地址定义为自动生成类型或者静态指定类型其中…

力扣 213. 打家劫舍 II

一、题目描述 你是一个专业的小偷&#xff0c;计划偷窃沿街的房屋&#xff0c;每间房内都藏有一定的现金。这个地方所有的房屋都围成一圈&#xff0c;这意味着第一个房屋和最后一个房屋是紧挨着的。同时&#xff0c;相邻的房屋装有相互连通的防盗系统&#xff0c;如果两间相邻…

高考后计算机人工智能大类专业的选择建议

随着GPT的出现&#xff0c;很多人开始质疑是否还需要学计算机专业&#xff0c;计算机专业是否会消失。 先给结论&#xff0c;不会&#xff01; 只是会产生分层&#xff0c;大体上是这样的&#xff1a; 核心代码部分还是需要人来写的&#xff0c;只要是代码方式出现的结果&#…

了解ASEMI代理英飞凌TLE6208-6G其功能和应用的综合指南

编辑-Z TLE6208-6G是一款高度集成、通用且高效的汽车半桥驱动器&#xff0c;由英飞凌设计。这种功能强大的设备专门设计用于满足汽车应用的苛刻要求&#xff0c;如控制直流电机、螺线管和电阻负载。在本文中&#xff0c;我们将深入研究TLE6208-6G的功能、优点和应用&#xff0…

【Spring MVC】这几种传参方式这么强大,让我爱不释手,赶快与我一起去领略吧 ! ! !

前言: 大家好,我是良辰丫,在上2一篇文章中我们已经初步认识了Spring MVC,并且学习了热部署的配置,今天我们将继续开始我们的Spring MVC的学习! ! !&#x1f48c;&#x1f48c;&#x1f48c; &#x1f9d1;个人主页&#xff1a;良辰针不戳 &#x1f4d6;所属专栏&#xff1a;jav…

JDK9~17+Springboot3 @Resource常见问题和解决方案

一、常见问题描述 因为JDK版本升级的改动&#xff0c;在Jdk9~17环境下&#xff0c;搭建Springboot项目&#xff0c;会出现原有Resource&#xff08;javax.annotation.Resource&#xff09;不存在的问题&#xff0c;导致项目从Jdk8迁移到高版本时遇到的问题 原因 你可能会问&…

TiDB亿级数据亚秒响应查询将MySql数据全量迁移到TiDB

目录 1 下载安装TiDB工具包1.1 检查最新版本1.2 下载tidb-toolkit 2 Dumpling导出数据2.1 Dumpling工具简介2.2 导出需要的权限2.3 创建用户并授权2.4 验证数据库2.5 导出sql文件2.6 查看导出文件 3 TiDB Lightning导入数据3.1 TiDB Lightning简介3.2 TiDB Lightning 整体架构3…

蓝桥杯【第14届国赛】Python B组

本题解仅代表个人观点&#xff0c;仅供参考&#xff0c;欢迎各位指正 A&#xff1a;弹珠堆放 【问题描述】 小蓝有 20230610 颗磁力弹珠&#xff0c;他对金字塔形状尤其感兴趣&#xff0c;如下图所示&#xff1a; 高度为 1 的金字塔需要 1 颗弹珠&#xff1b; 高度为 2 的金字…

【Jetpack】使用 Room 中的 Migration 升级数据库异常处理 ( 多个数据库版本的迁移 | fallbackToDestructiveMigration() 函数处理升级异常 )

文章目录 一、Room#Migration 迁移工具升级数据库二、多个数据库版本的迁移三、数据库异常处理 - RoomDatabase.Builder#fallbackToDestructiveMigration() 函数四、完整代码示例 一、Room#Migration 迁移工具升级数据库 Room Migration 数据库迁移工具 是 Android Jetpack Arc…

【AI实战营第二期】第三次作业——基于 RTMDet 的气球检测(包含数据集)

作业&#xff1a;基于 RTMDet 的气球检测 背景&#xff1a;熟悉目标检测和 MMDetection 常用自定义流程。 任务&#xff1a; 基于提供的 notebook&#xff0c;将 cat 数据集换成气球数据集; 按照视频中 notebook 步骤&#xff0c;可视化数据集和标签; 使用MMDetection算法库…

01_Linux字符设备驱动开发

目录 字符设备驱动简介 驱动模块的加载和卸载 字符设备注册与注销 实现设备的具体操作函数 添加LICENSE和作者信息 Linux设备号的组成 设备号的分配 chrdevbase字符设备驱动开发实验 创建VSCode工程 添加头文件路径 编写实验程序 C库文件操作基本函数 编写测试APP…

苹果Vision Pro正式发布,下一个iPhone诞生了?

在库克即将退休之际&#xff0c;苹果开启了下一个十年。 2023年6月6日&#xff0c;在苹果WWDC开发者大会上&#xff0c;苹果发布了15寸的MacBook Air&#xff0c;以及一众iOS 17、iPad OS 17、Mac OS等系统的更新。当我们觉得这些常规更新有点不痛不痒&#xff0c;甚至想大呼“…

​selenium中元素定位正确但是操作失败,6种解决办法全稿定

selenium中元素定位正确但是操作失败的原因无外乎以下4种&#xff1a; 一、页面没加载好 解决方法&#xff1a;添加等待方法&#xff0c;如&#xff1a; time.sleep() 二、页面提交需要等待给数据后台 解决方法&#xff1a;添加等待方法&#xff0c;如&#xff1a; time.sl…

一套完整的三甲医院医学影像科PACS系统源码

一、PACS系统概述&#xff1a; 基于VC MSSQL开发的一套三甲医院医学影像PACS系统源码&#xff0c;集成3D影像后处理技术和功能&#xff0c;包括三维多平面重建、三维容积重建、三维表面重建、三维虚拟内窥镜、最大/小密度投影、心脏动脉钙化分析等能满足影像科功能。 二、PAC…