ArrayBlockingQueue 数组阻塞队列 源码阅读

news2024/9/27 5:36:32

1. 概述

数组阻塞队列

  • 有界的阻塞数组, 容量一旦创建, 无法修改
  • 阻塞队列, 队列满的时候, 往队列put数据会被阻塞, 队列空, 取数据也会被阻塞
  • 并发安全

2. 数据结构

/** 存储队列元素的数组 */
/** 存储队列元素的数组 */
final Object[] items;

/** 队首位置,下一次 take, poll, peek, remove 方法在 items 中的位置  */
int takeIndex;

/** 队末位置,下一次 put, offer, add 方法在 items 中的位置 */
int putIndex;

/** 队列中元素的数量 */
int count;

// 锁 保证队列的并发安全性
final ReentrantLock lock;

/** 队列不为空的监视器 */
private final Condition notEmpty;

/** 不满(队列元素数量小于items.size())的监视器 */
private final Condition notFull;

3. 初始化 构造函数

    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        // 数组
        this.items = new Object[capacity];
        // 队列公平不公平指的是用的ReentrantLock
        lock = new ReentrantLock(fair);
        // 创建队列不为空,不满的监视器
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;

        // 加锁
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                // 集合元素一个个入队
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

4. 方法

4.1 核心方法

核心方法入队,出队,移除元素,调用这些方法都加锁的。

    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        // 数组满了,putIndex重新从0开始
        if (++putIndex == items.length)
            putIndex = 0;
        // 添加元素count+1
        count++;
        // 唤醒队列不为空的wait
        notEmpty.signal();
    }

    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        // 添加元素count-1
        count--;
        // 如果迭代器不为空维护一下
        if (itrs != null)
            itrs.elementDequeued();
        // 唤醒队列不满的wait
        notFull.signal();
        return x;
    }

   void removeAt(final int removeIndex) {
        // assert lock.getHoldCount() == 1;
        // assert items[removeIndex] != null;
        // assert removeIndex >= 0 && removeIndex < items.length;
        final Object[] items = this.items;
        if (removeIndex == takeIndex) {
            // 如果要移除的是就是要出队的元素
            // removing front item; just advance
            // 元素设置为null
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
        } else {
            // an "interior" remove
            // slide over all others up through putIndex.
            // 用removeIndex后面的元素,向前移动一位
            final int putIndex = this.putIndex;
            for (int i = removeIndex;;) {
                int next = i + 1;
                if (next == items.length)
                    next = 0;
                if (next != putIndex) {
                    items[i] = items[next];
                    i = next;
                } else {
                    items[i] = null;
                    this.putIndex = i;
                    break;
                }
            }
            count--;
            if (itrs != null)
                itrs.removedAt(removeIndex);
        }
        notFull.signal();
    }

4.2 常用外部调用的方法

offer(),入队,队列满了直接返回 false,不会阻塞,入队成功返回true

   public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

add,其实调用了 offer

    public boolean add(E e) {
        return super.add(e);
    }

    // 父类
    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

put lock可以被中断,队列满了会阻塞, notFull.await(),等队列元素出队notFull.signal() 唤醒

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
        notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

在put基础上,等待timeout个unit时间

public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {

    checkNotNull(e);
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length) {
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(e);
        return true;
    } finally {
        lock.unlock();
    }
}

取出元素,列表为空直接返回null,不阻塞

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

取元素,队列空阻塞等待

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
        notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

取元素,队列空等待参数指定的时间

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return dequeue();
    } finally {
        lock.unlock();
    }
}

查看将要取出的元素,元素不出队

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return itemAt(takeIndex); // null when queue is empty
    } finally {
        lock.unlock();
    }
}

final E itemAt(int i) {
    return (E) items[i];
}

