【Java】一文搞懂生产者和消费者模型

news2024/12/25 9:48:38

  • 阻塞队列的概念
  • 生产者消费者模式
    • 消息队列
    • 消息队列的作用
  • JDK中的阻塞队列
  • 实现阻塞队列
  • 实现生产者消费者模型

阻塞队列的概念

之前介绍过队列,是一种数据结构,先进先出FIFO。阻塞队列也满足队列的特性,不过有特殊之处:
入队元素时,先判断一下队列是否已满,如果满了就等待(阻塞),当有空余空间时再插入;
出队元素时,先判断一下队列是否空了,如果空了就等待(阻塞),当队列中有元素时再取出。

现实生活中的例子:包饺子
1.每个人各擀各的饺子皮,各包各的饺子
这种情况大概率会出现争抢擀面杖的现象,在多线程环境下就是锁竞争。
2.一个人专门来擀饺子皮,其他人负责包饺子
当饺子皮多的时候,擀饺子皮的人就可以休息一会儿;当没有饺子皮的时候就得一直擀;
当饺子皮多的时候,包饺子的人就要一直去包饺子;当没有饺子皮的时候就停下来休息;
在这里插入图片描述

上述这个例子中,擀皮的人可以成为生产者,包饺子的人称为消费者,放饺子皮的地方就是一个交易场所,这个场所就可以用阻塞队列实现。这就是阻塞队列的一个典型应用场景—— “生产者消费者模型”,这是一种非常典型的开发模型。

生产者消费者模式

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取。在具体项目中,使用消息队列这样一个“中间件”来实现这个功能。

消息队列

在基础的数据结构上,做了一些针对应用场景的优化和实现,把这样的一些框架和软件称为“中间件”。消息队列就是一个“中间件”,本质上是一个阻塞队列,在此基础上把放入阻塞队列的消息打了一个标签。比如卖包子场景:
在这里插入图片描述
假如说刚出锅的全是豆沙包,那么只需要处理买豆沙包的消息,消息的标签就可以实现分组的作用

消息队列的作用

1.解耦
比如下面这种情况,服务器A和服务器B必须知道对方的存在,且参数必须要约定好,如果有其中一方挂了,那么整个流程都会受影响。这种情况下如果某一个功能需要修改,涉及的服务器可能都需要修改代码。这种情况就说耦合度非常高,不建议这种系统组织方式。
在这里插入图片描述
通过消息队列把三个系统进行解耦,是他们不再进行直接通信,起到了单独维护,单独运行的效果
在这里插入图片描述
在设计程序的时候提出过一些要求:比如高内聚,低耦合。高内聚就是把功能强相关的代码写在一起,维护起来非常方便。这是一种组织代码的方式。低耦合就是不要把相同的代码写的到处都是,一般是通过抽象的方式把代码封装成方法,使用的时候调用即可。良好的代码组织方式,可以有效的降低维护成本。

2.削峰填谷
峰和谷是指消息的密集程度。
比如双十一期间流量会暴增,在这个链路中,任何一个节点出现问题都会影响整个业务流程。
在这里插入图片描述
这时你可能会想,可以使用多组链路,不同的用户可以通过不同的链路来访问(负载均衡),这样可以解决流量暴增带来的问题。确实可以解决这个问题,但是需要思考的是,流量不会一直在峰值,大部分时间流量都是正常状态, 此时部署的其他链路就用不上了,这无疑增加了好几倍的花销。

假设银行一秒只能处理200个订单,物流公司一秒只能处理100个订单,当调用第三方接口时,如果调用次数达到了上限就阻塞一会儿。此时使用消息队列,在流量激增的时候用消息队列缓冲(削峰)在流量减少的时候,把消息队列中存储的消息一点点消费(填谷)。最终让系统和硬件配置达到平衡。
在这里插入图片描述

3.异步
同步是指请求方必须死等对方的响应。
异步是指发出请求之后,自己去干别的事情,有响应时会接收到通知从而处理响应

JDK中的阻塞队列

