JUC“阻塞队列”水很深,你把握不住!

news2024/11/19 5:55:48
作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO

联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬

提到阻塞队列,大家脑海中就会冒出:

  • BlockingQueue
  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • SynchronousQueue

但JDK阻塞队列本身是非常简单的,难的是阻塞队列内部的AQS。

如果你之前对阻塞队列一无所知又恰好想要学习,希望能耐心看完下面的内容。还是那句话,学习阻塞队列的重点不是阻塞队列本身...我自己都可以手写阻塞队列。

为了打破大家对阻塞队列“难”、“晦涩”、“神秘”的印象,我会从新的角度切入,重构大家对阻塞队列的认识。

主要内容:

  • 什么是线程间通信
  • 实现线程间通信
    • 轮询
    • 等待唤醒机制:wait/notify
    • 等待唤醒机制:condition
  • 山寨版BlockingQueue
  • JDK BlockingQueue简介
  • 展望AQS

什么是线程间通信

定义:

针对同一个资源的操作有 不同种类的线程。

说人话就是:共享资源+多线程,最典型的例子就是锁和生产者消费者(本文以生产者-消费者为例子讲解)。

以现实生活为例。消费者和生产者就像两个线程,原本做着各自的事情,厂家管自己生产,消费者管自己买,一般情况下彼此互不影响。

但当物资到达某个临界点时,就需要根据供需关系适当作出调整。

当厂家做了一大堆东西,产能过剩时,应该暂停生产,扩大宣传,让消费者过来消费

当消费者发现某个热销商品售罄,应该提醒厂家尽快生产

在上面的案例中,生产者和消费者是不同种类的线程,一个负责存入,另一个负责取出,且它们操作的是同一个资源。但最难的部分在于:

  • 资源到达上限时,生产者等待,消费者消费
  • 资源达到下限时,生产者生产,消费者等待

你会发现,原本互不打扰的两个线程之间开始“沟通”了:

  • 生产者:喂,我这边做的太多了,先休息会儿,你赶紧消费
  • 消费者:喂,货快没了,我休息会儿,你赶紧生产

这种线程间的相互调度,也就是线程间通信。

看到这,你心里暗暗想道:我擦,我只会new Thread().start(),怎么让A线程去喊B线程工作呢?

实现线程间通信

还是以上面的生产者-消费者为例,有很多种方式可以实现线程间通信。

轮询

设计理念:生产者和消费者线程各自使用while循环,每隔片刻就去判断Queue的状态,队列为空时生产者才可插入数据,队列不为空时消费者才能取出数据,否则一律sleep等待。

/**
 * 轮询版本
 */
public class WhileQueue<T> {
    // 容器,用来装东西
    private final LinkedList<T> queue = new LinkedList<>();

    public void put(T resource) throws InterruptedException {
        while (queue.size() >= 1) {
            // 队列满了,不能再塞东西了,轮询等待消费者取出数据
            System.out.println("生产者:队列已满,无法插入...");
            TimeUnit.MILLISECONDS.sleep(1000);
        }
        System.out.println("生产者:插入" + resource + "!!!");
        queue.addFirst(resource);
    }

    public void take() throws InterruptedException {
        while (queue.size() <= 0) {
            // 队列空了,不能再取东西,轮询等待生产者插入数据
            System.out.println("消费者:队列为空,无法取出...");
            TimeUnit.MILLISECONDS.sleep(1000);
        }
        System.out.println("消费者:取出消息!!!");
        queue.removeLast();
        TimeUnit.MILLISECONDS.sleep(5000);
    }

}

测试

