【阻塞队列】阻塞队列的模拟实现及在生产者和消费者模型上的应用

news2024/11/15 19:50:05

文章目录

  • 📄前言
  • 一. 阻塞队列初了解
    • 🍆1. 什么是阻塞队列?
    • 🍅2. 为什么使用阻塞队列?
    • 🥦3. Java标准库中阻塞队列的实现
  • 二. 阻塞队列的模拟实现
    • 🍚1. 实现普通队列
    • 🍥2. 实现队列的阻塞功能
    • 🧊3. 解除阻塞状态
  • 三. 使用模拟的阻塞队列验证生产者和消费者模型

📄前言

本文是对阻塞队列的应用场景的介绍,对阻塞队列的作用以及具体实现的讨论。


一. 阻塞队列初了解

🍆1. 什么是阻塞队列?

阻塞队列是一种带有阻塞功能的“先进先出”线性表。即在一个带有最大容量的队列中,在某时刻队列容量已满时继续入队 或 队列为空时继续出队,就会进入阻塞等待状态,直到队列变为 未满或非空 便解除阻塞状态,继续入队或出队。

🍅2. 为什么使用阻塞队列?

若存在以下简易的分布式系统:
在这里插入图片描述
上述分布式系统虽然能完成客户端与服务器端的交互需求,但可能存在以下问题:

  1. 在正常情况下,用户可以通过客户端想服务器发起请求并获取相应的服务,但假如在某刻服务器A突然出现了故障,与服务器A直接通信的服务器B也可能因此出现故障,导致整个服务瘫痪。
  2. 若未来想增加 更多的服务器 来处理服务器A发起的请求,则需求对 服务器A 的接口 进行一定的改动,付出一定的时间和人力成本。
  3. 当某个时刻,很多的客户端同时向 服务器A 发起请求,作为与用户直接交互的服务器,服务器A具备承载这些并发量的能力,但服务器集群中负责其他功能的服务器接收请求的承载能力可能较弱,此时可能造成其他服务器的崩溃。

造成上述现象的原因可以归结为以下两点:

  1. 模块间的耦合性较高(例如问题1和2)
  2. 承载能力较弱的模块不具备抗冲击能力。(例如问题3)

上述的解决方法是在服务器之间加入一个阻塞队列,利用生产者和消费者模型解决以上问题。
什么是生产者消费者模型呢?(如下图)
在这里插入图片描述

当服务器A接收来自客户端的请求时,不把请求直接发给服务器B,而是将请求数据加入到队列中,服务器B通过队列接收请求并把请求除了的结果返回给A。


当上述分布式系统引入阻塞队列后工作模式如下图所示:
在这里插入图片描述

引入阻塞队列的好处:

  1. 解耦合。当服务器A或服务器B出现问题时,就不会对其他服务器造成直接的影响;当需要添加新的服务器来处理这些请求时,新的服务器也同样只需从队列中取数据,无需对原有服务器的接口(代码)进行任何的改动。
  2. 削峰填谷”。当服务器A 瞬间接收客户端发来的大量请求时,由于服务器B处理请求的速度较慢,剩余的请求会在阻塞队列里面堆积,虽然客户端获取服务的时间相对增加了,但一定程度上缓解了其他承受并发量能力较弱的服务器的压力。

🥦3. Java标准库中阻塞队列的实现

在这里插入图片描述

BlockingQueue的主要方法:
在这里插入图片描述
方法演示如下:(使用普通入队方法入队4次,再使用带有阻塞的出队方法出队4次)

public static void main(String[] args) throws InterruptedException {

    BlockingQueue<Integer> q = new ArrayBlockingQueue<>(3);
    System.out.println("数据 5 入队状态: " + q.offer(5));
    System.out.println("数据 6 入队状态: " + q.offer(6));
    System.out.println("数据 7 入队状态: " + q.offer(7));
    System.out.println("数据 8 入队状态: " + q.offer(8));
    System.out.print("队列中的数据: ");
    System.out.println(q);

    
    System.out.println("数据出队: ");
    for (int i = 0; i < 4; i++) {
        System.out.print(q.take() + " ");
    }
    System.out.println("程序结束 !");
}

