J.U.C Review - 阻塞队列原理/源码分析

news2024/9/20 9:50:22

文章目录

  • 阻塞队列的由来
  • BlockingQueue的操作方法
  • BlockingQueue的实现类
    • ArrayBlockingQueue
    • LinkedBlockingQueue
    • DelayQueue
    • PriorityBlockingQueue
    • SynchronousQueue
  • 阻塞队列原理深入分析
    • 1. 构造器和监视器初始化
    • 2. put操作的实现
    • 3. take操作的实现
    • 4. 注意事项
    • 小结
  • 线程池中的阻塞队列
  • BlockingQueue 的使用场景
    • 生产者-消费者模型

在这里插入图片描述


阻塞队列的由来

假设一种典型场景:有一个生产者不断地生产资源,消费者不断地消费资源,所有的资源被存储在一个共享的缓冲池中。生产者将资源放入缓冲池,消费者从缓冲池取出资源进行消费。这种设计模式被称为生产者-消费者模式

生产者-消费者模式的优势在于:

  • 消除了生产者类与消费者类之间的代码依赖性。
  • 将数据生产与数据使用的过程解耦,有助于简化开发过程,并提升系统的负载能力。

但在实现这一模式时,多个线程对共享变量的操作会引发线程安全问题,如重复消费死锁。尤其是在多个生产者和消费者同时存在的情况下,管理共享资源变得更为复杂。当缓冲池为空时,消费者需要等待;当缓冲池满时,生产者需要等待。为了实现这种等待-唤醒机制,需要手动编写复杂的线程同步代码。

幸运的是,Java 的并发包(java.util.concurrent)中已经为我们提供了一个用于简化这类开发的工具——阻塞队列(BlockingQueue)。使用阻塞队列,开发人员不需要再担心在多线程环境下操作共享变量时的线程安全问题。

阻塞队列是 Java 并发包下的重要数据结构。与普通队列不同,它提供了线程安全的访问方式,是并发包中很多高级同步类的基础。

通常,阻塞队列用于生产者-消费者模式。在这种模式中,生产者将数据添加到队列中,消费者从队列中取出数据。阻塞队列就是用来存放这些数据的容器


BlockingQueue的操作方法

阻塞队列提供了四组方法用于插入、移除和检查元素:

操作类型抛出异常返回特殊值一直阻塞超时退出
插入add(e)offer(e)put(e)offer(e, time, unit)
移除remove()poll()take()poll(time, unit)
检查element()peek()--
  • 抛出异常:当操作无法立即执行时,方法会抛出异常。例如,当阻塞队列已满时,调用add(e)将抛出IllegalStateException("Queue full");当队列为空时,调用remove()将抛出NoSuchElementException
  • 返回特殊值:如果操作无法立即执行,方法返回一个特殊值(如true/false)。
  • 一直阻塞:如果操作无法立即执行,方法会阻塞线程直到操作成功或线程被中断。
  • 超时退出:方法会在给定时间内等待操作成功,如果超时则返回一个特殊值(如true/false)。

注意

  1. 阻塞队列中不允许插入null,否则会抛出NullPointerException
  2. 尽量避免调用remove(o)移除特定对象,这种操作效率较低。

BlockingQueue的实现类

ArrayBlockingQueue

ArrayBlockingQueue 是基于数组结构的有界阻塞队列。它的内部结构为一个定长数组,且一旦初始化,队列的大小不能改变。该类构造函数允许指定是否使用公平锁,默认是非公平锁

public ArrayBlockingQueue(int capacity, boolean fair){
    // 初始化队列大小和公平性设置
    lock = new ReentrantLock(fair);
    // 初始化其他参数
}

LinkedBlockingQueue

LinkedBlockingQueue 是基于链表结构的有界阻塞队列。默认情况下,队列大小为Integer.MAX_VALUE,可以手动指定队列大小。该队列遵循**先进先出(FIFO)**原则。

DelayQueue

DelayQueue 是一个特殊的阻塞队列,队列中的元素只有在指定的延迟时间到期后,才能从队列中获取到。队列中的元素必须实现java.util.concurrent.Delayed接口。DelayQueue 是一个无界队列,插入操作永远不会被阻塞,但获取操作会在没有到期元素时被阻塞。

PriorityBlockingQueue

PriorityBlockingQueue 是基于优先级的无界阻塞队列。队列元素的优先级由构造函数中传入的Comparator对象决定。内部使用非公平锁进行线程同步。