public class Test {
    public static void main(String[] args) {
        // 队列
        WhileQueue<String> queue = new WhileQueue<>();

        // 生产者
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 100; i++) {
                    try {
                        queue.put("消息" + i);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();

        // 消费者
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 100; i++) {
                    try {
                        queue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }
}

由于设定了队列最多只能存1个消息,所以只有当队列为空时,生产者才能插入数据。这是最简单的线程间通信:

多个线程不断轮询共享资源,通过共享资源的状态判断自己下一步该做什么。

看到这,你发现自己被骗了:哦,原来要实现线程间通信,并非真的需要A线程直接去叫B线程干什么,只要能按实际情况完成线程切换即可!

但上面的实现方式存在一些缺点:

  • 轮询的方式太耗费CPU资源,如果线程过多,比如几百上千个线程同时在那轮询,会给CPU带来较大负担
  • 无法保证原子性(代码里没有演示,但理论上确实如此,如果生产者的操作非原子性,消费者极可能获取到脏数据)

等待唤醒机制:wait/notify

相对而言,等待唤醒机制则要优雅得多,底层通过维护线程队列的方式,避免了过多线程同时自旋造成的CPU资源浪费,颇有点“用空间换时间”的味道。当一个生产者线程无法插入数据时,就让它在队列里休眠(阻塞),此时生产者线程会释放CPU资源,等到消费者抢到CPU执行权并取出数据后,再由消费者唤醒生产者继续生产。

举个例子,原本生产者和消费者都要时不时去店里看一下:

  • 生产者:货卖完了没有,卖完了我要继续生产(每分钟来店里看一下)
  • 消费者:补货了没,补货了我就可以买了(每分钟来店里看一下)

而现在,生产者去店里看了下,发现还有货,就管自己去后厨睡觉了,等店里货都卖完了,自然会有消费者过来喊他补货,不需要付出额外的精力在店里盯着。

Java有多种方式可以实现等待唤醒机制,最经典的就是wait和notify。

/**
 * wait/notify版本
 */
public class WaitNotifyQueue<T> {
    // 容器,用来装东西
    private final LinkedList<T> queue = new LinkedList<>();

    public synchronized void put(T resource) throws InterruptedException {
        while (queue.size() >= 1) {
            // 队列满了,不能再塞东西了,轮询等待消费者取出数据
            System.out.println("生产者:队列已满,无法插入...");
            this.wait();
        }
        System.out.println("生产者:插入" + resource + "!!!");
        queue.addFirst(resource);
        this.notify();
    }

    public synchronized void take() throws InterruptedException {
        while (queue.size() <= 0) {
            // 队列空了,不能再取东西,轮询等待生产者插入数据
            System.out.println("消费者:队列为空,无法取出...");
            this.wait();
        }
        System.out.println("消费者:取出消息!!!");
        queue.removeLast();
        this.notify();
    }
}

对比WhileQueue做了哪些改进:

  • 用synchronized保证原子性
  • wait和notify实现等待唤醒

但一般推荐使用notifyAll(为什么?)。我们给测试程序再加一个生产者线程就知道了:

开始不久后,整个程序所有线程都阻塞了

原因是:在synchronized机制下,所有等待的线程都在同一个队列里,而notify又恰巧是随机唤醒线程(也就是说,有可能生产者唤醒生产者)。

最终结果是:所有线程都睡觉了...表现在程序上,就是卡住了。

解决办法是改用notifyAll,把所有线程都唤醒,然后大家一起参与执行权的竞争。你是否有疑问:如果和上面一样,生产者1还是唤醒生产者2呢?

其实这个假设不成立...使用notifyAll以后就不再是随机唤醒某一个线程了,而是唤醒所有线程并重新抢夺执行权。也就是说,每一个线程在进入阻塞之前,都会叫醒其他所有线程!

等待唤醒机制:condition

wait/notify版本的缺点是随机唤醒容易出现“己方唤醒己方,最终导致全部线程阻塞”的乌龙事件,虽然wait/notifyAll能解决这个问题,但唤醒全部线程又不够精确,会造成无谓的线程竞争(实际只需要唤醒敌方线程即可)。

作为改进版,可以使用ReentrantLock的Condition替代synchronized的wait/notify:

/**
 * Condition版本
 */
public class ConditionQueue<T> {
    // 容器,用来装东西
    private final LinkedList<T> queue = new LinkedList<>();

    // 显式锁(相对地,synchronized锁被称为隐式锁)
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition producerCondition = lock.newCondition();
    private final Condition consumerCondition = lock.newCondition();

    public void put(T resource) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() >= 1) {
                // 队列满了,不能再塞东西了,轮询等待消费者取出数据
                System.out.println("生产者:队列已满,无法插入...");
                // 生产者阻塞
                producerCondition.await();
            }
            System.out.println("生产者:插入" + resource + "!!!");
            queue.addFirst(resource);
            // 生产完毕,唤醒消费者
            consumerCondition.signal();
        } finally {
            lock.unlock();
        }
    }

    public void take() throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() <= 0) {
                // 队列空了,不能再取东西,轮询等待生产者插入数据
                System.out.println("消费者:队列为空,无法取出...");
                // 消费者阻塞
                consumerCondition.await();
            }
            System.out.println("消费者:取出消息!!!");
            queue.removeLast();
            // 消费完毕,唤醒生产者
            producerCondition.signal();
        } finally {
            lock.unlock();
        }
    }
}

