【JUC进阶】11. BlockingQueue

news2024/11/15 13:52:31

目录

1、前言

2、BlockingQueue

2.1、ArrayBlockingQueue

2.1.1、take()

2.1.2、put()

2.2、LinkedBlockingQueue

2.3、PriorityBlockingQueue

2.4、SynchronousQueue

3、简单使用

3.1、创建ArrayBlockingQueue

3.2、Demo


1、前言

对于并发程序而言,高性能自然是一个我们需要追求的目标,但多线程的开发模式还会引入一个问题,那就是如何进行多个线程间的数据交换和共享呢?而JUC库中提供了多种并发队列和环形缓冲区的实现,为我们提供了高性能和线程安全的数据结构。

2、BlockingQueue

BlockingQueue是Java从JDK5开始在并发包(JUC)内引入的。他之所以适合作为数据交换共享的通道,关键在于他的Blocking上。Blocking是阻塞的意思。当服务线程(服务线程指不断获取队列中的消息,进行处理的线程)处理完成队列中所有的消息后,它如何知道下一条消息何时到来呢?

有两种做法:

  1. 不断轮询监控该队列;
  2. 监控队列空时,进行等待;当有消息进入队列时,自动唤醒该线程;

很明显第一种方案造成了不必要的资源浪费(线程不停的循环和监控队列)。BlockingQueue则很好的解决了该问题。它会让服务线程在队列为空时进行等待,当有新的消息进入队列后,自动将线程唤醒。

BlockingQueue实际上是个接口。提供了最基本的队列元素操作API,如add(), offer(),put(),take(),poll(),remove()等。

public interface BlockingQueue<E> extends Queue<E> {
    boolean add(E var1);

    boolean offer(E var1);

    void put(E var1) throws InterruptedException;

    boolean offer(E var1, long var2, TimeUnit var4) throws InterruptedException;

    E take() throws InterruptedException;

    E poll(long var1, TimeUnit var3) throws InterruptedException;

    int remainingCapacity();

    boolean remove(Object var1);

    boolean contains(Object var1);

    int drainTo(Collection<? super E> var1);

    int drainTo(Collection<? super E> var1, int var2);
}

有4个主要的实现类:ArrayBlockingQueue,LinkedBlockingQueue,PriorityBlockingQueue,SynchronousQueue。

2.1、ArrayBlockingQueue

ArrayBlockingQueue是基于数组实现的有界阻塞队列,内部使用一个可重入锁来保证线程安全。我们主要介绍一下该类,下面其他的都基本类似,以此类来讲一下他是如何实现上面说到的数据共享的。

从官方文档可以看出,他是一个有界队列,他会尝试put成满的队列的元件将导致在操作阻挡;尝试take从空队列的元件将类似地阻塞。

用过队列的小伙伴应该都知道,向队列中压入元素可以使用 offer()方法和 put()方法。对于 offer()方法,如果当前队列已经满了,它就会立即返回 false。如果没有满,则执行正常的入队操作。所以,我们不讨论这个方法。现在,我们需要关注的是 put()方法。put()方法也是将元素压入队列末尾。但如果队列满了,它会一直等待,直到队列中有空闲的位置。

从队列中弹出元素可以使用 poll()方法和 take()方法。它们都从队列的头部获得一个元素。不同之处在于:如果队列为空,那么 poll()方法会直接返回 null,而 take()方法会等待,直到队列内有可用元素。

ArrayBlockingQueue类的内部元素都放置在一个对象数组中:

/** The queued items */
final Object[] items;

因此,put()方法和 take()方法才是体现 Blocking 的关键。为了做好等待和通知两件事,在ArrayBlockingQueue 类内部定义了以下一些字段。

2.1.1、take()

从源码可以看到take():

/** Condition for waiting takes */
private final Condition notEmpty;

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

当执行 take()操作时,如果队列为空,则让当前线程在 notEmpty 上等待。新元素入以时,则进行一次 notEmpty 上的通知。

notEmpty实际上是个Condition并发类。在前面《【JUC基础】06. 生产者和消费者问题》中有提到过,可以找到该篇文章再熟悉一下。

当代码进行到take()执行到notEmpty.await();时,当前线程会进行等待,当队列中新插入新的元素时,线程便会得到一个通知,自动唤醒。

/**
 * 新增一个元素
 * Inserts element at current put position, advances, and signals.
 * Call only when holding lock.
 */
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}

当新元素进入队列后,调用notEmpty.signal();唤醒线程,继续工作。

2.1.2、put()

与take() 类似,put()的操作也是一样的。当队列满的时候,需要让压入的线程等待。

/** Condition for waiting puts */
private final Condition notFull;



/**
 * Inserts the specified element at the tail of this queue, waiting
 * for space to become available if the queue is full.
 *
 * @throws InterruptedException {@inheritDoc}
 * @throws NullPointerException {@inheritDoc}
 */
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

当有元素从队列中被取走,队列中出现空位置时,自然也需要通知等待入队的线程。

/**
 * Extracts element at current take position, advances, and signals.
 * Call only when holding lock.
 */
private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