JDK提供了多种不同的阻塞队列,可以根据不同的业务场景选择不同的阻塞队列实现方式。

public class Demo01_BlockingQueue {
    public static void main(String[] args) {
        //定义阻塞队列
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3);
        BlockingQueue<Integer> queue1 = new ArrayBlockingQueue<>(3);
        BlockingQueue<Integer> queue2 = new PriorityBlockingQueue<>(3);
    }
}

1.添加元素
在定义阻塞队列时可以给定初始化容量。下面这个示例中,由于当前的阻塞容量是3,所以当插入第四个元素时就会发生阻塞。

public class Demo01_BlockingQueue {
    public static void main(String[] args) throws InterruptedException {
        //定义一个阻塞队列
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3);
        //往队列中写入元素
        queue.put(1);
        queue.put(2);
        queue.put(3);
        System.out.println("已经插入了三个元素");
        queue.put(4);
        System.out.println("已经插入了第四个元素");
    }
}

在这里插入图片描述
2.取出元素

阻塞队列中获取元素不使用poll()方法,而是使用take()方法,会产生阻塞效果
在这里插入图片描述

public class Demo01_BlockingQueue {
    public static void main(String[] args) throws InterruptedException {
        //定义一个阻塞队列
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3);
        //往队列中写入元素
        queue.put(1);
        queue.put(2);
        queue.put(3);
        System.out.println("已经插入了三个元素");
        System.out.println(queue);
        //阻塞队列中获取元素使用take方法,会产生阻塞效果
        System.out.println("开始获取元素");
        System.out.println(queue.take());
        System.out.println(queue);
    }
}

在这里插入图片描述
当获取完阻塞队列中所有元素时,此时阻塞队列为空,再继续获取元素时,会进入阻塞状态。

public class Demo01_BlockingQueue {
    public static void main(String[] args) throws InterruptedException {
        //定义一个阻塞队列
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3);
        //往队列中写入元素
        queue.put(1);
        queue.put(2);
        queue.put(3);
        System.out.println("已经插入了三个元素");
        System.out.println(queue);
        //阻塞队列中获取元素使用take方法,会产生阻塞效果
        System.out.println("开始获取元素");
        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println("已经获取了三个元素");
        System.out.println(queue.take());
        System.out.println("已经获取了四个元素");
    }
}

在这里插入图片描述

实现阻塞队列

在学习数据结构时,实现一个普通队列底层用到了两种数据结构:循环数组和链表。阻塞队列就是在普通队列上加入了阻塞等待的操作。这个阻塞等待的操作就是等待(wait)和唤醒(notify)。
确定加锁的范围:
这两个方法和synchronized是强相关的,所以要加入synchronized,此时要先确定加锁的范围。在put()和take()方法中,根据修改共享变量的范围,加入synchronized。由于整个方法都在修改共享变量,所以给整个方法加锁。如果一个对象需要new出来使用,那么锁对象一般是this,此时的锁对象是this即可。
确定等待的时机:
在添加元素中,当前数组已满时,此时要阻塞等待;在取元素时,当阻塞队列为空时,此时要阻塞等待。
确定唤醒的时机:
在添加元素的方法中,执行put操作的最后一步之后再执行唤醒操作,把取元素的线程唤醒;在取元素的方法中,当前队列有空余位置的时候,唤醒添加元素的线程。

代码实现

public class MyBlockingQueue {
    //定义一个保存元素的数组
    private int[] elementData = new int[100];
    //定义队首下标
    private int head;
    //定义队尾下标
    private int tail;
    //定义有效元素的个数
    private int size;


    //插入一个元素
    public void put(int value) throws InterruptedException {
        //根据修改共享变量的范围加锁
        //锁对象是this即可
        synchronized (this){
            //判断数组是否已满
            while(size >= elementData.length){
                //阻塞等待
                this.wait();
            }
            //向队尾插入元素
            elementData[tail] = value;
            //移动队尾下标
            tail ++;
            //修正队尾下标
            if (tail>=elementData.length) {
                tail = 0;
            }
            //修改有效元素个数
            size ++;
            //做唤醒操作
            this.notifyAll();
        }

    }

