从线程间通信聊到阻塞队列

news2025/1/16 19:11:17
作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO

联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬

很多Java新手都对ReentrantLock、阻塞队列以及等待唤醒机制十分感兴趣,于是一头扎进源码,想要吃透个中细节,却最终迷失方向什么都没学到。其实,从某个角度而言,ReentrantLock、阻塞队列、等待唤醒、AQS都是同一个东西,或者说它们被用来解决同一个问题:线程间通信。所以,本文打算从线程间通信讲起,由浅入深地介绍阻塞队列。后面如果有时间,还可以聊聊AQS及ReentrantLock。

ThreadPoolExecutor = 线程池 + 阻塞队列,本文是为了下一篇线程池做铺垫。

什么是线程间通信

定义:

针对同一个资源的操作有不同种类的线程。

说人话就是:共享资源+多线程,最典型的例子就是锁和生产者消费者(关于锁,后面有专门的章节介绍,这里以生产者-消费者为例子讲解)。

以现实生活为例。消费者和生产者就像两个线程,原本做着各自的事情,厂家管自己生产,消费者管自己买,一般情况下彼此互不影响。

但当物资到达某个临界点时,就需要根据供需关系适当作出调整。

  • 当厂家做了一大堆东西,产能过剩时,应该暂停生产,扩大宣传,让消费者过来消费

  • 当消费者发现某个热销商品售罄,应该提醒厂家尽快生产

在上面的案例中,生产者和消费者是不同种类的线程,一个负责存入,另一个负责取出,且它们操作的是同一个资源。但最难的部分在于:

  • 资源到达上限时,生产者等待,消费者消费
  • 资源达到下限时,生产者生产,消费者等待

你会发现,原本互不打扰的两个线程之间开始“沟通”了:

  • 生产者:喂,我这边做的太多了,先休息会儿,你赶紧消费
  • 消费者:喂,货快没了,我休息会儿,你赶紧生产

这种线程间的相互调度,也就是线程间通信。

看到这,你心里暗暗想道:我擦,我只会new Thread().start(),怎么让A线程去喊B线程工作呢?

实现线程间通信

还是以上面的生产者-消费者为例,有很多种方式可以实现线程间通信。

轮询

设计理念:生产者和消费者线程各自使用while循环,每隔片刻就去判断Queue的状态,队列为空时生产者才可插入数据,队列不为空时消费者才能取出数据,否则一律sleep等待。

/**
 * 轮询版本
 */
public class WhileQueue<T> {
    // 容器,用来装东西
    private final LinkedList<T> queue = new LinkedList<>();

    public void put(T resource) throws InterruptedException {
        while (queue.size() >= 1) {
            // 队列满了,不能再塞东西了,轮询等待消费者取出数据
            System.out.println("生产者:队列已满,无法插入...");
            TimeUnit.MILLISECONDS.sleep(1000);
        }
        System.out.println("生产者:插入" + resource + "!!!");
        queue.addFirst(resource);
    }

    public void take() throws InterruptedException {
        while (queue.size() <= 0) {
            // 队列空了,不能再取东西,轮询等待生产者插入数据
            System.out.println("消费者:队列为空,无法取出...");
            TimeUnit.MILLISECONDS.sleep(1000);
        }
        System.out.println("消费者:取出消息!!!");
        queue.removeLast();
        TimeUnit.MILLISECONDS.sleep(5000);
    }

}

测试:

