【JavaEE】生产者消费者模式

news2025/1/15 16:50:34

作者主页:paper jie_博客

本文作者:大家好,我是paper jie,感谢你阅读本文,欢迎一建三连哦。

本文于《JavaEE》专栏,本专栏是针对于大学生,编程小白精心打造的。笔者用重金(时间和精力)打造,将MySQL基础知识一网打尽,希望可以帮到读者们哦。

其他专栏:《MySQL》《C语言》《javaSE》《数据结构》等

内容分享:本期将会分享设计模式中的生产者消费者模式

目录

什么是阻塞队列

什么是生产者消费者模式

生产者消费者模式的特点

 Java标准库中的阻塞队列

自定义实现一个阻塞队列

普通队列

 实现线程安全

正解

实现阻塞

正解

 基于自己实现的阻塞队列实现一个简单的生产者消费者模型


什么是阻塞队列

阻塞队列它也是队列,遵守着先进先出的原则.且它是一种线程安全的数据结构.它有两个特性:

1. 当队列满的时候,继续入队就会进行堵塞,直到有其他线程从队列中拿走元素才会解除堵塞.

2. 当队列为空的时候,继续出队就会进行堵塞,直到有其他线程从队列中插入元素后才会解除堵塞.

我们的阻塞队列最经典的使用场景就是生产者消费者模式.

举个栗子:

在我们家中,捏饺子一般有两个步骤: 1. 捏饺子皮, 2.包饺子. 假设有三个人.一个滑稽捏饺子皮,其他两个滑稽包饺子. 滑稽捏好饺子皮厚后就会放到板子上,而其他的滑稽就不用直接去捏饺子皮的滑稽手上拿饺子皮,而是去板子上拿即可.这样他们直接就只需要关注这个板子即可. 而这个板子就充当了我们的阻塞队列.

什么是生产者消费者模式

生产者消费者模式就是基于一个中间容器来解决它们之间的强耦合性问题.而这个中间容器就是阻塞队列.加入阻塞队列后,生产者与消费者之间就不会直接联系,而是通过通过阻塞队列来进行数据传输.这样生产者生产数据就不用知道是谁来处理他的数据,不用等待,直接交给阻塞队列即可.而消费者也不会去找生产者要数据,而是去阻塞队列里拿.

生产者消费者模式的特点

1. 通过阻塞队列可以降低生产者与消费者之间的耦合性

假设有三个服务器ABC,BC是将数据处理到,A是接受它们的数据.如果在没有加入阻塞队列的情况下.可能就会发生: 当B或者C挂了,这可能就会导致A也会挂,它们之间是强耦合的关系.因为B或者C的操作中需要涉及到一些关于A的操作.而A的操作也会涉及到一些关于B或C的操作.

但是当加入阻塞队列后就会发生不一样的结果. C,B处理好的结果只需要放到阻塞对列中,而A也只需要去阻塞队列中拿即可.这样B,C的操作对于A的影响就会很小,从而降低了他们之间的耦合性.

 

2. 削峰填谷 

这里就是加入阻塞队列起到一个平衡生产者与消费者之间的处理能力, 加入阻塞队列就可以防止当生产者一下生产出大量数据,而消费者一时间消费不了而导致挂了的问题.

举个栗子:

假设一个场景: 客户端发出请求,服务A接受请求,然后将请求交给BC处理器进行逻辑处理. 在正常情况下处理器是可以及时处理的.但是在一些特殊的时候会有一些突发峰值.外界客户端的请求非常的多.A接受这些请求一下子全部交给B,C服务器来处理,它们一下子可能就会支撑不住. 因为B,C需要就行逻辑处理业务,需要的资源开销就会比较大.如果一下给它大量的请求进行处理,处理器的资源可能就会超过它的上限而导致机器挂了.

但是在加入阻塞队列后就不用担心这种情况了. 就算客户端有大量的请求,A接受后也是传送给阻塞队列,再由B,C去阻塞队列中拿数据处理来慢慢消化.这就算数据再多,只要A服务器,阻塞队列不挂(阻塞队列和A服务器抗压能力很强,它们只需要进行存储数据和传送数据),BC也可以按正常速度进行处理.

 Java标准库中的阻塞队列

在Java标准库中也提供了阻塞队列,如果我们需要使用阻塞队列,只需要使用标准库中的即可. 库中的阻塞队列叫BlockingQueue,它是一个interface接口,它实现的的类有:

ArrayBlockingQueue

LinkedBlockingQueue

priorityBlockingQueue

里面的put和offer方法就是入队方法,但是put是带有阻塞的功能. take也是出队方法,但它也带有阻塞的功能.