如何理解Condition呢?你可以认为lock.newCondition()创建了一个队列,调用producerCondition.await()会把生产者线程放入生产者的等待队列中,当消费者调用producerCondition.signal()时会唤醒从生产者的等待队列中唤醒一个生产者线程出来工作。

也就是说,ReentrantLock的Condition通过拆分线程等待队列,让线程的等待唤醒更加精确了,想唤醒哪一方就唤醒哪一方。

山寨版BlockingQueue

至此,大家应该对线程间通信有了大致了解。如果你仔细观察,会发现上面其实都采用了阻塞队列实现。我们都是先构造一个Queue,然后生产者和消费者直接操作Queue,至于是否阻塞,由Queue内部判断。这样封装的好处是,将生产者和消费者解耦的同时,不暴露过多细节,使用起来更简单。

大家应该都听过JDK的阻塞队列吧?基于上面的案例,我们改进一下,抽取出一个自定义的阻塞队列(使用wait/nofityAll实现):

public class BlockingQueue<T> {

    // 模拟队列
    private final LinkedList<T> queue = new LinkedList<>();

    private int MAX_SIZE = 1;
    private int remainCount = 0;

    public BlockingQueue(int capacity) {
        if (capacity <= 0) {
            throw new IllegalArgumentException("size最小为1");
        }
        this.MAX_SIZE = capacity;
    }

    public synchronized void put(T resource) throws InterruptedException {
        while (queue.size() >= MAX_SIZE) {
            // 队列满了,不能再塞东西了,阻塞生产者
            System.out.println("插入阻塞...");
            this.wait();
        }
        queue.addFirst(resource);
        remainCount++;
        printMsg(resource, "被插入");
        this.notifyAll();
    }

    public synchronized T take() throws InterruptedException {
        while (queue.size() <= 0) {
            // 队列空了,不能再取东西了,阻塞消费者
            System.out.println("取出阻塞...");
            this.wait();
        }
        T resource = queue.removeLast();
        remainCount--;
        printMsg(resource, "被取出");
        this.notifyAll();
        return resource;
    }

    private void printMsg(T resource, String operation) throws InterruptedException {
        System.out.println(resource + operation);
        System.out.println("队列容量:" + remainCount);
    }
}

JDK BlockingQueue简介

虽然很多人开口闭口“阻塞队列”,但“阻塞队列”在他脑中只是个很模糊的概念。连“阻塞队列”的来龙去脉都不甚清楚,又怎么能说了解呢?

实际上,和List、Set一样,“阻塞队列”也有自己的一脉。在JDK的util包下有一个Queue接口:

如果你继续往下扒,就会发现Queue和List其实很像,也是集合的一个分支罢了:

为什么很多人会觉得阻塞队列(比如ArrayBlockingQueue)高大上,听起来比ArrayList牛逼呢?主要在于“阻塞”二字!因为大家不了解阻塞,自己也不知道怎么实现阻塞,所以会觉得阻塞队列很神秘,很牛逼。但仔细观察上面的继承关系你会发现,如果ArrayBlockingQueue没有实现BlockingQueue接口,那么它本应该是个普普通通的队列,而不是阻塞队列,也就没有那么惊艳了。

那么BlockingQueue做了啥呢?其实啥也没做,毕竟BlockingQueue只是个接口,而接口只能定义方法...就好比一栋摩天大厦建成了,楼顶有个空中泳池,你觉得很牛逼。那么,你觉得是当初说“我要楼顶有个大花园”的老板牛逼还是把这个方案实现的设计师牛逼呢?

扯远了,其实BlockingQueue继承Queue接口后,就定义了几个方法:

BlockingQueue金口一开,后面的小弟只能满足,所以几个阻塞队列的实现类都有上面的几个方法。

那么阻塞队列的“阻塞”是怎么实现的呢?以ArrayBlockingQueue为例,通过上面的继承关系分析,Queue和BlockingQueue是接口,里面只有方法定义没有具体实现,有可能实现“阻塞”功能的要么在AbstractQueue,要么就是ArrayBlockingQueue自身。我们查看AbstractQueue发现这家伙几乎啥都没写...

也就是说,当初老板发话“我希望这个队列能阻塞”,经理微笑着满口答应,结果转手就交给3个小弟自己整了。好在3个小弟争气,还真给他们搞出来了...

常用的3个阻塞队列:

  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • SynchronousQueue

还是以ArrayBlockingQueue为例,它是怎么实现阻塞的呢?

好家伙...竟然用了ReentrantLock,这和我们上面案例中写的ConditionQueue好像啊!

但是ArrayBlockingQueue只有notFull.await(),没看到signal(),不合理。仔细找找,唯一的可能是ArrayBlockingQueue把signal()藏在enqueue(e)方法里了:

其他两个阻塞队列LinkedBlockingQueue和SynchronousQueue同理,也是用ReentrantLock实现阻塞的。

展望AQS

看到这里,相信阻塞队列在大家心中已经不再那么神圣了,有什么了不起啊,我们自己也能写啊,还用了好几种方式实现呢!但是扪心自问,阻塞队列总共也就:

  • 阻塞
  • 队列

而我们所谓的手写阻塞队列,其实是这样的:队列直接用了LinkedList,阻塞也是借用wait/notify和ReentrantLock实现的。也就是说,我们其实只是做了组装工作,拿现成的队列+阻塞功能拼出了一个阻塞队列

世间路千万条,总有人不走寻常路。按理说现成的List+wait/notifyAll已经可以造出阻塞队列了,但就是有大佬不满足。

Doug Lea老爷子震惊的说:What?! Why you don't say earlly ya! I have already finished the AQS le...

是的,又是这个男人,他整出了一个AQS,再把AQS塞到ReentrantLock中,最后用ReentrantLock+数组、ReentrantLock+链表、ReentrantLock+Transfer搞出了ArrayBlockingQueue、LinkedBlockingQueue和SynchronousQueue...阻塞队列只能算顺便的,他的初衷其实是利用AQS统一并简化锁的实现,屏蔽同步状态管理、阻塞线程的排队和通知、唤醒机制等,让后续的二次开发更简便。

换句话说:

如果你纠结于阻塞队列怎么实现,那你的格局就太小了...JDK的阻塞队列依赖于ReentrantLock,而ReentrantLock只是对AQS的浅封装,真正需要我们花功夫学习的其实有且只有AQS。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1210662.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

java入门,从CK导一部分数据到mysql

