AQS应用之BlockingQueue详解

news2025/1/12 21:43:39

概要

AQS全称是 AbstractQueuedSynchronizer,中文译为抽象队列式同步器。BlockingQueue,是java.util.concurrent 包提供的用于解决并发生产者 - 消费者问题的最有用的类,它的特性是在任意时刻只有一个线程可以进行take或者put操作,并且BlockingQueue提供了超时return null的机制,在许多生产场景里都可以看到这个工具的身影。

队列类型

  1. 无限队列 (unbounded queue ) - 几乎可以无限增长
  2. 有限队列 ( bounded queue ) - 定义了最大容量

队列数据结构

队列实质就是一种存储数据的结构

  • 通常用链表或者数组实现
  • 一般而言队列具备FIFO先进先出的特性,当然也有双端队列(Deque)优先级队列
  • 主要操作:入队(EnQueue)与出队(Dequeue)

常见的4种阻塞队列

  • ArrayBlockingQueue 由数组支持的有界队列
  • LinkedBlockingQueue 由链接节点支持的可选有界队列
  • PriorityBlockingQueue 由优先级堆支持的无界优先级队列
  • DelayQueue 由优先级堆支持的、基于时间的调度队列

ArrayBlockingQueue

队列基于数组实现,容量大小在创建ArrayBlockingQueue对象时已定义好

数据结构如下图:

队列创建:

BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>();

应用场景

在线程池中有比较多的应用,生产者消费者场景

工作原理

基于ReentrantLock保证线程安全,根据Condition实现队列满时的阻塞

LinkedBlockingQueue

是一个基于链表的无界队列(理论上有界)

BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();

上面这段代码中,blockingQueue 的容量将设置为 Integer.MAX_VALUE 。

向无限队列添加元素的所有操作都将永远不会阻塞,[注意这里不是说不会加锁保证线程安全],因此它可以增长到非常大的容量。

使用无限 BlockingQueue 设计生产者 - 消费者模型时最重要的是 消费者应该能够像生产者向队列添加消息一样快地消费消息 。否则,内存可能会填满,然后就会得到一个 OutOfMemory 异常。

DelayQueue

由优先级堆支持的、基于时间的调度队列,内部基于无界队列PriorityQueue实现,而无界队列基于数组的扩容实现。

队列创建:

BlockingQueue<String> blockingQueue = new DelayQueue();

要求

入队的对象必须要实现Delayed接口,而Delayed集成自Comparable接口

应用场景

电影票

工作原理:

队列内部会根据时间优先级进行排序。延迟类线程池周期执行。

BlockingQueue API

BlockingQueue 接口的所有方法可以分为两大类:负责向队列添加元素的方法和检索这些元素的方法。在队列满/空的情况下,来自这两个组的每个方法的行为都不同。

添加元素

方法

说明

add()

如果插入成功则返回 true,否则抛出 IllegalStateException 异常

put()

将指定的元素插入队列,如果队列满了,那么会阻塞直到有空间插入

offer()

如果插入成功则返回 true,否则返回 false

offer(E e, long timeout, TimeUnit unit)

尝试将元素插入队列,如果队列已满,那么会阻塞直到有空间插入

检索元素

方法

说明

take()

获取队列的头部元素并将其删除,如果队列为空,则阻塞并等待元素变为可用

poll(long timeout, TimeUnit unit)

检索并删除队列的头部,如有必要,等待指定的等待时间以使元素可用,如果超时,则返回 null

在构建生产者 - 消费者程序时,这些方法是 BlockingQueue 接口中最重要的构建块。

多线程生产者-消费者示例

接下来我们用“迟早药丸”的场景创建一个由两部分组成的程序 - 生产者 ( Producer ) 和消费者 ( Consumer ) 。

生产者将生成一个 0 到 100 的随机数(十全大补丸的编号),并将该数字放在 BlockingQueue 中。我们将创建 16 个线程(潘金莲)用于生成随机数并使用 put() 方法阻塞,直到队列中有可用空间。

