【JavaEE初阶系列】——阻塞队列

news2025/1/12 10:38:35

目录

🚩阻塞队列的定义

🚩生产者消费者模型

🎈解耦性

🎈削峰填谷 

🚩阻塞队列的实现

📝基础的环形队列

📝阻塞队列的形成

📝 内存可见性

📝阻塞队列代码


🚩阻塞队列的定义

阻塞队列是一种特殊的队列,也遵循“先进先出”的原则。

阻塞队列能是一种线程安全的数据结构 , 并且具有以下特性 :
  • 1.线程安全
  • 2.带有阻塞特性

    a) 如果队列为空,继续出队列,就会发生阻塞。阻塞到其他线程往队列里添加元素为止。

    b)如果队列为满,继续入队列,就会发生阻塞。阻塞到其他线程往队列里添加元素为止。

阻塞队列,最大的意义,就是可以用来实现“生产者消费者模型”——一种常见的,多线程代码编写方式。


🚩生产者消费者模型

🎈解耦性

生产者消费者模式就是通过一个容器来 解决生产者和消费者的强耦合 问题。
耦合:俩个模块,联系越紧密,耦合越高!

就比如就一个擀饺子皮杖,所以我们分配一个人擀饺子,剩下三个人包饺子。擀饺子皮就会不停的产出饺子皮,三个人负责包饺子,那三个人就不停的消耗饺子皮。我生产出来的饺子皮得有地方放,那就是放在板子上,而所谓的板子就是阻塞对列,进行来放入元素和放出元素。

生产者:把生产出来的内容,放到阻塞队列中

消费者:就会从阻塞队列中获取内容

如果我生产的慢,那么三个人就得等,就相当于从空的队列中获取元素就会阻塞

如果我生产的快,我就得等了,就相当于从满的队列中放入元素就会阻塞。


为什么要使用生产者消费者模型呢?给我们带来了什么好处呢?

解耦性:两个模块,联系越紧密,耦合就越高,尤其是分布式系统。

比如,考虑一个简单的分布式系统

如果A和B直接交互(A把请求发给B,B把响应返回到A)

彼此之间的耦合就是比较高的

  • 1)如果B出现问题,很可能就把A也影响到了
  • 2)如果未来再添加一个C,就需要对A这边的代码,做出一定的改动。

所以解决上述问题,使用生产者消费者模型,就可以有效的解决刚才的耦合问题。

 阻塞队列(当把阻塞队列封装成单独的 服务器程序部署到特定的机器上,这个时候就把这个队列,称为消息队列),此时的耦合度就会降低,如果B这边出现问题,就不会对A产生直接影响(A只是和对列交互,不知道B的存在)后续增加一个C,此时A不必进行任何修改,只需要C从队列中获取数据即可。


🎈削峰填谷 

短时间内,(削峰)请求量比较多,(填谷)请求量比较少。

这个结构下,一旦客户端这边发起的请求非常多了,每个A收到的请求,都会立即发给B,A这边抗多少访问量B,B和A完全一样。但是在不同的服务器下,上面跑的业务不同,虽然访问量一样,单个访问,消耗的硬件资源不一样,可能A承担这些并发量就会挂了~~。比如B要操作数据库,数据库本身就是一个分布式系统,相对脆弱的环节。

引入生产者消费者模型,上述问题也会得到很大的改善。

A这边收到较大的请求量,A会把对应的请求写入队列中,B仍然可以按照之前的节奏,来处理请求。 比如,正常情况下,A和B每秒处理1k请求,极端情况下,A这边每秒处理3K请求,如果让B也处理3k次,就要挂了。队列B承担了压力,B仍然可以按照1K次的节奏,处理请求。但是像上述的情况下不会一直持续的存在,只会短时间出现,过了峰值之后,A的请求就恢复正常了,B就可以逐渐的把积压的数据都给处理掉了。这就保证了整个系统在突发情况下,都能更好的控制。

就比如大坝中,如果上游水量增加,大坝关闸蓄水,把上游的压力分担很多,往下游去放水的时候,就有节奏的放,如果上游水量减少了,大坝开闸放水。


🚩阻塞队列的实现

在java标准库里,已经提供了线程的阻塞队列,让咱们直接使用。