public PriorityBlockingQueue(int initialCapacity,
                                  Comparator<? super E> comparator) {
         this.lock = new ReentrantLock(); // 默认使用非公平锁
         // 初始化其他参数
     }

SynchronousQueue

SynchronousQueue 是一个没有内部容量的特殊队列。每个插入操作必须等待一个相应的移除操作,反之亦然。

  • iterator() 永远返回空。
  • peek() 永远返回null
  • put() 阻塞,直到有消费者取走元素。
  • offer() 在插入成功后立即返回true,否则返回false
  • take() 阻塞,直到有元素可取。
  • poll() 在取不到元素时立即返回null
  • isEmpty() 永远返回true

阻塞队列原理深入分析

阻塞队列的原理主要基于和**条件变量(Condition)**来控制线程的等待与唤醒机制,确保多线程环境下的线程安全和资源同步。

通过分析ArrayBlockingQueue的源码,我们可以深入理解阻塞队列的工作原理。

1. 构造器和监视器初始化

public ArrayBlockingQueue(int capacity, boolean fair) {
    // 初始化队列大小和是否公平锁
    lock = new ReentrantLock(fair);
    // 初始化消费者和生产者监视器
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
  • 队列数组(items): 用于存储队列元素。
  • 锁(lock): 控制多线程对队列的并发访问,支持公平锁和非公平锁。
  • 条件变量: notEmpty用于标记和唤醒消费者线程,notFull用于标记和唤醒生产者线程。

2. put操作的实现

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();  // 自旋获取锁
    try {
        while (count == items.length) {
            // 如果队列已满,阻塞当前线程,标记为生产者线程
            notFull.await();
        }
        // 插入元素到队列
        enqueue(e);
    } finally {
        lock.unlock();  // 释放锁
    }
}

put操作分为以下几步:

  1. 获取锁: 线程尝试获取锁。如果没有获取到,线程将自旋等待。
  2. 判断队列是否已满: 如果队列已满,线程调用notFull.await()方法阻塞自己,等待被消费者线程唤醒。此时线程释放锁。
  3. 插入元素: 当队列未满或被唤醒后再次获取到锁,线程将元素插入队列。
  4. 唤醒消费者线程: 插入元素后,线程调用notEmpty.signal()唤醒一个等待的消费者线程。

3. take操作的实现

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();  // 自旋获取锁
    try {
        while (count == 0) {
            // 如果队列为空,阻塞当前线程,标记为消费者线程
            notEmpty.await();
        }
        // 从队列中取出元素
        return dequeue();
    } finally {
        lock.unlock();  // 释放锁
    }
}

take操作的流程与put操作类似:

  1. 获取锁: 线程尝试获取锁,失败则自旋等待。
  2. 判断队列是否为空: 如果队列为空,线程调用notEmpty.await()阻塞自己,等待被生产者线程唤醒,线程释放锁。
  3. 取出元素: 当队列非空或线程被唤醒并获取到锁后,线程取出队列中的元素。
  4. 唤醒生产者线程: 取出元素后,线程调用notFull.signal()唤醒一个等待的生产者线程。

4. 注意事项

  1. 锁的竞争: puttake操作必须首先获取到锁,才能进行后续操作。没有获取到锁的线程会自旋等待,避免竞争引发的线程安全问题。
  2. 条件等待与唤醒: 如果队列满或空,线程将被阻塞并释放锁,等待相应的生产者或消费者线程唤醒。
  3. 循环判断(while而非if): 使用while循环判断队列状态,确保被唤醒的线程在条件改变后,仍能正确执行后续操作。即使被唤醒,线程还需重新检查队列状态,防止并发条件下的误操作。

小结

阻塞队列通过ReentrantLockCondition机制,确保了在多线程环境下的安全操作。puttake操作通过相应的条件变量,阻塞等待与唤醒操作,使生产者和消费者线程能够有效地协作,从而实现线程间的同步与资源共享。


线程池中的阻塞队列

Java 线程池(ThreadPoolExecutor)中的任务队列就是使用阻塞队列实现的。

public ThreadPoolExecutor(int corePoolSize,
                           int maximumPoolSize,
                           long keepAliveTime,
                           TimeUnit unit,
                           BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
}

使用阻塞队列可以有效地管理任务的执行,实现线程池的负载均衡和任务调度。


BlockingQueue 的使用场景

生产者-消费者模式线程池是阻塞队列的两个经典应用场景。

生产者-消费者模型

以下是一个简单的生产者-消费者模型示例:

public class ProducerConsumerExample {
    private int queueSize = 10;
    private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(queueSize);
     
