如何理解 Java 中的阻塞队列:从基础到高级的深度解析

news2024/11/15 19:38:47

提到阻塞队列,许多人脑海中会浮现出 BlockingQueueArrayBlockingQueueLinkedBlockingQueueSynchronousQueue。尽管这些实现看起来复杂,实际上阻塞队列本身的概念相对简单,真正挑战在于内部的 AQS(Abstract Queuing Synchronizer)。如果你对阻塞队列感到陌生,希望下面的内容能帮助你从全新角度理解它。


文章目录

      • 1、线程间通信
        • 2、线程间通信的实现
        • 2.1、轮询
        • 2.2、等待唤醒机制(wait/notify)
        • 2.3、等待唤醒机制(Condition)
      • 3、自定义阻塞队列
        • 4、Java 中的 BlockingQueue


1、线程间通信

线程间通信是指多个线程对共享资源的操作和协调。在生产者-消费者模型中,生产者和消费者是不同种类的线程,他们对同一个资源(如队列)进行操作。生产者负责向队列中插入数据,消费者负责从队列中取出数据。

主要挑战在于如何在资源达到上限时让生产者等待,而在资源达到下限时让消费者等待。线程间的这种相互调度,就是线程间通信。

以现实生活为例。消费者和生产者就像两个线程,原本做着各自的事情,厂家管自己生产,消费者管自己买,一般情况下彼此互不影响。900 240

image-20240808015130401

但当物资到达某个临界点时,就需要根据供需关系适当作出调整。比如,当厂家做了一大堆东西,产能过剩时,应该暂停生产,扩大宣传,让消费者过来消费。

image-20240808015015146

同理,当消费者发现某个热销商品售罄,应该提醒厂家尽快生产。

image-20240808015356312

在上面的案例中,生产者和消费者是不同种类的线程,一个负责存入,另一个负责取出,且它们操作的是同一个资源。但最难的部分在于:资源到达上限时,生产者等待,消费者消费;资源达到下限时,生产者生产,消费者等待。

我们可以发现,原本互不打扰的两个线程之间开始了 “沟通”:

  • 生产者:做的商品太多了,应该扩大宣传,让大家来买。
  • 消费者:都卖完啦,应当提醒商家尽快补货。

这种线程间的相互调度,也就是线程间通信。


2、线程间通信的实现

实现线程间通信的方式有多种:

  • 轮询:生产者和消费者线程通过循环不断检查队列的状态。这种方法简单,但会消耗大量 CPU 资源,且无法保证原子性。
  • 等待唤醒机制(wait/notify):通过 waitnotify 机制,线程可以在队列为空或满时阻塞自己,当状态改变时由其他线程唤醒。synchronized 保证了线程的原子性,但 notify 可能导致线程竞争不均。
  • 等待唤醒机制(Condition):使用ReentrantLockCondition实现等待唤醒机制,可以更加精确地控制线程的阻塞和唤醒。通过创建不同的Condition实例,可以分别管理生产者和消费者的等待状态,避免了notify的随机唤醒问题。
2.1、轮询

设计理念:生产者和消费者线程通过循环不断检查队列的状态,队列为空时生产者才可插入数据,队列不为空时消费者才能取出数据,否则一律 sleep 等待。

image-20240808015823555

代码实现:

import java.util.LinkedList;
import java.util.concurrent.TimeUnit;

/**
 * 自定义阻塞队列实现:轮询版本
 * 
 * @param <T> 队列中存储的元素类型
 */
public class WhileQueue<T> {
    // 用来存储元素的容器
    private final LinkedList<T> queue = new LinkedList<>();
    // 队列的最大容量
    private final int MAX_SIZE = 1;

    /**
     * 将元素添加到队列中
     * 
     * @param resource 要插入的元素
     * @throws InterruptedException 如果当前线程被中断
     */
    public void put(T resource) throws InterruptedException {
        // 如果队列满了,生产者线程将进入轮询等待状态
        while (queue.size() >= MAX_SIZE) {
            System.out.println("生产者:队列已满,无法插入...");
            TimeUnit.MILLISECONDS.sleep(1000); // 线程等待1秒钟再重试
        }
        // 插入元素到队列的前面
        System.out.println("生产者:插入" + resource + "!!!");
        queue.addFirst(resource);
    }