public class Test {
    public static void main(String[] args) {
        // 队列
        WhileQueue<String> queue = new WhileQueue<>();

        // 生产者
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 100; i++) {
                    try {
                        queue.put("消息" + i);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();

        // 消费者
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 100; i++) {
                    try {
                        queue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }
}

由于设定了队列最多只能存1个消息,所以只有当队列为空时,生产者才能插入数据。这是最简单的线程间通信:

多个线程不断轮询共享资源,通过共享资源的状态判断自己下一步该做什么。

看到这,你发现自己被骗了:哦,原来要实现线程间通信,并非真的需要A线程直接去叫B线程干什么,不同线程通过共享变量即可完成通信!

但上面的实现方式存在一些缺点:

  • 轮询的方式太耗费CPU资源,如果线程过多,比如几百上千个线程同时在那轮询,会给CPU带来较大负担
  • 无法保证原子性(代码里没有演示,但理论上确实如此,如果生产者的操作非原子性,消费者极可能获取到脏数据)

等待唤醒机制:wait/notify

相对而言,等待唤醒机制则要优雅得多,底层通过维护线程队列的方式,避免了过多线程同时自旋造成的CPU资源浪费,颇有点“用空间换时间”的味道。当一个生产者线程无法插入数据时,就让它在队列里休眠(阻塞),此时生产者线程会释放CPU资源,等到消费者抢到CPU执行权并取出数据后,再由消费者唤醒生产者继续生产。

举个例子,原本生产者和消费者都要时不时去店里看一下:

  • 生产者:货卖完了没有,卖完了我要继续生产(每分钟来店里看一下)
  • 消费者:补货了没,补货了我就可以买了(每分钟来店里看一下)

而现在,生产者去店里看了下,发现还有货,就管自己去后厨睡觉了,等店里货都卖完了,自然会有消费者过来喊他补货,不需要付出额外的精力在店里盯着。

Java有多种方式可以实现等待唤醒机制,最经典的就是wait和notify。

/**
 * wait/notify版本
 */
public class WaitNotifyQueue<T> {
    // 容器,用来装东西
    private final LinkedList<T> queue = new LinkedList<>();

    public synchronized void put(T resource) throws InterruptedException {
        while (queue.size() >= 1) {
            // 队列满了,不能再塞东西了,轮询等待消费者取出数据
            System.out.println("生产者:队列已满,无法插入...");
            this.wait();
        }
        System.out.println("生产者:插入" + resource + "!!!");
        queue.addFirst(resource);
        this.notify();
    }

    public synchronized void take() throws InterruptedException {
        while (queue.size() <= 0) {
            // 队列空了,不能再取东西,轮询等待生产者插入数据
            System.out.println("消费者:队列为空,无法取出...");
            this.wait();
        }
        System.out.println("消费者:取出消息!!!");
        queue.removeLast();
        this.notify();
    }
}

对比WhileQueue做了哪些改进:

  • 用synchronized保证原子性
  • wait和notify实现等待唤醒

但一般推荐使用notifyAll(为什么?)。我们给测试程序再加一个生产者线程就知道了:

我们发现,整个程序所有线程都阻塞了。

原因是:在synchronized机制下,所有等待的线程都在同一个队列里(生产者和消费者共用一个队列),而notify又恰巧是随机唤醒线程。也就是说,有可能当前醒着的唯一线程是生产者,而他干完活以后唤醒的又是生产者。

最终结果是:所有线程都睡觉了...表现在程序上,就是卡住了。

解决办法是改用notifyAll,把所有线程都唤醒,然后大家一起参与执行权的竞争。你是否有疑问:如果和上面一样,生产者1还是唤醒生产者2呢?

其实这个假设不成立...使用notifyAll以后就不再是随机唤醒某一个线程了,而是唤醒所有线程并重新抢夺执行权。也就是说,每一个线程在进入阻塞之前,都会叫醒其他所有线程!

等待唤醒机制:condition

wait/notify版本的缺点是随机唤醒容易出现“己方唤醒己方,最终导致全部线程阻塞”的乌龙事件,虽然wait/notifyAll能解决这个问题,但唤醒全部线程又不够精确,会造成无谓的线程竞争(实际只需要唤醒敌方线程即可)。

作为改进版,可以使用ReentrantLock的Condition替代synchronized的wait/notify:

/**
 * Condition版本
 */
public class ConditionQueue<T> {
    // 容器,用来装东西
    private final LinkedList<T> queue = new LinkedList<>();

    // 显式锁(相对地,synchronized锁被称为隐式锁)
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition producerCondition = lock.newCondition();
    private final Condition consumerCondition = lock.newCondition();

    public void put(T resource) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() >= 1) {
                // 队列满了,不能再塞东西了,轮询等待消费者取出数据
                System.out.println("生产者:队列已满,无法插入...");
                // 生产者阻塞
                producerCondition.await();
            }
            System.out.println("生产者:插入" + resource + "!!!");
            queue.addFirst(resource);
            // 生产完毕,唤醒消费者
            consumerCondition.signal();
        } finally {
            lock.unlock();
        }
    }

    public void take() throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() <= 0) {
                // 队列空了,不能再取东西,轮询等待生产者插入数据
                System.out.println("消费者:队列为空,无法取出...");
                // 消费者阻塞
                consumerCondition.await();
            }
            System.out.println("消费者:取出消息!!!");
            queue.removeLast();
            // 消费完毕,唤醒生产者
            producerCondition.signal();
        } finally {
            lock.unlock();
        }
    }
}

