阻塞式队列-生产者消费者模型

news2025/1/10 8:52:43

1.阻塞队列是什么

阻塞队列是一种特殊的队列. 也遵守 "先进先出" 的原则.
阻塞队列能是一种线程安全的数据结构, 并且具有以下特性:

  • 当队列满的时候, 继续入队列就会阻塞, 直到有其他线程从队列中取走元素.
  • 当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插入元素.
     

阻塞队列的一个典型应用场景就是 "生产者消费者模型". 这是一种非常典型的开发模型.
 

2.生产者消费者模型

生产者消费者模型有啥用?

  • 在开发中起到服务器之间"解耦合"的效果~~
  • 在请求突然暴增的峰值中,起到"削峰填谷"的效果~~ 

 

如果A和B是直接相互调用.
此时A就得知道B的存在,B也得知道A的存在.
并且A需要知道B提供的接口是什么.B也得知道A是通过啥方式来调用的~~(耦合比较强)
如果其中一个服务挂了,另一个服务也好不到哪去~~ 

 

A不需要知道B存在,B也不需要知道A存在

双方只要认识这个阻塞队列即可~~

如果其中的一个服务挂了,并不影响另一个服务正常工作~~

 

 

如果A收到的请求突然暴增了,此时B收到的请求也会暴增!!!

如果B本身计算量已经很大了请求再次暴增,可能B就挂了~~

(每个请求处理的时候都需要分配硬件资源)

 

由阻塞队列来承担A这边的压力.
B仍然按照原有的节奏来消费队列中的数据~~ 

 

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取.
 

1) 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力.

比如在 "秒杀" 场景下, 服务器同一时刻可能会收到大量的支付请求. 如果直接处理这些支付请求,服务器可能扛不住(每个支付请求的处理都需要比较复杂的流程). 这个时候就可以把这些请求都放到一个阻塞队列中, 然后再由消费者线程慢慢的来处理每个支付请求.这样做可以有效进行 "削峰", 防止服务器被突然到来的一波请求直接冲垮

 2) 阻塞队列也能使生产者和消费者之间 解耦.

比如过年一家人一起包饺子. 一般都是有明确分工, 比如一个人负责擀饺子皮, 其他人负责包. 擀饺子皮的人就是 "生产者", 包饺子的人就是 "消费者".擀饺子皮的人不关心包饺子的人是谁(能包就行, 无论是手工包, 借助工具, 还是机器包), 包饺子的人也不关心擀饺子皮的人是谁(有饺子皮就行, 无论是用擀面杖擀的, 还是拿罐头瓶擀, 还是直接从超市买的)

3.标准库中的阻塞队列

在 Java 标准库中内置了阻塞队列. 如果我们需要在一些程序中使用阻塞队列, 直接使用标准库中的即可.

  • BlockingQueue 是一个接口. 真正实现的类是 LinkedBlockingQueue.
  • put 方法用于阻塞式的入队列, take 用于阻塞式的出队列.
  • BlockingQueue 也有 offer, poll, peek 等方法, 但是这些方法不带有阻塞特性.

生产者消费者模型的实现

