【JUC系列-10】深入理解ArrayBlockingQueue的底层原理

news2024/12/27 11:22:34

JUC系列整体栏目


内容链接地址
【一】深入理解JMM内存模型的底层实现原理https://zhenghuisheng.blog.csdn.net/article/details/132400429
【二】深入理解CAS底层原理和基本使用https://blog.csdn.net/zhenghuishengq/article/details/132478786
【三】熟练掌握Atomic原子系列基本使用https://blog.csdn.net/zhenghuishengq/article/details/132543379
【四】精通Synchronized底层的实现原理https://blog.csdn.net/zhenghuishengq/article/details/132740980
【五】通过源码分析AQS和ReentrantLock的底层原理https://blog.csdn.net/zhenghuishengq/article/details/132857564
【六】深入理解Semaphore底层原理和基本使用https://blog.csdn.net/zhenghuishengq/article/details/132908068
【七】深入理解CountDownLatch底层原理和基本使用https://blog.csdn.net/zhenghuishengq/article/details/133343440
【八】深入理解CyclicBarrier底层原理和基本使用https://blog.csdn.net/zhenghuishengq/article/details/133378623
【九】深入理解ReentrantReadWriteLock 读写锁的底层实现https://blog.csdn.net/zhenghuishengq/article/details/133629550
【十】深入理解ArrayBlockingQueue的底层实现https://blog.csdn.net/zhenghuishengq/article/details/133692023

深入理解ArrayBlockingQueue的底层实现

  • 一,深入理解ArrayBlockingQueue的底层实现
    • 1,阻塞队列的api基本使用
    • 2,ArrayBlockingQueue的基本使用
    • 3,ArrayBlockingQueue的底层源码
      • 3.1,ArrayBlockingQueue的基本属性
      • 3.2,put入队操作
      • 3.3,出队操作
    • 4,总结

一,深入理解ArrayBlockingQueue的底层实现

在理解阻塞队列BlockingQueue之前,先理解Queue的特性,Queue是作为一种经典的数据结构使用的,与之相对应的的就是栈。队列采用的是先进先出的策略模式,在Java中也有Queue的具体实现,内部也有着一些对应的api,如add,remove入队出队等

public interface Queue<E> extends Collection<E> {...}

在Queue这个接口内有一个子类,就是本文要讲解的主角 BlockingQueue ,在原来的Queue基础上,新增了两个附加操作put和take ,这两个操作可支持阻塞,而在阻塞队列中有多个阻塞队列的具体实现,因此本文主要是讲解 ArrayBlockingQueue 的具体使用

public interface BlockingQueue<E> extends Queue<E> {
        void put(E e) throws InterruptedException;
        E take() throws InterruptedException;
}

1,阻塞队列的api基本使用

在阻塞队列中,有以下的api可以使用,如一些入队,出队等操作,但是同一个操作会有多个方法

支持将数据加入到队列,但是不支持阻塞的方法有下面两种

boolean add(E e);		//数据入队,队列已满则抛出异常
boolean offer(E e);		//数据入队,队列已满则返回false

支持将数据加入到队列,但是支持阻塞的方法有下面两种,表示的是队列未满则插入,队列已满则阻塞

//数据入队,线程阻塞
void put(E e) throws InterruptedException;
//数据入队,线程阻塞一段时间
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;

支持数据出队,但是不支持阻塞的方法有以下

boolean remove(Object o);	//数据出队,队列为空则抛异常

支持数据出队,同时支持数据阻塞的方法有以下,表示的是队列有数据则删除,队列没有数据则阻塞

//数据出队,允许中断,线程阻塞一段时间
E poll(long timeout, TimeUnit unit) throws InterruptedException;
//数据出队,线程阻塞
E take() throws InterruptedException;

2,ArrayBlockingQueue的基本使用

上面的这些方法是阻塞队列的通用方法,因此下面主要针对这个 ArrayBlockingQueue 来讲解内部的使用。举一个简单的生产者和消费者模型,来描述这个ArrayBlockingQueue的使用

首先定义一个产品类,内部的属性变量比较简单

/**
 * @Author: zhenghuisheng
 * @Date: 2023/10/8 20:24
 */
@Data
public class Product {
    private Integer id;
    private String productName;
}

随后定义一个生产者Producer的线程任务类,内部用于生产产品,并将生产的产品加入到阻塞队列中


/**
 * 生产者线程
 * @Author: zhenghuisheng
 * @Date: 2023/10/8 20:21
 */