    /**
     * 从队列中取出元素
     * 
     * @throws InterruptedException 如果当前线程被中断
     */
    public void take() throws InterruptedException {
        // 如果队列为空,消费者线程将进入轮询等待状态
        while (queue.size() <= 0) {
            System.out.println("消费者:队列为空,无法取出...");
            TimeUnit.MILLISECONDS.sleep(1000); // 线程等待1秒钟再重试
        }
        // 从队列的末尾取出元素
        System.out.println("消费者:取出消息!!!");
        queue.removeLast();
        TimeUnit.MILLISECONDS.sleep(5000); // 模拟消费操作需要时间
    }
}

测试:

/**
 * 测试类:创建生产者和消费者线程来测试WhileQueue的功能
 */
public class Test {
    public static void main(String[] args) {
        // 创建一个WhileQueue实例
        WhileQueue<String> queue = new WhileQueue<>();

        // 创建并启动生产者线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 100; i++) {
                    try {
                        queue.put("消息" + i); // 插入消息到队列
                    } catch (InterruptedException e) {
                        e.printStackTrace(); // 捕获并打印中断异常
                    }
                }
            }
        }).start();

        // 创建并启动消费者线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 100; i++) {
                    try {
                        queue.take(); // 从队列中取出消息
                    } catch (InterruptedException e) {
                        e.printStackTrace(); // 捕获并打印中断异常
                    }
                }
            }
        }).start();
    }
}

由于设定了队列最多只能存1个消息,所以只有当队列为空时,生产者才能插入数据。这是最简单的线程间通信:多个线程不断轮询共享资源,通过共享资源的状态判断自己下一步该做什么。

但上面的实现方式存在一些缺点:

  • 轮询的方式太耗费 CPU 资源,如果线程过多,比如几百上千个线程同时在那轮询,会给 CPU 带来较大负担
  • 无法保证原子性(代码里没有演示,但理论上确实如此,如果生产者的操作非原子性,消费者极可能获取到脏数据)
2.2、等待唤醒机制(wait/notify)

相对而言,等待唤醒机制则要优雅得多,底层维护线程队列,线程可以在队列为空或满时阻塞自己,当状态改变时由其他线程唤醒。synchronized 保证了线程的原子性,同时避免了过多线程同时自旋造成的 CPU 资源浪费,颇有点用空间换时间的味道。

当一个生产者线程无法插入数据时,就让它在队列里休眠(阻塞),此时生产者线程会释放 CPU 资源,等到消费者抢到 CPU 执行权并取出数据后,再由消费者唤醒生产者继续生产。

Java 有多种方式可以实现等待唤醒机制,最经典的就是通过 waitnotify 的方式:

import java.util.LinkedList;

/**
 * 自定义阻塞队列实现:使用 wait/notify
 * 
 * @param <T> 队列中存储的元素类型
 */
public class WaitNotifyQueue<T> {
    // 用来存储元素的容器
    private final LinkedList<T> queue = new LinkedList<>();
    // 队列的最大容量
    private final int MAX_SIZE = 1;

    /**
     * 将元素添加到队列中
     * 
     * @param resource 要插入的元素
     * @throws InterruptedException 如果当前线程被中断
     */
    public synchronized void put(T resource) throws InterruptedException {
        // 当队列满时,生产者线程进入等待状态
        while (queue.size() >= MAX_SIZE) {
            System.out.println("生产者:队列已满,无法插入...");
            this.wait(); // 释放锁,并进入等待状态
        }
        // 插入元素到队列的前面
        System.out.println("生产者:插入" + resource + "!!!");
        queue.addFirst(resource);
        this.notify(); // 唤醒等待的消费者线程
    }

    /**
     * 从队列中取出元素
     * 
     * @throws InterruptedException 如果当前线程被中断
     */
    public synchronized void take() throws InterruptedException {
        // 当队列为空时,消费者线程进入等待状态
        while (queue.size() <= 0) {
            System.out.println("消费者:队列为空,无法取出...");
            this.wait(); // 释放锁,并进入等待状态
        }
        // 从队列的末尾取出元素
        System.out.println("消费者:取出消息!!!");
        queue.removeLast();
        this.notify(); // 唤醒等待的生产者线程
    }
}

