【多线程-从零开始-捌】阻塞队列,消费者生产者模型

news2025/1/14 1:18:30

什么是阻塞队列

阻塞队里是在普通的队列(先进先出队列)基础上,做出了扩充

  1. 线程安全
    • 标准库中原有的队列 Queue 和其子类,默认都是线程不安全的
  2. 具有阻塞特性
    • 如果队列为空,进行队列操作,此时就会出现阻塞。一直阻塞到其他线程往队列里添加元素为止
    • 如果队列满了,进行队列操作,此时就会出现阻塞。一直阻塞到其他线程从队列里取走元素为止

基于阻塞队列,最大的应用场景,就是实现“生产者消费者模型”(日常开发中,常见的编程手法)

生产者消费者模型

比如:
小猪佩奇一家准备包饺子,成员有佩奇,猪爸爸和猪妈妈,外加一个桌子

  • 佩奇负责擀面皮
  • 猪爸爸和猪妈妈负责包饺子
  • 桌子用来放你擀好的面皮
    每次佩奇擀好一个面皮后,就放在桌子上,猪爸爸和猪妈妈就用这个面皮包出一个饺子

此时:

  • 佩奇就是面皮的生产者——生产者
  • 猪爸爸和猪妈妈就是面皮的消费者——消费者
  • 桌子就是阻塞队列——阻塞队列

为什么是是阻塞队列而不是普通队列?


因为阻塞队列可以很好的协调生产者和消费者

  • 若佩奇擀面皮很快,不一会桌子上就满了
    • 阻塞队列:佩奇就休息一下,等面皮被消耗一些之后继续再擀
    • 普通队列:不会停,放不下了也一直擀
  • 若猪爸爸和猪妈妈包的很快,不一会桌子上就空了
    • 阻塞队列:猪爸爸和猪妈妈休息一下,等到面皮擀出来之后再包
    • 普通队列:不会停,没面皮了也一直包

好处

上述生产者消费者模型在后端开发中,经常会涉及到
当下后端开发,常见的结构——“分布式系统”,不是一台服务器解决所有问题,而是分成了多个服务器,服务器之间相互调用

主要有两方面的好处

1. 服务器之间解耦合

我们希望见到“低耦合”

  • 模块之间的关联程度/影响程度

通常谈到的“阻塞队列”是代码中的一个数据结构
但是由于这个东西太好用了,以至于会把这样的数据结构单独封装成一个服务器程序,并且在单独的服务器机器上进行部署
此时,这样的饿阻塞队列有了一个新的名字,“消息队列”(Message Queue,MQ)

如果是直接调用image.png|354

  • 编写 A 和 B 代码中,会出现很多对方服务器相关的代码
  • 并且,此时如果 B 服务器挂了,A 可能也会直接受到影响
  • 再并且,如果后续想加入一个 C 服务器,此时对 A 的改动就很大

如果是通过阻塞队列
image.png|526

  • A 之和队列通信
  • B 也只和队列通信
  • A 和 B 互相不知道对方的存在,代码中就更没有对方的影子
    看起来,A 和 B 之间是解耦合了,但是 A 和队列,B 和队列之间,不是引入了新的耦合吗?
  • 耦合的代码,在后续的变更工程中,比较复杂,容易产生 bug
  • 但消息队列是成熟稳定的产品,代码是稳定的,不会频繁更改。A、B 和队列之间的耦合,对我们的影响微乎其微
  • 再增加 C 服务器也很方便,也不会影响到原有的 A 和 B 服务器
2. “削峰填谷”的效果

通过中间的阻塞队列,可以起到削峰填谷的效果,在遇到请求量激增突发的情况下,可以有效保护下游服务器,不会被请求冲垮

阻塞队列的作用就相当与三峡大坝在三峡的防汛作用