在这里插入图片描述

可以发现,当调用 take()方法取出队列元素时,因为队列最终为空,程序进入了阻塞状态,没有打印“程序结束”。


二. 阻塞队列的模拟实现

🍚1. 实现普通队列

阻塞队列的关键方法是两个带有阻塞功能的 put() 和 take()方法,而这两个方法是在原有出入队方法上使用 Object类 带有wait()方法 和 notify() 方法让线程进入等待状态 或 唤醒线程。
因此,我们可以先把基础的队列进行实现,随后在原有基础上进行修改。队列可以使用数组(环形队列)或链表两种方式实现,这里我采用数组的方式实现队列。(由于队列的实现方法较为常见,这里直接给出实现代码)

class MyBlockingQueue<E> {
    private Object[] elem;
    private int defaultCapacity = 11;	// 阻塞队列默认容量
    private int front;	// 记录队头元素位置
    private int rear;	// 记录队尾元素位置
    private int size;   // 用于记录当前队列元素的实际个数

    public MyBlockingQueue(){
        this.elem = new Object[defaultCapacity + 1];
    }
    public MyBlockingQueue(int capacity) {
        defaultCapacity = capacity;
        this.elem = new Object[defaultCapacity + 1];
    }

    public boolean offer(E val) {
        // 判断队列是否已满
        if (size == defaultCapacity) {
            return false;
        }
        
        elem[rear] = val;
        size++;
        // 如果 rear自增 到达数组末尾,使 rear 重新到数组的头部
        rear = (rear + 1) % (defaultCapacity + 1);
        return true;
    }
    
    public E poll() {
        // 判断队列是否为空
        if (front == rear) {
            return null;
        }
        
        Object ret = elem[front];
        size--;
        // 如果 front 自增 到达数组末尾,使 front 重新到数组的头部
        front = (front + 1) % (defaultCapacity + 1);
        return (E)ret;
    }
}

🍥2. 实现队列的阻塞功能

当阻塞队列容量已满时,调用 put() 方法会进入阻塞状态,因此在原先 offer()方法判断的基础上,我们需要使用 wait()方法 让线程进入阻塞等待状态,考虑到可能有多个线程同时调用 put()方法,可能会引起线程安全问题,因此我们应在 if()判断条件和整个修改操作上 加锁(或者直接在方法上加锁)。(代码如下)

public void put (E value) throws InterruptedException {
    // 判断队列是否已满
    synchronized (this) {
        if (size == defaultCapacity) {
        // 队列进入阻塞状态, 直到有元素出队时 解除阻塞
            this.wait();
        }
        
        queue[rear] = value;
        size++;
        // 如果 rear自增 到达数组末尾,使 rear 重新到数组的头部
        rear = (rear + 1) % (defaultCapacity + 1);
    }
}

当队列为空时,调用 take() 方法会使线程进入阻塞状态,同理若判空条件成立,我们需要调用 wait() 方法使线程进入阻塞,为防止多个线程在队列即将为空时同时调用 take() 方法引发线程安全问题,我们需要在 if()判断语句 和 整个修改操作 进行加锁操作(或者直接在方法上加锁)。(代码如下)

public E take() throws InterruptedException {
    // 判断队列是否为空
    synchronized (this) {
        if (rear == front) {
        // 队列进入阻塞状态,直到有新的元素入队时 解除阻塞
            this.wait();
        }
        
        Object ret = queue[front];
        // 如果 front 自增 到达数组末尾,使 front 重新到数组的头部
        front = (front + 1) % (defaultCapacity + 1);
        size--;
        return (E)ret;
    }
}

🧊3. 解除阻塞状态

什么情况下队列会接触阻塞状态呢?

  1. 当队满时,某个线程从阻塞队列取出一个元素,即执行完出队操作后,需要使用 notify()方法 唤醒因执行 put()方法而阻塞的线程。
  2. 当队空时,某个线程向队列新增一个元素,即执行完入队操作后,需要使用 notify()方法唤醒因执行 take()方法而阻塞的线程。

