【Java|多线程与高并发】阻塞队列以及生产者-消费者模型

news2024/12/25 0:19:36

文章目录

  • 1. 前言
  • 2. 阻塞队列
  • 3. 实现阻塞队列
  • 4. 生产者-消费者模型
  • 5. 总结

1. 前言

阻塞队列(BlockingQueue)常用于多线程编程中,可以实现线程之间的同步和协作。它可以用来解决生产者-消费者问题,其中生产者线程将元素插入队列,消费者线程从队列中获取元素,它们之间通过阻塞队列进行协调。
在这里插入图片描述

2. 阻塞队列

Java中的阻塞队列(BlockingQueue)是一种特殊的队列,它在队列为空时会阻塞获取元素的操作,直到队列中有新的元素被添加进来;在队列已满时会阻塞插入元素的操作,直到队列中有空的位置。

需要注意的是 在Java中 BlockingQueue是一个接口
在这里插入图片描述

Java中提供了多种阻塞队列的实现,包括:

  1. ArrayBlockingQueue:基于数组实现的有界阻塞队列,它按照先进先出的顺序对元素进行排序。
  2. LinkedBlockingQueue:基于链表实现的可选有界阻塞队列,它可以指定容量,如果不指定则默认为无界队列。
  3. PriorityBlockingQueue:基于优先级堆实现的无界阻塞队列,它可以按照元素的优先级进行排序。
  4. SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等待另一个线程的移除操作。
  5. LinkedBlockingDeque: 基于链表的双端阻塞队列。
  6. LinkedTransferQueue: 基于链表、无界的的阻塞队列。

作为一个队列,有三个基本操作,入队,出队和查看队首元素.

在使用阻塞队列时,有两点需要注意:

  1. offer和put可以实现入队列,offer并没有阻塞功能,put具有阻塞功能
  2. poll和take可以实现出队列,poll没有阻塞功能,take具有阻塞功能

示例:

public class Demo14 {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Integer> blockingQueue  = new LinkedBlockingQueue<>(10);

        blockingQueue.put(1);
        blockingQueue.put(2);
        int ret = blockingQueue.take();
        System.out.println(ret);
        ret = blockingQueue.take();
        System.out.println(ret);
        blockingQueue.take();
        System.out.println(ret);
    }
}

代码分析:
在这里插入图片描述

3. 实现阻塞队列

阻塞队列的实现并不复杂,主要是通过这个过程,强化对于阻塞队列的认识.

对于实现阻塞队列可以大致分为3步:

  1. 实现一个普通的队列
  2. 保证线程安全
  3. 加上阻塞功能

实现一个普通队列的方法可以使用链表,也能够使用数组.

接下来就基于数组来实现一个 循环队列

代码:

public class MyBlockingQueue {
        private int[] elem;
        private int usedSize = 0;// 有效个数
        private int front = 0;
        private int rear = 0;
        public MyBlockingQueue(){
            this.elem = new int[10];
        }
        public MyBlockingQueue(int k){
            this.elem = new int[k];
        }
        /**
         * 入队
         * @param val
         * @return
         */
        public void put(int val){
            // 判断队列是否满了
            if (usedSize >= elem.length){
                return;
            }
            // 进行入队列操作
            elem[rear] = val;
            rear++;
            if (rear >= elem.length){
                rear = 0;
            }
            usedSize++;
        }
        /**
         * 出队
         * @return
         */
        public Integer take(){
            if (usedSize == 0){
                return null;
            }
            int ret = elem[front];
            front++;
            if (front >= elem.length){
                front = 0;
            }
            usedSize--;
            return ret;
        }

}

循环队列在判断队列是否满时有两种方式:

  1. 用计数器记录有效数据的个数(上述代码使用的是这种方法)

  2. 浪费一个空间不用. 当(rear+1) % 数组长度== front时为队列满这种情况

以上就是一个简单的循环队列的实现,详细可以参考我的这篇文章【数据结构与算法】队列-模拟实现队列以及设计循环队列

解决了第一步,接下来就来实现第二步,解决线程安全问题

解决这个问题,就离不开synchronized了,以防万一再给变量加上volatile关键字

public class MyBlockingQueue {
        private int[] elem;
        private volatile int usedSize = 0;// 有效个数
        private volatile int front = 0;
        private volatile int rear = 0;
        public MyBlockingQueue(){
            this.elem = new int[10];
        }
        public MyBlockingQueue(int k){
            this.elem = new int[k];
        }
        /**
         * 入队
         * @param val
         * @return
         */
        public void put(int val){
            synchronized (this) {
                // 判断队列是否满了
                if (usedSize >= elem.length){
                    return;
                }
                // 进行入队列操作
                elem[rear] = val;
                rear++;
                if (rear >= elem.length){
                    rear = 0;
                }
                usedSize++;
            }
        }
        /**
         * 出队
         * @return
         */
        public Integer take(){
            synchronized (this) {
                if (usedSize == 0){
                    return null;
                }
                int ret = elem[front];
                front++;
                if (front >= elem.length){
                    front = 0;
                }
                usedSize--;
                return ret;
            }
        }
}