一、需求 需要从生产环境ck数据库导数据到mysql&#xff0c;数据量大约100w条记录。 二、处理步骤 1、这里的关键词是生产库&#xff0c;第二就是100w条记录。所以处理数据的时候就要遵守一定的规范。首先将原数据库表进行备份&#xff0c;或者将需要导出的数据建一张新的表了…

界面控件Kendo UI for jQuery R3 2023 - 发布全新金字塔图表类型

Telerik & Kendo UI R3 2023版本带来了30多个新的UI组件&#xff0c;丰富的设计系统文档、多种自定义选项、支持Linux的现代化报表体验等。借助R3 2023&#xff0c;开发人员能够在现代框架上快速构建强大的数字体验功能&#xff0c;满足不断变化的业务需求等。今天将为大家…

vcenter server (部署较大服务器)

作用 VMware vCenter是集中管理控制台&#xff0c;管理所有安装了VMware ESXI的主机 使用vCenter Server可以对虚拟机进行实时的监控&#xff0c;包括服务器硬件、网络和共享的存储&#xff0c;并可以进行故障诊断。 可以查看实时的统计和图表&#xff0c;监控虚拟主机和资源…

【NodeJS】Nodejs安装及环境配置

下载安装包 网址&#xff1a;https://nodejs.org/en 安装程序 1.下载完成后&#xff0c;双击安装包&#xff0c;进行安装&#xff0c;一路默认配置 nxet 即可&#xff0c;安装路劲给默认在C盘&#xff0c;或者选择其他位置&#xff0c;当前教程默认C盘 2.下图根据本身的…

创造者设计模式