如何理解Condition呢?你可以认为lock.newCondition()创建了一个队列,调用producerCondition.await()会把生产者线程放入生产者的等待队列中,当消费者调用producerCondition.signal()时会唤醒从生产者的等待队列中唤醒一个生产者线程出来工作。

后续condition.signal()就是从等待队列唤醒线程

也就是说,ReentrantLock的每个Condition都会创建一个等待队列的方式,可以分别存储需要等待的生产者线程和消费者线程,从而实现“精准唤醒”。

山寨版BlockingQueue

至此,大家应该对线程间通信有了大致了解。如果你仔细观察,会发现上面其实都采用了阻塞队列实现。我们都是先构造一个Queue,然后生产者和消费者直接操作Queue,至于是否阻塞,由Queue内部判断。这样封装的好处是,将生产者和消费者解耦的同时,不暴露过多细节,使用起来更简单(让生产者、消费者自己去判断是否需要阻塞会很繁琐)。

大家应该都听过JDK的阻塞队列吧?基于上面的案例,我们改进一下,抽取出一个自定义的阻塞队列(使用wait/nofityAll实现):

public class BlockingQueue<T> {

    // 模拟队列
    private final LinkedList<T> queue = new LinkedList<>();

    private int MAX_SIZE = 1;
    private int remainCount = 0;

    public BlockingQueue(int capacity) {
        if (capacity <= 0) {
            throw new IllegalArgumentException("size最小为1");
        }
        this.MAX_SIZE = capacity;
    }

    public synchronized void put(T resource) throws InterruptedException {
        while (queue.size() >= MAX_SIZE) {
            // 队列满了,不能再塞东西了,阻塞生产者
            System.out.println("插入阻塞...");
            this.wait();
        }
        queue.addFirst(resource);
        remainCount++;
        printMsg(resource, "被插入");
        this.notifyAll();
    }

    public synchronized T take() throws InterruptedException {
        while (queue.size() <= 0) {
            // 队列空了,不能再取东西了,阻塞消费者
            System.out.println("取出阻塞...");
            this.wait();
        }
        T resource = queue.removeLast();
        remainCount--;
        printMsg(resource, "被取出");
        this.notifyAll();
        return resource;
    }

    private void printMsg(T resource, String operation) throws InterruptedException {
        System.out.println(resource + operation);
        System.out.println("队列容量:" + remainCount);
    }
}

BlockingQueue简介

虽然很多人开口闭口“阻塞队列”,但“阻塞队列”在他脑中只是个很模糊的概念。连“阻塞队列”的来龙去脉都不甚清楚,又怎么能说了解呢?

实际上,和List、Set一样,“阻塞队列”也有自己的一脉。在JDK的util包下有一个Queue接口:

如果你继续往下扒,就会发现Queue和List其实很像,也是集合的一个分支罢了:

为什么很多人会觉得阻塞队列(比如ArrayBlockingQueue)高大上,听起来比ArrayList牛逼呢?主要在于“阻塞”二字!因为大家不了解阻塞,自己也不知道怎么实现阻塞,所以会觉得阻塞队列很神秘,很牛逼。但仔细观察上面的继承关系你会发现,如果ArrayBlockingQueue没有实现BlockingQueue接口,那么它本应该是个普普通通的队列,而不是阻塞队列,也就没有那么惊艳了。