从实现上来说,ArrayBlockingQueue在物理上是一个数字,但在逻辑上来说是个环形结构。由于其数组的特性,其容量在初始化时就已指定,并且无法动态调整。

当有元素加入或离开队列时,总是使用takeIndex和putIndex两个变量分别表示队列头部和尾部元素在数组中的位置。每一次入队和出队操作都会调整这两个重要的索引位置。

private int incCursor(int index) {
    // assert lock.getHoldCount() == 1;
    if (++index == items.length)
        index = 0;
    if (index == putIndex)
        index = NONE;
    return index;
}
/**
 * Circularly decrement i.
 */
final int dec(int i) {
    return ((i == 0) ? items.length : i) - 1;
}

可以看出,这两个函数将数组的头尾相接,实现了环形数组。

2.2、LinkedBlockingQueue

与ArrayBlockingQueue类似,LinkedBlockingQueue基于链表实现的可选有界或无界阻塞队列,内部使用两个可重入锁来保证线程安全。这里就不详细展开了。

2.3、PriorityBlockingQueue

PriorityBlockingQueue则是基于优先级堆实现的无界阻塞队列,元素根据优先级进行排序。

2.4、SynchronousQueue

SynchronousQueue则是一个没有容量的阻塞队列,每个插入操作都必须等待另一个线程的移除操作,适用于直接传递任务的场景。

3、简单使用

ArrayBlockingQueue提供了接口中所有方法的实现BlockingQueue。这些方法用于插入、访问和删除数组阻塞队列中的元素。前面说的put和take是阻塞操作的方法,其他的可以参看API自己尝试。

3.1、创建ArrayBlockingQueue

为了创建数组阻塞队列,我们​​必须导入该java.util.concurrent.ArrayBlockingQueue包。导入包后,我们可以使用以下方法在 Java 中创建数组阻塞队列:

/**
 * capacity: 数组阻塞队列的大小
 */
ArrayBlockingQueue<Type> animal = new ArrayBlockingQueue<>(int capacity);

3.2、Demo

import java.util.concurrent.ArrayBlockingQueue;

class Main {
    public static void main(String[] args) {
        ArrayBlockingQueue<String> animals = new ArrayBlockingQueue<>(5);

       try {
           //Add elements to animals
           animals.put("Dog");
           animals.put("Cat");
           System.out.println("ArrayBlockingQueue: " + animals);

           // Remove an element
           String element = animals.take();
           System.out.println("Removed Element: " + element);
        }
        catch(Exception e) {
            System.out.println(e);
        }
    }
}

输出:

ArrayBlockingQueue:[Dog,Cat]
Removed Element: Dog

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/739578.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

python: FileHelper

# encoding: utf-8 # 版权所有 2023 涂聚文有限公司 # 许可信息查看&#xff1a; # 描述&#xff1a; # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2023.1 python 311 # Datetime : 2023/7/9 19:12 # User : geovindu # Product : PyCharm # Proj…

QT事件处理

设计一个闹钟&#xff0c;定时播报内容。 #ifndef MAINWINDOW_H #define MAINWINDOW_H#include <QMainWindow> #include <QTimerEvent> #include <QDateTime> #include <QMessageBox> #include <QTextToSpeech> #include <QDebug> namespa…

校园闲置物品交易平台的设计与实现(论文+源码)_kaic

摘 要 伴随大数据时代的到来&#xff0c;计算机已成为人们步入个数化生活的必须品。由于计算机技术的成熟&#xff0c;互联网的强大功能也正在被人们以最大限度的开发。通过网络&#xff0c;人们能够足不出户完成校园闲置物品查阅&#xff0c;这在方便学生的同时也解决了在传统…

用于FPGA远程更新的QuickBoot方法

用于FPGA远程更新的QuickBoot方法 用于FPGA远程更新的QuickBoot方法 用于FPGA远程更新的QuickBoot方法1. 远程更新简介2 QuickBoot方案2.1 QuickBoot配置方法2.2 QuickBoot Flash 编程方法 3.QuickBoot实现3.1 Critical Switch World (key point)3.2 QuickBoot存储映射3.3 Bits…

Django ORM中QuerySet常用接口汇总记录

存在模型数据 学生表 课程表 支持链式操作的接口 all接口&#xff1a;用于查询所有数据&#xff0c;相当于&#xff1a;select * from xxx filter接口&#xff1a;根据条件过滤数据 values接口&#xff1a;指定返回的字段&#xff0c;结果是包含 dict 的 QuerySet 对象 valu…

zabbix----代理服务器,高可用集群

文章目录 一、部署 zabbix 代理服务器1.1 设置 zabbix 的下载源&#xff0c;安装 zabbix-proxy1.2 部署数据库&#xff0c;要求 MySQL 5.7 或 Mariadb 10.5 及以上版本1.3修改 zabbix-proxy 配置文件1.4 启动 zabbix-proxy1.5 在所有主机上配置 hosts 解析1.6 在 Web 页面配置 …

Oracle批量生成供datax调用的json文件及可执行sh脚本