基于 waitnotify 的阻塞队列。其原理是通过同步机制和线程通信来处理生产者-消费者问题。在 put 方法中,生产者线程检查队列是否已满,如果已满,则调用 wait 使自己进入等待状态,释放锁,直到队列有空位。生产者在插入元素后调用 notify 唤醒可能等待的消费者线程。在 take 方法中,消费者线程检查队列是否为空,如果为空,则调用 wait 使自己进入等待状态,释放锁,直到队列有新元素。消费者在取出元素后调用 notify 唤醒可能等待的生产者线程。这种机制避免了忙等待,通过有效的线程通信提高了资源利用效率。

Ps:使用 notifyAll 在某些情况下可能更合适,尤其是当有多个生产者和消费者线程时。notifyAll 会唤醒所有等待的线程,而不仅仅是一个线程,这样可以保证系统中的所有线程都有机会被唤醒,避免了因线程唤醒不充分导致的潜在问题。

2.3、等待唤醒机制(Condition)

等待唤醒机制(wait/notify)版本的缺点是随机唤醒容易出现"己方唤醒己方",最终导致全部线程阻塞的乌龙事件,虽然 wait/notifyAll 能解决这个问题,但唤醒全部线程又不够精确,会造成无谓的线程竞争(实际只需要唤醒敌方线程即可)。

因此使用ReentrantLockCondition实现等待唤醒机制,可以更加精确地控制线程的阻塞和唤醒。通过创建不同的Condition实例,可以分别管理生产者和消费者的等待状态,避免了notify的随机唤醒问题。

作为改进版,可以使用 ReentrantLockCondition 替代 synchronizedwait/notify

import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionQueue<T> {
    // 容器,用来装东西
    private final LinkedList<T> queue = new LinkedList<>();
    private final int CAPACITY = 10; // 队列容量

    // 显式锁(相对地,synchronized锁被称为隐式锁)
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition producerCondition = lock.newCondition();
    private final Condition consumerCondition = lock.newCondition();

    public void put(T resource) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() >= CAPACITY) {
                // 队列满了,不能再塞东西了,等待消费者取出数据
                System.out.println("生产者:队列已满,无法插入...");
                // 生产者阻塞
                producerCondition.await();
            }
            System.out.println("生产者:插入" + resource + "!!!");
            queue.addFirst(resource);
            // 生产完毕,唤醒消费者
            consumerCondition.signal();
        } finally {
            lock.unlock();
        }
    }

    public void take() throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() <= 0) {
                // 队列空了,不能再取东西,等待生产者插入数据
                System.out.println("消费者:队列为空,无法取出...");
                // 消费者阻塞
                consumerCondition.await();
            }
            System.out.println("消费者:取出消息!!!");
            queue.removeLast();
            // 消费完毕,唤醒生产者
            producerCondition.signal();
        } finally {
            lock.unlock();
        }
    }
}

如何理解 Condition 呢?可以认为 lock.newCondition() 创建了一个队列,调用 producerCondition.await() 会把生产者线程放入生产者的等待队列中,当消费者调用producerCondition.signal() 时会唤醒从生产者的等待队列中唤醒一个生产者线程出来工作。

也就是说,ReentrantLockCondition 通过拆分线程等待队列,让线程的等待唤醒更加精确了,想唤醒哪一方就唤醒哪一方。


3、自定义阻塞队列

基于以上机制,我们可以自定义实现一个简单的阻塞队列。以下代码示例展示了一个基于 wait/notifyAll 实现的阻塞队列:

public class BlockingQueue<T> {
    private final LinkedList<T> queue = new LinkedList<>();
    private int MAX_SIZE = 1;
    private int remainCount = 0;

    public BlockingQueue(int capacity) {
        if (capacity <= 0) {
            throw new IllegalArgumentException("size最小为1");
        }
        this.MAX_SIZE = capacity;
    }

    public synchronized void put(T resource) throws InterruptedException {
        while (queue.size() >= MAX_SIZE) {
            this.wait();
        }
        queue.addFirst(resource);
        remainCount++;
        this.notifyAll();
    }

    public synchronized T take() throws InterruptedException {
        while (queue.size() <= 0) {
            this.wait();
        }
        T resource = queue.removeLast();
        remainCount--;
        this.notifyAll();
        return resource;
    }
}

4、Java 中的 BlockingQueue

BlockingQueue 是 Java 并发包(java.util.concurrent)中的一个接口,继承自 Queue 接口。它提供了额外的阻塞操作,例如在队列为空时等待元素变得可用,或在队列已满时等待空间变得可用。

