【多线程-从零开始-捌】代码案例2—阻塞队列

news2025/1/17 19:23:10

什么是阻塞队列

阻塞队里是在普通的队列(先进先出队列)基础上,做出了扩充

  1. 线程安全
    • 标准库中原有的队列 Queue 和其子类,默认都是线程不安全的
  2. 具有阻塞特性
    • 如果队列为空,进行队列操作,此时就会出现阻塞。一直阻塞到其他线程往队列里添加元素为止
    • 如果队列满了,进行队列操作,此时就会出现阻塞。一直阻塞到其他线程从队列里取走元素为止

基于阻塞队列,最大的应用场景,就是实现“生产者消费者模型”(日常开发中,常见的编程手法)

生产者消费者模型

比如:
小猪佩奇一家准备包饺子,成员有佩奇,猪爸爸和猪妈妈,外加一个桌子

  • 佩奇负责擀面皮
  • 猪爸爸和猪妈妈负责包饺子
  • 桌子用来放你擀好的面皮
    每次佩奇擀好一个面皮后,就放在桌子上,猪爸爸和猪妈妈就用这个面皮包出一个饺子

此时:

  • 佩奇就是面皮的生产者——生产者
  • 猪爸爸和猪妈妈就是面皮的消费者——消费者
  • 桌子就是阻塞队列——阻塞队列

为什么是是阻塞队列而不是普通队列?


因为阻塞队列可以很好的协调生产者和消费者

  • 若佩奇擀面皮很快,不一会桌子上就满了
    • 阻塞队列:佩奇就休息一下,等面皮被消耗一些之后继续再擀
    • 普通队列:不会停,放不下了也一直擀
  • 若猪爸爸和猪妈妈包的很快,不一会桌子上就空了
    • 阻塞队列:猪爸爸和猪妈妈休息一下,等到面皮擀出来之后再包
    • 普通队列:不会停,没面皮了也一直包

好处

上述生产者消费者模型在后端开发中,经常会涉及到
当下后端开发,常见的结构——“分布式系统”,不是一台服务器解决所有问题,而是分成了多个服务器,服务器之间相互调用

主要有两方面的好处

1. 服务器之间解耦合

我们希望见到“低耦合”

  • 模块之间的关联程度/影响程度

通常谈到的“阻塞队列”是代码中的一个数据结构
但是由于这个东西太好用了,以至于会把这样的数据结构单独封装成一个服务器程序,并且在单独的服务器机器上进行部署
此时,这样的饿阻塞队列有了一个新的名字,“消息队列”(Message Queue,MQ)

如果是直接调用image.png|354

  • 编写 A 和 B 代码中,会出现很多对方服务器相关的代码
  • 并且,此时如果 B 服务器挂了,A 可能也会直接受到影响
  • 再并且,如果后续想加入一个 C 服务器,此时对 A 的改动就很大

如果是通过阻塞队列
image.png|526

  • A 之和队列通信
  • B 也只和队列通信
  • A 和 B 互相不知道对方的存在,代码中就更没有对方的影子
    看起来,A 和 B 之间是解耦合了,但是 A 和队列,B 和队列之间,不是引入了新的耦合吗?
  • 耦合的代码,在后续的变更工程中,比较复杂,容易产生 bug
  • 但消息队列是成熟稳定的产品,代码是稳定的,不会频繁更改。A、B 和队列之间的耦合,对我们的影响微乎其微
  • 再增加 C 服务器也很方便,也不会影响到原有的 A 和 B 服务器
2. “削峰填谷”的效果

通过中间的阻塞队列,可以起到削峰填谷的效果,在遇到请求量激增突发的情况下,可以有效保护下游服务器,不会被请求冲垮

阻塞队列的作用就相当与三峡大坝在三峡的防汛作用

image.png

  • A 向队列中写入数据变快了,但是 B 仍然可以按照原有的速度来消费数据
  • 阻塞队列扛下了这样的压力,就像三峡大坝抗住上游的大量水量的压力
  • 如果是直接调用,A 收到多少请求,B 也收到多少,那很可能直接就把 B 给搞挂了
  • 当 A 不再写入数据的时候,但队列中还存有数据,可以继续工给 B