public class ThreadDemo1 {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
        queue.put("aaa");

        System.out.println(queue.take());

        System.out.println(queue.take());
    }
}

自定义实现一个阻塞队列

这里我们准备实现一个基于数组的阻塞队列,也就是环形队列.这里队列里面我们需要一个数组,计数器和两个头尾指针,put和take方法. 这里我们分三步来实现这个阻塞队列.

1) 普通队列

2) 实现线程安全

3) 实现阻塞功能

普通队列

class MyarrayBlockingQueue {
    private int elems[] = null;
    private int size = 0;
    private int head = 0;
    private int tail = 0;
    public MyarrayBlockingQueue(int capactiy) {
        elems = new int[capactiy];
    }
    public void put(int value) {
        //判断队列满没满
        if(size == elems.length) {
            //阻塞
            return;
        }
        //添加元素
        elems[tail] = value;
        tail++;
        //判断尾指针是不是需要循环到0位置
        if(tail == elems.length) {
            tail = 0;
        }
        size++;
    }
    public int take() {
        int elem = 0;
        //判断队列为不为空
        if(size == 0) {
            //阻塞
            return elem;
        }
        //出队
        elem = elems[head];
        head++;
        if(head == elems.length) {
            head = 0;
        }
        size--;
        return elem;
    }
}

 实现线程安全

实现线程安全,我们就需要加锁.但是当我们下面这个代码这样加锁时,就会出现问题.

public void put(int value) {
        //判断队列满没满
        if(size == elems.length) {
            //阻塞
            return;
        }
        synchronized (this) {
            //添加元素
            elems[tail] = value;
            tail++;
            //判断尾指针是不是需要循环到0位置
            if(tail == elems.length) {
                tail = 0;
            }
            size++;
        }
    }
    public int take() {
        int elem = 0;
        //判断队列为不为空
        if(size == 0) {
            //阻塞
            return elem;
        }
        synchronized(this) {
            //出队
            elem = elems[head];
            head++;
            if(head == elems.length) {
                head = 0;
            }
            size--;
            return elem;
        }
    }

我们发现如果当有两个线程t1,t2同时使用put或者take方法.假设当前有99个元素,容量为100.当t1执行到if(size == elems.length)后被调度走了,再轮到t2执行.当t2执行完后,队列的元素语已经满了.但是当轮到t1执行时因为上一次的if判断它还会再入队一个元素,这就会出现size为101的问题.

正解

我们将if判断条件也放到锁中就可以了.

public void put(int value) {
        synchronized (this) {
            //判断队列满没满
            if(size == elems.length) {
                //阻塞
                return;
            }
            //添加元素
            elems[tail] = value;
            tail++;
            //判断尾指针是不是需要循环到0位置
            if(tail == elems.length) {
                tail = 0;
            }
            size++;
        }
    }
    public int take() {
        int elem = 0;
        synchronized(this) {
            //判断队列为不为空
            if(size == 0) {
                //阻塞
                return elem;
            }
            //出队
            elem = elems[head];
            head++;
            if(head == elems.length) {
                head = 0;
            }
            size--;
            return elem;
        }
    }

实现阻塞

这里需要实现阻塞就要使用我们的wait和notify方法.

当队列满时,使用put就会执行wait进入阻塞,只有当使用take调用notify队列才会解除堵塞.

当队列为空时,使用take就会执行wait进入堵塞,只有当使用put调用notify队列才会解除堵塞.

public void put(int value) throws InterruptedException {
        synchronized (this) {
            //判断队列满没满
            if(size == elems.length) {
                this.wait();
                return;
            }
            //添加元素
            elems[tail] = value;
            tail++;
            //判断尾指针是不是需要循环到0位置
            if(tail == elems.length) {
                tail = 0;
            }
            size++;
            this.notify();
        }
    }
    public int take() throws InterruptedException {
        int elem = 0;
        synchronized(this) {
            //判断队列为不为空
            if(size == 0) {
                this.wait();
                return elem;
            }
            //出队
            elem = elems[head];
            head++;
            if(head == elems.length) {
                head = 0;
            }
            size--;
            this.notify();
            return elem;
        }
    }

但是这样又会出现一个问题. 假设有三个线程t1,t2,t3 t1和t2调用put. t3调用take. 且这个队列已经满了. 这里就会有一种情况: t1和t2调用put发现满了就都会在wait那里堵塞,且解锁. 这时t3就执行take方法出队了一个且调用了notify方法唤醒了t1. 则t1也就向下执行入队了一个,此时队列是满了.但是!!!t1的notify方法就可能会唤醒t2.而t2就会直接向下执行又入队一个,但队列是满的,这就出现问题了.