标准库中,针对BlockingQueue提供了俩种重要的实现方式

  • 1.基于数组
  • 2.基于链表

 了解标准库的阻塞队列怎么用,固然是一个环节,更重要的,是我们能够自己实现一个阻塞队列

阻塞队列包含三部分

  • 基于一个普通的队列
  • 线程安全——加锁(保证原子性)
  • 阻塞 (队列空出元素,队列满入元素都会造成阻塞)
  • 内存可见性 volatile关键字

普通队列,可以基于数组,也可以基于链表,基于数组其实是环形队列。

head指向的是队列的头部位置,tail是每插入一个元素tail都往后移一位,[head,tail)构成了一个区间,这个区间里的内容就是当前队列中的有效元素~,入队列,把新的元素,放到tail位置上,同时tail++,出队列,把head指向的元素给删除掉,head++;

但是初始情况下,队列为空的时候 head和tail重合,队列满了情况下,head和tail又重合了。

解决方案:

1.浪费一个格子,让tail指向head的前一个位置,就算满了。

2.专门搞一个变量size,来表示元素的个数,size为0是空,为数组最大值,就是满。(本次基于这种方法来写)

BlockingQueue阻塞队列中有俩个方法:

  • put阻塞式的入队列
  • tale阻塞式的出队列

📝基础的环形队列

首先定义三个变量,对头,队尾,有效元素的个数。
  • 入队列的时候,如果队列满了,普通队列就直接返回了,不满就插入元素,如果tail等于数组的最大长度了,那么就让tail设置成0.
  • 如果队列为空,普通队列就直接返回了,不为空,就删除元素,如果head等于数组的最大长度,那么就让head=0即可
class MyBlockQueue{
   //此处的最大长度,也可以指定构造方法,由构造方法来设定
    private String[] data=new String[1000];
    //队列的起始位置
    private int head=0;
    //队列的结束位置的下一个位置
    private int tail=0;
    //队列中有效元素的个数
    private int size=0;

    //核心放大,入队列和出队列
    public  void put(String elem){
        if(size==data.length){
            //队列满了
            //如果普通队列就直接return了
            return;
        }
        //队列没满,真正的往里面添加元素
        data[tail]=elem;
        tail++;
        //如果tail自增之后,到达了数组的末尾,这个时候就需要让它回到开头(环形队列)
        if(tail== data.length){
            tail=0;
        }
        size++;
    }

    public String take(){
        if(size==0){
            return null;
        }
        //队列不为空,就队首元素就返回去,并且删除掉
        String ret=data[head];
        head++;
        if(head==data.length){
            head=0;
        }
        size--;
        return ret;
    }
}

 这很明显是线程不安全的情况,如果在多线程的情况,对一个变量修改,那是非常的不安全的。


📝阻塞队列的形成

所以我们首先需要加锁。

我们直接将方法内部所有的代码段都加锁,因为里面涉及到多个变量修改,所以就要加锁来保证原子性和线程安全。

如何进行阻塞呢?

  • a.如果队列为空,继续出队列,就会发生阻塞。阻塞到其他线程往队列里添加元素为止。
  • b.如果队列为满,继续入队列,就会发生阻塞。阻塞到其他线程往队列里添加元素为止。

所以这里需要wait和notify机制

一个队列,要么是空,要么是满。

take和put只有一边能阻塞。

  • 如果put阻塞了,其他线程继续调用put也都会阻塞,只有靠take唤醒
  • 如果take阻塞了,其他线程继续调用take也还是会阻塞,只有靠put唤醒。

当put方法,因为队列满了,进入wait之后,此时,wait返回(被唤醒的时候)队列一定是不满的嘛?wait除了notify之外,是否还有其他的方式唤醒呢?

interrupt是可以中断wait状态的,但是在用try catch捕捉异常的时候,如果没有throw抛出异常的话,代码会往下一直执行,那么下面的tail指向的元素给覆盖掉了,实际上此处队列是满着的,此时tail指向的元素,并非是无效元素(把一个有效元素给覆盖住了)

