【JavaEE】多线程案例-阻塞队列

news2024/10/7 11:28:49

在这里插入图片描述

1. 前言

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:

  • 在队列为空时,获取元素的线程会等待队列变为非空
  • 当队列满时,存储元素的线程会等待队列可用

阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

2. 什么是生产者-消费者模型

生产者消费者模型是一种多线程并发协作的模型,由两类线程和一个缓冲区组成:生产者线程生产数据并把数据放在缓冲区,消费者线程从缓冲区取数据并消费。生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品。当存储空间为空时,消费者阻塞;当存储空间满时,生产者阻塞。
在这里插入图片描述

3. 生产者-消费者模型的意义

  1. 解耦合
  2. 削峰填谷

3.1 解耦合

两个模块的联系越紧密,耦合度就越高,耦合度越高就意味着两个模块的影响程度很大,当一个模块出现问题的时候,另一个模块也会受到影响而导致出现问题,特别是对分布式系统来说:解耦合是非常重要的。

在这里插入图片描述
假设上面是一个简单的分布式系统,服务器 A 和服务器 B 之间直接进行交互(服务器 A 向服务器 B 发送请求并接收服务器 B 返回的信息,服务器 B 向服务器 A 发送请求,以及接收服务器 A 返回的信息),服务器 A 和服务器 B 之间的耦合度比较高,当两个服务器之间的一个发生故障的时候就会导致两个服务器都无法使用。

不仅如此,当我们想要再添加一个服务器 C 与服务器 A 之间进行交互的时候,不仅需要对服务器 C 做出修改,还需要对服务器 A 作出修改。

相比上面的情况,如果我们使用生产者-消费者模型的话就可以解决上面的耦合度过高的问题。

在这里插入图片描述
服务器 A 接收到客户端发来的请求不是直接发送给服务器 B ,而是将接收到的请求加入到阻塞队列中,然后服务器 B 从阻塞队列中获取到请求,这样就避免了两个服务器之间进行直接的交互,降低了耦合性;不仅如此,当我们需要额外添加一个服务器 C 的时候,就不需要对服务器 A 做出修改,而是直接从阻塞队列获取请求信息。

在这里插入图片描述

3.2 削峰填谷

当客户端向服务器 A 短时间发出大量请求信息的话,那么当服务器 A 接收到客户端发来的请求的时候,就会立即将收到的所有信息都发送给服务器 B ,但是由于虽然服务器 A 能够接收的请求量可以很多,但是服务器 B 却不能一次接收这么多请求,就会导致服务器 B 会挂掉。

如果使用生产者-消费者莫模型的话,当客户端向服务器 A 短时间发送大量请求的话,服务器 A 不会将请求发送给 B ,而是发送给阻塞队列中,当阻塞队列满了的时候,服务器 A 就会停止向阻塞队列中发送请求,陷入阻塞状态,等服务器 B 向阻塞队列中受到请求使得阻塞队列容量减少的时候,服务器 A 才会继续向阻塞队列中发送收到的请求,这样就避免了服务器 B 短时间内受到大量的请求而挂掉的情况;如果阻塞队列中收到的请求信息被读取完的时候,服务器 B 就会停止从阻塞队列中读取请求,进入阻塞状态,直到服务器 A 向阻塞队列中发送请求。
在这里插入图片描述

4. 如何使用Java标准库提供的阻塞队列

当知道了生产者-消费者模型的意义之后,我们就来看看如何使用阻塞队列。在Java标准库中提供了阻塞队列 BlockingQueue 可以直接使用。

在这里插入图片描述
因为 BlockingQueu 是一个接口,无法实例化,所以需要创建出实现了 BlockingQueue 接口的类,而 ArrayBlockingQueue 和 LinkedBlockingQueue 实现了这个接口。

我们还可以观察到,BlockingQueue 还继承了 Queue ,也就是说我们也可以使用 Queue 中的方法,比如 offer 和 poll 等,但是在阻塞队列中不使用这两个方法,因为这两个方法不具有阻塞特性,而是使用 put 和 take 方法。

