BlockingQueue(阻塞队列)基本使用指南

news2024/12/28 2:59:28

概述

BlockingQueue 是 java.util.concurrent 包提供的用于解决并发生产者 - 消费者问题的最有用的类。

BlockingQueue 的特性是在任意时刻只有一个线程可以进行 take 或者 put 操作,并且 BlockingQueue 提供了超时 return null 的机制,在许多生产场景里都可以看到这个工具的身影。

BlockingQueue是一个接口,它的实现类有 ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、 LinkedBlockingQueue、PriorityBlockingQueue、 SynchronousQueue等,它们的区别主要体现在存储结构上或对元素 操作上的不同,但是对于 take 与 put 操作的原理,却是类似的。


队列类型

  • 无限队列(unbounded queue ) - 几乎可以无限增长
  • 有限队列(bounded queue ) - 定义了最大容量

队列数据结构

队列实质就是一种存储数据的结构

  • 通常用链表或者数组实现
  • 一般而言队列具备先进先出的特性,当然也有双端队列(Deque)优先级队列
  • 主要操作:入队(EnQueue)与出队(Dequeue)

在这里插入图片描述


BlockingQueue API

参考:阻塞队列BlockingQueue(JDK8)

继承关系:BlockingQueue extends Queue extends Collection

BlockingQueue 接口的所有方法可以分为两大类:负责向队列添加元素的方法和检索这些元素的方法。在队列满/空的情况下,来自这两个组的每个方法的行为都不同。

  • 添加元素 的常用方法:

    // 将指定的元素插入到队列的尾部,如果队列满了,那么会阻塞,直到队列中有元素被取出。注:不允许添加null元素
    void put(E e)
    // 如果插入成功则返回 true,若队列已满则抛出 IllegalStateException 异常  
    boolean add(E e)
    // 如果插入成功则返回 true,若队列已满则返回 false  
    boolean offer(E e)
    // 如果插入成功则返回 true,若队列已满则阻塞指定时间直到队列中有元素被取出,超时则返回 false
    boolean offer(E e, long timeout, TimeUnit unit)
    
  • 检索元素 的常用方法:

    // 获取队列的头部元素并将其删除,在队列为空时会阻塞,直到队列中有元素可取  
    E take()
    // 检索并删除队列的头部,如果没有元素,则等待指定时间以使元素可用,如果超时,则返回 null
    E poll(long timeout, TimeUnit unit)
    
    // 一次性从 BlockingQueue 获取所有可用的数据对象(可以指定获取数据的个数)
    // 注:可以提升获取数据效率;不需要多次分批加锁或释放锁
    int drainTo(Collection<? super E> c)
    int drainTo(Collection<? super E> c, int maxElements)
    
  • 还可以使用 Collection(集合)接口中的方法,例如:

    // 清空队列
    void clear()
    // 队列大小
    int size()
    // 可用大小
    int remainingCapacity()
    

常见的阻塞队列

  • ArrayBlockingQueue :由数组支持的有界队列
  • LinkedBlockingQueue :由链接节点支持的可选有界队列
  • PriorityBlockingQueue :由优先级堆支持的无界优先级队列
  • DelayQueue :由优先级堆支持的、基于时间的调度队列

ArrayBlockingQueue

介绍

队列基于数组实现,容量大小在创建 ArrayBlockingQueue 对象时指定。底层数组一旦创建了,容量就不能改变了,因此 ArrayBlockingQueue 是一个容量限制的阻塞队列。因此在队列满的时候执行入队会阻塞,在队列为空时出队也会阻塞。

数据结构如下图:

在这里插入图片描述

  • 队列创建:BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>();

  • **应用场景:**在线程池中有比较多的应用,生产者消费者场景

  • **工作原理:**基于 ReentrantLock 保证线程安全,根据 Condition 实现队列满时的阻塞

  • 主要变量:

    // 元素数组,其长度在构造方法中指定
    final Object[] items;
    
    // 队列中实际的元素数量
    int count;
    
    // 保护所有通道的主锁
    final ReentrantLock lock;
    
    // 等待take条件的队列
    private final Condition notEmpty;
    
    // 等待put条件的队列
    private final Condition notFull;
    

