JavaEE 初阶(11)——多线程9之“阻塞队列”

news2025/1/23 12:16:01

目录

一. 什么是“阻塞队列”

二. 生产者消费者模型

2.1 概念

2.2 组件

 2.3 实际应用

2.4 优点 

a. 实现“解耦合” 

b. 流量控制——“削峰填谷”

2.5 代价

a. 更多的机器

b. 通信时间延长

三. 阻塞队列的实现 

3.1 简述 

 3.2 ArrayBlockingQueue的使用

3.3 实现MyArrayBlockingQueue 


一. 什么是“阻塞队列”

“阻塞队列” 是在普通队列(先进先出)的基础上,做出了一些扩充。以下是阻塞队列的一些主要特点:

  1. 阻塞插入:当队列为满时,如果再往队列里插入元素,队列会阻塞插入操作的线程(wait);一直阻塞到其他线程从队列取走元素为止(被notify唤醒)
  2. 阻塞移除:当队列为空时,如果再从队列里移除元素,队列会阻塞移除操作的线程(wait);一直阻塞到其他线程往队列里添加元素为止(被notify唤醒)
  3. 线程安全:阻塞队列内部实现了线程同步,因此可以在多线程环境中安全地使用。(标准库中原有的队列Queue和其子类,默认都是线程不安全的)

基于阻塞队列,最大的应用场景,就是实现 “生产者消费者模型” ——日常开发中,常见的编程手法。

二. 生产者消费者模型

2.1 概念

生产者消费者模型 是一个经典的并发编程模型,它描述了一组生产者线程和一组消费者线程如何通过共享的数据缓冲区进行交互。这个模型的核心思想是解耦生产数据的线程(生产者)与消费数据的线程(消费者),使得它们可以并发执行,提高系统的整体效率。

2.2 组件

1.生产者

  • 负责生成数据并将其放入缓冲区。
  • 生产者在缓冲区满时会等待,直到有空间可用。

2.消费者

  • 负责从缓冲区取出数据并处理
  • 消费者在缓冲区为空时会等待,直到有数据可以消费

3.缓冲区

  • 是一个共享的数据结构,用于存储生产者产生的数据,等待消费者来消费。
  • 缓冲区的大小通常是有限的,因此需要适当的同步机制来控制生产者和消费者的访问。
 2.3 实际应用

   上述生产者消费者模型,在后端开发中,经常会涉及到~~当下后端开发常见的结构——“分布式系统”,不是一台服务器解决所有问题,而是分成了多个服务器,服务器之间相互调用,进行通信。

下面是一个搜狗浏览器的例子:

 

2.4 优点 

   在上述服务器之间的通信过程中,使用生产者消费者模型,是非常常见的做法。使用生产者消费者模型,主要有两方面的好处~~

a. 实现“解耦合” 

生产者和消费者不需要知道对方的实现细节,只需要知道如何与缓冲区交互,从而降低了模块之间的 关联/影响程度。

   如果是“直接调用” 关系,编写 A代码中,就会出现很多B服务器相关代码;编写B代码中,也会出现很多A服务器相关的代码。并且,如果B服务器出现故障,A服务器也会受到影响。

    并且,如果后续想增加一个 C服务器,此时对A代码的改动就会很大。

   但是,当引入了生产者消费者模型,结构就成了下列样子~~ 

   A只和队列通信,B也只和队列通信。 A不知道B的存在,代码中更没有B的影子了;B也不知道A的存在,代码中也没有A的影子。这样就实现了A与B之间的“解耦合”~~

通常谈到的“阻塞队列”是代码中的一个数据结构,但是由于这个东西太好用了,以致于会把这样的数据结构,单独封装成一个服务器程序,并且在单独的服务器机器上进行部署~~

此时,这样的阻塞队列,有了一个新的名字,“消息队列”(Message Queue,MQ)


消息队列:一种进程间通信或分布式系统中常用的中间件技术,用于在消息的发送者和接收者之间传递消息。它通常被用于异步处理、系统解耦、流量削峰和消息的可靠传递。

