阻塞队列(超详细易懂)

news2025/1/13 8:03:27

目录

一、阻塞队列

1.阻塞队列概述

2.生产者消费者模型

3.阻塞队列的作用

4.标准库中的阻塞队列类

5.例子:简单生产者消费者模型

二、阻塞队列模拟实现

1.实现循环队列(可跳过)

1.1简述环形队列

1.2代码实现

2.实现阻塞队列

2.1实现思路

2.2代码实现

2.3代码解析

①wait和notify的使用,实现自动阻塞和解阻塞

②while循环判断,线程安全的铜墙铁壁

2.4纯享版代码实现(无注释)


一、阻塞队列

1.阻塞队列概述

阻塞队列是一种特殊的队列,同样遵循“先进先出”的原则,支持入队操作和出队操作。在此基础上,阻塞队列会在队列已满或队列为空时陷入阻塞,使其成为一个线程安全的数据结构,它具有如下特性:

  • 当队列已满时,继续入队列就会阻塞,直到有其他线程从队列中取走元素。
  • 当队列为空时,继续出队列也会阻塞,直到有其他线程向队列中插入元素。

2.生产者消费者模型

生产者消费者模型有两种角色,生产者和消费者,两者之间通过缓冲容器来达到解耦合的效果。类似于厂商和客户与中转仓库之间的关系,如下图:

厂家生产的商品堆积在中转仓库,当中转仓库满时,入仓阻塞,当中转仓库为空时,出仓阻塞。通过上述结构,生产者和消费者摆脱了“产销一体”的运作模式,即解耦合。同时,无论是客户需求暴增,还是厂家产量飙升,都会被中央仓库协调,避免突发情况导致结构崩溃。

同理,根据生产者消费者模型,我们将线程带入到消费者和生产者的角色,阻塞队列带入到缓冲空间的角色,一个类似的模型很容易就搭建起来了。

所以说,阻塞队列对生产者消费者模型是相当重要的。

3.阻塞队列的作用

①解耦合

作为生产者消费者模式的缓冲空间,将线程(其他)之间分隔,通过阻塞队列间接联系起来,起到降低耦合性的作用,这样即使其中一个挂掉,也不会使另一个也跟着挂掉。

②削峰填谷

因为阻塞队列本身的大小是有限的,所以能起到一个限制作用,即在消费者面对突发暴增的入队操作,依然不受影响。

如电商平台在每年双十一时都会出现请求峰值的情况,如下(杜撰):

而假设电商平台对请求的处理流程是这样的:

因为处理请求需要消耗硬件资源,如果没有消息队列,面对双十一这种请求暴增的情况,请求处理服务器很可能就直接挂掉了。

而有了消息队列之后,请求处理服务器不必直接面对大量请求的冲击,仍旧可以按原先的处理速度来处理请求,避免了被冲爆,这就是‘削峰’。

没有被处理的请求也不是不处理了,而是当消息队列有空闲时再继续流程,即高峰请求被填在低谷中,这就是‘填谷’。

经过‘削峰填谷’之后的请求处理曲线就(大致)变成了下图:

4.标准库中的阻塞队列类

类名说明
LinkedBlockingQueue<>基于链表的阻塞队列(常用)
LinkedBlockingDeque<>基于链表的双端阻塞队列
LinkedTransferQueue<>基于链表的无界阻塞队列
ArrayBlockingQueue<>基于顺序表的阻塞队列
PriorityBlockingQueue<>带有优先级功能的阻塞队列
方法    解释
void put(E e)带有阻塞特性的入队操作方法(常用)
E take() 带有阻塞特性的出队操作方法(常用)
boolean offer(E e, long timeout, TimeUnit unit) 带有阻塞特性的入队操作方法,并且可以设置最长等待时间
E poll(long timeout, TimeUnit unit) 带有阻塞特性的出队操作方法,并且可以设置最长等待时间
public boolean contains(Object o)

 判断阻塞队列中是否包含某个元素

5.例子:简单生产者消费者模型

可调整生产时间和消费时间观察效果。

//模拟实现的阻塞队列
class MyBlockingQueue {
    private Object lock = new Object();
    private String[] elems = null;
    private int head = 0;
    private int tail = 0;
    private int size = 0;
    public MyBlockingQueue(int capacity) {
        elems = new String[capacity];
    }