需要记住的重要一点是,我们需要阻止我们的消费者线程无限期地等待元素出现在队列中。

从生产者(潘金莲)向消费者(武大郎)发出信号的好方法是,不需要处理消息,而是发送称为毒 ( poison ) 丸 ( pill ) 的特殊消息。 我们需要发送尽可能多的毒 ( poison ) 丸 ( pill ) ,因为我们有消费者(武大郎)。然后当消费者从队列中获取特殊的毒 ( poison ) 丸 ( pill )消息时,它将优雅地完成执行。

以下生产者的代码:

@Slf4j
public class NumbersProducer implements Runnable {
    private BlockingQueue<Integer> numbersQueue;
    private final int poisonPill;
    private final int poisonPillPerProducer;

    public NumbersProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) {
        this.numbersQueue = numbersQueue;
        this.poisonPill = poisonPill;
        this.poisonPillPerProducer = poisonPillPerProducer;
    }
    public void run() {
        try {
            generateNumbers();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void generateNumbers() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
            log.info("潘金莲-{}号,给武大郎的泡药!",Thread.currentThread().getId());
        }
        for (int j = 0; j < poisonPillPerProducer; j++) {
            numbersQueue.put(poisonPill);
            log.info("潘金莲-{}号,往武大郎的药里放入第{}颗毒丸!",Thread.currentThread().getId(),j+1);
        }
    }
}

我们的生成器构造函数将 BlockingQueue 作为参数,用于协调生产者和使用者之间的处理。我们看到方法 generateNumbers() 将 100 个元素(生产100副药给武大郎吃)放入队列中。它还需要有毒 ( poison ) 丸 ( pill ) (潘金莲给武大郎下毒)消息,以便知道在执行完成时放入队列的消息类型。该消息需要将 poisonPillPerProducer 次放入队列中。

每个消费者将使用 take() 方法从 BlockingQueue 获取一个元素,因此它将阻塞,直到队列中有一个元素。从队列中取出一个 Integer 后,它会检查该消息是否是毒 ( poison ) 丸 ( pill )(武大郎看潘金莲有没有下毒) ,如果是,则完成一个线程的执行。否则,它将在标准输出上打印出结果以及当前线程的名称。

@Slf4j
public class NumbersConsumer implements Runnable {
    private BlockingQueue<Integer> queue;
    private final int poisonPill;