@Data
public class Producer implements Runnable {
    private ArrayBlockingQueue arrayBlockingQueue;
    public Producer(ArrayBlockingQueue arrayBlockingQueue){
        this.arrayBlockingQueue = arrayBlockingQueue;
    }
    @Override
    public void run() {
        for (int i = 0; i < 5; i++) {
            Product product = new Product();
            product.setId(i);
            product.setProductName("商品" + i + "号");
            try {
                //加入阻塞队列
                arrayBlockingQueue.put(product);
                System.out.println("生产者"  + i + "号生产完毕");
                Thread.sleep(50);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

随后创建一个消费者Consumer线程的任务类,主要用于消费者消费线程

/**
 * 消费者线程
 * @Author: zhenghuisheng
 * @Date: 2023/10/8 20:21
 */
@Data
public class Consumer implements Runnable {
    private ArrayBlockingQueue arrayBlockingQueue;
    public Consumer(ArrayBlockingQueue arrayBlockingQueue){
        this.arrayBlockingQueue = arrayBlockingQueue;
    }
    @Override
    public void run() {
        for (int i = 0; i < 5; i++) {
            try {
                //消费者消费
                Object take = arrayBlockingQueue.take();
                System.out.println(take);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        System.out.println("消费者消费完毕");
    }
}

随后创建一个线程池的根据类,用于定义线程池的各个参数以及初始化线程池

/**
 * 线程池工具
 * @author zhenghuisheng
 * @date : 2023/3/22
 */
public class ThreadPoolUtil {

    /**
     * io密集型:最大核心线程数为2N,可以给cpu更好的轮换,
     *           核心线程数不超过2N即可,可以适当留点空间
     * cpu密集型:最大核心线程数为N或者N+1,N可以充分利用cpu资源,N加1是为了防止缺页造成cpu空闲,
     *           核心线程数不超过N+1即可
     * 使用线程池的时机:1,单个任务处理时间比较短 2,需要处理的任务数量很大
     */

    public static synchronized ThreadPoolExecutor getThreadPool() {
        if (pool == null) {
            //获取当前机器的cpu
            int cpuNum = Runtime.getRuntime().availableProcessors();
            log.info("当前机器的cpu的个数为:" + cpuNum);
            int maximumPoolSize = cpuNum * 2 ;
            pool = new ThreadPoolExecutor(
                    maximumPoolSize - 2,
                    maximumPoolSize,
                    5L,   //5s
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(),  //数组有界队列
                    Executors.defaultThreadFactory(), //默认的线程工厂
                    new ThreadPoolExecutor.AbortPolicy());  //直接抛异常,默认异常
        }
        return pool;
    }

}

最后在创建一个测试类作为主类的入口,随后就可以看到生产者消费者各个的消费信息了

/**
 * @Author: zhenghuisheng
 * @Date: 2023/10/8 20:27
 */
public class ArrayBlockingQueueDemo {
    //创建一个线程池
    static ThreadPoolExecutor pool = ThreadPoolUtil.getThreadPool();
    //创建一个全局阻塞队列
    private static ArrayBlockingQueue queue = new ArrayBlockingQueue(16);
    public static void main(String[] args) throws InterruptedException {
        //创建生产者线程
        Producer producer = new Producer(queue);
        //创建消费者线程
        Consumer consumer = new Consumer(queue);
        //线程加入线程池
        pool.execute(producer);
        pool.execute(consumer);
        Thread.sleep(10000);
        System.exit(0);
    }
}

其打印结果如下,乱序是没多大问题的

Product(id=0, productName=商品0号)
生产者0号生产完毕
生产者1号生产完毕
Product(id=1, productName=商品1号)
Product(id=2, productName=商品2号)
生产者2号生产完毕
生产者3号生产完毕
Product(id=3, productName=商品3号)
Product(id=4, productName=商品4号)
生产者4号生产完毕

3,ArrayBlockingQueue的底层源码

3.1,ArrayBlockingQueue的基本属性

在讲解这个类的源码之前,先看一下这个类的继承以及这个类中重要的一些属性,该类是继承了一个抽象的队列,并且是BlockingQueue的一个具体的实现

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>{}

在这个类中,有一个ReentrantLock锁,说明这个ArrayBlockingQueue的底层是通过这个互斥锁实现的,并且还引入了两个条件等待队列,很明显一个是队列满了put的时候阻塞,一个是队列空了take的阻塞。根据前面几篇的AQS的文章的讲解,很容易想到这两个条件队列的作用;用的ReentrantLock锁,也大概得可以知道take和put操作时互斥的,因此在性能上肯定是会出现问题的

//互斥锁
final ReentrantLock lock;
//为空时take的条件队列
private final Condition notEmpty;
//满时put的条件队列
private final Condition notFull;

除了上面这些属性之外,还有一些关于数组的定义,删除到哪个位置,添加到那个数组的索引位置,这些都是有记录的。由于数组是一块连续的空间,并且底层是通过队列的方式实现,因此根据先进先出原则,进来的数据都是在队尾,删除的都是队头的数据,整个操作都是对这个数组进行操作的

final Object[] items;	//数组
int takeIndex;			//前驱指针,用于记录删除到了哪个结点
int putIndex;			//后继指针,用于记录添加到了哪个结点
int count;				//数组数量

大概就是这么回事,takeindex记录的是出队到哪个位置的下标,putindex记录的入队到哪个位置的下标

请添加图片描述

最后再看看这个类的构造器方法,在初始化一个ArrayBlockingQueue时,就会将数组以及容量,还有两个条件队列全部构建完成,并且这把ReentrantLock互斥锁使用的是公平锁

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];	//初始化一个数组
    lock = new ReentrantLock(fair);		//初始化一把互斥锁
    notEmpty = lock.newCondition();		
    notFull =  lock.newCondition();
}

3.2,put入队操作

在初步的了解完上面的这些属性之后,那么接下来主要讲解一下阻塞队列入队的操作,那么直接看这个put方法

//入队操作
public void put(E e) throws InterruptedException {
    checkNotNull(e);	//判断是否为空的操作
    final ReentrantLock lock = this.lock;	//获取全局的互斥锁
    lock.lockInterruptibly();				//可中断锁
    try {
        while (count == items.length)	//当队列满的时候
            notFull.await();			//线程阻塞
        enqueue(e);						//如数组没满,则入队
    } finally {
        lock.unlock();					//解锁
    }
}

入队的逻辑也比较简单,将值加入到数组中,并将putindex的值+1,数组中的结点数+1,最后调用signal唤醒数组为空时被阻塞的线程,将Empty条件等待队列的结点加入到同步等待队列中,最后被put中的finally中的unlock方法给唤醒

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;	//添加数据
    if (++putIndex == items.length)	
        putIndex = 0;
    count++;
    notEmpty.signal();
}

在这里有一个重点方法,就是上面的这一句,当长度达到外部设置的长度之后,不会进行一个扩容操作,而是直接通过追加写的方式,从头开始。这就不禁有一个疑问了,为何这样设计,在印象中通过这种方式设计的结构,那么久只有环状的数组了

if (++putIndex == items.length)	
    putIndex = 0;

我直接好家伙,简直太精髓了,因为数组在删除数据的时候,那么就会有大量的数据需要移动,那么无论在空间还是时间都会产生一定的成本,因此为了优化这一步骤,直接采用环状的形式。

比如take一个数据,由于是要保证先进先出的原则,那么就需要删除头结点的数据,那么后面的全部数据都要发生一次移动,那不会影响出队的效率问题吗,因此这就不得不说这个takeindex的作用了

请添加图片描述

如果在数据出队后,采用环状的数据结构,那么就不需要因为删除数组的前面数据而移动数组后面的数据,只需要修改这个takeindex的指向就可以了,这样就成功的优化了出队的效率,从而减少大量数据的移动,不得不说真的是太强了

请添加图片描述

详细的可以参考本人写的JUC系列8,循环屏障的的底层实现原理,讲解的更加的详细,从条件队列转换到同步队列的逻辑都讲解的比较清楚

3.3,出队操作

接下来查看数据出队的操作,如下面的take方法,也会先获取这把全局的互斥锁,随后会判断队列是否为空,然后再入队的操作

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;	//获取全局的互斥锁
    lock.lockInterruptibly();				//允许锁可中断
    try {
        while (count == 0)					//如果队列中数据为空
            notEmpty.await();				//阻塞
        return dequeue();					//队列中数据不为空,出队
    } finally {
        lock.unlock();						//解锁
    }
}

如果队列的数据不为空,则进入出队的逻辑,即这个dequeue方法。这里面的逻辑也比较简单,上面说了使用环状+修改下标的位置来解决这个大量数据移动的问题,就是通过这段代码来实现的。在将数据取出完成之后,说明此时队列处于没满的状态,那么就会唤醒因为满了而加入条件队列的数据,从而从条件队列转移到同步队列,最后通过take方法的finally方法中的unlock将结点唤醒

private E dequeue() {
    final Object[] items = this.items;	//获取这个数组
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;	//将头结点数据置为空
    if (++takeIndex == items.length)	//takeindex加1,修改指针指向位置
        takeIndex = 0;	//环状数组
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();	//唤醒因数组满而加入到条件队列的数据
    return x;
}

4,总结

ArrayBlockingQueue内部主要是通过数组的方式实现的阻塞队列,内部采用的是环状数组,并且记录了入队和出队的两个下标,从而优化在数据出队时产生数据大量移动的问题。队列保证先进先出原则

ArrayBlockingQueue内部用了ReentrantLock互斥锁和两个条件队列组成,在put加入数据时,如果队列满了则会将这个线程加入条件队列,在take取出数据时,如果数组数据为空也会将这个线程加入条件队列。put和take成功都会唤醒另一个条件队列的线程

ArrayBlockingQueue更加的适用于生产者和消费者的模型,并且生产速度和消费速度比较接近的情况下使用。并且生产者和消费者共用一把互斥锁,没有单一的职责,即不能并行的工作,那么在高并发的场景下,是会会存在的一定的瓶颈的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1072223.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

什么是研发效能DevOps?研发效能方程式又是什么?

研发效能DevOps是一种理念一个方法&#xff0c;旨在通过优化软件开发、运营和维护的流程&#xff0c;实现高效、高质量、快速的价值交付。 研发效能需要解决的是&#xff1a;如何平衡价值、效率、成本这三者的关系&#xff0c;同时寻求可持续发展。研发效能的目标是持续低成本…

多因素共同作用,行业格局或将发生变化

上海亚商投顾前言&#xff1a;无惧大盘涨跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。 1、政经环境改善&#xff0c;数控机床有望走出寒冬 近年来&#xff0c;国家不断提高对于高端数控机床的扶持力度与关注。长…

高效解决 TypeError : ‘ numpy._DTypeMeta‘ object is not subscriptable 问题

文章目录 问题描述解决问题 问题描述 解决问题 参考博文 打开报错位置 AppData\Roaming\Python\Python39\site-packages\cv2\typing\ 添加single-quotes&#xff0c;即单引号 博主说The trick is to use single-quotes to avoid the infamous TypeError: ‘numpy._DTypeMeta’…

ChatGPT可以用于写留学文书嘛?对留学生有什么影响?

自从4月份ChatGPT4.0发布以后&#xff0c;越来越多的AI工具被应用于我们的日常生活当中&#xff0c;而对于学生来说&#xff0c;尤其是在读的留学生朋友&#xff0c;或多或少都使用过&#xff0c;无论是写文章综述还是项目总结&#xff0c;都有十分出色的效果。 01.ChatGPT 可…

算法题:买卖股票的最佳时机含手续费(动态规划解法贪心解法-详解)

这道题有两种解法&#xff1a;动态规划 or 贪心算法。 贪心算法的提交结果要比动态规划好一些&#xff0c;总体上动态规划的解法更容易想到。&#xff08;完整题目附在了最后&#xff09; 1、动态规划解法 设置两个数&#xff0c;dp[0]表示遍历到股票prices[i]时手里没有股…

SSM整合RabbitMQ,Spring4.x整合RabbitMQ

SSM整合RabbitMQ目录 前言版本实现目录参考pom.xml依赖rabbitmq.properties配置文件spring-rabbitmq.xmlspring-mvc.xml或applicationContext.xmlrabbitmq目录下MessageConsumer.javaMessageConsumer2.javaMessageProducer.javaMessageConstant.java 测试调用 扩展消息重发 前言…

飞桨大模型套件:一站式体验,性能极致,生态兼容

在Wave Summit 2023深度学习开发者大会上&#xff0c;来自百度的资深研发工程师贺思俊和王冠中带来的分享主题是&#xff1a;飞桨大模型套件&#xff0c;一站式体验&#xff0c;性能极致&#xff0c;生态兼容。 大语言模型套件PaddleNLP 众所周知PaddleNLP并不是一个全新的模型…

Google-CTF-2016-Stego.pcap数据包解析

Google-CTF-2016&#xff08;a-cute-stegosaurus-100&#xff09; 前言&#xff1a;别人发的题目 随便看看 记录一下解题过程&#xff01; 知识点: 在报文段中有 6Bit 的状态控制码&#xff0c; 分别如下tcp URG&#xff1a;紧急比特&#xff08;urgent&#xff09;&#x…

机械臂抓取的产业落地进展与思考

工业机械臂是一种能够模拟人类手臂动作的机械装置&#xff0c;具有高精度、高速度和高灵活性的特点。近年来&#xff0c;随着人工智能和机器人技术的快速发展&#xff0c;机械臂在工业生产、物流仓储、医疗护理等领域得到了广泛应用。机械臂抓取技术作为机械臂的核心功能之一&a…

C语言进阶---程序环境和预处理

C语言进阶---程序环境和预处理 前言一、程序的翻译环境、执行环境二、详解&#xff1a;C语言程序的编译链接三、预定义符号介绍四、预处理指令 #define五、宏和函数的对比&#xff08;思维导图&#xff09;六、命令定义、预处理指令 #include #undef1.命名约定2.命令行定义 七、…

撤销git本地修改(万能)

使用 git reflog 和 git reset 命令。 git reflog 命令可以查看 Git 中所有的提交历史和分支移动情况&#xff0c;包括已经删除的提交。 您可以通过这个命令找到git操作时间线上的某一个节点&#xff0c;也就是git提交快照的Hash值。 假设您要回滚的提交的哈希值是 e9769f5…

轻松实现时间录入自由!如何在Microsoft Word中轻松插入格式化的日期和时间

在文档中插入当前日期和时间有几个原因。你可能希望将其插入信函或页眉或页脚中。无论是什么原因&#xff0c;Word都可以轻松地将日期和时间插入文档。 如果希望在打开或打印文档时自动更新日期和时间&#xff0c;可以将其作为自动更新的字段插入。该字段也可以随时手动更新。…

【虚拟机】桥接模式下访问外网

目录 一、桥接模式的作用原理 二、配置桥接模式实现外网访问 1、设置 VMnet0 要桥接的网卡 2、虚拟机选择 VMnet0 网卡 3、手动配置虚拟机IP 一、桥接模式的作用原理 桥接模式相当于在当前局域网里创立了一个单独的主机&#xff0c;该主机桥接到宿主主机的网卡&#xff0…

TPU编程竞赛|Stable Diffusion大模型巅峰对决,第五届全球校园人工智能算法精英赛正式启动!

目录 赛题介绍 赛题背景 赛题任务 赛程安排 评分机制 奖项设置 近日&#xff0c;2023第五届全球校园人工智能算法精英赛正式开启报名。作为赛题合作方&#xff0c;算丰承办了“算法专项赛”赛道&#xff0c;提供赛题「面向Stable Diffusion的图像提示语优化」&#xff0c…

ORB-SLAM2运行自己的数据集进行定位教程

ORB-SLAM2只做定位的话&#xff0c;精度还是挺准确的&#xff0c;所以用单目摄像头录制视频&#xff0c;制作自己的数据集跑一下&#xff0c;看看定位精度&#xff0c;将过程加以记录。 文章目录 一、系统配置二、制作数据集1、脚本编写2、配置文件编写3、录制视频素材&#x…

亚马逊、速卖通卖家旺季攻略:抢占旺季销售先机!

随着11月的到来&#xff0c;海外跨境电商又将迎来一年中最重要的营销季节。 11月营销节点 1. 黑色星期五&#xff08;Black Friday&#xff09;&#xff1a;11月的第四个星期五 作为西方传统节日&#xff0c;黑色星期五通常位于11月第四个星期五&#xff0c;是购物狂欢的黄金…

LeetCode刷题笔记【35】:动态规划专题-7(爬楼梯、零钱兑换、完全平方数)

文章目录 前置知识70. 爬楼梯 &#xff08;进阶&#xff09;题目描述解题思路代码 322. 零钱兑换题目描述解题思路代码 279.完全平方数题目描述解题思路代码 总结 前置知识 今天的三道题都聚焦完全背包问题, 关于完全背包, 基础性的思路可以参考上一篇文章 本文的很多操作就是…

4.Docker 搭建 redis6

1.下载redis docker pull redis:6.2.62.创建需要挂载的宿主机文件夹 mkdir -p /data/redis/conf mkdir -p /data/redis/data3.配置redis 切换到/data/redis/conf文件夹下&#xff0c;创建redis.conf,复制redis.conf配置文件内容到redis.conf文件中&#xff0c;然后按下键盘 …

多层包的java程序使用命令行编译、运行、打包

对于没有包层级的java程序&#xff0c;用命令行进行编译、运行、打包很简单。对于多层级包的java程序会有所不同。以如下程序为例 package HeadFirstJava.chapter01.guessGame;public class GameLanucher {public static void main(String[] args) {GuessGame game new Guess…

CentOS停服遭替代,这些操作差异,你了解了吗?

背景 随着 CentOS停服&#xff0c;各个行业的运维都在寻找各自的替代方案&#xff0c;考虑的出发点有&#xff1a; 新操作系统是否兼容CentOS&#xff0c;避免太大的操作差异&#xff1b; 新操作系统是否为信创&#xff0c;其具体收费情况如何&#xff1b; 新操作系统是否支…