package thread2;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Test2 {
    public static void main(String[] args) {
        BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
        Thread producer = new Thread(() -> {
            int n = 1;
            while (true) {
                try {
                    blockingQueue.put(n);
                    System.out.println("生产者生产了 " + n);
                    Thread.sleep(1000);
                    n++;
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        producer.start();
        Thread customer = new Thread(() -> {
            while (true) {
                try {
                    int n = blockingQueue.take();
                    System.out.println("消费者消费了 " + n);
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        customer.start();
    }
}

阻塞队列实现(自己造轮子实现生产者消费者模型)

  • 通过 "循环队列" 的方式来实现.
  • 使用 synchronized 进行加锁控制.
  • put 插入元素的时候, 判定如果队列满了, 就进行 wait. (注意, 要在循环中进行 wait. 被唤醒时不一定队列就不满了, 因为同时可能是唤醒了多个线程).
  • take 取出元素的时候, 判定如果队列为空, 就进行 wait. (也是循环 wait)
     
package thread2;

class MyBlockingQueue {
    private int[] items = new int[1000];
    private int size = 0;
    private int head = 0;
    private int tail = 0;

    private Object locker = new Object();

    public void put(int val) throws InterruptedException {
        synchronized (locker) {
            if (size == items.length) {
                // return;
                locker.wait();
            }
            items[tail] = val;
            tail++;
            if (tail >= items.length) {
                tail = 0;
            }
            size++;
            locker.notify();//唤醒take
        }
    }

    public Integer take() throws InterruptedException {
        synchronized (locker) {
            if (size == 0) {
                locker.wait();
//                return null;
            }
            int ret = items[head];
            head++;
            if (head >= items.length) {
                head = 0;
            }
            size--;
            locker.notify();//唤醒put
            return ret;
        }
    }
}

public class Test1 {
    private static MyBlockingQueue queue = new MyBlockingQueue();

    public static void main(String[] args) throws InterruptedException {
        Thread producer = new Thread(() -> {
            int n = 1;
            while (true) {
                try {
                    queue.put(n);
                    System.out.println("生产者生产了:" + n);
                    n++;
                    //Thread.sleep(500);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }

            }
        });
        producer.start();

        Thread customer = new Thread(() -> {
            while (true) {
                try {
                    int n = queue.take();
                    System.out.println("消费者消费了 " + n);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        customer.start();
    }
}



 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/336522.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

HLNet代码debug记录

昨天跑HLNet的代码&#xff0c;配环境的时候又双叒叕遇到了一些问题&#xff0c;记录一下&#xff1a; 1.error: identifier “AT_CHECK“ is undefined 出现在python setup.py build develop的时候 参照https://blog.csdn.net/sinat_29957455/article/details/113334944 根据报…

如何在腾讯云服务器上安装Jupyter Notebook示例?

Jupyter简介及服务器端安装 首先&#xff0c;服务器端安装Jupyter。 sudo pip3 install jupyterlab&#xff1a; 启动Jupyter服务 # 设置jupyter web的密码jupyter-notebook password# 创建jupyter工作目录mkdir ~/jupyter_workspace# 启动jupyter (两次ctrlc停止服务)jup…

图解LeetCode——剑指 Offer 32 - III. 从上到下打印二叉树 III

一、题目 请实现一个函数按照之字形顺序打印二叉树&#xff0c;即&#xff1a;第一行按照从左到右的顺序打印&#xff0c;第二层按照从右到左的顺序打印&#xff0c;第三行再按照从左到右的顺序打印&#xff0c;其他行以此类推。 二、示例 2.1> 示例1 提示&#xff1a; …

【软件测试】资深测试工程师说:你真的能做好bug分析吗?

目录&#xff1a;导读前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09;前言 bug报告&#xff0c…

Revive:从间谍软件进化成银行木马

2022 年 6 月&#xff0c;Cleafy 研究人员发现了一个新的安卓银行木马 Revive。之所以选择 Revive 这个名称&#xff0c;是因为恶意软件为防止停止工作启用的一项功能名为 revive。 Revive 属于持续潜伏的那一类恶意软件&#xff0c;因为它是为特定目标开发和定制的。这种类型…

Python 四种推导式,包含实例演示

嗨害大家好鸭&#xff01;我是小熊猫~ 这次继续来给大家带来python基础内容~ 源码资料电子书:点击此处跳转文末名片获取 Python 推导式 Python 推导式是一种独特的数据处理方式&#xff0c;可以从一个数据序列构建另一个新的数据序列的结构体。 Python 支持各种数据结构的推…

关于进行vue-cli过程中的解决错误的问题

好久没发文章了&#xff0c;直到今天终于开始更新了&#xff0c;最近想进军全端&#xff0c;准备学习下vue&#xff0c;但是这东西真的太难了&#xff0c;我用了一天的时间来解决在配置中遇到的问题&#xff01;主要问题&#xff1a;cnpm文件夹和vue-cli文件夹的位置不对并且vu…

秒杀项目之网关服务限流熔断降级分布式事务

目录一、网关服务限流熔断降级二、Seata--分布式事务2.1 分布式事务基础2.1.1 事务2.1.2 本地事务2.1.3 分布式事务2.1.4 分布式事务场景2.2 分布式事务解决方案2.2.1 全局事务可靠消息服务2.2.2 最大努力通知2.2.3 TCC事事务三、Seata介绍四、 Seata实现分布式事务控制4.1 案例…

【Android】Binder的理解

1.Binder是什么&#xff1f; 对于android而言&#xff0c;是跨进程传输的通道&#xff0c;是封装好的java类&#xff0c;可以直接继承和使用。 从组成、模型来讲&#xff0c;我认为是连接Server层、Client层、ServerManager层的纽带&#xff0c;也是驱动。 2.Binder的基础概…

RMI攻击Registry的两种方式

概述 RMI(Remote Method Invocation) &#xff1a;远程方法调用 它使客户机上运行的程序可以通过网络实现调用远程服务器上的对象&#xff0c;要实现RMI&#xff0c;客户端和服务端需要共享同一个接口。 基础 Client 和 Regisry 基于 Stub 和 Skeleton 进行通信&#xff0c…

ContextCapture Master 倾斜摄影测量实景三维建模

ContextCapture实景建模大师是一套无需人工干预&#xff0c;通过影像自动生成高分辨率的三维模型的软件解决方案。它集合了全球最先进数字影像处理、计算机虚拟现实以及计算机几何图形算法&#xff0c;在易用性、数据兼容性、运算性能、友好的人机交互及自由的硬件配置兼容性等…

花1分钟配置远程DEBUG,开发效率翻倍,妹子直呼绝绝子

当把一个工程部署到远程服务器后有可能出现意想不到错误&#xff0c;日志打印过多或者过少都影响问题排查的效率&#xff0c;这个时候可以通过远程调试的方式快速定位bug&#xff0c;提升工作效率。本文主要讲解如何使用Idea开发工具进行远程调试&#xff0c;希望对你有帮助。 …

微信小程序授权登录流程

自我介绍我是IT果果日记&#xff0c;微信公众号请搜索 IT果果日记一个普通的技术宅&#xff0c;定期分享技术文章&#xff0c;欢迎点赞、关注和转发&#xff0c;请多关照。首先&#xff0c;我们要了解什么是微信小程序登录&#xff1f;它的作用是什么&#xff1f;用户登录微信小…

使用Fetch时,post数据时,后端接收的Content-Type为text/plain

在使用 Fetch做一个前端的post请求时&#xff0c;直接从网上抄了一段代码 export async function postData(url, data){const response await fetch(url, {method: POST, // *GET, POST, PUT, DELETE, etc.mode: no-cors, // no-cors, *cors, same-originheaders: { Content-…

xshell 工具连接不上本地的 Centos 7虚拟机,4种情况,逐个分析

导读 小编之前使用过 VMware workstation 工具搭建 Centos 7 版本的虚拟机集群&#xff0c;各项功能都正常&#xff0c;用完了也就清除了&#xff08;节约本地空间&#xff09;。因为最近学习大数据&#xff0c;需要从新安装虚拟机&#xff0c;结果发现并不如第一次那么顺利。所…

TDengine时序数据库的简单使用

最近学习了TDengine数据库&#xff0c;因为我们公司有硬件设备&#xff0c;设备按照每分钟&#xff0c;每十分钟&#xff0c;每小时上传数据&#xff0c;存入数据库。而这些数据会经过sql查询&#xff0c;统计返回展示到前端。但时间积累后现在数据达到了百万级数据&#xff0c…

Qt使用workflow

Qt工程设置 QMAKE_CFLAGS_DEBUG -MTd QMAKE_CXXFLAGS_DEBUG -MTd上述内容必须设置&#xff0c;否则会报错&#xff1a;error LNK2038: 检测到“RuntimeLibrary”的不匹配项: 值“MTd_StaticDebug”不匹配值“MDd_DynamicDebug。 libworkflow.pri的文件内容如下&#xff1a; …

算法第十五期——动态规划(DP)之各种背包问题

目录 0、背包问题分类 1、 0/1背包简化版 【代码】 2、0/ 1背包的方案数 【思路】 【做法】 【代码】 空间优化1&#xff1a;交替滚动 空间优化2&#xff1a;自我滚动 3、完全背包 【思路】 【代码】 4、分组背包 核心代码 5、多重背包 多重背包解题思路1:转化…

MySQL8 创建用户,设置修改密码,授权

MySQL8 创建用户,设置修改密码,授权 MySQL5.7可以 (创建用户,设置密码,授权) 一步到位 &#x1f447; GRANT ALL PRIVILEGES ON *.* TO 用户名% IDENTIFIED BY 密码 WITH GRANT OPTION&#x1f446;这样的语句在MySQL8.0中行不通, 必须 创设和授权 分步执行&#x1f447; CR…

如何动态生成列字段?请看向这里哟

&#x1f343; 场景前言 &#x1f420;一般而言&#xff0c;某个简单查询接口涉及到得表结构不超过三个。如果不是单表操作的话&#xff0c;多个表中间用到联合查询的SQL也可以解决相关问题。但是&#xff0c;事与愿违的是我们的业务是跟着场景走的&#xff0c;并不是所有的业务…