    public void put(String elem) throws InterruptedException {
        synchronized(lock) {
            while(size == elems.length) {
                lock.wait();
            }
            elems[tail] = elem;
            tail++;
            if(tail >= elems.length) {
                tail = 0;
            }
            size++;
            lock.notify();
        }
    }
    public String tack() throws InterruptedException {
        String elem = null;
        synchronized (lock) {
            while(size == 0) {
                lock.wait();
            }
            elem = elems[head];
            head++;
            if(head == elems.length) {
                head = 0;
            }
            size--;
            lock.notify();
        }
        return elem;
    }
}

public class ThreadDemo1 {
    public static void main(String[] args) throws InterruptedException {
        MyBlockingQueue queue = new MyBlockingQueue(10);
        //生产者线程
        Thread producerModel = new Thread(() -> {
            int number = 0;
            while(true) {
                try {
                    queue.put("" + number);
                    System.out.println("生产了 " + number++);
                    Thread.sleep(400);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        //消费者线程A
        Thread consumerModelA = new Thread(() -> {
            String number;
            while(true) {
                try {
                    number = queue.tack();
                    System.out.println("A消费了 " + number);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        //消费者线程B
        Thread consumerModelB = new Thread(() -> {
            String number;
            while(true) {
                try {
                    number = queue.tack();
                    System.out.println("B消费了 " + number);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        producerModel.start();
        consumerModelA.start();
        consumerModelB.start();
    }
}

二、阻塞队列模拟实现

1.实现循环队列(可跳过)

在正式实现阻塞队列之前,我们需要先将普通循环队列这个框架搭建起来,再考虑为其加入阻塞队列的特性。

1.1简述环形队列

环形队列,也被称为循环队列,是一种特殊的线性数据结构。它的操作基于先进先出(FIFO)的原则,并且队尾被连接在队首之后以形成一个循环,可以使用数组或链表实现,这里采用数组。

  • 环形队列在逻辑上是环形的,但在物理上,它通常是通过一个定长的数组来实现的。
  • 环形队列的大小是确定的,一旦创建,它所能存放的元素个数就是固定的。
  • 先进先出,队列队首出队,队尾入队

1.2代码实现

class AnnularQueue {
    //String类型的数组,存储队列元素
    private String[] elems = null;
    //队首位置
    private int head = 0;
    //队尾位置
    private int tail = 0;
    //存储的元素个数
    private int size = 0;

    //构造方法,用于构建定长数组,数组长度由参数指定
    public AnnularQueue(int capacity) {
        elems = new String[capacity];
    }

    //入队方法
    public void put(String elem) throws InterruptedException {
        //当队列已满时,拒绝入队
        if(size == elems.length) {
            return;
        }
        //将元素存入队尾
        elems[tail] = elem;
        //存入后,队尾位置后移一位
        tail++;
        //实现环形队列的关键,超过数组长度后回归数组首位
        if(tail >= elems.length) {
            //回归数组首位
            tail = 0;
        }
        //存入后元素总数加一
        size++;
    }

    //出队方法
    public String tack() throws InterruptedException {
        String elem = null;
        //当队列为空时,拒绝入队,返回null
        if(size == 0) {
            return elem;
        }
        //出队,取出队首值(不用置空,队尾存入时覆盖)
        elem = elems[head];
        //出队后,队首位置后移一位
        head++;
        //实现环形队列的关键,超过数组长度后回归数组首位
        if(head == elems.length) {
            //回归数组首位
            head = 0;
        }
        //存入后元素总数加一
        size--;
        //返回取出的元素
        return elem;
    }
}

2.实现阻塞队列

2.1实现思路

前面说到了“阻塞队列是一种特殊的队列,同样遵循‘先进先出’的原则,支持入队操作和出队操作”,实现一个只有入队和出队操作的队列很简单,关键在于如何将阻塞队列的特性加入进去,使其能够判断队列是否已满或是为空,进而阻塞等待。

判断是否未满很简单,只要在队列中定义一个size变量统计已存个数,当已存个数和队列长度相同时就为已满,size为0时队列为空。

阻塞等待也不难,只要引入锁,在入队、出队操作中使用wait和notify就可以了。

真正的难点在于,阻塞队列是适配于多线程程序的,必须要考虑到线程安全问题,而这一问题往往不好解决。

来,先看一下基于环形队列的简单阻塞队列到底是如何实现的吧。

2.2代码实现

测试可使用上文的例子‘简单生产者消费者模型’。

class MyBlockingQueue {
    //对象公用锁
    private Object lock = new Object();
    //String类型的数组,存储队列元素
    private String[] elems = null;
    //队首位置
    private int head = 0;
    //队尾位置
    private int tail = 0;
    //存储的元素个数
    private int size = 0;
    
    //构造方法,用于构建定长数组,数组长度由参数指定
    public MyBlockingQueue(int capacity) {
        elems = new String[capacity];
    }

    //入队方法
    public void put(String elem) throws InterruptedException {
        synchronized(lock) {
            //已满时入队操作阻塞
            while(size == elems.length) {
                lock.wait();
            }
            //将元素存入队尾
            elems[tail] = elem;
            //存入后,队尾位置后移一位
            tail++;
            //实现环形队列的关键,超过数组长度后回归数组首位
            if(tail >= elems.length) {
                //回归数组首位
                tail = 0;
            }
            //存入后元素总数加一
            size++;
            //当出队操作阻塞时,入队后为其解除阻塞
            //(入队后队列不为空了)
            lock.notify();
        }
    }
    
    //出队方法
    public String tack() throws InterruptedException {
        //存储取出的元素,默认为null
        String elem = null;
        synchronized (lock) {
            //队列为空时出队操作阻塞
            while (size == 0) {
                lock.wait();
            }
            //出队,取出队首值(不用置空,队尾存入时覆盖)
            elem = elems[head];
            //出队后,队首位置后移一位
            head++;
            //实现环形队列的关键,超过数组长度后回归数组首位
            if(head == elems.length) {
                //回归数组首位
                head = 0;
            }
            //存入后元素总数加一
            size--;
            //当入队操作阻塞时,出队后为其解除阻塞
            //(出队后队列不满)
            lock.notify();
        }
        //返回取出的元素
        return elem;
    }
}

2.3代码解析

①wait和notify的使用,实现自动阻塞和解阻塞

首先看wait的位置:

判断条件很易懂,就是当队列已满(为空)时调用wait方法,使调用该方法的线程陷入阻塞,也就是说线程阻塞时,队列是一定陷入已满或为空状态的。

那么什么时候解除阻塞呢?当然是失去已满或为空状态的时候。

调用tack方法出队能够使队列留出位置,不再已满;调用put方法入队能够为队列存入元素,不再为空,两者相辅相成。

所以阻塞的put方法一定是要在别的线程调用tack方法,完成出队后才可能解除阻塞的;阻塞的tack方法也一定是要在别的线程调用put方法,完成入队后才可能解除阻塞的。

注意“可能”二字,因为阻塞的可能有很多线程,所以还要再参与lock锁竞争。

②while循环判断,线程安全的铜墙铁壁

代码中,while循环条件为判断队列状态是否已满(为空),若判断通过,则线程阻塞,等待状态解除。就直观效果而言,使用if语句和while循环的区别不大,但对于多线程程序,我们不得不多考虑一些。

在实现自动阻塞和解阻塞时,我们让put方法(入队操作)和tack方法(出队操作)互相为对方解除阻塞。

但是,put方法和stack方法中用的是同一把锁(lock),并且notify方法的机制是随机解除一个线程的阻塞,那么不管是put方法(入队操作)还是tack方法(出队操作)调用notify方法都可能反而为“同类”解除了阻塞,而我们的原意是要让他们互相解除阻塞。

下图示例就是一种bug可能,在队列容量为100,当前元素量为99的情况下,由于notify的随机唤醒机制,已满状态下的队列又进行了一次入队操作。(出队bug同理)

而使用while循环则不会出现这样的bug,由于wait方法在循环体内部,因此当阻塞结束后仍然会再次判断队列状态,即便再次堵塞后又出现同样的问题也没关系,大不了继续判断,至死方休。

③其他问题请留言😘

2.4纯享版代码实现(无注释)

class MyBlockingQueue {
    private static Object lock = new Object();
    private String[] elems = null;
    private int head = 0;
    private int tail = 0;
    private int size = 0;
    public MyBlockingQueue(int capacity) {
        elems = new String[capacity];
    }

    public void put(String elem) throws InterruptedException {
        synchronized(lock) {
            while(size == elems.length) {
                lock.wait();
            }
            elems[tail] = elem;
            tail++;
            if(tail >= elems.length) {
                tail = 0;
            }
            size++;
            lock.notify();
        }
    }
    public String tack() throws InterruptedException {
        String elem = null;
        synchronized (lock) {
            while(size == 0) {
                lock.wait();
            }
            elem = elems[head];
            head++;
            if(head == elems.length) {
                head = 0;
            }
            size--;
            lock.notify();
        }
        return elem;
    }
}

博主是Java新人,每位同志的支持都会给博主莫大的动力,欢迎留言讨论,如果有任何疑问,或者发现了任何错误,都欢迎大家在评论区交流“ψ(`∇´)ψ

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1430731.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CMake生成osg的FFMPEG插件及Windows下不生成VS工程问题解决

在Windows下&#xff0c;如何利用CMake生成osg的FFMPEG插件&#xff0c;请参考如下博文&#xff0c;同生成jpeg插件类似&#xff1a; osg第三方插件的编译方法&#xff08;以jpeg插件来讲解&#xff09;。 如下为生成FFMPEG时必要的设置&#xff1a; 注意&#xff1a; 一定要…

开发智能化企业培训平台:教育系统源码的创新方法

在传统的企业培训模式中&#xff0c;往往面临着效率低下、内容过时以及难以个性化的问题。为了解决这些挑战&#xff0c;采用智能化技术成为了企业培训领域的热门趋势。通过开发智能化企业培训平台&#xff0c;可以提高培训效果、降低成本&#xff0c;并更好地满足员工多样化的…

海量数据处理商用短链接生成器平台 - 2

第二章 短链平台项目创建git代码管理开发分层规范 第1集 短链平台实战-Maven聚合工程创建微服务项目 **简介&#xff1a;Maven聚合工程创建微服务项目实战 ** Maven聚合工程拆分 dcloud-common 公共依赖包 dcloud-app FlinkKafka实时计算 dcloud-account 账号流量包微服务 dc…

Oracle 面试题 | 10.精选Oracle高频面试题

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

【深度测试】看到技术方案后,该怎么进行分析和测试

测试左移的思想&#xff0c;讲究尽早测试&#xff0c;测试是一系列的行为&#xff0c;并不一定要等代码运行起来才能测&#xff0c;下面会分享一些经验&#xff0c;提供大家参考。 一、静态分析 1.1 分析方法调用链 目标&#xff1a;梳理结构&#xff0c;化繁为简 原理&#…

Quppy wise 注册教程,轻松通过欧洲银行同名转账绑定个人IBAN账号

Quppy 注册教程,轻松通过欧洲银行同名转账绑定个人IBAN账号 官网下载APP或者去香港区下载APP使用, 按照官方APP里的邮箱注册就行&#xff0c;成功后添加电话和个人信息&#xff1b;需要说明的是&#xff1a;网站所填内容请全部用真实身份填写&#xff1b;名在前&#xff0c;姓…

华为机考入门python3--(7)牛客7-取近似值

分类&#xff1a;数字 知识点&#xff1a; str转float float(str) 向上取整 math.ceil(float_num) 向下取整 math.floor(float_num) 题目来自【牛客】 import math def round_to_int(float_num): # 如果小数点后的数值大于等于0.5&#xff0c;则向上取整&#xf…

计算机视觉中的目标跟踪

从保护我们城市的监控系统到自动驾驶车辆在道路上行驶&#xff0c;目标跟踪已经成为计算机视觉中的一项基础技术。本文深入探讨了目标跟踪&#xff0c;探索了其基本原理、多样化的方法以及在现实世界中的应用。 什么是目标跟踪&#xff1f; 目标跟踪是深度学习在计算机视觉中广…

分布式任务调度框架XXL-JOB详解

分布式任务调度 概述 场景: 如12306网站根据不同车次设置放票时间点&#xff0c;商品成功发货后向客户发送短信提醒等任务,某财务系统需要在每天上午10天前统计前一天的账单数据 任务的调度是指系统为了完成特定业务&#xff0c;基于给定的时间点&#xff0c;时间间隔&#…

axios二次封装用法

axios二次封装 一、request.js import axios from axios import router from "/router";const request axios.create({baseURL: http://localhost:9090,timeout: 5000 })// request 拦截器 // 可以自请求发送前对请求做一些处理 // 比如统一加token&#xff0c;对…

Unity DOTS中的baking(三)过滤baking的输出

Unity DOTS中的baking&#xff08;三&#xff09;过滤baking的输出 默认情况下&#xff0c;在conversation world&#xff08;baker和baking system运行的环境&#xff09;下产生的所有entities和components&#xff0c;都会作为baking环节的输出。在baking结束时&#xff0c;U…

[SWPUCTF 2021 新生赛]include

他让我们传入一个flag值 我们传入即可看到代码部分 传入一个php的伪类即可 得到经过Base64加密的flag&#xff0c;解密即可

x-shell安装、使用以及配置cuda、cudnn和conda

x-shell安装、使用以及安装最新版本conda x-shell安装远程连接服务器conda安装和环境配置 x-shell安装 x-shell是一款终端模拟软件&#xff0c;用于在Windows界面下远程访问和使用不同系统下的服务器。免费版本下载地址&#xff1a; https://www.xshell.com/zh/free-for-home-…

论文阅读:Learning Lens Blur Fields

这篇文章是对镜头模糊场进行表征学习的研究&#xff0c;镜头的模糊场也就是镜头的 PSF 分布&#xff0c;镜头的 PSF 与物距&#xff0c;焦距&#xff0c;光学系统本身的像差都有关系&#xff0c;实际的 PSF 分布是非常复杂而且数量也很多&#xff0c;这篇文章提出用一个神经网络…

Vulnhub靶机:hacksudo3

一、介绍 运行环境&#xff1a;Virtualbox 攻击机&#xff1a;kali&#xff08;10.0.2.15&#xff09; 靶机&#xff1a;hacksudo3&#xff08;10.0.2.45&#xff09; 目标&#xff1a;获取靶机root权限和flag 靶机下载地址&#xff1a;https://www.vulnhub.com/entry/hac…

Power BI案例-链接Mysql方法

Power BI案例-连锁Mysql 方法1-通过组件mysql-connector-net-8.3.0&#xff1a; 选择文件–获取数据–选择MySQL数据库–选择链接 提示无组件&#xff0c;选择了解详细情况 弹出浏览器&#xff0c;选择下载 不用登陆&#xff0c;可以直接下载 下载的组件如下&#xff1a…

cesium-加载谷歌影像

cesium在开发的时候有可能会加载不同的影像&#xff0c;今天就先看一下加载谷歌的吧。 使用谷歌有个好处就是基本不会出现此区域无卫星图的情况 闲言话语不多说&#xff0c;看代码 <template><div id"cesiumContainer" style"height: 100vh;"&g…

uniapp开发一个交流社区小程序

uniapp开发一个交流社区小程序 假期的时候简单学了一下uniapp&#xff0c;想开发一款类似百度贴吧的交流社区来练练手。本篇文章主要记录开发过程&#xff0c;文末附上项目地址。 主要需要开发以下几个页面。 信息页面热榜页面用户主页用户信息页 信息页面 该页面的功能主要…

国标GB/T 28181详解:设备视音频文件检索消息流程

目 录 一、设备视音频文件检索 二、设备视音频文件检索的基本要求 三、命令流程 1、流程图 2、流程描述 四、协议接口 五、产品说明 六、设备视音频文件检索的作用 七、参考 在国标GBT28181中&#xff0c;定义了设备视音频文件检索消息的流程&#xff0c;主…

ubuntu 安装 kvmQemu no active connection to install on

更新 apt sudo apt update检查虚拟化是否开启 0 不开&#xff0c;其余数字表示开启&#xff0c;开不开都可以&#xff0c;不开性能弱&#xff0c;只能跑 x86 系统 egrep -c (vmx|svm) /proc/cpuinfo安装 sudo apt install -y qemu-kvm virt-manager libvirt-daemon-system virt…