问题
  1. 为啥一个服务器,收到的请求变多,就容易挂?
  • 一台服务器,就是一台“电脑”,上面就提供了一些硬件资源(包括但不限于 CPU,内存,硬盘,网络带宽…)
  • 就算你这个及其配置再好,硬件资源也是有限的
  • 服务器每次收到一个请求,处理这个请求的过程,就都需要执行一系列的代码,在执行这些代码的过程中,就需要消耗一定的硬件资源(CPU,内存,硬盘,网络带宽…)
  • 这些请求小号的总的硬件资源的量,超过了及其能提供的上限,那么此时机器就会出现(卡死,程序直接崩溃等…)
  1. 在请求激增的时候,A 为啥不会挂?队列为啥不会挂?反而是 B 更容易挂呢?
  • A 的角色是一个“网关服务器”,收到客户端的请求,再把请求转发给其他的服务器
    • 这样的服务器里的代码,做的工作比较简单(单纯的数据转发),消耗的硬件资源通常更少
    • 处理一个请求,消耗的资源更少,同样的配置下,就能支持更多的请求处理
  • 同理,队列其实也是比较简单的程序,单位请求消耗的硬件资源,也是比较少见的
  • B 这个服务器,是真正干活的服务器,要真正完成一系列的业务逻辑
    • 这一系列的工作,代码量非常庞大,消耗的时间很多,消耗的系统硬件资源,也是更多的

类似的,像 MySQL 这样的数据库,处理每个请求的时候,做的工作就是比较多的,消耗的硬件资源也是比较多的,因此 MySQL 也是后端系统中,容易挂的部分
对应的,像 Redis 这种内存数据库,处理请求,做的工作远远少于 MySQL,消耗的资源更少,Redis 就比 MySQL 硬朗很多,不容易挂

代价

  1. 需要更多的机器来部署这样的消息队列(小代价)
  2. A 和 B 之间的通信延迟会变长
    • 对于 A 和 B 之间的调用,要求响应时间比较短就不太适合了

每个技术都有优缺点,不能无脑吹,也不能无脑黑

比如:微服务

  • 本质上就是把分布式系统服务拆的更细了,每个服务都很小,只做一项功能
  • 非常适合大公司,部门分的很细
  • 但需要更多的机器,处理请求需要更多的响应时间,更复杂的后端结构,运维成本水涨船高

Java 自带的阻塞队列

阻塞队列在 Java 标准库中也提供了现成的封装——BlockingQueue

image.png|565

  • BlockingQueue 本质上是一个接口,不能直接 new,只能 new 一个类
  • 因为是继承与 Queue,所以 Queue 的一些操作,offerpoll 这些,在 BlockingQueue 中同样可以使用(不过不建议使用,因为都不能阻塞
  • BlockingQueue 提供了另外两个专属方法,都能阻塞
    • put——入列
    • take——出队列
BlockingQueue<String> queue = new ArrayBlockingQueue<>(1000);

capacity 指的是容量,是一个需要加上的参数

public class Demo10 {  
    public static void main(String[] args) throws InterruptedException {  
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);  
        queue.put("111");  
        System.out.println("put成功");  
        queue.put("111");  
        System.out.println("put成功");  
      	
    }
}
//运行结果
put成功
put成功
put成功
  • 只打印了三个,说明第四次 put 的时候容量不够,阻塞了
public class Demo10 {  
    public static void main(String[] args) throws InterruptedException {  
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);  
        queue.put("111");  
        System.out.println("put 成功");  
        queue.put("111");  
        System.out.println("put 成功");  
        
        queue.take();  
        System.out.println("take 成功");  
        queue.take();  
        System.out.println("take 成功");  
        queue.take();  
        System.out.println("take 成功");  
    }
}
//运行结果
put 成功
put 成功
take 成功
take 成功
  • 由于只有 put 了两次,所以也只有两次 take,随后阻塞住了
public class Demo11 {  
    public static void main(String[] args) {  
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1000);  
  
        Thread t1 = new Thread(() -> {  
            int i = 1;  
            while(true){  
                try {  
                    queue.put(i);  
                    System.out.println("生产者元素"+i);  
                    i++;  
                    Thread.sleep(1000);  
                } catch (InterruptedException e) {  
                    throw new RuntimeException(e);  
                }            
            }        
        });        
        Thread t2 = new Thread(() -> {  
            while(true) {  
                try {  
                    Integer i = queue.take();  
                    System.out.println("消费者元素"+i);  
                } catch (InterruptedException e) {  
                    throw new RuntimeException(e);  
                }            
            }        
        });        
    	t1.start();  
        t2.start();  
    }
}
  • 上述程序中,一个线程生产,一个线程消费
  • 实际开发中,通常可能是多个线程生产,多个线程消费

自己实现一个阻塞队列

普通队列

基于数组的队列
实现一个基础的队列

//此处不考虑泛型参数,只是基于 String 进行存储  
class MyBlockingQueue {  
    private String[] data = null;  
    private int head = 0;  
    private int tail = 0;  
    private int size = 0;  
    
    public MyBlockingQueue(int capacity) {  
        data = new String[capacity];  
    }    
    