image.png

  • A 向队列中写入数据变快了,但是 B 仍然可以按照原有的速度来消费数据
  • 阻塞队列扛下了这样的压力,就像三峡大坝抗住上游的大量水量的压力
  • 如果是直接调用,A 收到多少请求,B 也收到多少,那很可能直接就把 B 给搞挂了
  • 当 A 不再写入数据的时候,但队列中还存有数据,可以继续工给 B
问题
  1. 为啥一个服务器,收到的请求变多,就容易挂?
  • 一台服务器,就是一台“电脑”,上面就提供了一些硬件资源(包括但不限于 CPU,内存,硬盘,网络带宽…)
  • 就算你这个及其配置再好,硬件资源也是有限的
  • 服务器每次收到一个请求,处理这个请求的过程,就都需要执行一系列的代码,在执行这些代码的过程中,就需要消耗一定的硬件资源(CPU,内存,硬盘,网络带宽…)
  • 这些请求小号的总的硬件资源的量,超过了及其能提供的上限,那么此时机器就会出现(卡死,程序直接崩溃等…)
  1. 在请求激增的时候,A 为啥不会挂?队列为啥不会挂?反而是 B 更容易挂呢?
  • A 的角色是一个“网关服务器”,收到客户端的请求,再把请求转发给其他的服务器
    • 这样的服务器里的代码,做的工作比较简单(单纯的数据转发),消耗的硬件资源通常更少
    • 处理一个请求,消耗的资源更少,同样的配置下,就能支持更多的请求处理
  • 同理,队列其实也是比较简单的程序,单位请求消耗的硬件资源,也是比较少见的
  • B 这个服务器,是真正干活的服务器,要真正完成一系列的业务逻辑
    • 这一系列的工作,代码量非常庞大,消耗的时间很多,消耗的系统硬件资源,也是更多的

类似的,像 MySQL 这样的数据库,处理每个请求的时候,做的工作就是比较多的,消耗的硬件资源也是比较多的,因此 MySQL 也是后端系统中,容易挂的部分
对应的,像 Redis 这种内存数据库,处理请求,做的工作远远少于 MySQL,消耗的资源更少,Redis 就比 MySQL 硬朗很多,不容易挂

代价

  1. 需要更多的机器来部署这样的消息队列(小代价)
  2. A 和 B 之间的通信延迟会变长
    • 对于 A 和 B 之间的调用,要求响应时间比较短就不太适合了

每个技术都有优缺点,不能无脑吹,也不能无脑黑

比如:微服务

  • 本质上就是把分布式系统服务拆的更细了,每个服务都很小,只做一项功能
  • 非常适合大公司,部门分的很细
  • 但需要更多的机器,处理请求需要更多的响应时间,更复杂的后端结构,运维成本水涨船高

Java 自带的阻塞队列

阻塞队列在 Java 标准库中也提供了现成的封装——BlockingQueue

image.png|565

  • BlockingQueue 本质上是一个接口,不能直接 new,只能 new 一个类
  • 因为是继承与 Queue,所以 Queue 的一些操作,offerpoll 这些,在 BlockingQueue 中同样可以使用(不过不建议使用,因为都不能阻塞
  • BlockingQueue 提供了另外两个专属方法,都能阻塞
    • put——入列
    • take——出队列
BlockingQueue<String> queue = new ArrayBlockingQueue<>(1000);

capacity 指的是容量,是一个需要加上的参数

public class Demo10 {  
    public static void main(String[] args) throws InterruptedException {  
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);  
        queue.put("111");  
        System.out.println("put成功");  
        queue.put("111");  
        System.out.println("put成功");  
      	
    }
}
//运行结果
put成功
put成功
put成功
  • 只打印了三个,说明第四次 put 的时候容量不够,阻塞了
public class Demo10 {  
    public static void main(String[] args) throws InterruptedException {  
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);  
        queue.put("111");  
        System.out.println("put 成功");  
        queue.put("111");  
        System.out.println("put 成功");  
        
