JUC阻塞队列BlockingQueue---ArrayBlockingQueue

news2024/12/22 20:04:52

JUC阻塞队列BlockingQueue---ArrayBlockingQueue

  • ArrayBlockingQueue
  • 示例代码
  • 原理
    • 构造方法
    • 内部常量
    • 入队put方法
    • 出队take方法

什么是阻塞队列?

ArrayBlockingQueue

ArrayBlockingQueue是典型的有界阻塞队列,其内部是用数组存储元素的,初始化时需要指定容量大小,利用 ReentrantLock 实现线程安全。

ArrayBlockingQueue可以用于实现数据缓存、限流、生产者-消费者模式等各种应用。

在生产者-消费者模型中使用时,如果生产速度和消费速度基本匹配的情况下,使用ArrayBlockingQueue是个不错选择;当如果生产速度远远大于消费速度,则会导致队列填满,大量生产线程被阻塞。

示例代码

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;

public class BlockingQueueExample {

    private static final int QUEUE_CAPACITY = 5;
    private static final int PRODUCER_DELAY_MS = 1000;
    private static final int CONSUMER_DELAY_MS = 2000;

    public static void main(String[] args) throws InterruptedException {
        // 创建一个容量为QUEUE_CAPACITY的阻塞队列
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);

        // 创建一个生产者线程
        new Thread(() -> {
            while (true) {
                try {
                    // 在队列满时阻塞
                    queue.put("producer");
                    System.out.println("生产了一个元素,队列中元素个数:" + queue.size());
                    Thread.sleep(PRODUCER_DELAY_MS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        // 创建一个消费者线程
        new Thread(() -> {
            while (true) {
                try {
                    // 在队列为空时阻塞
                    String element = queue.take();
                    System.out.println("消费了一个元素,队列中元素个数:" + queue.size());
                    Thread.sleep(CONSUMER_DELAY_MS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

原理

ArrayBlockingQueue使用独占锁ReentrantLock实现线程安全,入队和出队操作使用同一个锁对象,也就是只能有一个线程可以进行入队或者出队操作;这也就意味着生产者和消费者无法并行操作,在高并发场景下会成为性能瓶颈。

内部采用双指针对数组进行操作。使用双指针的好处在于可以避免数组的复制操作。如果使用单指针,每次删除元素时需要将后面的元素全部向前移动,这样会导致时间复杂度为 O(n)。而使用双指针,我们可以直接将 takeIndex 指向下一个元素,而不需要将其前面的元素全部向前移动。同样地,插入新的元素时,我们可以直接将新元素插入到 putIndex 所指向的位置,而不需要将其后面的元素全部向后移动。这样可以使得插入和删除的时间复杂度都是 O(1) 级别,提高了队列的性能。

双指针示意

构造方法

可以看到,在构建对象时,创建了一个独占锁ReentrantLock。同时,基于独占锁又创建了两个Condition,利用通知机制机进行阻塞控制。

    /**
     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
     * capacity and default access policy.
     *
     * @param capacity the capacity of this queue
     * @throws IllegalArgumentException if {@code capacity < 1}
     */
    // 指定队列大小,创建非公平锁 
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    /**
     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
     * capacity and the specified access policy.
     *
     * @param capacity the capacity of this queue
     * @param fair if {@code true} then queue accesses for threads blocked
     *        on insertion or removal, are processed in FIFO order;
     *        if {@code false} the access order is unspecified.
     * @throws IllegalArgumentException if {@code capacity < 1}
     */
    // 指定队列大小,指定是否使用公平锁  
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    /**
     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
     * capacity, the specified access policy and initially containing the
     * elements of the given collection,
     * added in traversal order of the collection's iterator.
     *
     * @param capacity the capacity of this queue
     * @param fair if {@code true} then queue accesses for threads blocked
     *        on insertion or removal, are processed in FIFO order;
     *        if {@code false} the access order is unspecified.
     * @param c the collection of elements to initially contain
     * @throws IllegalArgumentException if {@code capacity} is less than
     *         {@code c.size()}, or less than 1.
     * @throws NullPointerException if the specified collection or any
     *         of its elements are null
     */
    // 指定队列大小,指定是否使用公平锁,同时添加指定元素集 
    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

内部常量

可以看到,有两个常量用来定义为双指针索引。

    /** The queued items */
    //数据元素数组
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    //下一个待取出元素索引
    int takeIndex;

    /** items index for next put, offer, or add */
    //下一个待添加元素索引
    int putIndex;

    /** Number of elements in the queue */
    //元素个数
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
	//内部锁
    final ReentrantLock lock;

    /** Condition for waiting takes */
    //消费者
    private final Condition notEmpty;

    /** Condition for waiting puts */
    //生产者
    private final Condition notFull;

入队put方法

    /**
     * Inserts the specified element at the tail of this queue, waiting
     * for space to become available if the queue is full.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
    	// 判断入队元素是否为空,为空则抛出空指针异常
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        //加锁,如果线程中断抛出异常 
        lock.lockInterruptibly();
        try {
        	// 如果元素个数等于数组长度(队列满了)
            while (count == items.length)
            	// 等待出队后唤醒
                notFull.await();
           
            // 入队元素     
            enqueue(e);
        } finally {
            // 解锁
            lock.unlock();
        }
    }
    
    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        // 将元素放到putIndex索引处
        items[putIndex] = x;
        
        if (++putIndex == items.length)
        	// 精髓所在--环形数组:putIndex 指针到数组尽头了,返回头部
            putIndex = 0;
        // 队列内元素个数加1    
        count++;
                
        //notEmpty条件队列转同步队列,准备唤醒消费者线程,此时队列有数据    
        notEmpty.signal();
    }

出队take方法

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        
        //加锁,如果线程中断抛出异常 
        lock.lockInterruptibly();
        try {
			
			//如果队列为空,则消费者挂起
            while (count == 0)
                notEmpty.await();
            //出队    
            return dequeue();
        } finally {
        	// 解锁
            lock.unlock();
        }
    }
    /**
     * Extracts element at current take position, advances, and signals.
     * Call only when holding lock.
     */
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
		//取出takeIndex位置的元素
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
        	//设计的精髓-- 环形数组:takeIndex 指针到数组尽头了,返回头部
            takeIndex = 0;
        
        // 队列内元素个数减1    
        count--;
        if (itrs != null)
            itrs.elementDequeued();
            
        //notFull条件队列转同步队列,准备唤醒生产者线程,此时队列有空位    
        notFull.signal();
        return x;
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/681679.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

chatgpt赋能python:Python程序安装指南

Python程序安装指南 介绍 Python 是一种高级编程语言&#xff0c;广泛应用于数据分析、人工智能、Web开发等领域。安装 Python 程序是使用 Python 和运行 Python 脚本的必要步骤。在本文中&#xff0c;我们将提供详细的 Python 程序安装指南。 安装Python程序步骤 步骤1: 下…

1.1-python课程简介

一、python入门 1、python来源2、什么是python3、python编程软件下载4、python软件安装5、python软件运行和调试 1、python来源 Guido van Rossum 于1989年在荷兰国家数学和计算机科学研究所设计出来的。 2、什么是python Python 是一个高层次的结合了解释性、编译性、互动性…

【推荐】win 安装 rust 1.70 (GNU)

目录 一、下载二、安装三、配置环境变量四、检查是否安装成功五、参考文章 一、下载 官网地址&#xff1a;https://www.rust-lang.org/zh-CN/ https://forge.rust-lang.org/infra/other-installation-methods.html 历史版本下载地址&#xff1a; 二、安装 注意&#xff1a;安…

TypeScript ~ TS Webpack构建工具 ⑦

作者 : SYFStrive 博客首页 : HomePage &#x1f4dc;&#xff1a; TypeScript ~ TS &#x1f4cc;&#xff1a;个人社区&#xff08;欢迎大佬们加入&#xff09; &#x1f449;&#xff1a;社区链接&#x1f517; &#x1f4cc;&#xff1a;觉得文章不错可以点点关注 &…

chatgpt赋能python:Python相加:实现快速、高效的计算

Python相加&#xff1a; 实现快速、高效的计算 Python 运用广泛&#xff0c;是一种功能强大的编程语言。它不仅易于学习&#xff0c;而且具备许多强大的功能&#xff0c;其中包括 Python 相加。今天&#xff0c;我们将介绍如何写 Python 相加的代码&#xff0c;以及如何实现快…

chatgpt赋能python:如何将Python程序打包成App-一个详细的指南

如何将Python程序打包成App - 一个详细的指南 如果你是一位有着丰富Python编程经验的开发者&#xff0c;可能你听说过Python App打包。Python App打包就是将Python程序打包成为操作系统所支持的应用程序的过程。这个过程可以让你的Python程序在Windows、Mac和Linux操作系统上更…

Linux网络、进程

一、网络环境配置 第一种方法&#xff1a;自动获取 登陆后&#xff0c;通过界面来设置自动获取IP&#xff0c;特点&#xff1a;linux启动后会自动获取IP&#xff0c;缺点是每次自动获取的IP地址可能不一样。这种就不适合服务器使用 第二种方法&#xff1a;指定IP 直接修改配置…

【Leetcode60天带刷】day24 回溯算法—— 77. 组合

​ 理论基础&#xff1a; 什么是回溯法&#xff1f; 回溯法也可以叫做回溯搜索法&#xff0c;它是一种搜索的方式。 回溯法解决的问题 回溯法&#xff0c;一般可以解决如下几种问题&#xff1a; 组合问题&#xff1a;N个数里面按一定规则找出k个数的集合切割问题&#xff…

chatgpt赋能python:Python程序如何打包成安装包

Python程序如何打包成安装包 Python是一种受欢迎的编程语言&#xff0c;许多开发者使用它构建各种类型的应用程序。然而&#xff0c;当需要分发Python应用程序时&#xff0c;将所有代码和依赖项打包成新的安装包通常是最佳选择。本文将介绍如何使用Python打包工具将Python程序…

Java并发工具Fork/Join原理

我们一直讲&#xff0c;并发编程可以分为三个层面的问题&#xff0c;分别是分工、协作和互斥&#xff0c;当你关注于任务的时候&#xff0c;你会发现你的视角已经从并发编程的细节中跳出来了&#xff0c;你应用的更多的是现实世界的思维模式&#xff0c;类比的往往是现实世界里…

Golang每日一练(leetDay0107) 去除重复字母、最大单词长度乘积

目录 316. 去除重复字母 Remove Duplicate Letters &#x1f31f;&#x1f31f; 318. 最大单词长度乘积 Maximum-product-of-word-lengths &#x1f31f;&#x1f31f; &#x1f31f; 每日一练刷题专栏 &#x1f31f; Rust每日一练 专栏 Golang每日一练 专栏 Python每日…

【Less】四则运算

Less四则运算特点 对于两个不同单位值之间的运算&#xff0c;不要求你进行运算操作的几个值必须带单位&#xff0c;只要其中有一个有单位就可以了&#xff0c;运算结果的值会优先取第一个值的单位为准。如&#xff1a; 2030px-10em编译成40px。border:10em; border:(border2px)…

数据挖掘——宁县(区、市)农村居民人均可支配收入影响因子分析(论文)

《数据挖掘与分析》课程论文 题目&#xff1a;宁县&#xff08;区、市&#xff09;农村居民人均可支配收入影响因子分析 xx学院xx班&#xff1a;xxx 2022年6月 摘要&#xff1a;农村居民人均可支配收入可能被农作物产量、牲畜存栏、农作物播种数量等诸多因素影响。为此&#…

JavaSE基础语法--封装

Java是一门面向对象的语言。面向对象的三大特性&#xff1a;封装&#xff0c;继承&#xff0c;多态。封装到底是什么含义呢&#xff1f;通俗来讲就是屏蔽掉类的实现细节&#xff0c;对外提供接口让你调用。举个现实生活中的例子&#xff1a; 刚好618刚过&#xff0c;我因为需求…

chatgpt赋能python:Python算和的重要性及优势

Python算和的重要性及优势 在现代科技时代&#xff0c;计算机的应用范围越来越广泛&#xff0c;Python算和作为一种高效而强大的计算工具&#xff0c;已经成为了无数科学家和工程师的必备技能。Python算和不仅仅在各类科学实验中有着重要的应用&#xff0c;也在企业开发、数据…

chatgpt赋能python:Python程序一直运行怎么停止?

Python程序一直运行怎么停止&#xff1f; 在开发软件时&#xff0c;有时候我们会遇到Python程序一直运行不停止的情况&#xff0c;这时候我们该如何解决呢&#xff1f;本文将介绍一些常见的方法帮助您停止Python程序。 常见的停止Python程序的方法 1. KeyboardInterrupt&…

【硬件5】vr电源芯片驱动

文章目录 1.读MPS5023芯片&#xff1a;0x03ff即将前6位屏蔽2.读PXE1410CDM电压和电流&#xff1a;一个数&0x7ff&#xff0c;将这个数前5位全变为0&#xff0c;其余位不变2.1 1ine11&#xff1a;先看第15和10位&#xff0c;e9b6是上面读出的值2.2 1ine16&#xff1a;PMBUS协…

chatgpt赋能python:Python空循环:提高代码效率的神器

Python空循环&#xff1a;提高代码效率的神器 Python作为一门高效、易学的编程语言&#xff0c;广泛应用于各行各业。在编写Python代码时&#xff0c;循环结构是经常使用的。但是&#xff0c;有时候我们需要使用循环结构&#xff0c;但并不需要执行任何操作。这时候&#xff0…

chatgpt赋能python:Python中的空格:一种重要的编程元素

Python中的空格&#xff1a;一种重要的编程元素 在Python编程中&#xff0c;空格是被广泛使用的重要元素之一。本文将介绍Python中空格的重要性&#xff0c;并探讨空格在编程中的不同应用。 为什么空格在Python编程中如此重要&#xff1f; Python对空格敏感&#xff0c;意味…

CVPR2023 多目标跟踪(MOT)汇总

一、《OVTrack: Open-Vocabulary Multiple Object Tracking》 作者:Siyuan Li* Tobias Fischer* Lei Ke Henghui Ding Martin Danelljan Fisher Yu Computer Vision Lab, ETH Zurich 论文链接 &#xff1a;https://openaccess.thecvf.com/content/CVPR2023/papers/Li_OVTrack…