    public void put(String s) {  
        if(size == data.length) {  
            //队列满了  
            return;  
        }        
        data[tail] = s;  
        tail++;  
        if(tail >= data.length){  
            tail = 0;  
        }        
        size++;  
    }    
    
    public String take() {  
        if(size == 0) {  
            //队列为空  
            return null;  
        }        
        String ret = data[head];  
        head++;  
        if(head >= data.length){  
            head = 0;  
        }        
        size--;  
        return ret;  
    }
}

阻塞队列

  • 队列为空,take 就要阻塞,在其他线程 put 的时候唤醒
  • 队列未满,put 就要阻塞,在其他线程 take 的时候唤醒
//此处不考虑泛型参数,只是基于 String 进行存储  
class MyBlockingQueue {  
    private String[] data = null;  
    private int head = 0;  
    private int tail = 0;  
    private int size = 0;  
    private Object locker = new Object();  
  
    public MyBlockingQueue(int capacity) {  
        data = new String[capacity];  
    }  
    
    public void put(String s) throws InterruptedException {  
        //加锁的对象,可以单独定义一个,也可以直接就地使用this  
        synchronized (locker) {  
            if (size == data.length) {  
                //队列满了,需要阻塞  
                //return;  
                locker.wait();  
            }            
            data[tail] = s;  
            tail++;  
            if (tail >= data.length) {  
                tail = 0;  
            }            
            size++;  
            //唤醒 take 的阻塞  
            locker.notify();  
        }    
    }  
    