所以使用wait的时候,一定要注意,考虑当前wait唤醒,是通过notify唤醒,还是通过interrupt唤醒。

  • notify唤醒说明其他线程调用了take,此时队列已经不满了,可以继续添加元素
  • interrupt唤醒,此时队列还是满的,继续添加元素,肯定是会出现问题的。因为interrupt确实是可以让wait唤醒,1>如果try catch的捕获异常了后没有抛出异常的话,就会继续往下执行,2>如果try catch的捕获异常了后并且抛出异常,会终止了整个线程,代码没有什么问题。但是我们不能保证我们有没有抛出异常。

所以基于上述俩个情况,如果用notify唤醒的话,可以不用担心,因为唤醒了wait肯定是因为 调用了take删除元素才会唤醒的,但是如果用 interrupt唤醒之后,我们就要担心一下是否队列依旧是满的,因为interrupt调用之后,如果继续插入元素的话,就会出现问题。

关键要点,当wait返回的时候,需要进一步确认一下,看当前队列是不是满的(本来是因为队列满,进入阻塞,接触阻塞之后,再确定一下队列满不满,如果经过确定之后,队列还是满的,继续进行wait)

使用wait的时候,往往都是使用while作为条件判定的方式,目的就是为了让wait唤醒之后还能再确认一次,是否条件仍然满足。

对于interrupt的情况下,这样用while作为条件判定的方式,就会每次唤醒之后,还会继续判定一次,如果还是等于,就继续,这样就避免了有没有捕捉异常后是否有抛出异常了。

所以针对俩种情况,我们最好的情况下,都是用while来判定一下,防止出现bug现象。

本题主要针对的是wait() notify()情况下,也不排除用interrupt唤醒程序。


📝 内存可见性

内存可见性是指当我们对同一个资源进行多次大量的使用的时候,那么jvm就不会对这个资源的改变加载到主内存中去,而是加载到运行内存中,但是只有对一个值的改变加载到主内存才是真正得对这个资源得修改,内存可见性其实就是为了保证我们每一次对某个资源的修改都能加载到主内存中去,而不是因为这个资源可能在某个时间段内被多次使用,就被jvm选择,暂时不把它加载到主内存中去。 这样就会导致其他线程再读取当前变量值的时候,就会是原来的值。

所以我们对于阻塞队列中,我们如果消费者和生产者每次都生产的很多或者消费的很多的话,那么可能会被jvm直接加载到运行内存中去,那么就会导致bug存在。所以避免内存可见性情况,我们用volatile来修饰变量。


📝阻塞队列代码

package BlockingQueue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;


class MyBlockQueue {
    //此处的最大长度,也可以指定构造方法,由构造方法来设定
    private String[] data = new String[1000];
    //队列的起始位置
    private volatile int head = 0;
    //队列的结束位置的下一个位置
    private volatile int tail = 0;
    //队列中有效元素的个数
    private volatile int size = 0;

    //核心放大,入队列和出队列
    public void put(String elem) throws InterruptedException {
        synchronized (this) {
            while (size == data.length) {
                this.wait();
            }
            //队列没满,真正的往里面添加元素
            data[tail] = elem;
            tail++;
            //如果tail自增之后,到达了数组的末尾,这个时候就需要让它回到开头(环形队列)
            if (tail == data.length) {
                tail = 0;
            }
            size++;
            this.notify();
        }
    }