public class Demo1 {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> queue = new LinkedBlockingQueue<>();
        queue.put("123");
        queue.put("234");
        queue.put("345");
        queue.put("456");
        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println(queue.take());
    }
}

在这里插入图片描述
这里向阻塞队列中加入了四个数据,但是读取的时候读取了五次,所以看到线程进入了阻塞状态。

5. 自己实现一个阻塞队列

阻塞队列是建立在队列的基础上的,所以要想实现一个阻塞队列,首先需要实现出来一个队列,那么就先来看看如何实现出一个循环队列。

5.1 实现出循环队列

队列比较容易实现,但是循环队列该如何实现呢?当数据到达数组的最后的时候,将数组的下标修改为0,这样就可以达到循环的目的。

在这里插入图片描述
当 tail == head 的时候有两种情况:

  1. 队列中没有数据的时候
  2. 队列满了的时候

为了区分这两种情况,我们可以使用两种方法:

  1. 浪费一个空间,当tail++之后,如果tail + 1 == head,则表示队列满了,将 tail 修改为 0
  2. 定义一个size变量来表示队列中有效数据的个数,当size == queue.length的时候,表示队列满了
class MyQueue {
    private final String[] data = new String[1000];
    private int size;
    private int head = 0;
    private int tail = 0;
    public void put(String str) {
        //当添加数据的时候需要判断队列的容量是否已经满了
        if(size == data.length) return;
        data[tail++] = str;
        size++;
        if(tail == data.length) tail = 0;
    }

    public String take() {
    	//读取数据的时候需要判断队列是否为空
        if(size == 0) return null;
        String ret = data[head++];
        size--;
        if(head == data.length) head = 0;
        return ret;
    }
}

5.2 实现阻塞队列

阻塞队列就是在队列为空时,获取元素的线程会等待队列变为非空;当队列满时,存储元素的线程会等待队列可用。并且因为阻塞队列运用的环境是多线程,需要考虑到线程安全的问题。

5.2.1 加锁

当需要进行查询和修改的操作时,需要对该操作进行加锁。因为我们的 put 和 take 基本上都在查询和修改数据,所以可以将这两个操作直接进行加锁操作。

class MyBlockingQueue {
    private final String[] data = new String[1000];
    private int size;
    private int head = 0;
    private int tail = 0;
    public void put(String str) {
        synchronized (this) {
            if(size == data.length) return;
            data[tail++] = str;
            size++;
            if(tail == data.length) tail = 0;
        }
    }

    public String take() {
        synchronized (this) {
            if(size == 0) return null;
            String ret = data[head++];
            size--;
            if(head == data.length) head = 0;
            return ret;
        }
    }
}

5.2.2 进行阻塞操作

当进行完加锁操作之后,我们还需要实现阻塞的作用,当添加数据的时候,如果队列中容量满了的时候就进入阻塞等待状态,直到进行了 take 读取数据操作删除数据的时候,才停止等待;当读取数据的时候,如果队列为空,那么该线程就进入阻塞等待状态,直到进行了 put 操作。

class MyBlockingQueue {
    private final String[] data = new String[1000];
    private int size;
    private int head = 0;
    private int tail = 0;
    public void put(String str) throws InterruptedException {
        synchronized (this) {
            if(size == data.length) {
                this.wait();
            }
            data[tail++] = str;
            size++;
            if(tail == data.length) tail = 0;
            //这个 notify 用来唤醒 take 操作的等待
            this.notify();
        }
    }

    public String take() throws InterruptedException {
        synchronized (this) {
            if(size == 0) {
                this.wait();
            }
            String ret = data[head++];
            size--;
            if(head == data.length) head = 0;
            //这个 notify 用来唤醒 put 操作的等待
            this.notify();
            return ret;
        }
    }
}

5.2.3 解决因被 interrupt 唤醒 waiting 状态的问题