    public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) {
        this.queue = queue;
        this.poisonPill = poisonPill;
    }

    public void run() {
        try {
            while (true) {
                Integer number = queue.take();
                if (number.equals(poisonPill)) {
                    return;
                }
                log.info("武大郎-{}号,喝药-编号:{}",Thread.currentThread().getId(),number);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

需要注意的重要事项是队列的使用。与生成器构造函数中的相同,队列作为参数传递。我们可以这样做,是因为 BlockingQueue 可以在线程之间共享而无需任何显式同步。

既然我们有生产者和消费者,我们就可以开始我们的计划。我们需要定义队列的容量,并将其设置为 10个元素。

我们创建4 个生产者线程,并且创建等于可用处理器数量的消费者线程:

public class Main {

    public static void main(String[] args) {
        int BOUND = 10;
        int N_PRODUCERS = 16;
        int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
        int poisonPill = Integer.MAX_VALUE;
        int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;
        int mod = N_CONSUMERS % N_PRODUCERS;

        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);
        //潘金莲给武大郎熬药
        for (int i = 1; i < N_PRODUCERS; i++) {
            new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start();
        }
        //武大郎开始喝药
        for (int j = 0; j < N_CONSUMERS; j++) {
            new Thread(new NumbersConsumer(queue, poisonPill)).start();
        }
        //潘金莲开始投毒,武大郎喝完毒药GG
        new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start();
    }

}

BlockingQueue 是使用具有容量的构造创建的。我们正在创造 4 个生产者和 N 个消费者(武大郎)。我们将我们的毒 ( poison ) 丸 ( pill )消息指定为 Integer.MAX_VALUE,因为我们的生产者在正常工作条件下永远不会发送这样的值。这里要注意的最重要的事情是 BlockingQueue 用于协调它们之间的工作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1371461.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Cesium笔记 viewer控件隐藏

Cesium初始化后&#xff0c;场景中会有时间轴&#xff0c;动画&#xff0c;home等控件显示&#xff0c;需要将这些控件隐藏&#xff0c;如下&#xff1a; init() {let viewer new Cesium.Viewer("cesiumContainer", {fullscreenButton: false, // 隐藏界面右下角全…

【日志】Alertmanager+Loki实现Teams告警

目录 简介一、配置Webhook二、安装msteams三、配置Alertmanager简介 在前面的文章我们实现了Loki日志告警,接下来新增一种告警媒介:微软Teams 一、配置Webhook 这块我们主要借助incoming Webhook应用来发送消息到团队,把它添加到团队里,参考:https://www.cnblogs.com/a…

【Proteus仿真】【Arduino单片机】智能感应温控风扇

文章目录 一、功能简介二、软件设计三、实验现象联系作者 一、功能简介 本项目使用Proteus8仿真Arduino单片机控制器&#xff0c;使用LCD1602液晶显示模块、DS18B20温度、按键、声光报警、L293D电机驱动等。 主要功能&#xff1a; 系统运行后&#xff0c;LCD1602显示传感器检…

网络音频对讲广播模块-定时广播 ip网络广播音频模块SV-2401

1.模块介绍 SV-2400V网络音频模块是一款高性能的10/100M网络音频模块&#xff0c;采用高性能处理器及专业Codec&#xff0c;能接收网络音频数据流&#xff0c;转换成音频模拟信号输出。亦能采样本地的mic输入或linein输入&#xff0c;发送到网络上&#xff0c;供其他网络音频模…

Lumerical Examples------Ports

Lumerical Examples------Ports 引言正文示例引言 Ports 在 FDTD 工程中是一个很特别的组件,这里我们对它进行介绍 正文 虽然 Ports 不是严格意义上的 Monitor(监视器), 但是它也可以进行 S 参数提取。Ports 扮演着一个 frequency domain power monitor 的和 mode expan…

Mjdioureny练习二

&#xff08;一&#xff09;一个老人强烈的阳光照射 我跟他说&#xff1a;a old man strong sunlight 这四张照片都是很不错的 每一张都十分清晰而且十分真实逼真&#xff0c;细节把控的也十分完美。 &#xff08;二&#xff09;一个中国50岁的大叔&#xff0c;走在清晨的深林…

使用QPushButton实现计算机

1. 按钮类&#xff1a;QPushButton 1.1 信号 void clicked(bool checked false)//被点击触发void pressed()//当按下按钮时发出此信号void released()//当松开按钮时发出此信号void toggled(bool checked)//每当可检查按钮改变其状态时&#xff0c;都会发出此信号。1.2 实现按…

携程testab算法分析

声明 本文以教学为基准、本文提供的可操作性不得用于任何商业用途和违法违规场景。 本人对任何原因在使用本人中提供的代码和策略时可能对用户自己或他人造成的任何形式的损失和伤害不承担责任。 如有侵权,请联系我进行删除。 这里只是我分析过程,以及一些重要点的记录,没有…

UE5 C++(十五)— TimerHandle(定时器)的使用

文章目录 设置定时器声明FTimerHandle定义执行函数设置定时器 清除定时器 定时器&#xff08;Timer&#xff09; 可用于执行延迟类型的操作&#xff0c;或让某些操作在一段时间内重复执行。 设置定时器 定时器的设置只需三步即可完成&#xff1a;声明定时器句柄 FTimerHandle…

固态硬盘只显示一半容量怎么办?

有时候我们会发现固态硬盘只显示一半容量&#xff0c;那么为什么会出现这个问题呢&#xff1f;下面我们就一起来了解一下。 为什么硬盘没有显示满容量&#xff1f; 在Windows 11/10/8/7中硬盘容量显示错误&#xff0c;有很多原因会导致这种情况发生。以下总结了一些常见的原因…

PHP开发日志 ━━ 不同方法判断某个数组中是否存在指定的键名,测试哪种方法效率高

我们可以用isset($arr[a]) 或者 array_key_exists(a, $arr) 来判断a键名是否存在与$arr数组。 那么这两种方式哪个运行速度快呢&#xff1f; 不多废话了&#xff0c;现在我们写一段代码来测试一下&#xff1a; $array [a > 1, b > 2, c > 3];$start microtime(tru…

2023年全国职业院校技能大赛(高职组)“云计算应用”赛项赛卷③

2023年全国职业院校技能大赛&#xff08;高职组&#xff09; “云计算应用”赛项赛卷3 目录 需要竞赛软件包环境以及备赛资源可私信博主&#xff01;&#xff01;&#xff01; 2023年全国职业院校技能大赛&#xff08;高职组&#xff09; “云计算应用”赛项赛卷3 模块一 …

《Shader开发实战》-笔记

一、初识游戏图形 1、什么是渲染&#xff1f; 渲染实际上就是创建图像的过程&#xff0c;在渲染过程中创建的图像被称为渲染或者帧&#xff0c;该图像&#xff08;帧&#xff09;以每秒多次在计算机屏幕上进行呈现&#xff0c;即帧率。 负责渲染图像&#xff08;帧&#xff09…

美国证券交易委员会 X 账户被黑,引发比特币市场震荡

Bleeping Computer 网站消息&#xff0c;威胁攻击者成功“占领”了美国证券交易委员会的 X 账户&#xff0c;并发布一条关于批准比特币 ETF 在证券交易所上市的虚假公告。 帖子原文&#xff1a;今天&#xff0c;美国证券交易委员会批准比特币 ETF 在注册的国家证券交易所上市&a…

华为认证 | HCIP-Storage V5.5 认证正式发布!

华为认证存储高级工程师HCIP-Storage V5.5&#xff08;中文版&#xff09;自2023年12月28日起正式在中国区发布。 01 发布概述 基于“平台生态”战略&#xff0c;围绕“云-管-端”协同的新ICT技术架构&#xff0c;华为公司打造了覆盖ICT领域的认证体系&#xff0c;包含ICT基础…

自动化生产线-采用工业机器人比人工有哪些优势?

工业机器人相对于人工具有一些显著的优势&#xff0c;这些优势使它们在制造和生产领域得到广泛应用。以下是工业机器人相对于人工的一些主要优势&#xff1a; 1、精度和一致性&#xff1a; 机器人可以执行高精度的操作&#xff0c;确保产品的质量和规格一致&#xff0c;而且不容…

如何加密U盘数据?U盘数据加密软件怎么选?

U盘作为最常用的移动储存设备&#xff0c;是很多人储存数据的重要工具。而普通的U盘不具备保护数据的功能&#xff0c;很容易导致数据泄露。因此&#xff0c;我们需要使用专业的U盘加密软件来加密保护U盘数据。那么&#xff0c;U盘数据加密软件该怎么选择呢&#xff1f;下面我们…

golang中的循环依赖

作为 Golang 开发人员&#xff0c;您可能遇到过导入周期。Golang 不允许导入循环。如果 Go 检测到代码中的导入循环&#xff0c;则会抛出编译时错误。在这篇文章中&#xff0c;让我们了解导入周期是如何发生的以及如何处理它们。 导入周期 假设我们有两个包&#xff0c;p1并且…

windows10+ubuntu20.04双系统中,ubuntu系统显示home空间不足的扩容方法

实际上网上有两种扩容方法&#xff0c;除了本文的方法外&#xff0c;另一种是在使用启动U盘打开试用ubuntu&#xff0c;应该涉及到nvidia显卡驱动问题故未采用。另一种即本文。 最开始安装双系统时内存分配没有分配好&#xff0c;给ubuntu系统分配的空间较小,导致了后来的的问…