BlockingQueue 阻塞队列在 Java 中的主要实现有三个:

  1. ArrayBlockingQueue: 基于数组实现的有界阻塞队列,必须指定固定容量,支持可选的公平性策略。
  2. LinkedBlockingQueue: 基于链表实现的阻塞队列,默认无界或指定容量,有较高的插入和删除性能。
  3. SynchronousQueue: 一个没有内部容量的队列,每个插入操作必须等待一个对应的删除操作,反之亦然,适用于直接交换数据的场景。

更多实现可以参考:Java 并发集合:阻塞队列集合介绍

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1992411.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

javaweb_04:SpringBoot

一、SpringBoot快速入门 官网&#xff1a;https://spring.io/ spring提供了若干个子项目&#xff0c;每个项目用于完成特定的任务。 1、创建springboot工程&#xff0c;并勾选web开发相关依赖。 注意这里type要选成maven: 2、定义helloController类&#xff0c;添加方法h…

QT多语言工具实现支持生成ts文件,ts文件和xlsx文件互转

一. 工具介绍 1.如果你是Qt项目,为多语言发愁的话,看到这篇文件,恭喜你有福啦!工具截图如下:​ 2.在项目开发的过程中,尽量将所有需要翻译的文本放在一个文件中,qml翻译用一个文件,cpp用一个,如下: test.h #pragma once /******************************************…

Python:jupyter 模型可视化(VS)

step1:打开vs安装扩展 安装后重新启动vs 建立可视化模型 import pandas as pd from sklearn.tree import DecisionTreeClassifier from sklearn import treemusic_data pd.read_csv(music.csv)Xmusic_data.drop(columns[genre]) Ymusic_data[genre]modelDecisionTreeClassifie…

吴恩达机器学习 笔记四十 寻找相关特征 协同过滤的限制

寻找相关特征&#xff1a; 要找到其他和 x(i) 相关的项&#xff0c;即找到一个 item k&#xff0c; x(k) 与 x(i) 相似。x 是一个向量&#xff0c;判断相似用的是下图中的式子 &#xff0c;即 x(k) 和 x(i) 之间的平方距离&#xff0c;有时也写成下面那种形式。 协同过滤的缺点…

openfoam中为什么一个单元用27个点表示,代表什么图形(由27个节点组成的三维立方体单元,在有限元方法(FEM)中被称为“三次立方体单元”)

问题: 近期在做openfoam项目的时候,发现openfoam中固体的点为什么一个单元用27个点表示,想着代表什么图形呢?如果以顶点表示的话好像图形就复杂了,然后查询一下资料,结果如下 解答: 在OpenFOAM中,使用27个点来表示一个单元通常指的是一种高阶单元。这种单元类型在有…

PSINS工具箱|天文导航cns和卫星导航gps的对比|MATLAB源代码

文章目录 介绍运行结果CNS观测的姿态曲线滤波后的状态曲线轨迹曲线对比三轴位置曲线误差CDF(累计概率密度函数)图像函数源码介绍 天文导航(cns)+ins组合导航和gps+ins导航的结果对比,MATLAB的源代码,基于psins工具箱。 工具箱介绍:PSINS工具箱是一个开源的惯性导航系统…

odoo17 搜索栏升级的真是太方便了

odoo&#xff11;&#xff17; 搜索栏升级的真是太方便了 几行代码&#xff0c;惊人效果 代码&#xff1a; <!-- 搜索--><record model"ir.ui.view" id"bzglsp.jiancexm_search"><field name"name">搜索</field><…

文件销毁,硬盘销毁,数据销毁,巴黎奥运会:一场GDPR大考,硬盘文件数据销毁

巴黎奥运会在使用智能设备和系统的情况下&#xff0c;如何满足欧盟严格的数据保护要求&#xff1f; 2024年夏季&#xff0c;巴黎迎来备受瞩目的奥运盛会&#xff0c;预计将吸引上百万游客到访。为保障这一全球性体育盛会的顺利进行&#xff0c;法国政府启用了一系列智能系统和…

探索IT服务台自动化的办法

如今&#xff0c;IT 服务管理 (ITSM) 工具已经有了内置智能的自动化功能。人工智能 (AI) 和机器学习 (ML) 可以自动提供更好的服务&#xff0c;比如给出基于上下文的建议、进行异常检测、做根本原因分析等等。而且&#xff0c;AI 还可以和物联网 (IoT)、机器人流程自动化 (RPA)…

阿里云SSL证书 部署Windows服务器