Oracle+DataX+存储过程实现异构库之间的数据同步资源-CSDN文库 背景: 项目需要做数据迁移(hive2oceanbase),两边的库有几百张表,人工生成json文件,工作量巨大,想来想去还是用Oracle存储过程的形式,批量生成json文件和shell脚本,及实现跑批的功能。 本次测试是Oracl…

免费插画网站

humaaans undraw iradesign fresh-folk delesign

MYSQL的体系结构

mysql体系结构可以分为四个层级&#xff1a;连接层&#xff0c;SQL层&#xff0c;插件存储引擎&#xff0c;物理文件层 补充&#xff1a; SQL层中&#xff1a; 管理服务和工具组件&#xff1a;从备份和恢复的安全性、复制、集群、管理、配置、迁移和元数据等方面管理数据库。…

开发环境可运行,发包后报错(nginx代理出现了问题)

场景&#xff1a; vue项目首次发包... 后端服务发包完毕&#xff0c;apifox测试接口没问题、前端开发环境连服务也没问题... 前端项目打包,提前配置nginx&#xff0c;前端发包... 打开网页&#xff0c;登录接口正常&#xff0c;登录后其他接口报错... 查看报错信息&#xff1a;…

Unity 编辑器-查找所有未被使用的Prefab

需求 接到一个需求&#xff0c;将Res里所有特效相关的prefab检查一下&#xff0c;没有使用的移除。 分析 先拆解一下需求&#xff0c;如下 #mermaid-svg-YiTzyE1BvQ0ZTgLj {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#merm…

docke安装elasticsearch(ES)

docke安装elasticsearch&#xff08;ES&#xff09; 1.安装一个不带数据卷映射的ES docker run -d --name elasticsearch -p 9200:9200 -p 9300:9300 -e "discovery.typesingle-node" elasticsearch:7.6.22.创建es数据卷映射目录 mkdir -p data/elasticsearch3.将…

瑞芯微 RK356x 基于Android11移植usb接口rtl8723du wifi和蓝牙一体化

开发环境 平台: 瑞芯微RK356x 操作系统&#xff1a;Android11 WiFi、蓝牙芯片:RTL8723DU 通讯类型&#xff1a;USB协议 RTL8723du介绍 Realtek RTL8723DU是一个高度集成的单片机802.11b/g/n 1T1R WLAN&#xff0c;和一个集成的蓝牙2.1/4.2单片机&#xff0c;USB 2.0多功能。…

谈谈电机的FOC控制算法的特点以及应用场景

电机的FOC&#xff08;Field-Oriented Control&#xff09;控制算法是一种常用的电机控制策略。它的特点是将电机的控制分为两个部分&#xff1a;电流控制和转速控制。 首先&#xff0c;电流控制是FOC算法的关键部分。它通过控制电机的电流来实现对电机的力矩控制。具体来说&am…

maven引入jar包报红

maven引入jar包报红 1、检查自己的maven配置有无问题 2、检查是否没有子项目使用到当前引入的jar包&#xff0c;在根目录下引入的jar包如果没有子项目使用会报红&#xff0c;原因是在根目录下只是声明式引用&#xff0c;并没有实际引用到。 解决办法&#xff1a;找到要使用的子…

Leetcode-每日一题【1721.交换链表中的节点】

题目 给你链表的头节点 head 和一个整数 k 。 交换 链表正数第 k 个节点和倒数第 k 个节点的值后&#xff0c;返回链表的头节点&#xff08;链表 从 1 开始索引&#xff09;。 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,4,5], k 2输出&#xff1a;[1,4,3,2,5] 示例 …

816. 数组翻转

链接&#xff1a; 链接 题目&#xff1a; 给定一个长度为 nn 的数组 aa 和一个整数 sizesize&#xff0c;请你编写一个函数&#xff0c;void reverse(int a[], int size)&#xff0c;实现将数组 aa 中的前 sizesize 个数翻转。 输出翻转后的数组 aa。 输入格式 第一行包含两个整…

高德地图通过图层layer实现对海量点的可视化渲染

一、可视化海量点应用场景 在正文开始之前我先说说我为啥会使用这个技术来实现数据的可视化。 事情是这样的&#xff0c;我接手了一个项目&#xff0c;里面有个需求是在地图上标记出他们公司的产品的使用分布。我接手的时候呢&#xff0c;我前面的那位大哥是使用marker点覆盖物…

Spark(19):SparkSQL中数据的加载和保存

目录 0. 相关文章链接 1. 通用的加载和保存方式 1.1. 加载数据 1.2. 保存数据 2. Parquet 2.1. 加载数据 2.2. 保存数据 3. JSON 4. CSV 5. MySQL 5.1. 导入依赖 5.2. 读取数据 5.3. 写入数据 6. Hive 6.1. SparkSQL连接Hive 6.2. 内嵌的 HIVE 6.3. 外部的 HI…

FastDFS文件系统

FastDFS文件系统 环境代码实现 一.FastDFS 1.什么是FastDFS FastDFS:Fast Distributed File System,快速的分布式文件系统,是一款用c语言开发的开源的分布式文件系统. FastDFS 是用 c 语言编写的一款开源的分布式文件系统。FastDFS为互联网量身定制&#xff0c;充分考虑了…