多线程 - 阻塞式队列

news2025/1/14 18:15:35

v2-8cffdf8d243386262aee75b587db14bd_b

阻塞队列

阻塞队列,也是一个队列 ~~ 先进先出
实际上有一些特殊的队列,不一定非得遵守先进先出的 ~~ 优先级队列(PriorityQueue)
阻塞队列,也是特殊的队列,虽然也是先进先出的,但是带有特殊的功能: 阻塞

  1. 如果队列为空,执行出队列操作,就会阻塞.阻塞到另一个线程往队列里添加元素(队列不空)为止.

  2. 如果队列满了,执行入队列操作,也会阻塞.阻塞到另一个线程从队列取走元素位置(队列不满)

消息队列,也是特殊的队列.相当于是在阻塞队列的基础上,加上了个"消息的类型”按照制定类别进行先进先出.此时我们谈到的这个消息队列,仍然是一个“数据结构”.

因为这个消息队列,太好用了 ~~ 因此就有大佬把这样的数据结构,单独实现成了一个程序,这个程序,可以同过网络的方式和其它程序进行通信.

这时这个消息队列,就可以单独部署到一组服务器(分布式).存储能力和转发能力都大大提升了.
很多大型项目里,都可以看到这样的消息队列的身影.

此时消息队列,就已经成为了一个可以和MySQL,Redis这种相提并论的一个重要组件(“中间件”)
rabbit mq 就是消息队列中一种典型实现.还有其他的实现. active mq, rocket mq, kafka ……. 都是业界知名的消息队列.

要想认识清楚消息队列,还是得先认识清楚“阻塞队列”

为什么消息队列这么好用了? 和阻塞队列阻塞的特性关系非常大!!!
基于这样的特性,可以实现“生产者消费者模型”

过年.有个环节就是年夜饭,包饺子 ~~ 一家人在一起包饺子 ~~ 有些地方过年是吃汤圆的
包饺子的环节:挨饺子皮+包饺子

两种典型的包法:

  1. 每个人都分别进行擀饺子皮 + 包饺子 操作
  2. 一个人专门负责擀饺子皮,每次擀好一个皮,就放到盖帘上,其他人负责包,每次都从盖帘上取一个皮进行包 => 这种方式就称为生产者消费者模型.

生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等
待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取.

此时,负责擀饺子皮的人,就是生产者.
负责包饺子的人,就是消费者.
盖帘就是阻塞队列.

如果擀饺子皮的人擀得太慢了,包饺子的人就得等.
如果擀饺子皮的人擀得太快了,包饺子的人一时就包不完,擀饺子皮的人就得停下来等一会儿

image-20231002091352683


生产者消费者模型,能给我们的程序带来两个非常重要的好处!!!

1.实现了发送方和接收方之间的"解耦"

~~ 降低耦合的过程,就叫做“解耦"
耦合两个模块之间的关联关系是强还是弱
~~ 关联越强,耦合越高

开发中典型的场景:服务器之间的相互调用.

image-20231002095819632

image-20231002110305649

3.可以做到“削峰填谷”,保证系统的稳定性

image-20231002111335375

三峡大坝 ~~ 起到的效果,就是削峰填补

洪灾就是雨量达到峰值了,上游水量增长了,大坝把水拦住,让下游仍然有一个比较平缓的流量
到了旱季,三峡大坝开闸放水.

image-20231002120015605

服务器的开发,也和上述这个模型是非常相似的.
上游,就是用户发送的请求.
下游就是一些执行具体业务的服务器.

用户发多少请求?是不可控的.有的时候,请求多,有的时候请求少…
这时就涉及到了,热搜(比如什么微博热搜什么的)这个概念.
那个词会成为热搜,这是和社会现象有关的,比如新冠期间,”疫情”…就是热搜词.
说不定某个瞬间,很多用户都要来给你发起请求 ~~ 服务器处理每个请求,都需要消耗硬件资源,包括不限于(CPU,内存,硬盘,带宽…),如果某个硬件资源达到瓶颈,此时服务器就挂了,这就给系统的稳定性带来风险!!!
使用生产者消费者模型,就是一个有效的手段!!!

使用标准库提供的阻塞队列

public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> {}