移除某个元素,移除找到的第一个这个元素,并返回true

    public boolean remove(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count > 0) {
                final int putIndex = this.putIndex;
                // 遍历
                int i = takeIndex;
                do {
                    if (o.equals(items[i])) {
                        // 找到了移除对应元素
                        removeAt(i);
                        return true;
                    } 
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex);
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

  public boolean contains(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                do {
                    if (o.equals(items[i]))
                        // 找到了返回true
                        return true;
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex);
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

清理队列

public void clear() {
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int k = count;
        if (k > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex;
            do {
                items[i] = null;
                if (++i == items.length)
                    i = 0;
            } while (i != putIndex);
            takeIndex = putIndex;
            count = 0;
            if (itrs != null)
                itrs.queueIsEmpty();
            for (; k > 0 && lock.hasWaiters(notFull); k--)
            notFull.signal();
        }
    } finally {
        lock.unlock();
    }
}

将队列中的元素移动到c集合中

public int drainTo(Collection<? super E> c) {
    return drainTo(c, Integer.MAX_VALUE);
}

/**
 * @throws UnsupportedOperationException {@inheritDoc}
 * @throws ClassCastException            {@inheritDoc}
 * @throws NullPointerException          {@inheritDoc}
 * @throws IllegalArgumentException      {@inheritDoc}
 */
public int drainTo(Collection<? super E> c, int maxElements) {
    checkNotNull(c);
    if (c == this)
        throw new IllegalArgumentException();
    if (maxElements <= 0)
        return 0;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // n取 maxElements 和 集合元素的最小值
        int n = Math.min(maxElements, count);
        int take = takeIndex;
        int i = 0;
        try {
            while (i < n) {
                @SuppressWarnings("unchecked")
                E x = (E) items[take];
                c.add(x);
                items[take] = null;
                if (++take == items.length)
                    take = 0;
                i++;
            }
            return n;
        } finally {
            // Restore invariants even if c.add() threw
            if (i > 0) {
                count -= i;
                // 修改队首位置
                takeIndex = take;
                if (itrs != null) {
                    if (count == 0)
                        itrs.queueIsEmpty();
                    else if (i > take)
                        itrs.takeIndexWrapped();
                }
                for (; i > 0 && lock.hasWaiters(notFull); i--)
                notFull.signal();
            }
        }
    } finally {
        lock.unlock();
    }
}

5. 单元测试


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class ArrayBlockingQueueStudy {
    public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);
        queue.put("1");
        queue.put("2");

        // true
        System.out.println(queue.offer("3"));
        // false
        System.out.println(queue.offer("4"));

        // 抛出异常
        // Exception in thread "main" java.lang.IllegalStateException: Queue full
        // at java.util.AbstractQueue.add(AbstractQueue.java:98)
        // at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:312)
        //  at arrayBlockingQueue.ArrayBlockingQueueStudy.main(ArrayBlockingQueueStudy.java:18)
        // System.out.println(queue.add("4"));

        // 阻塞
        // queue.put("4");

        // 等待1s 返回false
        System.out.println(queue.offer("4", 1, TimeUnit.SECONDS));

        System.out.println(queue.poll());
        System.out.println(queue.poll());
        System.out.println(queue.poll());
        // null
        System.out.println(queue.poll());

        // 阻塞
        // System.out.println(queue.take());

        // 等待1s 返回null
        System.out.println(queue.poll(1, TimeUnit.SECONDS));

        queue.put("1");
        // 1
        System.out.println(queue.peek());
        // 1
        System.out.println(queue.peek());

        queue.put("1");
        System.out.println(queue.remove("1"));
        // 1
        System.out.println(queue.peek());
        System.out.println(queue.remove("1"));
        // null
        System.out.println(queue.peek());

        queue.put("1");
        queue.put("1");
        queue.clear();
        // null
        System.out.println(queue.peek());

        List<String> c = new ArrayList<>();
        queue.put("1");
        queue.put("1");
        queue.drainTo(c);
        // 2 ["1","1"]
        System.out.println(c.size());
        // null
        System.out.println(queue.peek());
    }
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1486407.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【王道操作系统】ch1计算机系统概述-06虚拟机