    public String take() throws InterruptedException {
        synchronized (this) {
            while(size == 0) {
                this.wait();
            }
                //队列不为空,就队首元素就返回去,并且删除掉
                String ret = data[head];
                head++;
                if (head == data.length) {
                    head = 0;
                }
                size--;
                this.notify();
                return ret;
            }
        }
}
public class Test {
    public static void main(String[] args) {
        MyBlockQueue queue=new MyBlockQueue();
        //消费者
        Thread t1=new Thread(()->{
            while (true){
                try {
                    String result=queue.take();
                    System.out.println("消费元素"+result);
                    //Thread.sleep(500);//生产慢 消费快
                } catch (InterruptedException e) {
                   throw new RuntimeException(e);
                }
            }
        });

        //生产者
        Thread t2=new Thread(()->{
            int num=1;
            while (true){
                try {
                    queue.put(num+" ");
                    System.out.println("生产元素"+num);
                    num++;
                   Thread.sleep(500);//生产快,消费慢
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        t1.start();
        t2.start();
    }
}

我设定的是生产的慢,消费的快,每0.5s生产一个,我们可以看到,生产一个消费一个。 

 如果设定的生产的快,消费的慢,那么此时我们设定的数组长度是1000,等一下功夫都生产到1000了,然后等消费1个,然后继续生产,消费一个生产一个这样的速度了。


要努力成为自己想要的样子~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1542189.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MQ高级篇---消息可靠性

MQ的一些常见问题 后面内容基于springboot 2.3.9.RELEASE 消息可靠性 生产者确认机制 在publisher微服务中application.yml中添加 spring:rabbitmq:publisher-confirm-type: correlatedpublisher-returns: truetemplate:mandatory: true每个RabbitTemplate只能配置一个Return…

vscode配置c/c++调试环境

本文记录win平台使用vscode远程连接ubuntu server服务器下,如何配置c/c调试环境。 过程 1. 服务器配置编译环境 这里的前置条件是vscode已经能够连接到服务器,第一步安装编译构建套件(gcc、g、make、链接器等)和调试器&#xf…

Vue2(十):全局事件总线、消息订阅与发布、TodoList的编辑功能、$nextTick、动画

一、全局事件总线!! 任意组件间通信 比如a想收到别的组件的数据,那么就在a里面给x绑定一个demo自定义事件,所以a里面就得有一个回调函数吧,然后我要是想让d组件给a穿数据,那就让d去触发x的自定义事件&…

使用html做一个2048小游戏

下载地址: https://pan.xunlei.com/s/VNtiF13HxmmE4gglflvS1BUhA1?pwdvjrt# 提取码:vjrt”

C#执行命令行

效果图 主要代码方法 private Process p;public List<string> ExecuteCmd(string args){System.Diagnostics.Process p new System.Diagnostics.Process();p.StartInfo.FileName "cmd.exe";p.StartInfo.RedirectStandardInput true;p.StartInfo.RedirectSta…

STM32--RC522学习记录

一&#xff0c;datasheet阅读记录 1.关于通信格式 2.读寄存器 u8 RC522_ReadReg(u8 address) {u8 addr address;u8 data0x00;addr((addr<<1)&0x7e)|0x80;//将最高位置一表示read&#xff0c;最后一位按照手册建议变为0Spi_Start();//选中从机SPI2_ReadWriteByte(ad…

C++实现FFmpeg音视频实时拉流并播放

1.准备工作: 下载rtsp流媒体服务器rtsp-simple-server,安装go开发环境并编译 编译好后启动流媒体服务器 准备一个要推流的mp4视频文件,如db.mp4 使用ffmpeg开始推流 推流命令: ffmpeg -re -stream_loop -1 -i db.mp4 -c copy -rtsp_transport tcp -f rtsp rtsp://192.168.16…

JVM快速入门(2)HotSpot和堆、新生区、永久区、堆内存调优、JProfiler工具分析OOM原因、GC(垃圾回收)、JVM经典面试笔试题整理

5.6 HotSpot和堆 5.6.1 Hotspot 三种JVM&#xff1a; Sun公司&#xff0c;HotspotBEA&#xff0c;JRockitIBM&#xff0c;J9 VM&#xff0c;号称是世界上最快的Java虚拟机 我们一般学习的是&#xff1a;HotSpot 5.6.2 堆 Heap&#xff0c;一个JVM只有一个堆内存&#xff0c…

基于51单片机数控直流电压源proteus仿真LCD显示+程序+设计报告+讲解视频

基于51单片机数控直流电压源proteus仿真LCD显示( proteus仿真程序设计报告讲解视频&#xff09; 仿真图proteus7.8及以上 程序编译器&#xff1a;keil 4/keil 5 编程语言&#xff1a;C语言 设计编号&#xff1a;S0072 讲解视频 基于51单片机数控直流电压源proteus仿真程序…

t-rex2开放集目标检测

论文链接&#xff1a;http://arxiv.org/abs/2403.14610v1 项目链接&#xff1a;https://github.com/IDEA-Research/T-Rex 这篇文章的工作是基于t-rex1的工作继续做的&#xff0c;核心亮点&#xff1a; 是支持图片/文本两种模态的prompt进行输入&#xff0c;甚至进一步利用两…

八股文(1)

管道 匿名管道和命名管道 命名管道的使用是什么&#xff1f;在linux系统如何实现 命名管道&#xff08;Named Pipe&#xff09;&#xff0c;也称为FIFO&#xff08;First In First Out&#xff09;&#xff0c;是一种在UNIX和Linux系统中用于进程间通信&#xff08;IPC&…

【PyQt】17.1-日历控件 不同风格的日期和时间、以及高级操作

日历控件puls版本 前言一、日历控件中不同风格的日期和时间1.1 代码1.2 注意事项格式设置m的大小写问题QTime和QDateTime的区别 1.3 运行结果 二、高级操作2.1 成倍调整2.2 下拉出日历2.3 事件函数2.4 获取设置的日期和时间 完整代码 前言 1、不同风格的日期和时间展示 2、高级…

Django下载使用、文件介绍

【一】下载并使用 【1】下载框架 &#xff08;1&#xff09;注意事项 计算机名称不要出现中文python解释器版本不同可能会出现启动报错项目中所有的文件名称不要出现中文多个项目文件尽量不要嵌套,做到一项一夹 &#xff08;2&#xff09;下载 Django属于第三方模块&#…

无人机摄影测量---倾斜摄影测量原理与关键技术

《无人机航空摄影测量精品教程》专栏&#xff1a;无人机航测外业作业流程&#xff08;像控点布设、航线规划、仿地飞行、航拍&#xff09;和内业数据处理软件&#xff08;Pix4d、CC、EPS、PhotoScan、Globalmapper&#xff09;像控点权重调配、空三加密、DOM、DSM、DEM&#xf…

ubuntu18安装opensips3.4,开启ws/wss/http接口模块

、如果是centos 7安装则使用yum 命令。 添加库地址注意系统类型&#xff0c;选择对应的系统类型和版本 curl https://apt.opensips.org/opensips-org.gpg -o /usr/share/keyrings/opensips-org.gpg echo "deb [signed-by/usr/share/keyrings/opensips-org.gpg] https:/…

芯片工程系列(5)2.5D 3D封装

0 英语缩写 硅通孔&#xff08;Through Silicon Via&#xff0c;TSV&#xff09;硅中介层&#xff08;Silicon Interposer&#xff09;物理气象沉淀法&#xff08;Physical Vapor Deposition&#xff0c;PVD&#xff09;DRIE、CVD、PVD、CMP等设备CoWoS&#xff08;Chip on Wa…

政安晨:【深度学习实践】【使用 TensorFlow 和 Keras 为结构化数据构建和训练神经网络】(五)—— Dropout和批归一化

政安晨的个人主页&#xff1a;政安晨 欢迎 &#x1f44d;点赞✍评论⭐收藏 收录专栏: TensorFlow与Keras实战演绎 希望政安晨的博客能够对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff01; Dropout和批归一化是深度学习领域中常用的正则化技术…

pytorch如何向tensor结尾添加元素或维度--torch.cat()、torch.unsqueeze()的用法

目录 示例1 矢量后增加元素 示例2 tensor维度增加1 示例3 另一种替代unsqueeze的方法 示例1 矢量后增加元素 使用torch.cat()函数 ptorch.Tensor([1,5,0]) ptorch.cat((p, torch.Tensor([4])), 0) 结果&#xff1a; 这里&#xff0c;cat的第一个输入变量用()包绕&#xf…

中断(NVIC)的使用--EXTI--TIM

目录 中断是什么 轮询 中断 中断调用情况 中断的分类 内部中断&#xff08;TIM、UART等&#xff09; tim.c tim.h 外部中断EXTI exti.c exti.h 中断是什么 在处理事件的时候有两种方式&#xff1a;轮询和中断。 轮询 顾名思义&#xff0c;就是每轮都询问一次。比如…

笔记本和台式机主板内部结构分析

笔记本和态势机主板内存接口以及配件安装位置 笔记本主板 1 以thinkpad L-490为例,使用拆机小工具拆机&#xff0c;打开后面板&#xff0c;内部结构示意图如下 台式机主板 以技嘉-B660M-AORUS-PRO-AX型号主板为例 笔记本电脑和台式机电脑的相同之处 CPU&#xff1a;笔记本…