【多线程】深入剖析生产者-消费者模型

news2025/1/24 5:43:29

💐个人主页:初晴~

📚相关专栏:多线程 / javaEE初阶


一、阻塞队列

阻塞队列是⼀种特殊的队列,也遵守 "先进先出" 的原则。是在普通的队列基础上做出了补充。
java标准库中的原有的队列Queue及其子类,默认都是线程不安全的,而阻塞队列能是⼀种线程安全的数据结构,具有阻塞特性
  • 当队列满的时候, 继续⼊队列就会阻塞, 直到有其他线程从队列中取⾛元素
  • 当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插⼊元素
基于阻塞队列最大的应用场景就是“生产消费者模型”了。

二、生产者消费者模型

1、概念

比如一个生产线需要制作零件,组装两个步骤,这时就由两种方式进行生产部署:

1、每个人都分别进行零件制作和组装两个操作

2、将员工分成两拨人,一拨人专门负责制作零件,另一拨人专门负责组装

显然第一种方法是比较低效的,制作零件的机器数量是有限的,所有人就会去竞争这几台机器,可能会导致阻塞等待导致效率低下。而第二种方法则实现了各司其职,自然效率会更高,这时零件制作就相当于是生产者,组装就相当于是消费者,这种方式就被称之为“生产者消费者模型”,在后端开发中经常会涉及到。

2、作用

⽣产者消费者模式就是通过⼀个容器来解决⽣产者和消费者的强耦合问题
⽣产者和消费者彼此之间不直接通讯,⽽通过阻塞队列来进⾏通讯,所以⽣产者⽣产完数据之后不⽤等待消费者处理,直接扔给阻塞队列,消费者不找⽣产者要数据,⽽是直接从阻塞队列⾥取.
1. 阻塞队列能使⽣产者和消费者之间解耦合.
直接耦合的代码结构:
这样A与BC的耦合度非常高,出一点问题就可能导致程序崩溃,并且对于后期维护来说也十分不方便。
引入生产者消费者模型的代码结构:
这样显然A与B直接就解耦合了,不会直接关联。
注意:
阻塞队列是一种 数据结构,由于比较好用,会将其单独封装为一个服务器程序,并且在单独的服务器机器上进行部署。这时的阻塞队列就被称之为 “消息队列”(Messae Queue,MQ)了。而消息队列相对是比较成熟的,代码不会频繁修改,因此我们认为A与队列,B与队列之间的交互是 低耦合的。

2. 阻塞队列就相当于⼀个缓冲区,平衡了⽣产者和消费者的处理能⼒. (削峰填⾕)

⽐如在 "秒杀" 场景下, 服务器同⼀时刻可能会收到⼤量的⽀付请求. 如果直接处理这些⽀付请求, 服务器可能扛不住(每个⽀付请求的处理都需要⽐较复杂的流程). 这个时候就可以把这些请求都放到⼀个阻塞队列中, 然后再由消费者线程慢慢的来处理每个⽀付请求.
这样做可以有效进⾏ "削峰", 防⽌服务器被突然到来的⼀波请求直接冲垮.
(1)为什么一个服务器请求收到过多就有可能会崩溃?
一台服务器就是一台“电脑”,提供了一些 硬件资源(CPU、内存、硬盘、网络带宽……),服务器每次收到一个请求,在处理的过程中就需要执行一系列代码,执行代码的过程中就需要 消耗一定的硬件资源,而服务器的硬件资源是 有限的,当请求需要消耗的硬件资源的量 超过机器上限时,机器就会出现问题(卡死、程序崩溃……)
(2)当请求激增时,为什么A与消息队列不会崩溃?
A相当于是一个 “网关服务器”,负责接收客户端的请求,再把请求发给其它服务器。这样的服务器做的工作是比较简单的( 单纯的数据转发),处理同样一个请求消耗的硬件资源更少,在同等情况下能处理更多请求。消息队列中的程序也比较简单,消耗的资源也相对较少,因此他们都不太容易会崩溃。
而B是真正 处理业务逻辑的服务器,代码量更加庞大, 消耗的时间与硬件资源更多,当请求激增时就更容易崩溃了。
优点:
1、解耦合
2、削峰填谷
缺点:
1、需要更多的机器来部署消息队列
2、生产者与消费者之间的通信会延时,响应时间会变长