实现将阿里云SSL证书部署到Windows IIS 服务器中&#xff0c;方便https请求 第一步、获取并下载SSL证书 1.购买证书&#xff08;一年20个&#xff09;&#xff0c;如果没有SSL证书就需要去购买个人测试证书&#xff0c;有效期3个月 2.创建证书 3.下载证书 第二步、安装证书 …

大数据-70 Kafka 高级特性 物理存储 日志存储 日志清理: 日志删除与日志压缩

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; 目前已经更新到了&#xff1a; Hadoop&#xff08;已更完&#xff09;HDFS&#xff08;已更完&#xff09;MapReduce&#xff08;已更完&am…

万字长文讲透数字化转型

温馨提醒&#xff1a;1.6w字详细拆解&#xff0c;内容篇幅较长&#xff0c;建议先收藏~ 数字化浪潮正在席卷全球&#xff0c;践行数字化转型和提升企业的运营水平与竞争力&#xff0c;已经成为各国企业角力全球市场的重要议题。为此&#xff0c;很多国家政府都推出了鼓励和推动…

开发者们都在讨论Bandizip,你真的不心动吗?

前言 在这个信息爆炸的时代&#xff0c;数据如潮水般涌来&#xff0c;我们的电脑空间似乎永远不够用&#xff1b;每当面对堆积如山的文件&#xff0c;你是否也曾感到头疼不已&#xff1f;别急&#xff0c;小江湖今天就要带你走进一个神奇的世界&#xff0c;那里有一款软件&…

Linux:多线程(二.理解pthread_t、线程互斥与同步、基于阻塞队列的生产消费模型)

上次讲解了多线程第一部分&#xff1a;Linux&#xff1a;多线程&#xff08;一.Linux线程概念、线程控制——创建、等待、退出、分离&#xff0c;封装一下线程&#xff09; 文章目录 1.理解Linux下线程——理解tid2. Linux线程互斥2.1相关概念2.2引入问题分析问题解决思路 2.3L…

Sqli-labs靶场65关详解(一)

前言:目的是通过构造sql语句来窃取数据库数据 一.sqli-labs靶场(1~4) 1~4使用了union联合查询字符型注入,要点在于闭合单双引号括号 要点:union联合查询 UNION 操作符用于合并两个或多个 SELECT 语句的结果集UNION 内部的 SELECT语句必须拥有相同数量的列列也必须拥有相似的…

YARN 的介绍

YARN 的介绍 一、YARN 产生背景1.1 MapReduce 1.0 系统架构 1.2 MapReduce 1.0架构缺陷二、YARN 是什么三、YARN 作用四、YARN 架构五、工作原理六、MapReduce ON YARN 工作流程七、YARN 的容错性八、YARN 的高可用八、YARN 调度器8.1 先进先出调度器8.2 容量调度器8.3 公平调度…

PyTorch深度学习实战(1)——PyTorch安装与配置

本章共有两节&#xff0c;2.1节介绍如何安装PyTorch&#xff0c;以及如何配置学习环境&#xff1b;2.2节带领读者快速浏览PyTorch中的主要内容&#xff0c;帮助读者初步了解PyTorch。 PyTorch是一款以C语言为主导开发的轻量级深度学习框架&#xff0c;它提供了丰富的Python接口…

Redis 大Key排查与优化

Redis 大Key排查与优化 什么是BigKey bigkey简单来说就是存储本身的key值空间太大&#xff0c;或者hash&#xff0c;list&#xff0c;set等存储中value值过多。没有具体的衡量标准。 参考的大小范围&#xff1a; String 类型值大于10KB。Hash、List、Set、Zset类型元素个数…

【Mybatis Plus】Mybatis Plus_快速上手

文章目录 1.Mybatis Plus 简介2.与SpringBoot集成2.1在maven中引入MP依赖2.3 在application.yml中&#xff0c;配置好自己的数据库文件 3. 快速上手 创建实体类entity/User.java3.1创建通用Mapper3.2 使用继承了BaseMapper的Mapper开始编写Crud3.2.1 Insert3.2.2 deleted3.2.3 …

5.1树的基本概念

5.11树的定义 树是n>0的有限集.树适用于有层次结构的数据 只有根节点无前驱 只有叶子节点无后继 有后继的节点为分支节点 除根节点外,任何一个节点都有且只有一个前驱 5.12树的基本术语 祖先节点:从k-->R经过的所有节点 子孙节点:从一个节点出发后下面的所有节点 …