    //获取一个元素
    public int take() throws InterruptedException {
        //根据修改共享变量的范围加锁
        //锁对象是this即可
        synchronized (this){
            //判断队列是否为空
            while (size<=0) {
                this.wait();
            }
            //从队首出队
            int value = elementData[head];
            //移动队首下标
            head ++;
            //修正队首下标
            if (head>=elementData.length) {
                head = 0;
            }
            //修改有效元素个数
            size --;
            //唤醒操作
            this.notifyAll();
            //返回队首元素
            return value;
        }
    }
}

⚠️⚠️⚠️注意:在wait方法的官方文档中指出:“线程可以在没有通知、中断或超时的情况下唤醒,即所谓的虚假唤醒。虽然在这种情况下在实践中很少发生,但是应用程序必须通过测试导致线程被唤醒的条件来防止这种情况,如果条件不满足,则继续等待。换句话说,等待应该总是出现在循环中”。
也就是说,第一次满足wait条件时,线程进入阻塞状态,被唤醒之后,这期间会发生很多事情,有一种可能是被唤醒后,等待的条件依然成立,所以需要再次检查等待条件,如果满足就继续阻塞等待。即就是在实现上述阻塞队列时,检查wait的判断条件时,用while来判断,而非if
在这里插入图片描述

测试自定义阻塞队列

public class Demo02_MyBlockingQueue {
    public static void main(String[] args) throws InterruptedException {
        //定义一个阻塞队列
        MyBlockingQueue queue = new MyBlockingQueue();
        //往队列中写入元素
        queue.put(1);
        queue.put(2);
        queue.put(3);
        System.out.println("已经插入了三个元素");

        System.out.println("开始获取元素");
        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println("已经获取了三个元素");
        System.out.println(queue.take());
        System.out.println("已经获取了四个元素");
    }
}

在这里插入图片描述

实现生产者消费者模型

分别用一个线程模拟生产者和消费者,实现一个简单的生产者消费者模型。让生产者线程每10ms就生产一次,让消费者线程每1s再消费一次。
在这里插入图片描述
代码实现

public class Demo03_ProducerConsumer {
    // 定义一个阻塞队列,初始容量为100
    private static MyBlockingQueue queue = new MyBlockingQueue();