        queue.take();  
        System.out.println("take 成功");  
        queue.take();  
        System.out.println("take 成功");  
        queue.take();  
        System.out.println("take 成功");  
    }
}
//运行结果
put 成功
put 成功
take 成功
take 成功
  • 由于只有 put 了两次,所以也只有两次 take,随后阻塞住了
public class Demo11 {  
    public static void main(String[] args) {  
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1000);  
  
        Thread t1 = new Thread(() -> {  
            int i = 1;  
            while(true){  
                try {  
                    queue.put(i);  
                    System.out.println("生产者元素"+i);  
                    i++;  
                    Thread.sleep(1000);  
                } catch (InterruptedException e) {  
                    throw new RuntimeException(e);  
                }            
            }        
        });        
        Thread t2 = new Thread(() -> {  
            while(true) {  
                try {  
                    Integer i = queue.take();  
                    System.out.println("消费者元素"+i);  
                } catch (InterruptedException e) {  
                    throw new RuntimeException(e);  
                }            
            }        
        });        
    	t1.start();  
        t2.start();  
    }
}
  • 上述程序中,一个线程生产,一个线程消费
  • 实际开发中,通常可能是多个线程生产,多个线程消费

自己实现一个阻塞队列

普通队列

基于数组的队列
实现一个基础的队列

//此处不考虑泛型参数,只是基于 String 进行存储  
class MyBlockingQueue {  
    private String[] data = null;  
    private int head = 0;  
    private int tail = 0;  
    private int size = 0;  
    
    public MyBlockingQueue(int capacity) {  
        data = new String[capacity];  
    }    
    
    public void put(String s) {  
        if(size == data.length) {  
            //队列满了  
            return;  
        }        
        data[tail] = s;  
        tail++;  
        if(tail >= data.length){  
            tail = 0;  
        }        
        size++;  
    }    
    
    public String take() {  
        if(size == 0) {  
            //队列为空  
            return null;  
        }        
        String ret = data[head];  
        head++;  
        if(head >= data.length){  
            head = 0;  
        }        
        size--;  
        return ret;  
    }
}

阻塞队列

  • 队列为空,take 就要阻塞,在其他线程 put 的时候唤醒
  • 队列未满,put 就要阻塞,在其他线程 take 的时候唤醒
//此处不考虑泛型参数,只是基于 String 进行存储  
class MyBlockingQueue {  
    private String[] data = null;  
    private int head = 0;  
    private int tail = 0;  
    private int size = 0;  
    private Object locker = new Object();  
  
    public MyBlockingQueue(int capacity) {  
        data = new String[capacity];  
    }  
    
    public void put(String s) throws InterruptedException {  
        //加锁的对象,可以单独定义一个,也可以直接就地使用this  
        synchronized (locker) {  
            if (size == data.length) {  
                //队列满了,需要阻塞  
                //return;  
                locker.wait();  
            }            
            data[tail] = s;  
            tail++;  
            if (tail >= data.length) {  
                tail = 0;  
            }            
            size++;  
            //唤醒 take 的阻塞  
            locker.notify();  
        }    
    }  
    