文章目录 【王道操作系统】ch1计算机系统概述-06虚拟机01传统计算机02虚拟机的基本概念&#xff08;1&#xff09;第一类虚拟机管理程序&#xff08;2&#xff09; 第二类虚拟机管理程序&#xff08;3&#xff09; 两类虚拟机管理程序的对比 【王道操作系统】ch1计算机系统概述…

【Linux系统化学习】线程概念

目录 线程的概念 线程的引出 什么是线程 理解线程比进程更加的轻量化 线程的优点 现成的缺点 线程异常 线程用途 Linux进程VS线程 线程的简单现象 线程的概念 有关操作系统的书籍或者课本都会这样描述线程&#xff1a; 线程是比进程轻量化的一种执行流线程是进程内部…

[SWPUCTF 2021 新生赛]babyrce

先打开环境 分析代码&#xff0c;要给COOKIE赋值admin1 使用hackbar赋值 打开rasalghul.php 分析代码&#xff0c;使用GET传参url&#xff0c;如果url里没有/ /&#xff0c;则赋值给ip&#xff0c;然后通过shell_exec函数得到flag&#xff0c;否则&#xff0c;返回nonono。他…

备战蓝桥杯————递归反转单链表

当要求只反转单链表中的一部分时&#xff0c;递归实现确实具有一定的挑战性&#xff0c;但也是可行的。下面我将介绍一种递归实现的方法来反转单链表中的一部分。 一、反转链表 题目描述 给你单链表的头节点 head &#xff0c;请你反转链表&#xff0c;并返回反转后的链表。 示…

详解Win 7重置电脑操作步骤

文章目录 介绍Win 7 重置系统的方法&#xff1a;1.按下键盘上的Windows键和R键&#xff0c;打开运行窗口&#xff0c;输入sysprep 点击回车确定。2.之后就会出现如下界面&#xff0c;在这个新打开的窗口中双击 sysprep 程序3.选择【进入系统全新体验&#xff08;00BE) 】&#…

C++ sort排序

sort函数接受两个迭代器作为参数&#xff0c;分别表示要排序的范围的起始和结束位置。 请注意&#xff0c;sort函数默认使用小于运算符&#xff08;<&#xff09;来比较元素的顺序&#xff0c;默认从小到大排。 在这里&#xff0c;使用str.begin()和str.end()来表示整个字符…

【MDVRP多站点物流配送车辆路径规划问题(带容量限制)】基于遗传算法GA求解

课题名称&#xff1a;基于遗传算法求解带容量限制的多站点的物流配送路径问题MDVRP 版本时间&#xff1a;2023-03-12 代码获取方式&#xff1a;QQ&#xff1a;491052175 或者 私聊博主获取 模型描述&#xff1a; 15个城市中&#xff0c;其中北京&#xff0c;长沙和杭州三座…

springboot241基于SpringBoot+Vue的电商应用系统的设计与实现

基于SpringBootVue的电商应用系统的设计与实现 摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本电商应用系统就是在这样的大环境下诞生&#xff0c;其可以…

构建 ESLint 内存泄露检测插件入门:提升代码质量与防范运行时风险

前言 本文目的是介绍如何创建开发一个自定义规则 ESLint 插件。利用其能力,检测一些代码中可能存在的内存泄露并及时进行提示,避免潜在的后期影响。 本文实现其中一部分功能–检测事件监听器的使用是否存在内存泄露为例来演示基本的 ESLint 自定义规则插件开发的过程。用以…

排序算法——快速排序的非递归写法

快速排序的非递归 我们写快速排序的时候&#xff0c;通常用的递归的方法实现快速排序&#xff0c;那么有没有非递归的方法实现快速排序呢&#xff1f;肯定是有的。思想还是一样的&#xff0c;不过非递归是看似是非递归其实还是递归。 思路解释 快速排序的非递归使用的是栈这…

性能优越!|多策略改进的长鼻浣熊优化算法MSCOA(MATLAB)