LinkedBlockingQueue<> (java.util.concurrent) => 基于链表实现的阻塞队列
priorityBlockingQueue<> (java.util.concurrent) => 带有优先级的阻塞队列 ~~ 基于数据结构堆实现的
ArrayBlockingQueue<> (java.utii.concurrent) => 基于数组实现的阻塞队列

Queue提供的方法有三个:

  1. 入队列 ~~ offer
  2. 出队列 ~~ poll
  3. 取队首元素 ~~ peek

阻塞队列主要方法是两个:

  1. 入队列(带有阻塞功能) ~~ put
  2. 出队列(带有阻塞功能) ~~ take
public static void main(String[] args) throws InterruptedException {
    BlockingDeque<String> blockingDeque = new LinkedBlockingDeque<>();
    blockingDeque.put("fly0213"); // 入队列
    String res = blockingDeque.take(); // 出队列. 
    System.out.println(res);
    res = blockingDeque.take(); // 队列中没有元素了, take, 就会阻塞.
    System.out.println(res);
}

现在基于标准库的阻塞队列,写一个简单的生产者消费者模型的代码

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;

/**
 * Created with IntelliJ IDEA.
 * Description:
 * User: fly(逐梦者)
 * Date: 2023-10-02
 * Time: 14:30
 */
public class ThreadDemo22 {
    public static void main(String[] args) {
        BlockingDeque<Integer> blockingDeque = new LinkedBlockingDeque<>();

        // 创建两个线程, 来作为生产者和消费者
        Thread customer = new Thread(() -> {
            while (true) {
                try {
                    Integer result = blockingDeque.take();
                    System.out.println("消费元素: " + result);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        customer.start();

        Thread producer = new Thread(() -> {
            int count = 0;
            while(true){
                try {
                    blockingDeque.put(count);
                    System.out.println("生产元素: "+ count);
                    count++;
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        producer.start();
    }
}

自己实现一个简单的阻塞队列

~~ 通过编写阻塞队列的代码,来更好的理解多线程

实现一个阻塞队列,需要分三步:

  1. 先写一个普通的队列,队列的实现可以基于数组,也可以基于链表(易于实现头删/尾插).
  2. 加上线程安全.
  3. 加上阻塞功能.

注: 基于链表实现普通队列的时候,需要头删和尾插的时间复杂度都是O(1).
链表头删的时间复杂度本来就是O(1),无需注意,只是链表的尾删操作时间复杂度要实现O(1),需要用一个额外的引用,记录当前的尾结点 ~~ 相关代码没什么难度,博主就不做讲解了!

用数组循环对列的方式来实现阻塞队列
image-20231002174113723

1. 实现普通队列的代码

/**
 * Created with IntelliJ IDEA.
 * Description:
 * User: fly(逐梦者)
 * Date: 2023-10-02
 * Time: 15:08
 */

// 自己写的阻塞队列
// 注: 此处不考虑泛型, 直接使用 int 来表示元素类型
class MyBlockingQueue {
    private int[] items = new int[1000];
    private int head = 0;
    private int tail = 0;
    private int size = 0;

    // 入队列
    public void put(int value) {
        if (size == items.length) {
            // 队列满了, 不能继续插入
            return;
        }
        items[tail] = value;
        tail++;
        // 对 tail 进行处理
        // 第一种写法
        // tail = tail % items.length;

        // 第二种写法 ( 更推荐, 可读性更高 & % 在效率上没有优势 )
        if (tail >= items.length) {
            tail = 0;
        }
        size++;
    }

    // 出队列
    public Integer take() {
        if (size == 0) {
            // 队列空, 不能出队列
            return null;
        }
        int result = items[head];
        head++;
        if (head >= items.length) {
            head = 0;
        }
        size--;
        return result;
    }
}

public class ThreadDemo23 {
    public static void main(String[] args) {
        MyBlockingQueue queue = new MyBlockingQueue();
        queue.put(1);
        queue.put(2);
        queue.put(3);
        queue.put(4);
        int result = 0;
        result = queue.take();
        System.out.println("result = " + result);
        result = queue.take();
        System.out.println("result = " + result);
        result = queue.take();
        System.out.println("result = " + result);
        result = queue.take();
        System.out.println("result = " + result);
    }
}

2. 阻塞功能 ~~ 意味着队列要在多线程环境下使用

保证线程安全,主要是就是加上锁 ~~ 使用 synchronized 包裹put()take()方法里的所有代码,当然, synchronized 加到方法上,也是可以的.

// 自己写的阻塞队列
// 注: 此处不考虑泛型, 直接使用 int 来表示元素类型
class MyBlockingQueue {
    private int[] items = new int[1000];
    private int head = 0;
    private int tail = 0;
    private int size = 0;

    // 入队列
    public void put(int value) {
        synchronized (this) {
            if (size == items.length) {
                // 队列满了, 此时要产生阻塞
                // return;
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            items[tail] = value;
            tail++;
            // 对 tail 进行处理
            // 第一种写法
            // tail = tail % items.length;

            // 第二种写法 ( 更推荐, 可读性更高 & % 在效率上没有优势 )
            if (tail >= items.length) {
                tail = 0;
            }
            size++;

            // 这个 notify 唤醒 take 中的 wait
            this.notify();
        }
    }

    // 出队列
    public Integer take() {
        int result = 0;
        synchronized (this) {
            if (size == 0) {
                // 队列空, 此时也需要阻塞
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return null;
            }
            result = items[head];
            head++;
            if (head >= items.length) {
                head = 0;
            }
            size--;
            // 唤醒 put 中的 wait
            this.notify();
        }
        return result;
    }
}

image-20231002175701861

这两个线程中的 wait 是否可能会同时触发 ???
~~ 如果同时触发了,显然就不能正确相互唤醒了
答案是否定的,针对同一个队列,不能够既是满,又是空,除非它跟薛定谔的猫一样,嘿嘿!!!

3.上述代码还有一个瑕疵

if (size == items.length) { this.wait(); }

当wait被唤醒的时候,此时if的条件,一定就不成立了嘛??
具体来说, put中的 wait 被唤醒后,要求队列没有满,
但是 wait 被唤醒了之后,队列一定是不满的嘛?
在当前代码中,虽然不会出现这种情况.当前代码一定是取元素成功才唤醒,每次取元素都会唤醒.
但是稳妥起见,最好的办法,是wait返回之后再次判定一下,看此时的条件是不是具备了!!!
例子: 类似于,早上起床上课,要定个8:20的闹钟.正常情况下,闹钟响了,就该起床了.
但是如果7:00就醒了,就发现,距离上课还早(条件尚未满足),还可以再睡会(每次醒了,应该都看下时间,判定一下当前是否满足了起床的条件).

if (size == items.length) {
    this.wait();      
    if (size == items.length) {
        this.wait();
     } 
}

这么写是进行了二次判定了,但是并不合适.可能第二次wait被唤醒,可能还是条件不具备~~

4.完整版的代码


/**
 * Created with IntelliJ IDEA.
 * Description:
 * User: fly(逐梦者)
 * Date: 2023-10-02
 * Time: 15:08
 */

// 自己写的阻塞队列
// 注: 此处不考虑泛型, 直接使用 int 来表示元素类型
class MyBlockingQueue {
    private int[] items = new int[1000];
    private int head = 0;
    private int tail = 0;
    private int size = 0;

    // 入队列
    public void put(int value) {
        synchronized (this) {
            while (size == items.length) { // 标准库就建议这么写!!!
                // 队列满了, 此时要产生阻塞
                // return;
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            items[tail] = value;
            tail++;
            // 对 tail 进行处理
            // 第一种写法
            // tail = tail % items.length;

            // 第二种写法 ( 更推荐, 可读性更高 & % 在效率上没有优势 )
            if (tail >= items.length) {
                tail = 0;
            }
            size++;

            // 这个 notify 唤醒 take 中的 wait
            this.notify();
        }
    }

    // 出队列
    public Integer take() {
        int result = 0;
        synchronized (this) {
            while (size == 0) {
                // 队列空, 此时也需要阻塞
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            result = items[head];
            head++;
            if (head >= items.length) {
                head = 0;
            }
            size--;
            // 唤醒 put 中的 wait
            this.notify();
        }
        return result;
    }
}

public class ThreadDemo23 {
    public static void main(String[] args) {
        MyBlockingQueue queue = new MyBlockingQueue();
        queue.put(1);
        queue.put(2);
        queue.put(3);
        queue.put(4);
        int result = 0;
        result = queue.take();
        System.out.println("result = " + result);
        result = queue.take();
        System.out.println("result = " + result);
        result = queue.take();
        System.out.println("result = " + result);
        result = queue.take();
        System.out.println("result = " + result);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1056504.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Go:实现SMTP邮件发送订阅功能(包含163邮箱、163企业邮箱、谷歌gmail邮箱)

需求很简单&#xff0c;就是用户输入自己的邮箱后&#xff0c;使用官方邮箱给用户发送替邮件模版 目录 前置邮件模版邮箱开启SMTP服务163邮箱163企业邮箱谷歌gmail邮箱腾讯企业邮箱-失败其他邮箱-未操作 邮件发送核心代码config.yaml配置读取邮件相关配置发送邮件 附录 前置 邮…

深度学习笔记之线性代数

深度学习笔记之线性代数 一、向量 在数学表示法中&#xff0c;向量通常记为粗体小写的符号&#xff08;例如&#xff0c;x&#xff0c;y&#xff0c;z&#xff09;当向量表示数据集中的样本时&#xff0c;它们的值具有一定的现实意义。例如研究医院患者可能面临的心脏病发作风…

Ubuntu系统初始设置

更换国内源 安装截图工具 安装中文输入法 安装QQ 参考&#xff1a; 安装双系统win10Ubuntu20.04LTS&#xff08;详细到我自己都害怕&#xff09; 引导方式磁盘分区方法UEFIGPTLegancyMBR 安装网络助手 sudo apt install net-tools 安装VS Code 使用从官网下载.deb安装包…

MySQL使用Xtrabackup在线做主从

1、主库上操作 1.1前提 172.16.11.2&#xff08;主库&#xff09; 172.16.11.4&#xff08;从库&#xff09; 在执行备份之前&#xff0c;确保数据库没有锁定&#xff0c;以避免备份期间的任何写操作。 确保主库上的 MySQL 服务器正在运行&#xff0c;以便备份数据的一致性。…

八、2023.10.2.Linux(二).8

文章目录 17、简述一下虚拟内存和物理内存&#xff0c;为什么要用虚拟内存&#xff0c;好处是什么&#xff1f;18、虚拟地址到物理地址怎么映射的&#xff1f;19、说说堆栈溢出是什么&#xff0c;会怎么样&#xff1f;20、简述操作系统中malloc的实现原理?21、说说进程空间从高…

uboot启动流程-涉及board_init_f 函数

一. uboot启动流程 _main 函数中会调用 board_init_f 函数&#xff0c;本文简单分析一下 board_init_f 函数。 二. board_init_f 函数 board_init_f 函数主要有两个工作&#xff1a; (1) 初始化一系列外设&#xff0c;比如串口、定时器&#xff0c;或者打印一些消息等。…

Docker Tutorial

什么是Docker 为每个应用提供完全隔离的运行环境 Dockerfile&#xff0c; Image&#xff0c;Container Image&#xff1a; 相当于虚拟机的快照&#xff08;snapshot&#xff09;里面包含了我们需要部署的应用程序以及替它所关联的所有库。通过image&#xff0c;我们可以创建很…

音乐创作软件:ToneLIB Jam v4.7.8 Crack

从强大的选项卡编辑器到 3D 模式 Tonelib Jam 是一款用于播放和创作音乐的综合软件应用程序。TL Jam专为初学者和经验丰富的吉他手而设计&#xff0c;可以提供一个完美的平台来掌握乐器&#xff0c;让您轻松学习自己喜欢的歌曲或设置高效的日常吉他练习程序。TL Jam 具有功能强…

目标检测|边框检测框转换,交并比计算 代码实现

文章目录 1. 相互转换的函数2.交并比实现 在目标检测任务中&#xff0c;非常重要的一部分就是框出检测框 这就需要检测框的位置大小等一些信息 一般我们有如下两种方式标记一个检测的位置和大小 1 两点法 检测框左上角坐标(x1,y1)&#xff0c;检测框右下角坐标&#xff08;x2…

2023年最新云存储工具排行榜:找到适合你的云存储服务

随着数据规模的不断增长&#xff0c;传统的本地存储已经无法满足用户的需求。云存储工具通过提供灵活、安全和高效的数据存储服务&#xff0c;成为了现代化的数据管理方式。在众多云存储工具中&#xff0c;有一些在功能和性能方面表现出色&#xff0c;成为用户首选。下面是2023…

时间序列-AR模型与MA模型的原理与实现

文章目录 1 自回归模型AR Model1.1 自回归模型 vs 多元线性回归模型1.1.1 线性回归1.1.2 AR(1)模型1.1.3 AR(p)模型 1.2 AR建模问题 2 移动平均模型 MA Model2.1 MA模型的数学表示2.1.1 MA(1)模型2.2.2 MA(q)模型 2.2 MA建模问题 ARIMA模型是AR模型&#xff08;自回归模型&…

计算机网络(二):物理层

参考引用 计算机网络微课堂-湖科大教书匠计算机网络&#xff08;第7版&#xff09;-谢希仁 1. 物理层的基本概念 物理层考虑的是怎样才能在连接各种计算机的传输媒体上传输数据比特流物理层为数据链路层屏蔽了各种传输媒体的差异&#xff0c;使数据链路层只需要考虑如何完成本…

levelDB引擎

一、背景 1.1、影响磁盘性能的因素&#xff1a; 主要受限于磁盘的寻道时间&#xff0c;优化磁盘数据访问的方法是尽量减少磁盘的IO次数。磁盘数据访问效率取决于磁盘IO次数&#xff0c;而磁盘IO次数又取决于数据在磁盘上的组织方式。磁盘数据存储大多采用B树类型数据结构&…

排序篇(三)----交换排序

排序篇(三)----交换排序 1.冒泡排序 基本思想: ​ 通过不断地比较相邻的元素&#xff0c;将较大的元素往后移动&#xff0c;从而实现排序的目的。 具体的步骤如下&#xff1a; 从待排序的数组中选择相邻的两个元素进行比较&#xff0c;如果前一个元素大于后一个元素&#…

Java编程技巧:swagger2、knif4j集成SpringBoot或者SpringCloud项目

目录 1、springbootswagger2knif4j2、springbootswagger3knif4j3、springcloudswagger2knif4j 1、springbootswagger2knif4j 2、springbootswagger3knif4j 3、springcloudswagger2knif4j 注意点&#xff1a; Api注解&#xff1a;Controller类上的Api注解需要添加tags属性&a…

在云服务器上打开ftp服务-踩坑及心得

我们产生这个需求的原因是因为打算搭建一个博客&#xff0c;选择了使用wordpress框架。然后&#xff0c;在安装插件的过程中&#xff0c;需要使用FTP服务进行操作。于是&#xff0c;我们决定搞清楚这个过程&#xff0c;并在其中遇到的困难进行记录。 一、安装vsftpd # 安装 s…

python生成中金所期权行权价

参考沪深300股指期权的合约表&#xff0c;写一个工具函数&#xff1a; 使用方法 def get_format_option_gap(value: float, deviation: int 0): # 根据中证1000指数获取点位"""根据标准的行权价&#xff0c;生成不同档位的期权列表&#xff0c;适合中金所:…

铁道货车通用技术条件

声明 本文是学习GB-T 5600-2018 铁道货车通用技术条件. 而整理的学习笔记,分享出来希望更多人受益,如果存在侵权请及时联系我们 90 mm90 mm。 B.2 制造要求 B.2.1 车体钢结构组成后&#xff1a; a) 敞车钢质侧、端板的平面度公差应小于或等于15 mm/m; 压型侧、端板的平面度…

S0003-Mac下iTerm2+zsh+ohmyzsh打造优雅美观终端

背景 优雅耐看的终端工具&#xff0c;必是每个程序员的追求。 本人也不例外&#xff0c;从业几年先后使用过&#xff1a; windows电脑&#xff1a;cmd、git bash、wsl zsh、terminal zshMac电脑: 自带terminal、iTerm2、terminal zsh 其中windows terminal zsh、mac ter…

数据结构——二叉树的基本概念及顺序存储(堆)

目录 一.前言 二.树概念及结构 2.1 树的概念 2.2 树的相关概念 2.3 树的表现 2.4 树在实际中的应用&#xff08;表示文件系统的目录树结构&#xff09; 三.二叉树的概念及结构 3.1 概念 3.2 特殊的二叉树 3.3 二叉树的性质 3.4 二叉树的存储结构 3.4.1 顺序存储 3…