应用场景:

  • 应用解耦降低系统间的耦合度,使系统更加模块化。
  • 异步处理:处理耗时任务,如邮件发送,文件上传等。
  • 消息广播:将消息广播给多个订阅者。
  • 分布式系统跨网络和跨服务的数据传递。
  • 高并发处理:在高并发情况下,消息队列可以起到削峰填谷的作用

  * 看起来,A和B之间是解耦合了,但是 A和队列,B和队列,难道不是引入了新的耦合吗?

 首先,我们需要明确为什么怕“耦合”?—— 因为耦合的代码在后续变更过程中,比较复杂,容易产生bug!

“消息队列”是成熟稳定的产品,代码不会频繁修改;并且A和队列、B和队列之间的交互,逻辑基本写一次就固定下来

b. 流量控制——“削峰填谷”

缓冲区的大小可以用来控制生产者和消费者的工作节奏,避免生产过快或消费过慢导致的问题 。

下面,解释一下什么是“削峰填谷”

 长江上游的三峡水库:


   服务器端,如果A的请求量突然激增,A往队列中写入数据变快了,但是由于阻塞队列的原因,B仍然可以按照原有的速度来消费数据。

    如果是直接调用,没有阻塞队列,那么A收到多少请求,B也会收到多少,很可能就直接把B给搞挂了.....

1)为啥一个服务器,收到的请求激增,就可能会崩溃??? 

     一台服务器,就是一台“电脑”,上面就提供了一些硬件资源(包括不限于CPU,内存,硬盘,网络带宽....)就算你这个机器 配置再好,硬件资源也是有限的~~

    服务器每次收到一个请求,处理这个请求的过程,都需要执行一系列的代码。在执行这些代码过程中,就会需要消耗一定的硬件资源(CPU,内存,硬盘,网络带宽....)

    这些请求消耗的总的硬件资源的量,超过了机器能提供的上限,那么此时机器就会出现问题(卡死,程序直接崩溃.....)

2)在请求激增的时候,A 为啥不会挂?队列为啥不会挂?反而是B更容易挂呢??

    A的角色是一个“网关服务器”,收到客户端的请求,再把请求转发给其他的服务器。这样,服务器里面的代码,做的工作比较简单(单纯的数据转发),消耗的硬件资源通常更少。处理一个请求,消耗的资源更少,同样的配置下,就能支持更多的请求处理~~

    同理,队列 其实也是比较简单的程序单位请求消耗的硬件资源也是比较少的~~

    B这个服务器,才是真正干活的服务器,要真正完成一系列的业务逻辑~~(这一系列的工作,代码量非常庞大,消耗的时间很多,消耗的系统硬件资源也是更多的)

类似的,MySQL这样的数据库,处理请求做的工作就是比较多的,消耗的硬件资源也是比较多的,因此MySQL也是后端系统中,容易挂的部分~~

对应的,像Redis这种内存数据库,处理请求做的工作,远远少于MySQL做的工作,消耗的资源更少。因此,Redis就比MySQL皮实很多,不容易挂~~

2.5 代价
a. 更多的机器
  •  需要更多的机器,来部署这样的消息队列 
b. 通信时间延长
  • A和B直接通信的延时会变长(对于 A和B 之间的调用,如果要求时间比较短,就不合适了)  

 比如,现在很火的“微服务”

“微服务”本质上就是把分布式系统的服务拆的更细了,每个服务都很小,只做一项功能(比较适合大公司,部门分的很细)。但是这样做的代价就是需要更多的机器,处理请求的响应时间更长,更复杂的后端结构运维成本变高.....

三. 阻塞队列的实现 

3.1 简述 

在Java的标准库中,提供了现成的封装——BlockingQueue

BlockingQueue继承了Queue,因此Queue中的一些操作——offer,poll(不能阻塞)等在BlockingQueue中同样也能使用。

BlockingQueue接口的实现类:

数据结构堆:本质上是一个 “完全二叉树”(即树中的每一层都是满的,除了可能最后一层,最后一层的节点从左向右连续排列),要求父节点和子节点之间的值,存在大小关系 (不是左右子树的大小关系)

堆是实现优先队列的一种高效方式,可以快速地插入元素和删除具有最高(或最低)优先级的元素。

   BlockingQueue 提供了另外两个专属方法——put:入队列    take:出队列 (能阻塞)。但是阻塞队列没有提供“阻塞版本”的获取队首元素的操作

   由于 put 和 take 可能产生阻塞,因此,这样的阻塞又会被 interrupt 唤醒。


 3.2 ArrayBlockingQueue的使用