那么BlockingQueue做了啥呢?其实啥也没做,毕竟BlockingQueue只是个接口,而接口只能定义方法...就好比一栋摩天大厦建成了,楼顶有个空中泳池,你觉得很牛逼。那么,你觉得是当初说“我要楼顶有个大花园”的老板牛逼还是把这个方案实现的设计师牛逼呢?

扯远了,其实BlockingQueue继承Queue接口后,就定义了几个方法:

失败抛异常

失败返回特殊值

阻塞

阻塞(指定超时时间)

插入

add(e)

offer(e)

put(e)

offer(e, time, unit)

删除

remove()

poll()

take()

poll(time, unit)

查询

element()

peek()

BlockingQueue金口一开,后面的小弟只能满足,所以几个阻塞队列的实现类都有上面的几个方法。

那么阻塞队列的“阻塞”是怎么实现的呢?以ArrayBlockingQueue为例,通过上面的继承关系分析,Queue和BlockingQueue是接口,里面只有方法定义没有具体实现,有可能实现“阻塞”功能的要么在AbstractQueue,要么就是ArrayBlockingQueue自身。我们查看AbstractQueue发现这家伙几乎啥都没写...

也就是说,当初老板发话“我希望这个队列能阻塞”,经理微笑着满口答应,结果转手就交给3个小弟自己整了。好在3个小弟争气,还真给他们搞出来了...

常用的3个阻塞队列:

  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • SynchronousQueue

还是以ArrayBlockingQueue为例,它是怎么实现阻塞的呢?put()方法内部判断,队列满了就notFull.await(),否则enqueue()。也就是说,队列满了就阻塞,没满就入队。

好家伙...竟然用了ReentrantLock,这和我们上面案例中写的ConditionQueue好像啊!

但是ArrayBlockingQueue只有notFull.await(),没看到signal(),不合理。仔细找找,唯一的可能是ArrayBlockingQueue把signal()藏在enqueue(e)方法里了:

也就是说,生产者每次成功插入都尝试唤醒消费者组,让它们来消费。虽然极端情况下可能被唤醒的消费者仍然抢不到锁,但没关系,put()内部会判断队列是否满了,一旦队列满了,无论来多少个生产者,统统给你阻塞住,极端情况下生产者全部阻塞,总会轮到消费者的。消费者则是每次成功消费都尝试唤醒生产者组。

其他两个阻塞队列LinkedBlockingQueue和SynchronousQueue同理,也是用ReentrantLock实现阻塞的。

展望AQS

看到这里,相信阻塞队列在大家心中已经不再那么神圣了,有什么了不起啊,我们自己也能写啊,还用了好几种方式实现呢!但是扪心自问,阻塞队列总共也就:

  • 阻塞
  • 队列

而我们所谓的手写阻塞队列,其实是这样的:队列直接用了LinkedList,阻塞也是借用wait/notify和ReentrantLock实现的。也就是说,我们其实只是做了组装工作,拿现成的队列+阻塞功能拼出了一个阻塞队列

世间路千万条,总有人不走寻常路。按理说现成的List+wait/notifyAll已经可以造出阻塞队列了,但就是有大佬不满足。

Doug Lea老爷子震惊的说:What?! Why you don't say earlly ya! I have already finished the AQS le...

是的,又是这个男人,他整出了一个AQS,再把AQS塞到ReentrantLock中,最后用ReentrantLock+数组、ReentrantLock+链表、ReentrantLock+Transfer搞出了ArrayBlockingQueue、LinkedBlockingQueue和SynchronousQueue...阻塞队列只能算顺便的,他的初衷其实是利用AQS统一并简化锁的实现,屏蔽同步状态管理、阻塞线程的排队和通知、唤醒机制等,让后续的二次开发更简便。

换句话说:

如果你纠结于阻塞队列怎么实现,那你的格局就太小了...JDK的阻塞队列依赖于ReentrantLock,而ReentrantLock只是对AQS的浅封装,真正需要我们花功夫学习的其实有且只有AQS。