文章来源于我的个人公众号&#xff1a;KAU的云实验台&#xff0c;主要更新智能优化算法的原理、应用、改进 ​函数测试(部分)&#xff1a; 仅运行MSCOA: 所有元启发式算法的共同点在于&#xff0c;其搜索过程可分为勘探和开发两个阶段。勘探阶段指算法对全局空间的搜索能力&…

kafka消费者重平衡是什么?怎么避免?

消费者重平衡是指主题下的分区怎么分配给消费者的过程。下面这个图可以看出该过程&#xff1a;原来有2个消费者&#xff0c;3个分区&#xff0c;其中一个消费者肯定就的处理2个分区了。那么当新加入消费者时&#xff0c;则每个消费者就只处理一个分区了。处理这个分区过程的叫协…

PyTorch-卷积神经网络

卷积神经网络 基本结构 首先解释一下什么是卷积&#xff0c;这个卷积当然不是数学上的卷积&#xff0c;这里的卷积其实表示的是一个三维的权重&#xff0c;这么解释起来可能不太理解&#xff0c;我们先看看卷积网络的基本结构。 通过上面的图我们清楚地了解到卷积网络和一般网…

CUDA学习笔记01:vs2019环境配置

为了在window11 vs2019下使用CUDA编程&#xff0c;配置了一下环境&#xff0c;但是我电脑一开始自带CUDA&#xff0c;然后再安装的vs2019&#xff0c;这样安装顺序上是不对的&#xff0c;vs2019找不到CUDA配置项&#xff0c;网上找了很多办法貌似都不好使而且很复杂。 那么最快…

2024东南大学553复试真题及笔记

2023年真题知识点 引用指针 题目为 传递一个指针的引用做修改&#xff0c;输出指针指向的结果&#xff0c;但是指针被修改&#xff0c;结果就不一样了。 static 静态变量 类里面的静态成员变量&#xff0c;很简单的题目 for循环 看循环的内容输出字符串 try try catch捕…

SandBox中的JavaAgent技术

8.1 JavaAgent Java Agent 是一种强大的技术&#xff0c;在运行时动态修改已加载类的字节码&#xff0c;为应用程序注入额外的功能和行为。 JDK 1.5 支持静态 Instrumentation&#xff0c;基本的思路是在 JVM 启动的时候添加一个代理&#xff08;javaagent&#xff09;&#…

【创作回顾】17个月峥嵘创作史

#里程碑专区#、#创作者纪念日# 还记得 2022 年 10 月 05 日&#xff0c;我在CSDN撰写了第 1 篇博客——《关于测试工程师瓶颈和突围的一个思考》&#xff0c;也是我在全网发布的第一篇技术文章。 回想当时&#xff0c;这一篇的诞生过程并不轻松&#xff0c;不像是一篇网络文章…

科技赋能,MTW400A为农村饮水安全打通“最后一公里”

日前&#xff0c;山东省政府纵深推进国家省级水网先导区建设&#xff0c;持续深化“水网”行动&#xff0c;着力构筑水安全保障网、水民生服务网、水生态保护网&#xff0c;建设水美乡村示范带、内河航运示范带、文旅融合示范带、绿色发展示范带&#xff0c;推动形成“三网四带…

【小沐学GIS】QGIS安装和入门使用

文章目录 1、简介2、下载和安装3、使用3.1 XYZ Tiles3.2 WMS / WMTS3.3 GeoJson文件加载 4、在线资源结语 1、简介 QGIS是一款开源地理信息系统。该项目于2002年5月诞生&#xff0c;同年6月作为SourceForge上的一个项目建立。QGIS目前运行在大多数Unix平台、Windows和macOS上。…

蓝桥杯-最大间隙

利用数组和求解最大值思想 #include <iostream> using namespace std; int a[1000]; int main() { int max0; int n; cin>>n; for(int i0;i<n;i) { cin>>a[i]; } for(int j1;j<n;j) { int ta[j]-a[j-1]; if(t>m…