    public String take() throws InterruptedException {  
        String ret = "";  
        synchronized (locker) {  
            if (size == 0) {  
                //队列为空,需要阻塞  
                //return null;  
                locker.wait();  
            }            
            ret = data[head];  
            head++;  
            if (head >= data.length) {  
                head = 0;  
            }            
            size--;  
            //唤醒 put 的阻塞  
            locker.notify();  
        }        
        return ret;  
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1996582.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

114套新闻网站源码+采集规则+安装使用教程-网络公司建站资源

运行环境 PHP5.6MYSQL5.6 – 系统版本支持WindowsLinux 源码介绍 1.版权问题 本114套新闻源码均由EYOUCMS系统二次开发而成&#xff0c;因为系统不涉及会员功能&#xff0c;所以没有版权纠纷问题&#xff0c;客户可以一直免费使用。 2.自动采集发布 系统自身集成了网易,新…

odoo17 翻译一个小bug

odoo17 翻译一个小bug 用户界面的没译过来 标红处&#xff0c;但在zh_CN.po中明显已经翻译过来了&#xff0c;采取暴力点的&#xff0c;直接把base下的base.pot删除&#xff0c;再更新一下&#xff0c;可以正常显示了

【区块链+社会公益】腾讯志愿者公益平台 | FISCO BCOS应用案例

由腾讯技术公益团队主导的“公益志愿者平台”&#xff0c;旨在链接公益组织和志愿者。公益组织入驻平台后可以发布公 益活动、征集志愿者&#xff0c;志愿者可以在平台报名参加公益活动、获得公益组织和平台联合颁发的志愿服务证书。 腾讯技术公益采用了微众区块链技术对 “公…

【Linux】网络编程套接字Scoket:UDP网络编程

目录 一、了解UDP协议 二、了解端口和IP地址 三、套接字概述与Socket的概念 四、Socket的类型 五、 Socket的信息数据结构 六、网络字节序与主机字节序的互相转换 七、地址转换函数 八、UDP网络编程流程及相关函数 socket函数 bind函数 recvfrom函数 sendto函数 …

网站开发涉及到的技术内容介绍——后端PHP(2)

网站开发涉及到的技术内容介绍——后端PHP(1)https://blog.csdn.net/xiaochenXIHUA/article/details/141000752?spm=1001.2014.3001.5501 一、PHP的常用函数 1.1、PHP文件夹的常用函数 PHP的目录常用函数 序号目录常用函数说明1$_SERVER[DOCUMENT_ROOT]获取到PHP项目的根目…

C++ -- 负载均衡式在线OJ (一)

一、项目宏观结构 1.项目功能 本项目的功能为一个在线的OJ&#xff0c;实现类似leetcode的题目列表、在线提交、编译、运行等功能。 2.项目结构 该项目一共三个模块&#xff1a; comm : 公共模块compile_server : 编译与运行模块oj_server : 获取题目列表&#xff0c;查看题…

Spring Boot项目缺少配置文件的解决方法:IDEA

本文介绍在IntelliJ IDEA软件中&#xff0c;为Spring Boot项目添加配置文件的操作方法。 最近&#xff0c;在IntelliJ IDEA软件中新创建了一个Spring Boot项目&#xff0c;是通过如下图所示的方法直接新建的。 但是&#xff0c;随后发现这样创建的Spring Boot项目没有配置文件。…

Threejs实现鼠标控制相机+键盘控制模型+点击指定点控制模型移动

1.前言 Threejs实现鼠标控制相机功能,键盘控制模型功能,点击指定点控制模型移动功能 键盘使用WASD控制模型移动效果图: 鼠标移动可控制相机的位置控制模型移动到指定点效果图: 2.功能拆分 根据以上效果图,可以得到以下三个主要实现的功能 鼠标移动可以使相机跟随通过键…

leetcode-121-买卖股票的最佳时机

原理&#xff1a; 核心原理&#xff1a; 如果我们真的在买卖股票&#xff0c;我们肯定会想&#xff1a;如果我是在历史最低点买入就好了&#xff01;该历史最低点是指卖出当天之前的历史最低点而不是全局最低点。 实现步骤&#xff1a; 1、初始化变量preprices[0]表示历史股…

20240809 每日AI必读资讯

乒乓球AI机器人赢了人类&#xff01;正反手灵活转换&#xff0c;擦网球高球都能接 - 谷歌发布首个达到人类竞技水平的机器人Agent&#xff0c;挑战乒乓球赛场。 - 机器人通过学习大量乒乓球状态数据&#xff0c;掌握了正手上旋球、反手瞄准等技能&#xff0c;展现出高速运动…

CTFHUB | web进阶 | PHP | Bypass disable_function | bypass iconv 2

开启题目 查看源码&#xff0c;发现可以蚁剑连接 进入之后无发现&#xff0c;使用插件 iconv 上传脚本 进入之后发现多了一个 .antproxy.php&#xff0c;复制文件名重新拼接连接 进入终端&#xff0c;查看根目录之后发现了有两个 flag 文件&#xff0c;之后发现了本题的 flag

STM32CUBEMX+PWM多一个尖峰的问题

问题描述&#xff1a;使用TIM2的通道3产生PWM波形&#xff0c;产生n个数量的波形后&#xff0c;在停止的时候会有一个尖峰。 怀疑是自动重载值临界的时候有问题&#xff0c;对重载值多减一个值&#xff0c;但还是有这个问题。 解决&#xff1a;电路是默认低电平&#xff0c;我…

skynet 连接redis

文章目录 概述main.luaagent.luaredis.lua 小结 概述 之前写过skynet 入门篇&#xff0c;还有skynet实操篇&#xff1b;这2篇&#xff0c;主要写了skynet如何使用&#xff0c;还有些skynet的调用流程之类。 其实&#xff0c;看过skynet的demo之后&#xff0c;发现skynet中没有…

L1-书生·浦语大模型全链路开源体系介绍

视频观看地址&#xff1a;书生浦语大模型全链路开源开放体系_哔哩哔哩_bilibili 本视频介绍了书生葡语大模型的开源开放体系&#xff0c;包括技术发展、性能提升、模型架构、开源生态等。 要点: - &#x1f31f; 开源开放体系涵盖数据收集、标注、训练、微调、评测、部署等全…

Ubuntu 系统的部署和基础操作(使用)

​ 大家好&#xff0c;我是程序员小羊&#xff01; 前言 Ubuntu 是一款基于 Debian 的开源 Linux 操作系统&#xff0c;以其易用性和强大的社区支持而广受欢迎。对于许多初次接触 Linux 的用户来说&#xff0c;Ubuntu 是理想的入门选择。本文将介绍 Ubuntu 系统的基本操作和使用…

cordova打包后请求不到接口(接口请求失败)

原因&#xff1a;CORS跨域问题导致 解决方法&#xff1a; 将根目录下的config.xml打开&#xff0c;添加 preference 即可

10分钟学会docker安装与使用

文章目录 1、docker简介2、docker的基本组成3、docker的安装与配置4、docker的常用命令 1、docker简介 什么是容器&#xff1f; 它是一种虚拟化的方案&#xff0c;是操作系统级别的虚拟化&#xff0c;只能运行相同或相似内核的操作系统&#xff0c;依赖于Linux内核特性&#x…

Qt实现圆形窗口

重新实现paintEvent()函数。 效果如下&#xff1a; 效果为蓝色区域&#xff0c;背景是vs接面&#xff0c;代码直接复制可用&#xff0c;留给有需要的人。 #ifndef CircleWidget_h__ #define CircleWidget_h__#include <QWidget>class CCircleWidget : public QWidget {Q…

MySQL安装以及配置

目录 1. MySQL安装包下载 2. 安装 3. 配置 4. 使用MySQL 5. 配置环境变量 1. MySQL安装包下载 1.1 迅雷下载 分享文件&#xff1a;MySQL安装包.zip 链接&#xff1a;https://pan.xunlei.com/s/VO3llUOt6rFFWl9TdrTrJI-cA1?pwdxere# 1.2 官网下载 MySQL :: Download MyS…

如何从戴尔笔记本电脑硬盘恢复数据

“如何从坏掉的戴尔笔记本电脑硬盘中恢复数据&#xff1f;我无法访问硬盘&#xff0c;但我确实需要从硬盘中检索数据。我有很多重要文件被困在那里。” 人们学习如何从戴尔笔记本电脑硬盘恢复数据的原因有很多&#xff0c;例如有意或无意删除、硬盘格式化、安全警告隔离受病毒…