正解

我们可以在if判断那里将if改成while,这样就算被notify唤醒了,也会再次判断队列是不是满/空.才会选择是不是执行还是继续堵塞.

public void put(int value) throws InterruptedException {
        synchronized (this) {
            //判断队列满没满
            while(size == elems.length) {
                this.wait();
                return;
            }
            //添加元素
            elems[tail] = value;
            tail++;
            //判断尾指针是不是需要循环到0位置
            if(tail == elems.length) {
                tail = 0;
            }
            size++;
            this.notify();
        }
    }
    public int take() throws InterruptedException {
        int elem = 0;
        synchronized(this) {
            //判断队列为不为空
            while(size == 0) {
                this.wait();
                return elem;
            }
            //出队
            elem = elems[head];
            head++;
            if(head == elems.length) {
                head = 0;
            }
            size--;
            this.notify();
            return elem;
        }
    }

 基于自己实现的阻塞队列实现一个简单的生产者消费者模型

public class ThreadDemo8 {
    public static void main(String[] args) {
        block queue = new block(1000);
        Thread t1 = new Thread(() -> {
            int count = 0;
            while(true) {
                try {
                    queue.put(count);
                    System.out.println("生产者: " + count);
                    Thread.sleep(1000);
                    count++;
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }

        });
        Thread t2 = new Thread(() -> {
            while(true) {
                try {
                    System.out.println("消费者: " +  queue.take());
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        t1.start();
        t2.start();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1291526.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【开源】基于JAVA的天沐瑜伽馆管理系统

项目编号&#xff1a; S 039 &#xff0c;文末获取源码。 \color{red}{项目编号&#xff1a;S039&#xff0c;文末获取源码。} 项目编号&#xff1a;S039&#xff0c;文末获取源码。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 数据中心模块2.2 瑜伽课程模块2.3 课…

线程池基础参数和执行流程

线程池核心参数 1.corePoolSize:线程池中核心线程的个数。 2.maximumPoolSize:线程池中线程的总数。&#xff08;线程总数核心线程数 救急线程数&#xff09; 3. keepAliveTime:救急线程的存活时间。&#xff08;救急线程空闲时的存活时间。&#xff09; 4.unit:存活时间的…

numpy数据读取保存及速度测试

目录 数据保存及读取 速度比对测试 数据保存及读取 代码示例&#xff1a; # 导入必要的库 import numpy as np # 生成测试数据 arr_disk np.arange(8) # 打印生成能的数据 print(arr_disk) # numpy保存数据到本地 np.save("arr_disk", arr_disk) # 加载本地数据…

gpt3、gpt2与gpt1区别

参考&#xff1a;深度学习&#xff1a;GPT1、GPT2、GPT-3_HanZee的博客-CSDN博客 Zero-shot Learning / One-shot Learning-CSDN博客 Zero-shot&#xff08;零次学习&#xff09;简介-CSDN博客 GPT-2 模型由多层单向transformer的解码器部分构成&#xff0c;本质上是自回归模型…

软件系统应用开发安全指南

2.1.应用系统架构安全设计要求 2.2.应用系统软件功能安全设计要求 2.3.应用系统存储安全设计要求 2.4.应用系统通讯安全设计要求 2.5.应用系统数据库安全设计要求 2.6.应用系统数据安全设计要求 全资料获取进主页。

C、C++、C#的区别概述

C、C、C#的区别概述 https://link.zhihu.com/?targethttps%3A//csharp-station.com/understanding-the-differences-between-c-c-and-c/文章翻译源于此链接 01、C语言 ​ Dennis Ritchie在1972年创造了C语言并在1978年公布。Ritchie设计C的初衷是用于开发新版本的Unix。在那之…

关于DWC OTG2.0中PFC的理解

在DWC OTG2.0 Controller手册中&#xff0c;有一章节专门介绍了PFC&#xff0c;Packet FIFO Controller。其内部分为共享FIFO&#xff08;shared FIFO&#xff09;以及专用FIFO&#xff08;Dedicated FIFO&#xff09;&#xff0c;并针对dev和host两种模式&#xff0c;并且还要…

IT行业软件数据文件传输安全与高效是如何保障的?

在当今迅速发展的科技世界中&#xff0c;云计算、大数据、移动互联网等信息技术正迎来蓬勃发展&#xff0c;IT行业正置身于一个全新的世界。数据不仅是最重要的资产&#xff0c;也是企业竞争力的核心所在。然而&#xff0c;如何缩短信息共享时间、高速流转数据、跨部门/跨区域协…

智能优化算法应用:基于鹰栖息算法无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于鹰栖息算法无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于鹰栖息算法无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.鹰栖息算法4.实验参数设定5.算法结果6.参考文献7.…

Linux--文件权限与shell外壳的理解

目录 一.Linux的用户与用户切换&#xff0c;提权 二.对文件权限的理解 1.文件权限角色的权限文件属性 2.Linux中的三种角色 3.为什么会存在所属组这个角色 4.文件属性的意义 4.1.第一个字母的意义 4.2 第2——第10个字母的意义 4.3修改文件权限的方法 三.目录权限 四…

记录 | linux手动清理 buff/cache

linux下手动清理 buff/cache 切换到 root 权限 # 这个drop_caches文件可以设置的值分别为1、2、3 echo 1 > /proc/sys/vm/drop_caches # 表示清除pagecache echo 2 > /proc/sys/vm/drop_caches # 表示清除回收slab分配器中的对象&#xff08;包括目录项缓存和inode缓…

idea报错——Access denied for user ‘root‘@‘localhost‘ (using password: YES)

项目场景&#xff1a; 使用idea启动SpringBoot项目报错&#xff0c;可以根据提示看到是数据库的原因&#xff0c;显示使用了密码&#xff0c;具体报错信息如下&#xff1a; 解决方案&#xff1a; 第一步&#xff1a;先去配置文件里面查看连接MySQL的url是否正确&#xff0c;如果…

代码随想录算法训练营第四十二天 _ 动态规划_01背包问题、416.分割等和子集。

学习目标&#xff1a; 动态规划五部曲&#xff1a; ① 确定dp[i]的含义 ② 求递推公式 ③ dp数组如何初始化 ④ 确定遍历顺序 ⑤ 打印递归数组 ---- 调试 引用自代码随想录&#xff01; 60天训练营打卡计划&#xff01; 学习内容&#xff1a; 二维数组处理01背包问题 听起来…

Node.js快速搭建简单的HTTP服务器并发布公网远程访问

文章目录 前言1.安装Node.js环境2.创建node.js服务3. 访问node.js 服务4.内网穿透4.1 安装配置cpolar内网穿透4.2 创建隧道映射本地端口 5.固定公网地址 前言 Node.js 是能够在服务器端运行 JavaScript 的开放源代码、跨平台运行环境。Node.js 由 OpenJS Foundation&#xff0…

基于ssm校园美食交流系统论文

目 录 摘 要 1 前 言 3 第1章 概述 4 1.1 研究背景 4 1.2 研究目的 4 1.3 研究内容 4 第二章 开发技术介绍 5 2.1Java技术 6 2.2 Mysql数据库 6 2.3 B/S结构 7 2.4 SSM框架 8 第三章 系统分析 9 3.1 可行性分析 9 3.1.1 技术可行性 9 3.1.2 经济可行性 10 3.1.3 操作可行性 10…

CentOS 7.9 安装 k8s(详细教程)

文章目录 安装步骤安装前准备事项安装docker准备环境安装kubelet、kubeadm、kubectl初始化master节点安装网络插件calicowork 加入集群 k8s集群测试 安装步骤 安装前准备事项 一台或多台机器&#xff0c;操作系统 CentOS7.x-86_x64硬件配置&#xff1a;2GB或更多RAM&#xff0…

order排序方式研究

请直接看原文: 链接&#xff1a;https://juejin.cn/post/7258182427306197051 --------------------------------------------------------------------------------------------------------------------------------- 一.前言 在MySQL世界中&#xff0c;排序是一个常见而重…

了解linux网络时间服务器

本章主要介绍网络时间服务器。 使用chrony配置时间服务器 配置chrony客户端向服务器同步时间 20.1 时间同步的必要性 些服务对时间要求非常严格&#xff0c;例如&#xff0c;图20-1所示的由三台服务器搭建的ceph集群。 这三台服务器的时间必须保持一致&#xff0c;如果不一致…

10 单词接龙

题目描述 单词接龙的规则是: 可用于接龙的单词首字母必须要前一个单词的尾字母相同; 当存在多个首字母相同的单词时&#xff0c;取长度最长的单词&#xff0c;如果长度也相等&#xff0c;则取字典序最小的单词;已经参与接龙的单词不能重复使用。 现给定一组全部由小写字母组成…

【开发技能】-解决visio交叉线(跨线)交叉点弯曲问题

问题 平时工作中使用visio作图时&#xff0c;经常会遇到交叉线在相交时会形成一个弯曲弓形&#xff0c;这十分影响视图效果。可以采用下面的方法消除弓形。 方法 第一步&#xff1a;菜单栏--设计---连接线 第二步&#xff1a;选中这条交叉线---点击显示跨线 最终问题得到解决…