    public static void main(String[] args) {
        // 创建生产者线程
        Thread producer = new Thread(() -> {
            //记录消息编号
            int num = 1;
            while (true) {
                // 生产一条就打印一条日志
                System.out.println("生产了元素 " + num);
                try {
                    // 把消息放入阻塞队列中
                    queue.put(num);
                    num++;
                    // 休眠10ms
                    TimeUnit.MILLISECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        // 启动生产者
        producer.start();

        // 创建消费者线程
        Thread consumer = new Thread(() -> {
            while (true) {
                try {
                    // 从队列中获取元素(消息)
                    int num = queue.take();
                    // 打印一下消费日志
                    System.out.println("消费了元素 :" + num);
                    // 休眠1秒
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        // 启动消费者
        consumer.start();
    }
}

测试结果:
让生产者线程每10ms就生产一次,让消费者线程每1s再消费一次。此时阻塞队列中消息满了再消费,但生产和消费还是同时进行的。
在这里插入图片描述

相反,如果让生产的线程慢,消费的线程快,则每生产一个消息就消费一个。阻塞队列永远不会满。
在这里插入图片描述


继续加油~
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/611637.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

用好ChatGPT,还得加点“料”

前言 关于ChatGPT或者类似的产品&#xff0c;相信绝大数的小伙伴已经有自己资源进行使用了&#xff0c;虽然可以通过简单的对话方式能获取到相关的信息&#xff0c;但其实里面还是有一些讲究的&#xff0c;毕竟AI始终不是人类&#xff0c;要让它更好的理解我们组织的语言&…

使用Transformer模型进行计算机视觉任务的端对端对象检测

Transformer模型是google团队在2017在论文attention is all you need中提出的一个用于NLP领域的模型,但是随着VIT模型与Swin Transformer模型的发布,把Transformer模型成功应用到计算机视觉任务中。 上期图文,我们使用hugging face的transformers模型进行了VIT模型的对象分…

Nginx:Rewrite

Nginx&#xff1a;Rewrite 一、常用的Nginx 正则表达式二、location2.1 location 大致可以分为三类2.2 location 常用的匹配规则2.3 location 优先级2.4 实际网站使用中&#xff0c;至少有三个匹配规则定义 三、rewrite3.1 rewrite功能3.2 rewrite跳转实现3.3 rewrite 执行顺序…

【大作业之爬虫实战+数据分析与可视化处理上】——案例——如桃花来

目的任务&#xff1a; 目的任务 爬取租房网对应元素&#xff1a; 租房名 每月的价格 室内规格构造 房间大小 交通信息 保存数据到excel表格中 做数据清洗 数据可视化呈现 设计要求&#xff1a; 设计要求&#xff1a; 以类的方式书写&#xff0c;简洁明了。能够促进学生…

技术分享 | App常见bug解析

【摘要】 功能Bug内容显示错误前端页面展示的内容有误。这种错误的产生有两种可能1、前端代码写的文案错误2、接口返回值错误功能错误功能错误是在测试过程中最常见的类型之一&#xff0c;也就是产品的功能没有实现。比如图中的公众号登录不成功的问题。界面展示错乱产品界面上…

海睿思分享 | 摆脱数据质量低下困扰,这个方法简单有效!

2019年五月&#xff0c;某企业数据服务平台推送了运营花呗的蚂蚁小微小额贷款有限公司进入清算程序。 究其原因&#xff0c;该数据服务平台抓取了不真实且不完整的数据&#xff0c;导致生成的数据质量低&#xff0c;信息不真实、存在法律合规风险等情况。 由于支付宝和花呗的…

飞浆AI studio人工智能课程学习(4)-优质Prompt分享

文章目录 最具商业价值Prompt分享与颁奖02最具商业价值Prompt分享与颁奖-Top102最具商业价值Prompt分享与颁奖-Top202最具商业价值Prompt分享与颁奖-Top302最具商业价值Prompt分享与颁奖-Top402最具商业价值Prompt分享与颁奖-Top502最具商业价值Prompt分享与颁奖-Top602最具商业…

数字化艺术时代的新趋势:虚拟数字展厅的崛起

引言&#xff1a; 艺术与技术的融合正带领我们进入一个全新的数字化艺术时代。在这个时代中&#xff0c;虚拟数字展厅正在以惊人的速度崛起&#xff0c;并引领着展览的新趋势。 一&#xff0e;虚拟数字展厅的定义和特点 虚拟数字展厅是一种基于虚拟现实和全景技术的数字化艺术…

经典的设计模式——UML类图的一些规范

文章目录 一、类的表示二、接口的表示三、继承的表示四、接口实现的表示五、关联关系六、聚合关系七、合成关系八、依赖关系 一、类的表示 矩形框第一层表示名称&#xff0c;如果是抽象类&#xff0c;则用斜 体表示 第二层是属性 第三层是方法 号表示公有&#xff0c;-表示私有…

dll修复都有哪些方法?详细解析各种dll修复方法

DLL&#xff08;动态链接库&#xff09;是 Windows 操作系统中的一种重要文件&#xff0c;它包含了许多程序所需的函数和资源。因此&#xff0c;当 DLL 文件出现问题时&#xff0c;可能会导致程序无法正常运行&#xff0c;甚至整个系统崩溃。这时候需要使用 DLL 修复工具进行修…

Excel集成GPT,惊呆我了

Excel&GPT 从最开始的GPT对话&#xff0c;到后面的Office集成GPT进行内测&#xff0c;用GPT实现写Word、做Excel、做PPT&#xff08;别着急&#xff0c;后面我会分享AIPPT&#xff09;已经不再是设想&#xff0c;而在逐步演变成真的&#xff01; 当然&#xff0c;目前国内…

Vivado下时序逻辑模块的仿真

文章目录 D触发器两级D触发器带异步复位的D触发器带异步复位和同步置数的D触发器移位寄存器单口RAM伪双口RAM真双口RAM单口ROM 组合逻辑电路在逻辑功能上特点是任意时刻的输出仅仅取决于当前时刻的输入&#xff0c;与电路原来的状态无关。 时序逻辑在逻辑功能上的特点是任意时刻…

从0到1:如何建立一个大规模多语言代码生成预训练模型

国产AI辅助编程工具CodeGeeX是一个使用AI大模型为基座的辅助编程工具&#xff0c;帮助开发人员更快的编写代码。可以自动完成整个函数的编写&#xff0c;只需要根据注释或Tab按键即可。它已经在Java、JavaScript和Python等二十多种语言上进行了训练&#xff0c;并基于大量公开的…

【Python】打包与发布(Packaging and distributing projects)

以Unix/macOS系统为例。 前提准备&#xff1a;确保pip为最新版本&#xff0c;可使用以下命令来更新pip&#xff1a; python3 -m pip install --upgrade pip一、创建一个简单的项目 我们在目录packaging_tutorial下进行操作。 项目名称为&#xff1a;example_package_wayne。 …

【yolov5系列】yolov5目标检测的原理梳理+核心代码解析

打算写yolov5源码阅读和总结&#xff0c;已经打算了一年&#xff0c;如今已经更新到yolov8&#xff0c;只能说自己行动太慢了&#xff0c;哭泣(๑>؂<๑&#xff09;。趁着看要yolov8一起赶紧把yolov5总结总结。 一、Yolov5的网络结构 模型主要分为3部分 backbone&#x…

Maven 打包插件 maven-jar-plugin

文章目录 指定版本生成可执行 Jar准备依赖&#xff0c;并指定依赖位置自动下载依赖的 Jar 文件 打包时排除文件与其他常用打包插件比较 本文是对 maven-jar-plugin 常用配置的介绍&#xff0c;更详细的学习请参照 Apache Maven JAR Plugin 官方文档 这是 maven 生命周期 packa…

Python+Pytest+Allure+Git+Jenkins数据驱动接口自动化测试框架

一、接口基础   接口测试是对系统和组件之间的接口进行测试&#xff0c;主要是效验数据的交换&#xff0c;传递和控制管理过程&#xff0c;以及相互逻辑依赖关系。其中接口协议分为HTTP&#xff0c;RPC&#xff0c;Webservice&#xff0c;Dubbo&#xff0c;RESTful等类型。 …

实用工具 | 语音文本对齐MFA的安装及使用

Montreal Forced Aligner&#xff08;MFA&#xff09;[1]是一个用于将音频和文本进行对齐的工具。它可以用于语音识别、语音合成和发音研究等领域。MFA支持多种语言和语音&#xff0c;用户可以根据需要自定义训练模型。 本博客介绍如何使用MFA对音频和文本进行对齐&#xff0c…

计算机网络实验:交换机划分Vlan配置

目录 前言实验目的实验内容实验过程总结 前言 计算机网络是当代信息技术的重要组成部分&#xff0c;也是现代社会的基础设施之一。为了提高计算机网络的性能和安全性&#xff0c;网络管理员需要对网络进行合理的规划和设计&#xff0c;包括对网络拓扑、地址分配、路由协议、交…

TP-LINK XDR6078 WiFi6路由器 简单开箱评测

TL-XDR6078易展版AX6000双频WiFi6路由器 简单开箱测评&#xff0c;新房快装修好了&#xff0c;先装上WiFi&#xff0c;挑了一会选中这个了&#xff0c;双2.5G电口&#xff0c;6000Mbps无线速率&#xff0c;还支持端口汇聚等等功能&#xff0c;感觉还不错。 TP-LINK XDR3040 Wi…