学完本篇,我鼓励你现在、马上、立刻打开IDEA搜索ArrayBlockingQueue,把源码看一遍。相信我,你已经可以驯服它!

作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO

进群,大家一起学习,一起进步,一起对抗互联网寒冬

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1294073.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C51--IIC协议

IIC协议初识&#xff1a; 1、概述 IIC全称Inter-Integrated Circuit (集成电路总线) 是由PHILIPS公司在80年代开发的两线式串行总线&#xff0c;用于连接微控制器及其外围设备。 IIC属于半双工同步通信方式 SCL——时钟信号 SDA——数据信号 2、特点&#xff1a; 简单性和…

如何将 MySQL 数据库转换为 SQL Server

本文解释了为什么组织希望将其 MySQL 数据库转换为 Microsoft SQL 数据库。本文接着详细介绍了尝试转换之前需要记住的事项以及所涉及的方法。专业的数据库转换器工具将帮助您快速将 MySQL 数据库记录转换为 MS SQL Server。 在继续之前&#xff0c;我们先讨论一下 MySQL 到 M…

Linux查看openSSL版本

命令&#xff1a;openssl version

【python、opencv】opencv仿射变换原理及代码实现

opencv仿射变换原理 仿射变换是opencv的基本知识点&#xff0c;主要目的是将原始图片经过仿射变换矩阵&#xff0c;平移、缩放、旋转成目标图像。用数学公式表示就是坐标转换。 其中x&#xff0c;y是原始图像坐标&#xff0c;u&#xff0c;v是变换后的图像坐标。将公式转换为…

Socket介绍及使用Java实现socket通信前后端示例代码