解决完线程安全问题,接下来就是给put和take添加阻塞功能.

以下两种场景会有阻塞功能:

  1. 如果队列为空,出队列需要阻塞

  2. 如果队列为满,入队列需要阻塞

阻塞功能很好加,直接使用wait方法即可

但除了阻塞,还要通知唤醒线程,. 例如线程为空,出队列在阻塞状态,而入队列之后,队列不为空,就要让出队列继续执行才行. 因此还要搭配notify来实现

阻塞队列实现代码如下:

public class MyBlockingQueue {
        private int[] elem;
        private volatile int usedSize = 0;// 有效个数
        private volatile int front = 0;
        private volatile int rear = 0;
        public MyBlockingQueue(){
            this.elem = new int[10];
        }
        public MyBlockingQueue(int k){
            this.elem = new int[k];
        }
        /**
         * 入队
         * @param val
         * @return
         */
        public void put(int val) throws InterruptedException {
            synchronized (this) {
                // 判断队列是否满了
                while (usedSize >= elem.length){
                    this.wait();
                }
                // 进行入队列操作
                elem[rear] = val;
                rear++;
                if (rear >= elem.length){
                    rear = 0;
                }
                usedSize++;
                this.notify();
            }
        }
        /**
         * 出队
         * @return
         */
        public Integer take() throws InterruptedException {
            synchronized (this) {
                while (usedSize == 0){
                    this.wait();
                }
                int ret = elem[front];
                front++;
                if (front >= elem.length){
                    front = 0;
                }
                usedSize--;
                this.notify();
                return ret;
            }
        }
}

代码分析:

在这里插入图片描述

此时出队列和入队列是相互唤醒的状态, 两个wait的执行条件互斥,因此不会出现同时阻塞的状态.

同时将wait的执行条件改为while,是因为如果线程是被interrupt唤醒的话,队列仍然为空,就不能去执行后续代码,因此要再进行条件判断,因此改为while更加稳妥

验证:
在这里插入图片描述
使用阻塞队列的好处:

  1. 使用阻塞队列,有利于代码"解耦合"
  2. 削峰填谷,利用生产者消费者模型在并发量高的时候将这些并发量分配到每一个服务器上.

4. 生产者-消费者模型

接下来来介绍生产者-消费者模型

生产者消费者模型是一种常见的并发编程模型,用于解决生产者和消费者之间的协作问题。在该模型中,生产者负责生产数据,消费者负责消费数据,它们通过共享的缓冲区来进行通信。

生产者消费者模型的主要特点:

  1. 生产者和消费者是两个独立的实体,它们可以运行在不同的线程中。
  2. 生产者负责生成数据,并将数据放入缓冲区中。
  3. 消费者负责从缓冲区中取出数据,并进行相应的处理。
  4. 缓冲区是生产者和消费者之间的共享数据结构,用于存储生产者生产的数据。
  5. 生产者和消费者之间的操作是互斥的,即同一时间只能有一个生产者或一个消费者对缓冲区进行操作。

为了实现生产者消费者模型,可以使用阻塞队列来作为缓冲区。为了实现生产者消费者模型,可以使用阻塞队列来作为缓冲区

生产者消费者模型的基本流程:

  1. 创建一个共享的阻塞队列,作为生产者和消费者之间的缓冲区。
  2. 创建生产者线程,它生成数据并将其放入队列中。
  3. 创建消费者线程,它从队列中获取数据并进行处理。
  4. 启动生产者和消费者线程,它们会并发执行。
  5. 生产者线程生成数据并插入队列,当队列已满时会被阻塞。
  6. 消费者线程从队列中获取数据并进行处理,当队列为空时会被阻塞。
  7. 生产者和消费者线程可以通过阻塞队列进行同步和协作,生产者在队列已满时会等待,直到有空闲位置可以插入数据;消费者在队列为空时会等待,直到有数据可以获取。

代码示例:

public class Demo16 {
    public static void main(String[] args) {
        BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
        // 生产者
        Thread t1 = new Thread(() ->{
            for (int i = 0; i < 1000; i++) {
                try {
                    blockingQueue.put(i);
                    System.out.println("生产元素: "+ i);
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        t1.start();

        // 消费者
        Thread t2 = new Thread(() ->{
            while(true){
                try {
                    Integer ret = blockingQueue.take();
                    System.out.println("消费元素: "+ ret);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        t2.start();
    }
}

运行截图:

在这里插入图片描述
上述代码仅仅只是一个示例,并没有什么实际意义.

生产者消费者模型依旧很重要,它可以有效地实现线程间的协作和资源共享,提高系统的并发性和吞吐量。

5. 总结

阻塞队列的使用可以简化多线程编程的复杂性,避免手动实现线程间的同步和协作逻辑,提高代码的可读性和可维护性。基于阻塞队列的生产者-消费者模型也要重点掌握.。阻塞队列作为生产者和消费者之间的缓冲区,提供线程安全的插入和获取操作,并在队列为空或队列已满时进行阻塞,从而实现线程间的同步。
在这里插入图片描述

感谢你的观看!希望这篇文章能帮到你!
专栏: 《从零开始的Java学习之旅》在不断更新中,欢迎订阅!
“愿与君共勉,携手共进!”
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/683024.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

《网络安全0-100》自学误区和陷阱

一、自学网络安全学习的误区和陷阱 1.不要试图先成为一名程序员(以编程为基础的学习)再开始学习 我在之前的回答中&#xff0c;我都一再强调不要以编程为基础再开始学习网络安全&#xff0c;一般来说&#xff0c;学习编程不但学习周期长&#xff0c;而且实际向安全过渡后可用到…

今天面了个腾讯拿 28K 出来的,让我见识到了软件测试的天花板

已经6月底了&#xff0c;你们是在职呢还是待业呢&#xff1f; 今年的春招结束了&#xff0c;而秋招也马上要开始了&#xff0c;很多小伙伴收获不错&#xff0c;拿到了心仪的 offer。 各大论坛和社区里也看见不少小伙伴慷慨地分享了常见的面试题和八股文&#xff0c;为此咱这里…

如何清除浏览器的 DNS 缓存 (Chrome, Firefox, Safari)

如何清除浏览器的 DNS 缓存 (Chrome, Firefox, Safari) Chrome Chromium Edge Firefox Safari clear DNS Cache, flush DNS cache 请访问原文链接&#xff1a;https://sysin.org/blog/clear-browser-dns-cache/&#xff0c;查看最新版。原创作品&#xff0c;转载请保留出处。…

java版工程管理系统源码,企业级工程项目可视化管理平台

Java版工程项目管理系统 Spring CloudSpring BootMybatisVueElementUI前后端分离 功能清单如下&#xff1a; 首页 工作台&#xff1a;待办工作、消息通知、预警信息&#xff0c;点击可进入相应的列表 项目进度图表&#xff1a;选择&#xff08;总体或单个&#xff09;项目显示1…

Linux 设备驱动程序(四)

系列文章目录 Linux 内核设计与实现 深入理解 Linux 内核 Linux 设备驱动程序&#xff08;一&#xff09; Linux 设备驱动程序&#xff08;二&#xff09; Linux 设备驱动程序&#xff08;三&#xff09; Linux 设备驱动程序&#xff08;四&#xff09; Linux设备驱动开发详解 …

大数据应用——spark实验

任务一&#xff1a;比较Spark和hadoop的区别 Spark和Hadoop都是用于分布式计算的框架&#xff0c;但两者有以下区别&#xff1a; 1、处理方式不同。Hadoop是基于MapReduce的&#xff0c;而Spark则是基于内存的分布式计算框架。 2、处理速度不同。因为Hadoop是磁盘读写密集型应用…

代理ip的用途及是否可以降低延迟

在互联网使用中&#xff0c;代理IP是一种被广泛使用的工具&#xff0c;用于隐藏真实IP地址和提供一些额外的功能。人们常常疑惑代理IP是否能够降低延迟&#xff0c;从而提高网络连接的速度和响应时间。下面&#xff0c;就让我们来探讨一下代理ip可以用在那些领域及是不是可以降…

C/S、B/S架构详解,一文带你搞懂

一、CS、BS架构定义 CS架构&#xff08;Client-Server Architecture&#xff09;是一种分布式计算模型&#xff0c;其中客户端和服务器之间通过网络进行通信。在这种架构中&#xff0c;客户端负责向服务器发送请求&#xff0c;并接收服务器返回的响应。服务器则负责处理客户端的…

SpringCloud微服务(二)网关GateWay、Docker、Dockerfile、Linux操作超详细

目录 统一网关GateWay 搭建网关服务的步骤 1、引入依赖 2、编写路由配置及nacos地址 路由断言工厂Route Oredicate Factory 路由过滤器配置 全局过滤器GlobalFilter 过滤器执行顺序 跨域问题处理 Docker ​编辑 Docker与虚拟机 镜像和容器 Docker的安装 启动docke…

原生JS实现图片裁剪功能

功能介绍&#xff1a;图片通过原生input上传&#xff0c;使用canvas进行图片裁剪。 裁剪框限制不允许超出图片范围&#xff0c;图片限制了最大宽高&#xff08;自行修改要的尺寸&#xff09;&#xff0c;点击确认获取新的base64图片数据 效果图&#xff1a; 上代码 <!DOCT…

物种气候生态位动态量化与分布特征模拟----R语言

在全球气候快速变化的背景下&#xff0c;理解并预测生物种群如何应对气候变化&#xff0c;特别是它们的地理分布如何变化&#xff0c;已经变得至关重要。利用R语言进行物种气候生态位动态量化与分布特征模拟&#xff0c;不仅可以量化描述物种对环境的需求和适应性&#xff0c;预…

【断路器型号字母含义解析】- 米思米机械设备知识分享

断路器型号意义含义 目前我国断路器型号根据国家技术标准的规定&#xff0c;一般由文字符号和数字按以下方式组成。其代表意义为&#xff1a; ①;产品字母代号&#xff0c;用下列字母表示&#xff1a; S;少油断路器&#xff1b; D;多油断路器&#xff1b; K;空气断路器&#xf…

听说你还不知道什么是 python?带你深入理解什么是 python

文章目录 前言什么是pythonpython的由来我们为什么要学习python帮助python学习的网站 前言 各位朋友们&#xff0c;大家好。在之后的时间里&#xff0c;我将陆续为大家分享我在python学习过程中学习到的知识点&#xff0c;如果你也对python感兴趣的话&#xff0c;欢迎大家来订…

持 PMP®证书增持 CSPM-2证书,先下手就对了!

2023年6月起&#xff0c;持有PMP证书的朋友可以直接增持一个同等级证书CSPM-2&#xff0c;不用重新考试&#xff0c;不用重新学习&#xff0c;原PMP证书不影响正常使用&#xff0c;相当于多了一个国标项目管理领域的证书。 第一步准备资料 1、填写能力评价表(简历和业绩不用填…

Stateflow状态图

目录 1.Stateflow简介 2.Stateflow编辑 2.1 打开方式 2.2 状态 2.3 状态名称 2.4 迁移 2.4.1 迁移 2.4.2 默认迁移 2.4.3 迁移有效条件 3 数据与事件 3.1 数据 3.2 事件 4 示例 1.Stateflow简介 Stateflow对象可分为图形对象和非图形对象。 图形对象包括&#xff1…

LangChain 介绍及相关组件使用总结

一、langChain LangChain 是一个由语言模型LLMs驱动的应用程序框架&#xff0c;它允许用户围绕大型语言模型快速构建应用程序和管道。 可以直接与 OpenAI 的 ChatGPT 模型以及 Hugging Face 集成。通过 langChain 可快速构建聊天机器人、生成式问答(GQA)、本文摘要等应用场景。…

Nginx做图片服务器

前言&#xff1a; Nginx是一个高性能的HTTP和反向代理web服务器,以及负载均衡器。根据nginx是高性能的http服务器&#xff0c;因此可以用作图片服务器使用。 本案例是在docker安装nginx来操作的。 Nginx安装 1、首先是docker的安装 详情参考&#xff1a;docker安装 2、拉取ng…

倒计时2天,解锁亚马逊云科技中国峰会 Dev Lounge 玩法

2023 亚马逊云科技中国峰会即将重磅登陆上海 主会场与分论坛云集百余位重磅嘉宾 开发者会客厅也已集齐全球优秀开源社区 倒计时2天&#xff0c;赶快报名参与这场顶级盛会&#xff01; 开发者大讲堂&#xff0c;圆桌讨论给你精彩 数据对于企业和个人来说都至关重要&#xff0…

【kubernetes系列】Kubernetes中的重要概念

​ 在学习k8s之前&#xff0c;必须先了解 Kubernetes 的几个重要概念&#xff0c;它们是组成 Kubernetes 集群的基石。&#xff08;参考Kubernetes权威指南&#xff09; 一、Master Kubernetes 里的Master指的是集群的控制节点&#xff0c; 每个Kubernetes 集群里至少需要有一…

大数据应用——hive实验

任务一&#xff1a;完成Hive内嵌模式部署 1.1 Hive部署 官网下载Hive安装包 &#xff08;1&#xff09;官网地址&#xff1a;Apache Hive &#xff08;2&#xff09;文档查看地址&#xff1a; https://cwili.apache.org/confluence/display/Hive/GettingStarted &#xff08;2&…