当使用了 wait 和 notify 对 put 和 take 操作进行了阻塞等待和唤醒操作之后,我们还需要注意,难道只有 notify 才会唤醒 WAITING 状态吗?前面我们学习了使用 interrupt 来终止线程,但是 interrup 还会唤醒处于 WAITING 状态的线程,也就是说这里的 WAITING 状态的线程不仅可以被 notify 唤醒,还可以被 interrupt 唤醒。

  1. 当线程是因为 put 操作队列满了的时候进入阻塞等待状态的时候,如果是因为被 interrupt 唤醒而不是 take 操作的 notify 唤醒的时候就意味着此时队列还是满的,当进行添加操作的时候,就会将有效的数据覆盖掉;
  2. 当线程是因为 take 操作队列为空的时候进入阻塞等待状态的时候,如果是因为被 interrupt 唤醒而不是 put 操作的 notify 唤醒的时候就意味着此时队列还是空的,如果进行删除操作,并没有意义。

为了解决 WAITING 状态被 interrupt 唤醒而造成的问题,当线程被唤醒的时候,需要进行判断 size 是否还等于 0 或者 queue.length,如果还等于,就继续进入 WAITING 状态,但是光一次判断是不够的,因为还可能是被 interrupt 唤醒的,所以需要进行多次判断,可以用 while 循环来解决。

class MyBlockingQueue {
    private final String[] data = new String[1000];
    private int size;
    private int head = 0;
    private int tail = 0;
    public void put(String str) throws InterruptedException {
        synchronized (this) {
            while(size == data.length) {
                this.wait();
            }
            data[tail++] = str;
            size++;
            if(tail == data.length) tail = 0;
            //这个 notify 用来唤醒 take 操作的等待
            this.notify();
        }
    }

    public String take() throws InterruptedException {
        synchronized (this) {
            while(size == 0) {
                this.wait();
            }
            String ret = data[head++];
            size--;
            if(head == data.length) head = 0;
            //这个 notify 用来唤醒 put 操作的等待
            this.notify();
            return ret;
        }
    }
}

5.2.4 解决因指令重排序造成的问题

因为 put 和 take 操作要进行读和写的操作,可能会因为指令重排序的问题造成其他问题,这里就需要使用 volatile 解决指令重排序问题。

class MyBlockingQueue {
    private final String[] data = new String[1000];
    private volatile int size;
    private volatile int head = 0;
    private volatile int tail = 0;
    public void put(String str) throws InterruptedException {
        synchronized (this) {
            while(size == data.length) {
                this.wait();
            }
            data[tail++] = str;
            size++;
            if(tail == data.length) tail = 0;
            //这个 notify 用来唤醒 take 操作的等待
            this.notify();
        }
    }

    public String take() throws InterruptedException {
        synchronized (this) {
            while(size == 0) {
                this.wait();
            }
            String ret = data[head++];
            size--;
            if(head == data.length) head = 0;
            //这个 notify 用来唤醒 put 操作的等待
            this.notify();
            return ret;
        }
    }
}

测试实现的阻塞队列