import java.util.concurrent.ArrayBlockingQueue;

public class ArrayBlockingQueue1 {
    public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2);
        queue.put("111");
        System.out.println("put 成功");
        queue.put("111");
        System.out.println("put 成功");
        //第三次put已经超过了队列的容量
        queue.put("111");
        System.out.println("put 成功");
    }
}

运行结果:

如果队列为满,put操作后,不会抛出异常,而是处于WAITING等待状态。


import java.util.concurrent.ArrayBlockingQueue;
public class ArrayBlockingQueue2 {
    public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
        queue.put("111");
        System.out.println("put 成功");
        queue.take();
        System.out.println("take 成功");
        //队列中已经没有元素,take后会阻塞
        queue.take();
        System.out.println("take 成功");

    }
}

 运行结果:

 

 如果队列为空,take操作后,不会抛出异常,而是处于WAITING等待状态。


import java.util.concurrent.ArrayBlockingQueue;

public class ArrayBlockingQueue3 {
    public static void main(String[] args) {
        ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
        //生产者
        Thread t1 = new Thread(()->{
            int i = 1;
            while(true){
                try {
                    queue.put(i);
                    System.out.println("产生元素"+i);
                    i++;
                    //给生产者加上sleep操作,生产慢点,消费快点
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        //消费者
        Thread t2 = new Thread(()->{
            while(true){
                try {
                    int i = queue.take();
                    System.out.println("消费元素"+i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        t1.start();
        t2.start();
    }
}

 运行结果:

 

import java.util.concurrent.ArrayBlockingQueue;

public class ArrayBlockingQueue4 {
    public static void main(String[] args) {
        ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
        //生产者
        Thread t1 = new Thread(()->{
            int i = 1;
            while(true){
                try {
                    queue.put(i);
                    System.out.print("生产元素"+i+" ");
                    i++;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        //消费者
        Thread t2 = new Thread(()->{
            while(true){
                try {
                    int i = queue.take();
                    System.out.println("消费元素"+i+" ");
                    //给消费者加上sleep,消费慢点,生产快点
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        t1.start();
        t2.start();
    }
}

运行结果: 


3.3 实现MyArrayBlockingQueue 

public class MyBlockingQueue {
    private String[] array = null;
    public MyBlockingQueue(int capacity){
        array = new String[capacity];
    }
    private int head = 0;
    private int tail = 0;
    private int size = 0;
    private final Object locker = new Object();
    public void put(String i) throws InterruptedException {
        //整个逻辑都要加锁,防止线程切换更改共享变量
        synchronized (locker){
            if(size == array.length){
                //队列满了,阻塞等待
                locker.wait();
            }
            array[tail] = i;
            tail++;
            if(tail >= array.length){
                tail = 0;
            }
            size++;
            //唤醒队列为空时候的等待
            locker.notify();
        }

    }
    public String take() throws InterruptedException{
        String ret = "";
        synchronized (locker){
            if(size == 0){
                //队列空了,阻塞等待
                locker.wait();
            }
            ret = array[head];
            head++;
            if(head >= array.length){
                head = 0;
            }
            size--;
            //唤醒队列满了时候的等待
            locker.notify();
        }
        return ret;
    }
}

  

但是不推荐第二种%余数的方法:1)代码可读性比较低   2)执行效率比较低(%对计算机来说是除法,比较慢)                        


Java官方文档建议 wait 使用的时候,要结合 while,而不是 if。 

                            

此处即使不改成while,代码问题也是不大~~因为一旦出现提前唤醒的情况(interrupt),代码会直接结束(throws异常)。


如果 wait 是直接包裹在 try-catch 中,此时使用 if 还是 while 的差别就很大了.....

    wait中断异常被抓住后——如果是 if 判断,wait 被唤醒后,代码会继续向后执行,此时size值可能依旧为0,那么接下来的数据就会出现问题如果是 while 循环,wait 被唤醒后,代码会再次确认条件,看是否能继续执行。

    因此,最好使用 while,可以确保结果的正确性,更加稳妥!

   由此告诫我们,日常开发中,有些bug,比如,报错/抛出异常,这种问题都好办,程序员能第一时间发现。但是有些bug,没有任何提示,但是会得到一个错误的结果(很可能看起来和正确的值没啥差别,但是后续造成的影响,可能是非常严重的)。因此,要选择最为稳妥的方法解决问题。

试想一下,假设使用这个阻塞队列,实现一个“充值”逻辑~~
某个线程在阻塞等待队列里玩家的充值数据。一旦玩家的充值数据到账,就把对应的道具发放给玩家。
玩家可能正要充值,还没冲呢,却不小心interrupt了,导致队列里读取出一个“错误值”......

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1974956.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据建模标准-基于事实建模

前情提要 数据模型定义 DAMA数据治理体系中将数据模型定义为一种文档形式&#xff0c;数据模型是用来将数据需求从业务传递到IT,以及在IT内部从分析师、建模师和架构师到数据库设计人员和开发人员的主要媒介&#xff1b; 作用 记录数据需求和建模过程中产生的数据定义&…

dctcp 比 reno,cubic 好在哪

dctcp 相比标准 aimd 如 reno&#xff0c;cubic 到底好在哪&#xff0c;理论上讲 dctcp 本质上也是 aimd 算法&#xff0c;但它的 cwnd 根据 mark rate 来实时缩放&#xff0c;而标准 reno/cubic 则一致缩放 β 0.5(reno) or β 0.3(cubic)&#xff0c;直观上看 dctcp 是连续…

PostgreSQL数据库内核(一):增加系统表pg_test_catalog

目录 编译环境准备 gdb调试 CLion配置 增加系统表pg_test_catalog 编译环境准备 使用PostgreSQL14.5源码版本编译&#xff0c;操作系统CentOS&#xff0c;本地windos系统CLion代码工具&#xff0c;首先下载pg源码&#xff0c;上传CentOS系统&#xff1a; more /etc/os-rel…

要 set 还是 map? 我全要

引子&#xff1a; 时隔多日&#xff0c;我又回来啦&#xff0c;接上回&#xff0c;我们讲到set的一小部分&#xff0c;我们今天来讲详细讲set与map&#xff0c;满满干货启动&#xff01;根据应用场景的不同&#xff0c;STL总共实现了两种不同结构的管理式容器&#xff1a;树型…

[240803] Prompt Fuzzer 新版本发布 | Windows 会在更新时进行时间调整以减少碳排放

目录 Prompt Fuzzer 新版本发布&#xff1a;更强大、更灵活的 GenAI 应用安全评估工具Windows 会在更新时进行时间调整以减少碳排放 Prompt Fuzzer 新版本发布&#xff1a;更强大、更灵活的 GenAI 应用安全评估工具 Prompt Security 发布了新版 Prompt Fuzzer&#xff0c;这是…

CSS+js:顶部导航栏背景滚动渐变、顶部背景滚动渐变

一、效果图 图1 图2 图3 二、gradual.html代码 <!DOCTYPE html> <html><head><meta charset"utf-8"><title>顶部导航栏渐变和顶部背景渐变</title></head><body><div class"content-root" id&quo…

Linux PSCI框架

Linux PSCI框架 概述 参考链接&#xff1a; 简单讲解Linux PSCI框架-Linxu内核栈 概述 PSCI &#xff08;Power State Coordination Interface&#xff09; 是ARM定义的电源管理接口规范&#xff0c;由firm来实现。Linux系统通过smc/hvc指令&#xff08;设备树可查看是那种&a…

5.7软件质量和软件度量

软件质量和软件度量 软件质量软件质量特性ISO/EC9126软件质量模型练习题Mc Call质量模型 软件质量保证软件评审软件容错技术结构冗余信息冗余时间元余冗余附加技术 软件度量练习题 软件质量 软件质量&#xff1a;是指反映软件系统或软件产品满足规定或隐含需求的能力的特征和特…

代码随想录算法训练营day32 | 509. 斐波那契数 、70. 爬楼梯 、746. 使用最小花费爬楼梯

碎碎念&#xff1a;开始动态规划了&#xff01;加油&#xff01; 参考&#xff1a;代码随想录 动态规划理论基础 动态规划常见类型&#xff1a; 动规基础类题目背包问题打家劫舍股票问题子序列问题 解决动态规划问题应该要思考清楚的&#xff1a; 动态规划五部曲&#xff1…

使用 continue 自定义 AI 编程环境

一直在使用github 的 copilot 来编程&#xff0c;确实好用&#xff0c;对编码效率有很大提升。 但是站在公司角度&#xff0c;因为它只能对接公网&#xff08;有代码安全问题&#xff09;。另外&#xff0c;它的扩展能力也不强&#xff0c;无法适配公司特定领域的知识库&#x…

c# winform 创建日志登录界面

一.创建一个用于登录的Login的复合控件 1.右击项目文件&#xff0c;点击添加用户控件&#xff0c;设置为控件名为Login。 2.拉动两个lable控件&#xff0c;两个textBox控件&#xff0c;一个button,一个CheckBox控件。 3.将控件的权限&#xff08;Modifiers&#xff09;设置为Pu…

Unity2D在处理精灵表过程中出现不清晰的解决方法

问题阐述 在我们拿到一张精灵表的时候&#xff0c;我们通常要进行切割。但这样往往导致切割的效果不是很好&#xff0c;这里举一个简单的例子。 这是举例子用到的精灵表 我们先对他进行切割处理。 将single改为Multiope 进入精灵编辑器后&#xff0c;我们选择切割方式 此时我…

【数据结构】链表篇

1.链表的概念以及结构 概念&#xff1a;链表是一种物理储存结构上的非连续、非顺序的储存结构&#xff0c;数据元素的逻辑顺序是通过链表中的指针链接次序实现的。 链式结构在逻辑上是连续的&#xff0c;但是在物理上不一定连续现实中的节点一般都是从堆上申请出来的从堆上申…

中度自闭症儿童上普校还是特校好呢

当家中有中度自闭症儿童时&#xff0c;家长们常常面临一个艰难的抉择&#xff1a;是让孩子进入普通学校&#xff08;普校&#xff09;接受融合教育&#xff0c;还是选择特殊教育学校&#xff08;特校&#xff09;接受更具针对性的教育&#xff1f;这是一个没有标准答案的问题&a…

Python基于逻辑回归的L1正则化(Lasso Logistic Regression)进行分类数据的特征选择项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档视频讲解&#xff09;&#xff0c;如需数据代码文档视频讲解可以直接到文章最后获取。 1.项目背景 可以使用Lasso回归进行特征选择&#xff0c;尽管它本质上是一个用于回归问题的技术&#xff0c;但通过…

Python基于Prophet实现时间序列数据趋势周期特征提取项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档视频讲解&#xff09;&#xff0c;如需数据代码文档视频讲解可以直接到文章最后获取。 1.项目背景 Prophet是Facebook开源的一个用于时间序列预测的库&#xff0c;它主要用于处理具有趋势、季节性和假期…

Springboot功能模块之文件上传(minio)

一、概述 1.1什么是MinIO&#xff1f; MinIO 是一个非常轻量的服务,可以很简单的和其他应用的结合使用&#xff0c;它兼容亚马逊 S3 云存储服务接口&#xff0c;非常适合于存储大容量非结构化的数据&#xff0c;例如图片、视频、日志文件、备份数据和容器/虚拟机镜像等。 官网…

基础第二关:8G 显存玩转书生大模型 Demo

基础任务 复现过程 结果截图 进阶任务 任务一 复现过程 结果截图 任务二 复现过程 结果截图

OpenFoam waves2foam 虚拟机 镜像 下载 Ubuntu

编译完成截图及安装版本信息&#xff1a; 下载地址(资源整理不易&#xff0c;下载使用需付费&#xff0c;且文件较大&#xff0c;不能接受请勿浪费时间下载): 链接&#xff1a;https://pan.baidu.com/s/1j0-MYpaG2rTYuizSWPFcxg?pwdmoxv 提取码&#xff1a;moxv

【String的介绍及使用】

String的介绍及使用 ## 小杨 为何学习string以及string的简单介绍 学习string类的原因 C语言中&#xff0c;字符串是以’\0’结尾的一些字符的集合&#xff0c;为了操作方便&#xff0c;C标准库中提供了一些str系列的库函数&#xff0c; 但是这些库函数与字符串是分离开的&am…