    public String take() throws InterruptedException {  
        String ret = "";  
        synchronized (locker) {  
            if (size == 0) {  
                //队列为空,需要阻塞  
                //return null;  
                locker.wait();  
            }            
            ret = data[head];  
            head++;  
            if (head >= data.length) {  
                head = 0;  
            }            
            size--;  
            //唤醒 put 的阻塞  
            locker.notify();  
        }        
        return ret;  
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1993207.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java代码生成器EasyCode

Java代码生成器EasyCode 一、安装插件二、连接数据库后右键Generator生成代码 一、安装插件 在 IntelliJ IDEA 的插件市场中搜索 EasyCode&#xff0c;然后安装该插件 二、连接数据库后右键Generator生成代码 勇敢面对挑战&#xff0c;成功从不会远离坚持者。坚持不懈的努力…

八股之Java集合

Java 集合&#xff0c;也叫作容器&#xff0c;主要是由两大接口派生而来&#xff1a;一个是 Collection接口&#xff0c;主要用于存放单一元素&#xff1b;另一个是 Map 接口&#xff0c;主要用于存放键值对。对于Collection 接口&#xff0c;下面又有三个主要的子接口&#xf…

MongoDB学习笔记(三)

使用Python操作MongoDB: 使用管理员用户&#xff1a;

Python —— 基础

目录 变量与引用 数据类型 赋值、深浅拷贝 控制流结构 逻辑操作符 is 与 dir() 关键字&#xff08;Python 3.11 &#xff09; https://www.cnblogs.com/qianfanwaer/p/14783204.html 变量与引用 变量是原来存储数据的一个标识符&#xff0c;可被看作是内存的一个位置&…

【学习笔记】Day 7

一、进度概述 1、DL-FWI基础入门培训笔记 2、inversionnet_train 试运行——未成功 二、详情 1、InversionNet: 深度学习实现的反演 InversionNet构建了一个具有编码器-解码器结构的卷积神经网络&#xff0c;以模拟地震数据与地下速度结构的对应关系。 &#xff08;一…

Python,我来啦!!!融合多个框架语言。。

基于上次发布CSDN的自己一些想法 上次&#xff0c;从一个道友手中购买了一份轮子代码&#xff0c;主要用到的技术就是pythonmysql或sqliteflask框架&#xff0c;这里我做了二次开发。 新的改变 这里&#xff0c;我对该代码进行了一些功能拓展与语法支持&#xff0c;除了原有…

【OpenCV C++20 学习笔记】仿射变换-warpAffine, getRotationMatrix2D

仿射变换 原理概述得到仿射变换的方法 APIgetAffineTransform()函数warpAffine()函数getRotationMatrix2D()函数 示例 原理 概述 仿射变换是矩阵乘法&#xff08;线性变换&#xff09;和向量加法的结合。它包含了&#xff1a; 旋转&#xff08;线性变换&#xff09;转换&…

【递归 + 记忆化搜索优化】力扣494. 目标和

给你一个非负整数数组 nums 和一个整数 target 。 向数组中的每个整数前添加 ‘’ 或 ‘-’ &#xff0c;然后串联起所有整数&#xff0c;可以构造一个 表达式 &#xff1a; 例如&#xff0c;nums [2, 1] &#xff0c;可以在 2 之前添加 ‘’ &#xff0c;在 1 之前添加 ‘-…

立体连接模式下的传播与沟通:AI智能名片小程序的创新应用与深度剖析

摘要&#xff1a;在数字化浪潮的推动下&#xff0c;信息传播与沟通方式正经历着前所未有的变革。立体连接模式&#xff0c;作为这一变革的重要产物&#xff0c;通过整合物理空间、虚拟网络空间与社群心理空间的三维联动&#xff0c;实现了信息的深度传播与高效互动。AI智能名片…

Java新手指南:从菜鸟到编程大师的趣味之路-类和对象

这里是Themberfue 本章主要介绍的是Java最重要的面向对象&#xff08;OOP&#xff09;的基础 面向对象 Java是一门纯面向对象的语言&#xff0c;在面向对象的世界里&#xff0c;一切皆为对象。面向对象是解决问题的一种思想&#xff0c;主要依靠对象之间的交互完成一件事情。用…

MySQL --- 复合查询

目录 一、多表查询 二、自连接 三、子查询 1、单行子查询 2、多行子查询 3、多列子查询 4、在from后面使用子查询 四、合并查询 1、union 2、union all 五、内连接 六、外连接 1、左外连接 2、右外连接 一、多表查询 我们需要的数据往往会来自不同的表&#xf…

redis面试(十)锁释放

自动释放 首先锁的释放分为两种&#xff0c;一种是自动释放&#xff0c;加入说加锁的线程宕机了不在了&#xff0c;我们之前说过这个。 那这个线程中的对redis这个锁不断刷新过期时间的看门狗逻辑就没有了&#xff0c;所以这个锁最多等待30s的时间就会自动过期删除&#xff0c…

文件I/O第一天作业 2024.8.8

使用系统I/O实现Linux的cat命令 #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <fcntl.h> #include <errno.h>int main(int argc, const char *argv[]) {if(argc < 2){printf("请输…

【笔试题面试题】IO类2 知识点汇总(笔试面试题)

书接上文&#xff0c;配上前文一起实用更加&#xff0c;持续更新&#xff0c;督促自己学习 目录 1、详细描述一下什么是IO以及标准IO和文件IO的区别&#xff08;补充&#xff09; 2、什么是死锁&#xff0c;如何避免死锁&#xff08;补充&#xff09; 3、为什么引入同步互斥…

NVIDIA Triton系列09-为服务器添加模型

NVIDIA Triton系列09-为服务器添加模型 B站&#xff1a;肆十二-的个人空间-肆十二-个人主页-哔哩哔哩视频 (bilibili.com) 博客&#xff1a;肆十二-CSDN博客 问答&#xff1a;(10 封私信 / 72 条消息) 肆十二 - 知乎 (zhihu.com) 前面已经用 https://github.com/triton-inferen…

基于python的百度迁徙迁入、迁出数据分析(七)

参考&#xff1a;【Python】基于Python的百度迁徙2——迁徙规模指数&#xff08;附代码&#xff09;-CSDN博客 记录于2024年8月&#xff0c;这篇是获取百度迁徙指数&#xff0c;之前我们都在讨论不同城市的迁徙比例关系&#xff0c;这篇我们来获取百度迁徙指数这个数据&#x…

大学新生入门编程的最佳路径:嵌入式领域的深入探索

对于大学新生来说&#xff0c;编程已成为一项必不可少的技能。面对众多编程语言和学习资源&#xff0c;新生们常常感到迷茫。如何选择适合自己的编程语言&#xff1f;如何制定有效的学习计划&#xff1f;如何避免常见的学习陷阱&#xff1f;本文将为你提供嵌入式领域的编程入门…

测试工具之JMeter

JMeter Apache JMeter应用程序是开源软件,是一个100%纯Java应用程序,旨在负载测试功能行为和衡量性能。它最初是为测试Web应用程序而设计的,但后来扩展到其他测试功能。 JMeter是一个免费、开源、跨平台的性能测试工具,于20世纪90年代后期面世。这是一个成熟、健全且具有…

开源一套金融大模型插件(ChatGPT)

shares vscode 插件A 股量化交易系统自研金融大模型&#xff0c;复利Chat 源码地址&#xff1a; https://github.com/xxjwxc/shares

NPDP考前怎么复习?NPDP200问PDF版来啦~

距离NPDP下半年考试还有4个月的时间&#xff0c;现在正是备考的黄金期。 以下复习建议~ 01.制定详细计划 首先&#xff0c;根据考试大纲&#xff0c;可以将内容划分为几个模块&#xff0c;如新产品开发流程、市场研究、产品规划等&#xff0c;并为每个模块设定学习目标和时间…