方法解析

  • put(E e) 方法

    public void put(E e) throws InterruptedException {
        // 检查元素是否为null,如果是,抛出NullPointerException
        Objects.requireNonNull(e);
        final ReentrantLock lock = this.lock;
        // 加锁
        lock.lockInterruptibly();
        try {
            // 当队里中的元素数量等于数组长度,则队列已满,阻塞,等待队列成为不满状态
            while (count == items.length)
                notFull.await();
            // 将元素入队
            enqueue(e);
        } finally {
            // 释放锁
            lock.unlock();
        }
    }
    

    put 方法总结:

    • ArrayBlockingQueue 不允许添加 null 元素;

    • ArrayBlockingQueue 在队列已满的时候,会调用 notFull.await(),释放锁并处于阻塞状态;

      一旦 ArrayBlockingQueue 在队列不满的时候,就立即入队。

  • E take() 方法

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        // 加锁
        lock.lockInterruptibly();
        try {
            // 当队列中元素数量为0时,则进入阻塞状态
            while (count == 0)
                notEmpty.await();
            // 队列不为空是,调用dequeue()出队
            return dequeue();
        } finally {
            // 释放锁
            lock.unlock();
        }
    }
    

    take 方法总结:

    • 取元素时,若队列为空,则调用 notEmpty.await(),进入阻塞状态,直至不为空时,调用 dequeue() 方法出队。

LinkedBlockingQueue

介绍

是一个基于链表的无界队列(理论上有界),向无限队列添加元素的所有操作正常情况下不会阻塞,因此它可以增长到非常大的容量。

  • 队列创建:BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();

    blockingQueue 如果在初始化时没有指定容量,那么容量默认为 Integer.MAX_VALUE

  • 注意:使用无限 BlockingQueue 设计生产者 - 消费者模型时,消费者需要能够像生产者向队列添加消息一样快地消费消息。否则,内存可能会填满,然后就会得到一个 OutOfMemory 异常。

  • 底层数据结构

    LinkedBlockingQueue 内部是使用链表实现一个队列的,但是有别于一般的队列,在于该队列至少是有一个节点的,头节点不含有元素。

    如果队列为空时,头节点的 next 参数为null,尾节点的 next 参数也为null

  • 主要变量

    // 容量限制,如果没有指定,则为 Integer.MAX_VALUE
    private final int capacity;
    
    // 当前队列中的元素数量
    // count只能在两个地方变化,一个是入队的方法(进行+1操作),另一个是出队的方法(进行-1操作),而AtomicInteger是原子安全的,所以也就确保了底层队列的数据同步。
    private final AtomicInteger count = new AtomicInteger();
    
    // 队列头节点,始终满足head.item == null
    transient Node<E> head;
    
    // 队列的尾节点,始终满足last.next == null
    private transient Node<E> last;
    
    // 由 take、poll 等持有的锁
    private final ReentrantLock takeLock = new ReentrantLock();
    
    // 当队列为空时,保存执行出队的线程
    private final Condition notEmpty = takeLock.newCondition();
    
    // 由 put、offer 等持有的锁
    private final ReentrantLock putLock = new ReentrantLock();
    
    // 当队列满时,保存执行入队的线程
    private final Condition notFull = putLock.newCondition();
    