本文介绍一下再Java中Socket的实现。 目录 一、需要掌握 二、程序源码 三、运行演示 一、介绍 Java Socket实现实时接收TCP消息需要客户端和服务端两个部分。 二、JavaSocket源码示例 客户端后台部分代码 public class Client {public static void main(String[] args)…

【mysql】下一行减去上一行数据、自增序列场景应用

背景 想获取if_yc为1连续账期数据 思路 获取所有if_yc为1的账期数据下一行减去上一行账期&#xff0c;如果为1则为连续&#xff0c;不等于1就为断档获取不等于1的最小账期&#xff0c;就是离当前账期最近连续账期 代码 以下为mysql语法&#xff1a; select acct_month f…

HarmonyOS鸿蒙操作系统架构开发

什么是HarmonyOS鸿蒙操作系统&#xff1f; HarmonyOS是华为公司开发的一种全场景分布式操作系统。它可以在各种智能设备&#xff08;如手机、电视、汽车、智能穿戴设备等&#xff09;上运行&#xff0c;具有高效、安全、低延迟等优势。 目录 HarmonyOS 一、HarmonyOS 与其他操…

C语言——2048完整版

2048是一个简单又有趣的小游戏&#xff0c;相信大家都接触并了解过&#xff0c;那如何通过代码来实现他呢&#xff1f;下面就让我们来一起看看。 目录 1、头文件 2、主函数 3、 StarGame 4、GetNum 5、Show 6、Picture 7、GetButton 8、MergeLeft 9、MergeUp 10、MergeR…

常见的校验码

在计算机领域中&#xff0c;校验码是一种用于检测或纠正数据传输或存储中错误的技术。校验码通常通过在数据中添加一些冗余信息来实现。其主要目的是确保数据的完整性和准确性。 奇偶校验码&#xff08;Parity Check&#xff09; 奇校验&#xff1a; 确保数据中二进制位中的1的…

hbuiler中使用npm安装datav

注&#xff1a;datav边框样式目前使用时&#xff1a;适用于网页&#xff0c;不适用于app 1、先安装node 安装、配置Node路径 2、为Node配置环境变量 3、在hbuilder的设置中填写node的路径 配置 4、打开cmd输入npm install jiaminghi/data-view 安装dataV&#xff0c;&…

算法 最小生成树

算法选择 稠密图&#xff1a;朴素版普利姆算法【因为代码短】 稀疏图&#xff1a;克鲁斯卡尔算法【因为思路简单】 普利姆&#xff08;Prim&#xff09; 朴素 Prim 时间复杂度 O(n^2) 适用情况 稠密图 算法流程 集合&#xff1a;当前已经在连通块中的所有点 初始化距…

JNPF低代码平台详解 -- 系统架构

目录 一、技术介绍 技术架构 二、设计原理 三、界面展示 1.代码生成器 2.工作流程 3.门户设计 4.大屏设计 5.报表设计 6.第三方登录 7.多租户实现 8.分布式调度 9.消息中心 四、功能框架 JNPF低代码是一款新奇、实用、高效的企业级软件开发工具&#xff0c;支持企…

在 JavaScript 中导入和导出 Excel XLSX 文件:SpreadJS

在 JavaScript 中导入和导出 Excel XLSX 文件 2023 年 12 月 5 日 使用 MESCIUS 的 SpreadJS 将完整的 JavaScript 电子表格添加到您的企业应用程序中。 SpreadJS 是一个完整的企业 JavaScript 电子表格解决方案&#xff0c;用于创建财务报告和仪表板、预算和预测模型、科学、工…

文章解读与仿真程序复现思路—— 中国电机工程学报EI\CSCD\北大核心《考虑多重不确定性的电–气–交通网络耦合系统数据驱动鲁棒优化调度》

这个标题涉及到一个复杂系统的问题&#xff0c;以下是对标题的解读&#xff1a; 电–气–交通网络耦合系统&#xff1a; 涉及电力系统、气体&#xff08;可能是天然气&#xff09;系统和交通网络之间的相互关系。这种耦合可能表示这些系统之间存在一定的依赖和相互影响。 多重不…

Java面试题(每天10题)-------连载(45)

Dubbo篇 1、Dubbo的服务调用流程 2、Dubbo支持那种协议&#xff0c;每种协议的应用场景&#xff0c;优缺点&#xff1f; dubbo&#xff1a; 单一长连接和 NIO 异步通讯&#xff0c;适合大并发小数据量的服务调用&#xff0c;以及消费者远大于提供者。传输协议 TCP&#xff0c;…

PbootCMS 前台RCE漏洞复现

0x01 产品简介 PbootCMS是全新内核且永久开源免费的PHP企业网站开发建设管理系统,是一套高效、简洁、 强悍的可免费商用的PHP CMS源码,能够满足各类企业网站开发建设的需要 0x02 漏洞概述 PbootCMS v<=3.1.6版本中存在模板注入,攻击者可构造特定的链接利用该漏洞,执行…

微信小程序引入vant-weapp爬出坑

最新的微信小程序的项目结构跟之前的不一样&#xff0c;然后&#xff0c;按照vant-weapp上的官方文档&#xff0c;安装步骤失败&#xff0c;提示了各种错误。如果你的微信小程序结构跟我的一致&#xff0c;可以采用和我一样的方案。 微信小程序引入vant-weapp爬出坑 移动pack…

基于javeweb实现的图书借阅管理系统

一、系统架构 前端&#xff1a;jsp | js | css | jquery 后端&#xff1a;servlet | jdbc 环境&#xff1a;jdk1.7 | mysql | tocmat 二、代码及数据库 三、功能介绍 01. 登录页 02. 首页 03. 图书管理 04. 读者管理 05. 图书分类管理 06. 图书借阅信息 07. 图书归还信…

H桥简单24V直流电机GPIO驱动代码

主控: 雅特力AT32F403A, 主频100Mhz 驱动: GPIO口简单驱动 先展示主要的桥电路 头文件 /**************************************************************************** Copyright notice & Disclaimer* *******************************************…

Mysql dumpling 导入导出sql文件

一&#xff1a;导出命令 mysqldump -u root -p saishi > saishi.sql mysqldump -u root -p saishi > saishi.sql root是用户名 saishi是数据库名 saishi.sql导出文件名 二&#xff1a;选择导入的数据库 cd到安装mysql的文件下&#xff08;找不到可以用&#xff1a;wh…