三、实现方式

1、手写一个阻塞队列

在之前的文章中,我们曾在 栈和队列 一文深入探讨过队列的写法,这里就不做过多赘述,直接看一下最基础的队列的代码实现:

class MyBlockingQueue{
    private String[] data=null;
    private int head=0;
    private int tail=0;
    private int size=0;
    public MyBlockingQueue(int capacity){
        data=new String[capacity];
    }

    public void put(String s){
        if(size==data.length){
            //队列满了
            return;
        }
        data[tail]=s;
        tail++;
        if(tail>=data.length){
            tail=0;
        }
        size++;
    }

    public String take(){
        if(size==0){
            //队列为空
            return null;
        }
        String ret=data[head];
        head++;
        if(head>=data.length){
            head=0;
        }
        size--;
        return ret;
    }
}

由于put与take方法中涉及了很多的修改操作,这样的代码在多线程环境下肯定是会有线程安全问题的,博主在 深入剖析线程安全问题 一文中做过详细分析。

那么该如何解决这一问题呢,显然就是通过加锁操作了:

但这样还是不够的。当线程发现队列为空时,不应该继续再去参与锁的竞争应该直接进入阻塞等待的状态,等到其它线程调用put方法,队列不为空时,再恢复执行,从而避免浪费不必要的资源,提高执行效率,这时就可以利用wait-notify来解决了。这在博主的 等待通知机制 一文过详细的介绍。于是最终代码实现就如下:

class MyBlockingQueue{
    private String[] data=null;
    private int head=0;
    private int tail=0;
    private int size=0;
    public MyBlockingQueue(int capacity){
        data=new String[capacity];
    }

    public void put(String s){
        synchronized (this){
            if(size==data.length){
                //队列满了
                return;
            }
            data[tail]=s;
            tail++;
            if(tail>=data.length){
                tail=0;
            }
            size++;
            this.notify();
        }
    }

    public String take() throws InterruptedException {
        synchronized (this){
            if(size==0){
                //队列为空
                this.wait();
            }
            String ret=data[head];
            head++;
            if(head>=data.length){
                head=0;
            }
            size--;
            return ret;
        }
    }
}

2、实现生产者消费者模型

在 Java 标准库中内置了阻塞队列。如果我们需要在⼀些程序中使⽤阻塞队列, 直接使⽤标准库中的BlockingQueue即可。
  • BlockingQueue 是⼀个接⼝. 真正实现的类LinkedBlockingQueue/ArrayBlockingQueue等
  • put ⽅法⽤于阻塞式的⼊队列, take ⽤于阻塞式的出队列
  • BlockingQueue 也有 offer, poll, peek 等⽅法, 但是这些⽅法不带有阻塞特性

使用示例:

public class Main {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> queue=new ArrayBlockingQueue<>(3);
        queue.put("111");
        System.out.println("put 成功");
        queue.put("111");
        System.out.println("put 成功");
        
        queue.take();
        System.out.println("take 成功");
        queue.take();
        System.out.println("take 成功");
        queue.take();
        System.out.println("take 成功");
    }
}

我们可以看到当执行第三个take时,由于此时队列为空,因此线程就会进入阻塞状态了。

接着我们就可以用它来试着简单实现一个生产者消费者模型了:

public class Main{
    public static void main(String[] args) {
        BlockingQueue<Integer> queue=new ArrayBlockingQueue<>(1000);
        //生产者线程
        Thread t1=new Thread(()->{
            int i=1;
            while(true){
                try {
                    queue.put(i);
                    System.out.println("生产元素 "+ i);
                    i++;
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        
        //消费者线程
        Thread t2=new Thread(()->{
            while (true){
                try {
                    Integer i=queue.take();
                    System.out.println("消费元素 "+ i);
                    
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        
        t1.start();
        t2.start();
    }
}

其中t1就作为生产者线程,t2作为消费者线程,让我们试试看让生产慢于消费会发生什么:

这时我们得益于生产者消费者模型,由于生产会慢一些,消费者就会进入阻塞等待,每当生产者线程生产一个元素,便紧接着消费一个元素,从而形成了这种井然有序的执行结果。

让我们再试试消费慢于生产会发生什么:

我们可以看到虽然上传线程一下产生了非常多生产元素,但是由于阻塞队列的存在,消费线程依旧会不紧不慢的依次处理消费掉元素,这能有效防止服务器在高并发环境下面对激增的需求量无法执行而崩溃。

总结

生产者消费者模型可以有效地让程序解耦合,便于后续代码的维护和优化,并且可以有效地“削峰填谷”,在高并发场景下能有效缓解服务器压力,避免服务器崩溃。在以后的代码开发中是非常重要一种结构。


那么本篇文章就到此为止了,如果觉得这篇文章对你有帮助的话,可以点一下关注和点赞来支持作者哦。作者还是一个萌新,如果有什么讲的不对的地方欢迎在评论区指出,希望能够和你们一起进步✊

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2106458.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ElasticSearch-ELK

Logstash Logstash 配置文件结构Logstash 导入数据到 ES同步数据库数据到 ES FileBeatELK&#xff08;采集 Tomcat 服务器日志&#xff09; 使用FileBeats将日志发送到LogstashLogstash输出数据到Elasticsearch&#xff08;logstash开头的索引&#xff09; 利用Logstash过滤器解…

JVM4-运行时数据区

目录 概述 程序计数器 栈 Java虚拟机栈 概述 栈帧的组成 局部变量表 操作数栈 帧数据 栈内存溢出 本地方法栈 堆 方法区 类的元信息 运行时常量池 方法区的实现 方法区的溢出 字符串常量池 直接内存 概述 Java虚拟机在运行Java程序过程中管理的内存区域&am…

一款好看的导航网HTML源码((全静态页面带特效)

源码介绍 一款好看的导航网HTML源码(全静态页面带特效)&#xff0c;页面自适应&#xff0c;单页源码没有后台&#xff0c;需要的下载。 效果预览 源码获取 一款好看的导航网HTML源码

JENV版本管理工具

下载地址&#xff1a;https://github.com/jenv/jenv 安装步骤 将其添加到PATH中 使用方法&#xff08;注意&#xff1a;局部配置会覆盖全局配置。使用命令可覆盖局部&#xff09; 添加新的Java环境&#xff08;需提供绝对路径&#xff09; jenv add <名称> <路径>…

AIoTedge IoT平台替代网关、PLC和HMI,实现智慧农业大棚控制

AIoTedge作为一个集成了边缘物联网平台、软网关和边缘AI平台的创新产品&#xff0c;它通过边缘计算技术实现了数据的即时处理和智能分析&#xff0c;有效降低了延迟和带宽消耗。在智慧农业大棚的智能控制中&#xff0c;AIoTedge可以替代传统的网关、PLC和HMI&#xff08;人机界…

嵌入式全栈开发学习笔记---C++(运算符重载)

目录 运算符重载概念 运算符重载语法 运算符重载的两种方法 运算符重载的步骤 运算符重载限制 运算符重载原则 重载输出运算符 如何判断返回引用还是普通变量&#xff1f; 赋值运算符重载 重载自增运算符 重载数组下标运算符[ ] 重载函数调用运算符( ) 不要重载逻…

聚乙二醇-降冰片烯!有啥用?打印各种3D结构支持细胞培养!

大家好&#xff0c;今天我们来了解一项关于3D生物打印的研究——《Poly(ethylene glycol)-Norbornene as a Photo-Click Bioink for Digital Light Processing 3D Bioprinting》发表于《ACS Applied Materials & Interfaces》。3D生物打印在组织工程和再生医学领域具有重要…

【RAG】LongRAG:利用长上下文LLMs增强检索增强生成

前言 现有的RAG框架通常使用100词的短段落作为检索单元&#xff0c;这种设计使得检索器需要在大量语料库中搜索&#xff0c;增加了工作负担&#xff0c;并且容易引入难负样本&#xff0c;影响性能。LongRAG框架为了解决这一问题&#xff0c;该框架使用长检索单元&#xff08;最…

伴奏提取免费软件怎么选?这5款工具让编辑更简单

音频提取&#xff0c;听起来可能有点技术范&#xff0c;但其实它就在我们生活的各种小细节里。 比如在网上看到一个超有感觉的视频&#xff0c;背景音乐简直完美&#xff0c;但你只想保存这段音乐&#xff1b;又或者你是个播客&#xff0c;需要从一段采访中提取清晰的对话声轨…

全国百佳出版社专著转让

1、信息化XXX高校英语教学模式研究 2、高校计算机教学XXXXXX革创新研究 3、文创产品设计XXXXXX 4、多元化语文教学&#xff1a;XXX策略

拆解kolors古诗文绘本comfyui工作流,学习提示词多样性组合!

前言 三人行必有我师&#xff0c;在comfyui工作流的搭建中这句话同样有用&#xff0c;几天内我们就学习如何借助kolors这个更具中国元素的模型生古诗文绘本工作流&#xff01; 老规矩先说原理&#xff0c;借助CR prompt list节点 1.搭建kolors文生图工作流 不会搭建kolors文生…

21.新增管理员页面制作

新增管理员页面制作 1.修改AdminUser.vue <template><el-main><!-- 搜索栏 --><el-form :model"searchParm" :inline"true" size"default"><el-form-item><el-input v-model"searchParm.nickName"…

9. GIS技术支持工程师岗位职责、技术要求和常见面试题

本系列文章目录&#xff1a; 1. GIS开发工程师岗位职责、技术要求和常见面试题 2. GIS数据工程师岗位职责、技术要求和常见面试题 3. GIS后端工程师岗位职责、技术要求和常见面试题 4. GIS前端工程师岗位职责、技术要求和常见面试题 5. GIS工程师岗位职责、技术要求和常见面试…

STM32(九):定时器——TIM编码器接口

Encoder Interface 编码器接口 编码器接口可接收增量&#xff08;正交&#xff09;编码器的信号&#xff0c;根据编码器旋转产生的正交信号脉冲&#xff0c;自动控制CNT自增或自减&#xff0c;从而指示编码器的位置、旋转方向和旋转速度。 每个高级定时器和通用定时器都拥有1个…

深度学习(七)-计算机视觉基础

计算机视觉 计算机视觉在广义上是和图像相关的技术总称。包括图像的采集获取&#xff0c;图 像的压缩编码&#xff0c;图像的存储和传输&#xff0c;图像的合成&#xff0c;三维图像重建&#xff0c;图像增强&#xff0c;图像修复&#xff0c;图像的分类和识别&#xff0c;目…

探索中国星坤:构建全球合作网络,服务全球客户!

在全球化的浪潮中&#xff0c;中国星坤以其卓越的产品和服务&#xff0c;成为全球通信、计算机、医疗电子等多个行业的领军企业。通过构建广泛的全球代理商合作伙伴网络&#xff0c;星坤不仅提升了自身的品牌影响力&#xff0c;更确保了其产品和服务的全球覆盖。本文将探讨星坤…

windows 如何使用免安装版 node?

由于工作需要&#xff0c;本机无法自主安装软件&#xff0c;于是产生了这样一个需求。苦寻全网良久才实现&#xff0c;所以必须把这个过程记录下来&#xff0c;以防后期再次遇到同样的问题。 &#xff08;1&#xff09;首先免安装版 node 下载 node 下载地址 根据自己的需要…

calibre:如何find object

我正在「拾陆楼」和朋友们讨论有趣的话题,你⼀起来吧? 拾陆楼知识星球入口 往期文章:

最新热门火爆小程序项目 在线敲木鱼小程序源码系统 功能强大 带完整的安装代码包以及搭建教程

系统概述 本系统采用微信小程序框架开发&#xff0c;充分利用了微信平台庞大的用户基础及丰富的生态资源。技术架构上&#xff0c;主要包括前端界面设计、后端逻辑处理、数据库管理以及云服务等部分。前端采用微信小程序提供的WXML、WXSS等语言进行页面布局与样式设计&#xf…

大白话说什么是“MLLM”多模态大语言模型

1. 什么是MLLM多模态大语言模型 1.1 先来思考一个问题 如果上传了一张图片&#xff0c;并向大模型提问。“图片中绿色框框中的人是谁&#xff1f;” 大模型回答&#xff1a;“那是波多野吉衣老师” 请问&#xff0c;大模型是怎么做到的&#xff1f; 我们用常规的思路来想一…