对 put()方法和take()方法 修改后代码如下:

public void put (E value) throws InterruptedException {
    // 判断队列是否已满
    synchronized (this) {
        if (size == defaultCapacity) {
        // 队列进入阻塞状态, 直到有元素出队时 解除阻塞
            this.wait();
        }
        
        queue[rear] = value;
        size++;
        // 如果 rear自增 到达数组末尾,使 rear 重新到数组的头部
        rear = (rear + 1) % (defaultCapacity + 1);
        // 此处的 notify 用来唤醒 队列为空时的 wait
        this.notify();
    }
}

public E take() throws InterruptedException {
    // 判断队列是否为空
    synchronized (this) {
        if (rear == front) {
        // 队列进入阻塞状态,直到有新的元素入队时 解除阻塞
            this.wait();
        }
        
        Object ret = queue[front];
        // 如果 front 自增 到达数组末尾,使 front 重新到数组的头部
        front = (front + 1) % (defaultCapacity + 1);
        size--;
        // 此处的 notify 用来唤醒 队列为满时的 wait
        this.notify();
        return (E)ret;
    }
}

三. 使用模拟的阻塞队列验证生产者和消费者模型

为了方便看到效果,我们假设阻塞队列的容量为2,并将生产与消费的数据进行打印。
当生产者与消费者处理数据的频率一样,且生产速率为 次/1s、消费速率为 次/1s 时,程序的生产与消费数据应轮流打印:(模拟代码和程序运行结果如下)