public class Demo4 {
    public static void main(String[] args) {
        MyBlockingQueue queue = new MyBlockingQueue();
        Thread t1 = new Thread(() -> {
            while(true) {
                try {
                    System.out.println("消费元素" + queue.take());
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        Thread t2 = new Thread(() -> {
            int count = 1;
            while(true) {
                try {
                    queue.put(count + "");
                    System.out.println("生产元素" + count);
                    count++;
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        t1.start();
        t2.start();
    }
}

让生产速度较慢,使得读取操作阻塞等待插入数据才执行。

在这里插入图片描述

public class Demo4 {
    public static void main(String[] args) {
        MyBlockingQueue queue = new MyBlockingQueue();
        Thread t1 = new Thread(() -> {
            while(true) {
                try {
                    System.out.println("消费元素" + queue.take());
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        Thread t2 = new Thread(() -> {
            int count = 1;
            while(true) {
                try {
                    queue.put(count + "");
                    System.out.println("生产元素" + count);
                    count++;
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        t1.start();
        t2.start();
    }
}

在这里插入图片描述

让生产 put 操作进入阻塞等待状态。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1011452.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SQlite操作后如何正确退出

在 C 语言中&#xff0c;使用 SQLite 库进行数据库操作后&#xff0c;可以通过以下步骤来正常退出和关闭 SQLite 连接&#xff1a; 关闭数据库连接&#xff1a;在完成数据库操作后&#xff0c;使用 sqlite3_close() 函数来关闭 SQLite 连接。该函数接受一个指向 sqlite3 数据库…

跨域问题解决方案(三种)

Same Origin Policy同源策略&#xff08;SOP&#xff09; 具有相同的Origin&#xff0c;也即是拥有相同的协议、主机地址以及端口。一旦这三项数据中有一项不同&#xff0c;那么该资源就将被认为是从不同的Origin得来的&#xff0c;进而不被允许访问。 Cross-origin resource…

Qt/C++音视频开发53-本地摄像头推流/桌面推流/文件推流/监控推流等

一、前言 编写这个推流程序&#xff0c;最开始设计的时候是用视频文件推流&#xff0c;后面陆续增加了监控摄像头推流&#xff08;其实就是rtsp视频流&#xff09;、网络电台和视频推流&#xff08;一般是rtmp或者http开头m3u8结尾的视频流&#xff09;、本地摄像头推流&#…

ArcGIS 10.2安装教程!

软件介绍&#xff1a;ArcGIS是一款专业的电子地图信息编辑和开发软件&#xff0c;提供一种快速并且使用简单的方式浏览地理信息&#xff0c;无论是2D还是3D的信息。软件内置多种编辑工具&#xff0c;可以轻松的完成地图生产全过程&#xff0c;为地图分析和处理提供了新的解决方…

【LeetCode75】第五十六题 爱吃香蕉的珂珂

目录 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 代码&#xff1a; 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 这道题挺炸裂的&#xff0c;题目给我们一个数组&#xff0c;数组里的每个元素表示每个仓库里的香蕉数量。 珂珂可以自己控制自己吃香蕉的…

头条移动端项目Day08 —— 定时计算热点文章、XXL-JOB

❤ 作者主页&#xff1a;欢迎来到我的技术博客&#x1f60e; ❀ 个人介绍&#xff1a;大家好&#xff0c;本人热衷于Java后端开发&#xff0c;欢迎来交流学习哦&#xff01;(&#xffe3;▽&#xffe3;)~* &#x1f34a; 如果文章对您有帮助&#xff0c;记得关注、点赞、收藏、…

算法——快乐数

202. 快乐数 - 力扣&#xff08;LeetCode&#xff09; 由图可知&#xff0c;其实这也是一个判断循环的过程&#xff0c;要用到快慢指针&#xff0c;且相遇后&#xff0c;若在全为1的循环里&#xff0c;那么就是快乐数&#xff0c;若相遇后不为1&#xff0c;说明这不是快乐数。 …

适合初学者快速入门的Numpy实战全集

适合初学者快速入门的Numpy实战全集 Numpy是一个用python实现的科学计算的扩展程序库&#xff0c;包括&#xff1a; 1、一个强大的N维数组对象Array&#xff1b;2、比较成熟的&#xff08;广播&#xff09;函数库&#xff1b;3、用于整合C/C和Fortran代码的工具包&#xff1b…

文心一言 VS 讯飞星火 VS chatgpt (92)-- 算法导论8.3 5题

五、用go语言&#xff0c;在本节给出的第一个卡片排序算法中&#xff0c;为排序 d 位十进制数&#xff0c;在最坏情况下需要多少轮排序?在最坏情况下&#xff0c;操作员需要记录多少堆卡片? 文心一言&#xff1a; 你提到的第一个卡片排序算法可能是指的基数排序&#xff08…

ABAQUS三维随机生长四参数法插件:QSGS3D - AbyssFish

插件介绍 QSGS3D - AbyssFish 插件可在Abaqus软件基于Quartet Structure Generation Set&#xff08;QSGS&#xff09;随机生长四参数生成法构建三维多孔介质双相材料&#xff0c;插件可指定生成试件的长度、宽度、高度以及划分的网格单元尺寸。可控制随机生长四参数中的分布概…

为什么建议将常量用const关键字来修饰

嵌入式软件中&#xff0c;内存资源是非常宝贵的&#xff0c;即sram资源。因此我们在编码过程中需要规划好并且使用好sram资源&#xff0c;这点非常重要&#xff01; 在此之前需要预备一点基础知识&#xff0c;在IAR中&#xff0c;一般会用ICF配置文件给工程配置存储区域&#…

Ae 效果:CC Particle World

模拟/CC Particle World Simulation/CC Particle World CC Particle World&#xff08;CC 粒子世界&#xff09;用于在三维空间中生成和模拟各种粒子系统&#xff0c;包括火焰、雨、雪、爆炸、烟雾等等。 效果名称左侧的立方体图标表示此效果支持 3D 摄像机。本效果也内置了“效…

用原生input type=range 写一个滑块,兼容各大浏览器

属性描述max设置或返回滑块控件最大值min设置或返回滑块控件最小值step设置或返回每次拖动滑块控件时的递增量value设置或返回滑块控件的value值defaultValue设置或返回滑块控件的默认值autofocus设置或返回滑块控件在页面加载后是否应自动获取焦点 先看谷歌浏览器: 览器 用…

centos免密登录

centos免密登录 小白教程&#xff0c;一看就会&#xff0c;一做就成。 1.知道服务器密码的情况 ssh-keygen -t rsa #上面的命令后三次回车#然后把想要免密登录的服务器加进来 ssh-copy-id -i /root/.ssh/id_rsa.pub root192.168.10.115 #免密码登录被控的主机&#xff08;ip是…

R语言用逻辑回归预测BRFSS中风数据、方差分析anova、ROC曲线AUC、可视化探索

全文链接&#xff1a;https://tecdat.cn/?p33659 行为风险因素监测系统&#xff08;BRFSS&#xff09;是一项年度电话调查。BRFSS旨在确定成年人口中的风险因素并报告新兴趋势&#xff08;点击文末“阅读原文”获取完整代码数据&#xff09;。 相关视频 例如&#xff0c;调查对…

用户生命周期价值-LTV、PEST分析、集团战略规划方案、麦肯锡的市场研究方法、数据规划架构设计··· | 本周精华...

▲点击上方卡片关注我&#xff0c;回复“8”&#xff0c;加入数据分析领地&#xff0c;一起学习数据分析&#xff0c;持续更新数据分析学习路径相关资料~&#xff08;精彩数据观点、学习资料、数据课程分享、读书会、分享会等你一起来乘风破浪~&#xff09;回复“小飞象”&…

C【分支语句和循环语句】

1.if语句 //多分支 if(表达式1)语句1; else if(表达式2)语句2; else语句3;如果表达式的结果为真&#xff0c;则语句执行。 在C语言中如何表示真假&#xff1f; 0表示假&#xff0c;非0表示真。 #include <stdio.h> int main() {if(表达式){语句列表1&#xff1b;}el…

美团多场景建模的探索与实践

总第574篇 2023年 第026篇 本文介绍了美团到家/站外投放团队在多场景建模技术方向上的探索与实践。基于外部投放的业务背景&#xff0c;本文提出了一种自适应的场景知识迁移和场景聚合技术&#xff0c;解决了在投放中面临外部海量流量带来的场景数量丰富、场景间差异大的问题&a…

JAVA - File类、字节流、字符流、特殊操作流

1.File类的构造方法 File类的创建文件功能 File类的判断和获取功能 File类的删除功能 2.IO流 - 字节流、字符流 字节流 处理字节数据&#xff1a;字节流以字节为单位处理数据&#xff0c;适用于处理二进制文件&#xff08;如图像、音频、视频文件&#xff09;或以字节为基本单…

Unity工具——LightTransition(光照过渡)

需求描述 在游戏中&#xff0c;开发者为了让玩家更直接地看到待拾取的物品从而为其添加一种闪烁效果&#xff0c;或者模拟现实中闪烁的灯光效果&#xff0c;我能够想到的一种方案则是通过控制光照强度来实现&#xff0c;那么本篇文章我们就尝试通过这个方案来实现一下&#xff…