方法解析

  • put(E e) 方法

    // 在此队列的尾部插入指定元素,如有必要,等待空间可用。
    public void put(E e) throws InterruptedException {
        // 不允许元素为null
        if (e == null) throw new NullPointerException();
        
        int c = -1;
        // 以当前元素新建一个节点
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        // 获得入队的锁
        putLock.lockInterruptibly();
        try {
            // 如果队列已满,那么将该线程加入到Condition的等待队列中
            while (count.get() == capacity) {
                notFull.await();
            }
            // 当队列未满,然后有出队线程取出导致,将节点入队
            enqueue(node);
            // 得到插入之前队列的元素个数。getAndIncrement返回的是 +1 前的值
            c = count.getAndIncrement();
            // 如果还可以插入元素,那么释放等待的入队线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            // 释放入队的锁
            putLock.unlock();
        }
        // 如果插入队列之前元素个数为0,插入后就通知出队线程队列非空
        if (c == 0)
            signalNotEmpty();
    }
    

    put 方法总结:

    • LinkedBlockingQueue 不允许插入的元素为 null;
    • 同一时刻只有一个线程可以进行入队操作,putLock 在将元素插入队列尾部时加锁了;
    • 如果队列满了,则会调用 notFull.await(),将该线程加入到 Condition 等待队列中。await 方法会释放线程占有的锁,这将导致之前由于被阻塞的入队线程将会获取到锁,执行到while循环处,不过可能因为队列仍旧是满的,也被进入到条件队列中;
    • 一旦有出队线程取走元素,就会通知到入队等待队列释放线程。那么第一个加入到 Condition 队列中的将会被释放,那么该线程将会重新获得 put 锁,继而执行 enqueue() 方法,将节点插入到队列的尾部;
    • 然后得到插入队列前元素的个数,如果插入后队列中还可以继续插入元素,那么就通知 notFull 条件的等待队列中的线程;
    • 如果插入队列前个数为 0,那现在插入后,就为 1 了,那就可以通知因为队列为空而导致阻塞的出队线程去取元素了。
  • E take() 方法

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        // 获取takeLock锁
        takeLock.lockInterruptibly();
        try {
            // 如果队列中元素数量为0,则将出队线程加入到notEmpty队列中进行等待
            while (count.get() == 0) {
                notEmpty.await();
            }
            // 得到到队头的元素
            x = dequeue();
            // 得到出队列前元素的个数。getAndDecrement返回的是 -1 前的值
            c = count.getAndDecrement();
            // 如果出队列前的元素数量大于1,那说明还可以继续取,那就释放在notEmpty队列的第一个线程
            if (c > 1)
                notEmpty.signal();
        } finally {
            // 释放出队锁
            takeLock.unlock();
        }
        // 如果出队前队列是满的,那现在取走一个元素了,队列就不满了,就可以去通知等待中的入队线程了。
        if (c == capacity)
            signalNotFull();
        return x;
    }
    

    take方法总结:

    • 同一时刻只有一个线程可以进行出队操作,takeLock 在出队之前加锁了;
    • 如果队列中元素为空,那就进入 notEmpty 队列中进行等待。直到队列不为空时,得到队列中的第一个元素。当发现取完发现还有元素可取时,再通知一下 notEmpty 队列中等待的其他线程。最后判断自己取元素前的是不是满的,如果是满的,那自己取完,就不满了,就可以通知在 notFull 队列中等待插入的线程进行 put 了。
  • remove() 方法

    用于删除队列中一个元素,如果队列中不含有该元素,那么返回 false ;有的话则删除并返回true。

    入队和出队都是只获取一个锁,而 remove()方法需要同时获得两把锁。

    public boolean remove(Object o) {
        // 因为队列不包含null元素,返回false
        if (o == null) return false;
        // 获取两把锁
        fullyLock();
        try {
            // 从头的下一个节点开始遍历
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                // 如果匹配,那么将节点从队列中移除,trail表示需要删除节点的前一节点
                if (o.equals(p.item)) {
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            // 释放两把锁
            fullyUnlock();
        }
    }
    
    /**
     * 锁定以防止 put 和 take.
     */
    void fullyLock() {
        putLock.lock();
        takeLock.lock();
    }
    
    /**
     * 解锁以允许 put 和 take.
     */
    void fullyUnlock() {
        takeLock.unlock();
        putLock.unlock();
    }
    

DelayQueue

由优先级堆支持的、基于时间的调度队列,内部基于无界队列 PriorityQueue 实现,而无界队列基于数组的扩容实现。

  • 队列创建:BlockingQueue<String> blockingQueue = new DelayQueue();

  • 要求:入队的对象必须要实现 Delayed 接口,而 Delayed 集成自 Comparable 接口

  • 应用场景:电影票

  • 工作原理:队列内部会根据时间优先级进行排序。延迟类线程池周期执行。


LinkedBlockingQueue 和 ArrayBlockingQueue 的区别

  • 底层实现不同

    LinkedBlockingQueue 底层实现是链表,ArrayBlockingQueue 底层实现是数组

  • 队列容量

    LinkedBlockingQueue 默认的队列长度是 Integer.Max,但是可以指定容量。在入队与出队都高并发的情况下,性能比ArrayBlockingQueue 高很多;

    ArrayBlockingQueue 必须在构造方法中指定队列长度,不可变。在只有入队高并发或出队高并发的情况下,因为操作数组,且不需要扩容,性能很高。

  • 锁的数量

    LinkedBlockingQueue 有两把锁,可以有两个线程同时进行入队和出队操作,但同时只能有一个线程进行入队或出队操作。

    ArrayBlockingQueue 只有一把锁,同时只能有一个线程进行入队和出队操作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/514282.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

哪些行业适合开发微信小程序?

随着电子商务的进一步发展&#xff0c;很多行业都开始开发自己的系统软件&#xff0c;企图通过线上线下结合的方式来达到更好的宣传效果&#xff0c;拓客引流。微信小程序凭借成本低、使用方便等优点成为很多商家的首选。那么究竟哪些行业适合开发微信小程序呢&#xff1f; …

springboot项目,localhost:port 可以访问,但是外网IP:port不行

springboot 项目启动后&#xff0c;localhost:port 可以访问&#xff0c;但是外网IP:port不行 现象原因及解决方法解决方法&#xff1a;规则1&#xff1a;规则2&#xff1a;规则3&#xff1a; 现象 springboot项目&#xff0c;localhost:port 可以访问&#xff0c;但是外网IP:…

Java多线程基础-9:代码案例之阻塞队列

阻塞队列是一种特殊的队列&#xff0c;带有“阻塞特性”&#xff0c;也遵守队列 “先进先出” 的原则。阻塞队列是一种线程安全的数据结构&#xff0c;并且具有以下特性&#xff1a; 当队列满时&#xff0c;继续入队列就会阻塞&#xff0c;直到有其他线程从队列中取走元素。当…

协程Flow原理

什么是Flow Flow直译过来就是“流”的意思&#xff0c;也就是将我们我们任务如同水流一样一步一步分割做处理。想象一下&#xff0c;现在有一个任务需要从山里取水来用你需要怎么做&#xff1f; 扛上扁担走几十里山路把水挑回来。简单粗暴&#xff0c;但是有可能当你走了几十…

Java项目经验二:二手车系统

1、项目简介 开发环境&#xff1a;IDEA MySQL JDK1.8 Git Maven 使用技术&#xff1a;Spring Cloud Mybatis Plus MySQL RocketMQ Nginx Nacos Redis MongoDB ElasticSearch Shiro 项目描述&#xff1a; XX二手车的服务贯穿二手车交易各个环节&#xff0c;运用成熟…

穿越数据智能“海峡”,企业更需要什么样的数智底座?

导读&#xff1a;更懂业务&#xff0c;是用友iuap数智中台多年打磨的核心能力。 如果将数智化转型比作企业的大航海旅程&#xff0c;数据和智能就像是大航海过程中企业必须穿越的海峡。随着数智化转型的不断深入&#xff0c;很多企业驶进数据智能海峡后发现&#xff0c;要用好数…

外观数列、文本左右对齐----2023/5/11

外观数列----2023/5/11 给定一个正整数 n &#xff0c;输出外观数列的第 n 项。 「外观数列」是一个整数序列&#xff0c;从数字 1 开始&#xff0c;序列中的每一项都是对前一项的描述。 你可以将其视作是由递归公式定义的数字字符串序列&#xff1a; countAndSay(1) “1”…

分享两款好用的软件

软件一&#xff1a;去水印神器——Inpaint Inpaint是一款功能强大的图像处理软件&#xff0c;它的主要功能是去除图片中的水印。除此之外&#xff0c;它还可以帮助用户修复照片中的缺陷&#xff0c;例如划痕、斑点、红眼等&#xff0c;删除照片中的不必要的元素&#xff0c;例…

Meta最新模型LLaMA详解(含部署+论文)

来源&#xff1a;投稿 作者&#xff1a;毛华庆 编辑&#xff1a;学姐 前言 本课程来自深度之眼《大模型——前沿论文带读训练营》公开课&#xff0c;部分截图来自课程视频。 文章标题&#xff1a;LLaMA: Open and Efficient Foundation Language Models 向量空间中词表示的有效…

ios打包ipa的四种实用方法(.app转.ipa)

总结一下&#xff0c;目前.app包转为.ipa包的方法有以下几种&#xff1a; 1、Apple推荐的方式&#xff0c;即实用xcode的archive功能 Xcode菜单栏->Product->Archive->三选一&#xff0c;一般选后两个。 局限性&#xff1a;个人开发一般采用这种方法&#xff0c;但…

linux内核网络子系统初探2---socket层

linux内核网络子系统初探2—socket层 一、内核网络socket层相关 接着上文&#xff0c;从这章开始&#xff0c;将按照五层网络模型的顺序逐层分析内核代码。 linux1.0网络协议栈部分代码如下&#xff1a; [rootlocalhost linux-1.0]# ls net/ ddi.c inet Makefile socket.…

uniapp安装uview-ui踏坑

1. 安装uview-ui npm install uview-ui -S 2. 创建vue.config.js 填写如下代码 module.exports {transpileDependencies: [uview-ui] }3. 配置main.js //uview import uView from "uview-ui" Vue.use(uView)4. App.vue中引入样式 <style lang"scss"…

文案智能改写-AI智能文章改写软件

随着人工智能技术的不断发展&#xff0c;越来越多的智能写作软件相继面世&#xff0c;其中&#xff0c;AI智能改写工具是一款非常有实用价值的工具。本文将从全自动批量改写、没有错别字和标准语法、支持图文模式改写、支持各种语言改写以及严格按照标准格式结构改写几个方面&a…

前端三剑客CSS篇——CSS选择器

初识CSS选择器 文章目录 初识CSS选择器CSS三大特征&#x1f44d;CSS的三种使用方法&#x1f44f;CSS常见选择器&#x1f440;标签选择器类选择器id选择器后代选择器属性选择器复合选择器 CSS代码风格&#x1f4dc; CSS是前端三剑客不可忽略的一部分&#xff0c;CSS对前端来说是…

知了汇智:坚持发展产教融合,做好高校、人才与企业之间的桥梁

6月将正式迎来高校毕业季&#xff0c;大学生就业是聚焦全社会关注的头等大事。5月9日&#xff0c;成都知了汇智科技有限公司&#xff08;以下简称“知了汇智”&#xff09;组织开展“深化产教融合、聚焦人才培养”的主题座谈会议&#xff0c;联动高校与合作企业参加&#xff0c…

天津专业python培训机构精选(犹豫不如学python)

说起python&#xff0c;有好多年轻开发者都学习过Python&#xff0c;而且到现在为止&#xff0c;还有好多人都在追着Python跑&#xff0c;即便其他语言也很优秀&#xff0c;但是对Python的爱真的是只增不减。接下来&#xff0c;小编就给大家浅说一下&#xff0c;为什么python这…

掌汇云创新鞋业会展,汇集专精特新企业,数字化连接上下游

国内&#xff1a;鞋业供需情况多变&#xff0c;对接难度较大。在一个基数庞大&#xff0c;且成长速度惊人的市场&#xff0c;要想快速地找到供应商显然不是一件简单的事&#xff1b; 国际&#xff1a;鞋业对于外贸的依赖程度很高&#xff0c;但是当前国际局势动荡&#xff0c;…

力扣 151. 反转字符串中的单词

一、题目描述 给你一个字符串 s&#xff0c;请你反转字符串中单词的顺序。 单词是由非空格字符组成的字符串。s 中使用至少一个空格将字符串中的单词分隔开。 返回单词顺序颠倒且单词之间用单个空格连接的结果字符串。 注意&#xff1a;输入字符串 s 中可能会存在前导空格、…

内存分段详解

内存分段 1.1 分段机制概述 1.1.1 分段机制产生的原因 对于分段机制&#xff0c;要从Intel的微处理器的8086开始说起&#xff0c;刚开始内存空间比较小&#xff0c;内存寻址采用的是直接访问物理地址的方式。由于技术的发展&#xff0c;计算机做的事情越来越多&#xff0c;程…

Connection closed, EOF detected错误

生产遇到了这个问题 原因&#xff1a;两个http网址&#xff0c;即没有启用SSL 解决方案 在 weblogic的启动文件 bin里面 的 setDomainEnv.sh 这个文件 。 加上这一句 set JAVA_OPTIONS%JAVA_OPTIONS% -DUseSunHttpHandlertrue