public static void main(String[] args) {
    MyBlockingQueue<Integer> myBlockingQueue = new MyBlockingQueue<>(2);
    // 生产者
    Thread producer = new Thread(() -> {
        for (int i = 0; i < 5; i++) {
            try {
                myBlockingQueue.put(i);
                System.out.println("生产了: " + i);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    });
	
	// 消费者
    Thread consumer = new Thread(() -> {
        for (int i = 0; i < 5; i++) {
            try {
                int ret = myBlockingQueue.take();
                System.out.println("消费了: " + ret);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    });
    
    producer.start();
    consumer.start();
}

在这里插入图片描述

当生产速率 > 消费速率,且生产速率为 次/1s、消费速率为 次/2s 时:可以预估到,当经过5s后程序会因队满进入阻塞状态,且后续每消费一次伴随着一次生产,为方便观察阻塞情况,我们可以在方法实现的地方加上阻塞日志的提示(模拟代码和程序运行结果如下)

public static void main(String[] args) {
    MyBlockingQueue<Integer> myBlockingQueue = new MyBlockingQueue<>(2);
    Thread producer = new Thread(() -> {
        for (int i = 0; i < 10; i++) {
            try {
                myBlockingQueue.put(i);
                System.out.println("生产了: " + i);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    });

    Thread consumer = new Thread(() -> {
        for (int i = 0; i < 10; i++) {
            try {
                int ret = myBlockingQueue.take();
                System.out.println("消费了: " + ret);
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    });

    producer.start();
    consumer.start();
}

在这里插入图片描述

当生产速率 < 消费速率,且生产速率为 次/2s、消费速率为 次/1s 时:可以预估到,当经过2s后程序会因队满进入阻塞状态,且后续每生产一次伴随着一次消费,为方便观察阻塞情况,我们可以在方法实现的地方加上阻塞日志的提示(模拟代码和程序运行结果如下)

public static void main(String[] args) {
    MyBlockingQueue<Integer> myBlockingQueue = new MyBlockingQueue<>(2);
    Thread producer = new Thread(() -> {
        for (int i = 0; i < 10; i++) {
            try {
                myBlockingQueue.put(i);
                System.out.println("生产了: " + i);
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    });

    Thread consumer = new Thread(() -> {
        for (int i = 0; i < 10; i++) {
            try {
                int ret = myBlockingQueue.take();
                System.out.println("消费了: " + ret);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    });

    producer.start();
    consumer.start();
}

在这里插入图片描述


以上就是本篇文章的全部内容了,如果这篇文章对你有些许帮助,你的点赞、收藏和评论就是对我最大的支持。
另外,文章可能存在许多不足之处,也希望你可以给我一点小小的建议,我会努力检查并改进。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1414533.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python.五.文件

1.文件读取的操作 1.文件的打开 open(name,mode,encoding) name:是要打开目标文件名的字符串&#xff0c;可以包含文件所在的具体路径 mode:设置打开文件的模式&#xff1a;只读 r 、写入 w 、追加 a encoding&#xff1a;编码格式 UTF-8 fopen("C:/test.txt"…

XSS_Labs靶场通关笔记

每一关的方法不唯一&#xff1b;可以结合源码进行分析后构造payload&#xff1b; 通关技巧&#xff08;四步&#xff09;&#xff1a; 1.输入内容看源码变化&#xff1b; 2.找到内容插入点&#xff1b; 3.测试是否有过滤&#xff1b; 4.构造payload绕过 第一关 构造paylo…

怎么获取二维码的链接?二维码转链接只需3步

怎么从二维码中提取内容呢&#xff1f;现在很多内容都会用二维码方式来存储&#xff0c;但是有些场景下二维码是无法使用的时候&#xff0c;想要查看二维码中的内容&#xff0c;就需要分解二维码成链接后使用。那么二维码分解成链接具体该怎么做呢&#xff1f;今天就将在线二维…

Hammer.js中文教程

一、什么是hammer.js hammerJS是一个开源的&#xff0c;轻量级的触屏设备javascript手势库&#xff0c;它可以在不需要依赖其他东西的情况下识别触摸&#xff0c;鼠标事件。允许同时监听多个手势、自定义识别器&#xff0c;也可以识别滑动方向。 优点&#xff1a; 为移动端网…

[已解决]504 Gateway Time-out 网关超时

文章目录 问题&#xff1a;504 Gateway Time-out 504 Gateway Time-out 网关超时思路解决 问题&#xff1a;504 Gateway Time-out 504 Gateway Time-out 网关超时 思路 网上的常规思路是修改nginx配置文件,增加请求执行时间,试过没有用 keepalive_timeout 600; fastcgi_con…

一文读懂: AIGC基本原理及应用领域

AIGC是利用人工智能技术来生成内容的一种新型技术。随着人工智能技术的不断发展&#xff0c;AIGC技术也得到了越来越广泛的应用。未来&#xff0c;AIGC技术将会对我们的生活和工作产生巨大的影响。 一、AIGC技术的基本原理 AIGC技术的基本原理是利用人工智能技术中…

JAVA学习笔记三

1.java执行流程分析 2.什么是编译 javac Hello.java 1.有了java源文件&#xff0c;通过编译器将其编译成JVM可以识别的字节码文件 2.在该源文件目录下&#xff0c;通过javac编译工具对Hello.java文件进行编译 3.如果程序没有错误&#xff0c;没有任何提示&#xff0c;但在…

[数据结构]-哈希

前言 作者&#xff1a;小蜗牛向前冲 名言&#xff1a;我可以接受失败&#xff0c;但我不能接受放弃 如果觉的博主的文章还不错的话&#xff0c;还请点赞&#xff0c;收藏&#xff0c;关注&#x1f440;支持博主。如果发现有问题的地方欢迎❀大家在评论区指正 本期学习目标&…

LabVIEW准分子激光器控制系统

LabVIEW准分子激光器控制系统是为了实现准分子激光光源在工业、医疗和科研领域的应用集成及其功能的扩展。系统由PC端和激光器端两部分构成&#xff0c;通过光隔离的RS232通讯连接&#xff0c;以实现稳定可靠的控制与通信。 系统主要由微控制单元&#xff08;MCU&#xff09;主…

程序员的你,是不是又被催婚了

程序员作为社会中一个较为特殊的群体&#xff0c;由于工作特性&#xff08;如长时间对着电脑、工作节奏较快、加班相对频繁等&#xff09;以及职业发展需要投入大量时间和精力&#xff0c;有时可能会面临较晚结婚的问题。这也导致了在某些情况下&#xff0c;他们可能被家人或朋…

C#使用TimeSpan对象获取时间间隔

目录 一、TimeSpan基础知识 二、实例 一、TimeSpan基础知识 使用TimeSpan对象可以方便地获取两个时间段的间隔。两个时间信息相减后会得到一个TimeSpan对象&#xff0c;该TimeSpan对象代表时间间隔&#xff0c;可以通过TimeSpan对象的Days、Hours、Minutes、Seconds、Millise…

腾讯云幻兽帕鲁专有服务器配置价格表,4核16G、8核32G

幻兽帕鲁服务器配置CPU内存多大合适&#xff1f;如何选择&#xff1f;最低4核8G起步&#xff0c;4核16G是官方推荐配置&#xff0c;最好是4核32G配置。阿腾云atengyun.com分享幻兽帕鲁Palworld服务器CPU内存配置及租用费用&#xff0c;如下图&#xff0c;Palworld官方推荐服务器…

利用Django搭建python web项目(简单登录)

1.概述 目前市面上web项目大多数是由java语言开发&#xff08;结合spring框架&#xff09;&#xff0c;但这并不意味着只有java语言能够开发web项目&#xff0c;python语言、go语言同样可以做到。本文将利用Django框架&#xff08;由python语言开发的web框架&#xff09;来搭建…

09. Springboot集成sse服务端推流

目录 1、前言 2、什么是SSE 2.1、技术原理 2.2、SSE和WebSocket 2.2.1、SSE (Server-Sent Events) 2.2.2、WebSocket 2.2.3、选择 SSE 还是 WebSocket&#xff1f; 3、Springboot快速集成 3.1、添加依赖 3.2、创建SSE控制器 3.2.1、SSEmitter创建实例 3.2.2、SSEmi…

esp32 操作DS1307时钟芯片

电气参数摘要 有VCC供电&#xff0c;IIC活动状态是1.5mA&#xff0c;待机状态200μA&#xff0c;电池电流5nA(MAX50nA&#xff09;无VCC供电的时候&#xff0c;电池电流&#xff0c;300nA&#xff08;时钟运行&#xff09;&#xff0c;10nA&#xff08;时钟停止&#xff09;供…

ASP.NET Core 7 Web 使用Session

ASP.NET Core 好像不能像20年前那样直接使用Session函数&#xff0c;我使用如下方法 1、在NuGet安装以下2个包 2、在Program.cs注册 //注册Session builder.Services.AddSession(options > {options.IdleTimeout TimeSpan.FromMinutes(60);options.Cookie.HttpOnly fals…

大小端(C语言)

一、什么是大小端&#xff1a; 1.大端(Big-Endian):高地址存放低位 2.小端(Little-Endian):高地址存放高位 例如&#xff1a;0x11223344在内存中存储 大小端影响了什么&#xff1f; 当基本数据类型占用字节数超过了1字节后&#xff0c;大小端决定了数据按照什么顺序存储在…

3. SQL 语言

重点&#xff1a; MySQL 的 三种安装方式&#xff1a;包安装&#xff0c;二进制安装&#xff0c;源码编译安装。 MySQL 的 基本使用 MySQL 多实例 DDLcreate alter drop DML insert update delete DQL select 3&#xff09;SQL 语言 3.1&#xff09;关系型数据库的常见…

C语言-算法-背包

[USACO07DEC] Charm Bracelet S&#xff08;01背包&#xff09; 题目描述 Bessie has gone to the mall’s jewelry store and spies a charm bracelet. Of course, she’d like to fill it with the best charms possible from the N (1 ≤ N ≤ 3,402) available charms. E…

通过LiveNVR实现海康大华华为宇视等监控摄像头在服务器上录像存储,并web无插件直播和回放

支持云端录像服务器上面集中录像存储在部署LiveNVR的服务器上面 1、流媒体服务软件2、配置开启录像(云端录像)3、录像回看(云端录像)3.1、查看录像3.1.1、时间轴视图3.1.2、列表视图 4、云端录像相关接口5、如何分享时间轴录像回看&#xff1f;6、iframe集成示例7、RTSP/HLS/FL…