Bike package com.jmj.pattern.builder.demo01;public class Bike {private String frame;//车架private String seat;//车座public String getFrame() {return frame;}public void setFrame(String frame) {this.frame frame;}public String getSeat() {return seat;}public…

【京东API】商品详情+搜索商品列表接口

利用电商API获取数据的步骤 1.申请API接口&#xff1a;首先要在相应电商平台上注册账号并申请API接口。 2.获取授权&#xff1a;在账号注册成功后&#xff0c;需要获取相应的授权才能访问电商API。 3.调用API&#xff1a;根据电商API提供的请求格式&#xff0c;通过编程实现…

如何使用Servlet写一个简单的网站

文章目录 前言1. 创建项目2. 引入依赖3. 创建目录4. 编写代码5. 打包程序6. 部署7.验证程序将 tomcat 集成到 idea 中 前言 前面我们学习了Java中常用的 HTTP 服务器 tomcat 的安装和使用&#xff0c;那么今天我们将使用这个 HTTP 服务器为大家写一个简单的网站&#xff0c;这…

Linux之输入输出重定向和管道

一、是什么 linux中有三种标准输入输出&#xff0c;分别是STDIN&#xff0c;STDOUT&#xff0c;STDERR&#xff0c;对应的数字是0、1、2&#xff1a; STDIN 是标准输入&#xff0c;默认从键盘读取信息STDOUT 是标准输出&#xff0c;默认将输出结果输出至终端STDERR 是标准错误…

新版软考高项试题分析精选(三)

请点击↑关注、收藏&#xff0c;本博客免费为你获取精彩知识分享&#xff01;有惊喜哟&#xff01;&#xff01; 1、项目整体管理要综合考虑项目各个相关过程&#xff0c;围绕整体管理特点&#xff0c;以下说法中&#xff0c;&#xff08; &#xff09;是不正确的。 A.项目的…

【2021集创赛】 RISC-V杯三等奖:基于E203 处理器的SM4算法硬件加速

杯赛题目&#xff1a;基于蜂鸟E203 RISC-V处理器内核的SoC设计 参赛要求&#xff1a;研究生组/本科生组 赛题内容&#xff1a; 基于芯来科技的开源蜂鸟E203 Demo SoC进行扩展&#xff0c;在限定的可编程逻辑平台上构建面向专用应用领域&#xff08;譬如人工智能、信息安全、工业…

194. 二叉树的最近公共祖先

题目 题解 递归 def lowestCommonAncestor(root: TreeNode, p: TreeNode, q: TreeNode) -> TreeNode:if not root or root p or root q:return rootleft lowestCommonAncestor(root.left, p, q)right lowestCommonAncestor(root.right, p, q)if not left:return right…

2.4G射频收发芯片XL2400P,收发一体,性能优异

XL2400P 系列芯片是工作在 2.400~2.483GHz 世界通用 ISM 频段的单片无线收发芯片。该芯片集成射频收发机、频率收生器、晶体振荡器、调制解调器等功能模块&#xff0c;并且支持一对多组网和带 ACK 的通信模式。发射输出功率、工作频道以及通信数据率均可配置。芯片已将多颗外围…

全志R128基础组件开发指南——图像采集

图像采集 CSI&#xff08;DVP&#xff09; 图像采集 SENSOR -> CSI 通路 CSI &#xff08;CMOS sensor interface&#xff09;接口时序上可支持独立 SYNC 和嵌入 SYNC(CCIR656)。支持接收 YUV422 或 YUV420 数据。 VSYNC 和HSYNC 的有效电平可以是正极性&#xff0c;也可…

常用网络命令(实习报告)

南京信息工程大学 实验&#xff08;实习&#xff09;报告 实验&#xff08;实习&#xff09;名称 常用网络命令 实验&#xff08;实习&#xff09;日期 2017/5/25 得分 指导教师 *** 专业 网络工程 年级 2015 班次 1 姓名 *** …

Mybatis报错找不到参数解决之编译保留参数名称

Hi, I’m Shendi Mybatis报错找不到参数解决之编译保留参数名称 需求场景 在使用 Mybatis 的过程中&#xff0c;对于函数参数&#xff0c;通常会加上 Param 注解来给参数命名&#xff0c;以让 Mybatis 找到参数。 有的时候忘记添加&#xff0c;执行时就会报找不到参数的错误&…

spring cloud之网关

Gateway网关(*) 什么是网关 # 1.说明 - 网关统一服务入口&#xff0c;可方便实现对平台众多服务接口进行管控。 - 网关 路由转发 过滤器路由转发&#xff1a;接收一切外界请求&#xff0c;转发到后端的微服务上去过滤器&#xff1a;在服务网关中可以完成一系列的横切功能&a…

学Diffusion前需要储备的一些知识点

自学Diffusion是非常困难的&#xff0c;尤其是到了VAE和VI这里基本找不到比较好的中文资料&#xff0c;甚至是涉及到一些重参数化&#xff0c;高斯混合之类的问题摸不着来龙去脉。在本文中&#xff0c;基本不会涉及公式&#xff0c;只有intuition和理解&#xff0c;如果要看公式…

Pytorch CUDA CPP简易教程,在Windows上操作

文章目录 前言一、使用的工具二、学习资源分享三、libtorch环境配置1.配置CUDA、nvcc、cudnn2.下载libtorch3.CLion配置libtorch4.CMake Application指定Environment variables5.测试libtorch 四、PyTorch CUDA CPP项目流程1.使用CLion结合torch extension编写可以调用cuda的C代…

推介会如何做好媒体宣传

传媒如春雨&#xff0c;润物细无声&#xff0c;大家好&#xff0c;我是51媒体网胡老师。 推介会是一种专为企业、社会组织和团体、政府等提供的展示自身特点、产品和政策的活动形式&#xff0c;旨在促进交流活动&#xff0c;形成合作&#xff0c;从而带来共同利益。推介会的本…

硬盘分区后数据还能恢复吗?答案揭晓!

“前两天刚给我的电脑硬盘分了区&#xff0c;但今天在查找数据时却发现某些数据丢失了。硬盘分区导致的数据丢失还有机会找回吗&#xff1f;怎么操作呢&#xff1f;请帮帮我&#xff01;” 在使用电脑时&#xff0c;可能由于电脑需要重装系统&#xff0c;或者出现系统崩溃的情况…