    public static void main(String[] args)  {
        ProducerConsumerExample example = new ProducerConsumerExample();
        Producer producer = example.new Producer();
        Consumer consumer = example.new Consumer();
         
        producer.start();
        consumer.start();
    }
     
    class Consumer extends Thread {
        @Override
        public void run() {
            consume();
        }
         
        private void consume() {
            while (true) {
                try {
                    queue.take();
                    System.out.println("取走一个元素,队列剩余:" + queue.size() + " 个元素");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
     
    class Producer extends Thread {
        @Override
        public void run() {
            produce();
        }
         
        private void produce() {
            while (true) {
                try {
                    queue.put(1);
                    System.out.println("插入一个元素,队列剩余空间:" + (queueSize - queue.size()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

输出可能如下:

取走一个元素,队列剩余0个元素
取走一个元素,队列剩余0个元素
插入一个元素,队列剩余空间:9
插入一个元素,队列剩余空间:9

注意:由于put()System.out.println()没有加锁,可能会出现日志输出不一致的情况。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2103300.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

泰克THDP0100(Tektronix)thdp0100高压差分探头详情资料

泰克 THDP0100 高压差分探头具有较大的差分动态范围功能&#xff0c;为用户提供了安全的高压测量探头解决方案。每个探头都配有两种尺寸的钩尖&#xff0c;并具有超范围视觉和声音指示器&#xff0c;当用户超出探头的线性范围时会发出警告。泰克 THDP0100 探头配备 TEkVPI 接口…

【vue css】css字体设置渐变色

实现的效果&#xff1a; 添加的代码&#xff1a; h2 {background-image: -webkit-linear-gradient(bottom, #1bffff, #ffffff);background-clip: text;//背景被裁剪成文字的前景色。-webkit-text-fill-color: transparent;//指定了文本字符的填充颜色。若未设置此属性&#xf…

【Linux操作系统】:Linux生产者消费者模型

目录 生产者消费者模型的概念 生产者消费者模型的特点 生产者消费者模型优点 基于BlockingQueue的生产者消费者模型 基于 BlockingQueue 的生产者消费者模型的概念 模拟实现基于阻塞队列的生产消费模型 生产者消费者模型的概念 生产者消费者模式就是通过一个容器来解决生…

MySQL Email验证流程详解:从注册到激活!

MySQL Email通知系统搭建教程&#xff01;如何从MySQL发送邮件&#xff1f; MySQL Email验证是一个至关重要的环节&#xff0c;它确保了用户注册过程的安全性和有效性。AokSend将详细介绍从用户注册到MySQL Email激活的完整流程&#xff0c;帮助开发者更好地理解和实现这一功能…

东风汽车将出席第五届中国新能源汽车热管理创新国际峰会

2024第五届中国新能源汽车热管理创新国际峰会将于11月14-15日在上海召开。峰会将汇聚来自全球的行业专家、学者、企业领袖及技术精英&#xff0c;共同探讨新能源汽车热管理领域的最新技术成果和发展趋势。 本次峰会将涵盖整车热管理系统构建、新能源商用车热管理、智能热管理系…

Python OpenCV 影像处理:傅立叶转换

►前言 上篇介绍基于计算影像的梯度&#xff0c;通过在影像中找到梯度值的变化来识别边缘。 本篇将介绍傅立叶变换的基本原理&#xff0c;了解傅立叶变换是如何将影像从空间域转换到频率域的&#xff0c;以及为什么这种转换在影像处理过程中是有用的。以及傅立叶变换的实际应…

9.3 k8s介绍

⼀、编排分类 单机容器编排: docker-compose 容器集群编排: docker swarm、mesosmarathon、kubernetes 应⽤编排: ansible(模块&#xff0c;剧本&#xff0c;⻆⾊) ⼆、系统管理进化史 1. 传统部署时代 早期&#xff0c;各个组织是在物理服务器上运⾏应⽤程序。 由于⽆法限…

getLocation:fail, the permission value is offline verifying

getLocation:fail, the permission value is offline verifying 后端会根据appid和secret生成 签名&#xff0c;前端wx配置时一定用appid来验证签名的正确 本次错误为配置初始化失败&#xff1a;前端与后端的appId不一致&#xff0c;我的失误也

TikTok直播为什么要用独立IP

TikTok直播作为一种受欢迎的社交媒体形式&#xff0c;吸引了越来越多的用户和内容创作者。在进行TikTok直播时&#xff0c;选择使用独立IP地址是一种被广泛推荐的做法。本文将探讨为什么在TikTok直播中更推荐使用独立IP&#xff0c;并解释其优势和应用。 独立IP是指一个唯一的互…

探索Linux项目自动化构建:make/Makefile的使用方法

&#x1f331;博客主页&#xff1a;青竹雾色间 &#x1f331;系列专栏&#xff1a;Linux &#x1f618;博客制作不易欢迎各位&#x1f44d;点赞 ⭐收藏 ➕关注 标题&#xff1a; 使用 Makefile 实现项目自动化构建 - 从零开始学习 Makefile 摘要&#xff1a; Makefile 是一个用…

如何在 OpenCloudOS 上安装 OpenTenBase 数据库

OpenTenBase 是由开放原子开源基金会孵化及运营的开源项目&#xff0c;是一款企业级的分布式 HTAP 数据库&#xff0c;具备高扩展性、商业数据库语法兼容、分布式 HTAP 引擎、多级容灾和多维度资源隔离等能力&#xff0c;目前已经成功应用于金融、医疗、航天等诸多行业的核心业…

Github Coplit和Poe不再订阅,改用Token和LobeChat

优化AI使用方式 1.取消Poe和Github Coplit的年度订阅 今天把200$ 的Poe和100$的Github Coplit的年度订阅取消了&#xff0c;确保到期不会续定&#xff0c;包年用AI的时代&#xff0c;在这里结束了。 2.改用Token购买模式 使用的AI质量必须不变&#xff0c;改用Token的方式&…

【Prometheus】Prometheus安装部署流程详解,配置参数webUI使用方法解析说明

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…

二十、Java8新特性

文章目录 引入一、Lambda表达式1.1 快速入门1.2 Lambda语法 二、函数式(Functional)接口2.1 函数式(Functional)接口介绍2.2 Java内置函数式接口 三、方法引用与构造器引用3.1 方法引用3.2 构造器引用 四、强大的Stream API4.1 创建 Stream 的4种方式4.2 Stream 的中间操作4.2.…

PHP图书馆在指尖图书借阅小程序助力全民阅读系统小程序源码

​图书馆在指尖 —— 图书借阅小程序助力全民阅读 &#x1f4da;【开篇&#xff1a;指尖上的知识海洋】&#x1f4da; 在这个快节奏的时代&#xff0c;你是否曾渴望随时随地都能沉浸在书海中&#xff1f;现在&#xff0c;有了图书借阅小程序&#xff0c;图书馆就真正来到了你…

【MySQL08】【死锁】

文章目录 一、前言二、查看事务加锁情况1. 使用 information_schema 数据库中表获取锁信息1.1 INNODB_TRX1.2 INNODB_LOCKS1.3 INNODB_LOCK_WAITS 2. 使用 SHOW ENGIN INNODB STATUS 获取锁信息 三、死锁四、参考内容 一、前言 最近在读《MySQL 是怎样运行的》、《MySQL技术内…

GPU版pytorch安装(win/linux)

参考&#xff1a; Pytorch环境配置——cuda、、cudnn、torch、torchvision对应版本&#xff08;最全&#xff09;及安装方法-CSDN博客 Previous PyTorch Versions | PyTorch 法1&#xff1a;命令安装 如&#xff1a; conda install pytorch2.1.0 torchvision0.16.0 torchau…

Leetcode面试经典150题-63.不同路径II

解法都在代码里&#xff0c;不懂就留言或者私信 class Solution {/**本题是典型的动态规划&#xff0c;但是需要注意的是这个网格中是有障碍的&#xff0c;障碍不能走所以其实还是一样的&#xff0c;计算所有点到(m-1,n-1)有多少种方式&#xff0c;返回(0,0)位置的解就行了 */…

Python | Leetcode Python题解之第393题UTF-8编码验证

题目&#xff1a; 题解&#xff1a; class Solution:def validUtf8(self, data: List[int]) -> bool:MASK1, MASK2 1 << 7, (1 << 7) | (1 << 6)def getBytes(num: int) -> int:if (num & MASK1) 0:return 1n, mask 0, MASK1while num & m…

Python文件自动分类

假如这样的步骤全部手动做下来耗时是6秒&#xff0c;在文件数量不多的情况下&#xff0c;比如10个文件&#xff0c;总共耗时一分钟其实是能够接受的。 但当文件数量特别多时&#xff0c;或者这个操作特别频繁每天都要做十几二十次时&#xff0c;手动操作就会变得耗时又繁琐…