【Java多线程案例】实现阻塞队列

news2025/1/13 10:33:51

1. 阻塞队列简介

1.1 阻塞队列概念

阻塞队列:是一种特殊的队列,具有队列"先进先出"的特性,同时相较于普通队列,阻塞队列是线程安全的,并且带有阻塞功能,表现形式如下:

  • 当队列满时,继续入队列就会阻塞,直到有其他线程从队列中取出元素
  • 当队列空时,继续出队列就会阻塞,直到有其他线程往队列中插入元素

基于阻塞队列我们可以实现生产者消费者模型,这在后端开发场景中是相当重要的!

1.2 生产者-消费者模型优势

基于阻塞队列实现的 生产者消费者模型 具有以下两大优势:

  1. 解耦合:

image.png
以搜狗搜索的服务器举例,用户输入搜索关键字 **美容,**客户端的请求到达搜狗的"入口服务器"时,会将请求转发到 广告服务器大搜索服务器,此时广告服务器返回相关广告内容,大搜索服务器根据搜索算法匹配对应结果返回,如果按照这种方式通信,那么入口服务器需要编写两套代码分别同广告服务器和大搜索服务器进行交互,并且一个严重问题是如果其中广告服务器宕机了,会导致入口服务器无法正常工作进而影响大搜索服务器也无法正常工作!!
image.png
而引入阻塞队列后,入口服务器不需要知晓广告服务器和大搜索服务器的存在,只需要往阻塞队列中发送请求即可,而广告服务器和大搜索服务器也不需要知道入口服务器的存在,只需要从阻塞队列中取出请求处理完毕返回给阻塞队列即可,并且当其中大搜索服务器宕机时,不影响其他服务器以及入口服务器的正常运作!

  1. 削峰填谷:

image.png
如果没有阻塞队列,当遇到一些突发场景例如"双十一"大促等客户请求量激增的时候,入口服务器转发的请求量增多,压力就会变大,同理广告服务器和大搜索服务器处理过程复杂繁多,消耗的硬件资源就会激增,达到硬件瓶颈之后服务器就宕机了(直观现象就是客户端发送请求,服务器不会响应了)
image.png
而引入阻塞队列/消息队列之后,由于阻塞队列只负责存储相应的请求或者响应,无需额外的业务处理,因此抗压能力比广告服务器和大搜索服务器更强,当客户请求量激增的时候交由阻塞队列承受,而广告服务器和大搜索服务器只需要按照特定的速率进行读取并返回处理结果即可,就起到了 削峰填谷 的作用!

注意:此处的阻塞队列在现实场景中并不是一个单纯的数据结构,往往是一个基于阻塞队列的服务器程序,例如消息队列(MQ)

2. 标准库中的阻塞队列

2.1 基本介绍

Java标准库提供了现成的阻塞队列数据结构供开发者使用,即BlockingQueue接口
BlockingQueue:该接口具有以下实现类:

  1. ArrayBlockingQueue:基于数组实现的阻塞队列
  2. LinkedBlockingQueue:基于链表实现的阻塞队列
  3. PriorityBlockingQueue:带有优先级的阻塞队列

BlockingQueue方法:该接口具有以下常用方法

  1. 带有阻塞功能:
  • put:向队列中入元素,队列满则阻塞等待
  • take:向队列中取出元素,队列空则阻塞等待
  1. 不带有阻塞功能:
  • peek:返回队头元素(不取出)
  • poll:返回队头元素(取出)
  • offer:向队列中插入元素

2.2 代码示例

/**
 * 测试Java标准库提供的阻塞队列实现
 */
public class TestStandardBlockingQueue {

    private static BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
    public static void main(String[] args) {
        // 生产者
        Thread t1 = new Thread(() -> {
            int i = 0;
            while (true) {
                try {
                    queue.put(i);
                    System.out.println("生产数据:" + i);
                    i++;
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        // 消费者
        Thread t2 = new Thread(() -> {
            while (true) {
                try {
                    Thread.sleep(1000);
                    int ele = queue.take();
                    System.out.println("消费数据:" + ele);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        t1.start();
        t2.start();
    }
}

运行效果
image.png
我们在主线程中创建了两个线程,其中t1线程作为生产者不断循环生产元素,而线程t2作为消费者每隔1s消费一个数据,所以我们很快看到当生产数据个数达到容量capacity时就会继续生产就会阻塞等待,直到消费者线程消费数据后才可以继续入队列,这样就实现了一个 生产者-消费者模型

3. 自定义实现阻塞队列

首先我们需要明确实现一个阻塞队列需要哪些步骤?

  1. 首先我们需要实现一个普通队列
  2. 使用锁机制将普通队列变成线程安全的
  3. 通过特殊机制让该队列能够带有"阻塞"功能

3.1 实现普通队列

相信大家如果学过 数据结构与算法 相关课程,应该对队列这种数据结构的实现并不陌生!实现队列有基于数组的也有基于链表的,我们此处采用基于数组实现的,基于数组实现的循环队列也有以下两种方式:

  1. 腾出一个空间用来判断队列空或者满
  2. 使用额外的变量size用来记录当前元素的个数

我们使用第二种方式实现,实现代码如下:

/**
 * 自定义实现阻塞队列
 */
public class MyBlockingQueue {
    private int head = 0; // 头指针
    private int tail = 0; // 尾指针
    private int size = 0; // 当前元素个数
    private String[] array = null;
    private int capacity; // 容量

    public MyBlockingQueue(int capacity) {
        this.capacity = capacity;
        this.array = new String[capacity];
    }

    /**
     * 入队列方法
     */
    public void put(String elem) {
        if (size == capacity) {
            // 队列已经满了
            return;
        }
        array[tail] = elem;
        tail++;
        if (tail >= capacity) {
            tail = 0;
        }
        size++;
    }

    /**
     * 出队列方法
     */
    public String take() {
        // 判断队列是否为空
        if (size == 0) {
            return null;
        }
        String topElem = array[head];
        head++;
        if (head >= capacity) {
            head = 0;
        }
        size--;
        return topElem;
    }

    public static void main(String[] args) {
        MyBlockingQueue queue = new MyBlockingQueue(3);
        queue.put("11");
        queue.put("22");
        queue.put("33");
        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println(queue.take());
    }
}

3.2 引入锁机制实现线程安全

引入synchronized关键字在原有队列实现的基础上实现线程安全,代码如下:

/**
 * 自定义实现阻塞队列
 */
public class MyBlockingQueue {
    private int head = 0; // 头指针
    private int tail = 0; // 尾指针
    private int size = 0; // 当前元素个数
    private String[] array = null;
    private int capacity; // 容量
    private Object locker = new Object(); // 锁对象

    public MyBlockingQueue(int capacity) {
        this.capacity = capacity;
        this.array = new String[capacity];
    }

    /**
     * 入队列方法
     */
    public void put(String elem) {
        synchronized (locker) {
            if (size == capacity) {
                // 队列已经满了
                return;
            }
            array[tail] = elem;
            tail++;
            if (tail >= capacity) {
                tail = 0;
            }
            size++;
        }
    }

    /**
     * 出队列方法
     */
    public String take() {
        String topElem = "";
        synchronized (locker) {
            // 判断队列是否为空
            if (size == 0) {
                return null;
            }
            topElem = array[head];
            head++;
            if (head >= capacity) {
                head = 0;
            }
            size--;
        }
        return topElem;
    }

    public static void main(String[] args) {
        MyBlockingQueue queue = new MyBlockingQueue(3);
        queue.put("11");
        queue.put("22");
        queue.put("33");
        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println(queue.take());
    }
}

我们在puttake等关键方法上将 多个线程修改同一个变量 部分的操作进行加锁处理,实现线程安全!

3.3 加入阻塞功能

在普通队列的实现中,如果队列满或者空我们直接使用return关键字返回,但是在多线程环境下我们希望实现阻塞等待的功能,这就可以使用Object类提供的wait/notify这组方法实现阻塞与唤醒机制了!我们就需要考虑阻塞与唤醒的时机了!
何时阻塞:这个问题非常简单,当队列满时入队列操作就应该阻塞等待,而当队列为空时出队列操作就需要阻塞等待
何时唤醒:想必大家都可以想到,对于入队列操作来说,只要队列不满就可以被唤醒,而对于出队列操作来说,队列不为空就可以被唤醒,因此,只要有线程调用take操作出队列,那么入队列的线程就可以被唤醒,而只要有线程调用put操作入队列,那么出队列的线程就可以被唤醒

/**
 * 自定义实现阻塞队列
 */
public class MyBlockingQueue {
    private int head = 0; // 头指针
    private int tail = 0; // 尾指针
    private int size = 0; // 当前元素个数
    private String[] array = null;
    private int capacity; // 容量
    private Object locker = new Object(); // 锁对象

    public MyBlockingQueue(int capacity) {
        this.capacity = capacity;
        this.array = new String[capacity];
    }

    /**
     * 入队列方法
     */
    public void put(String elem) {
        synchronized (locker) {
            while (size == capacity) {
                // 队列已经满了(进行阻塞)
                try {
                    locker.wait();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            array[tail] = elem;
            tail++;
            if (tail >= capacity) {
                tail = 0;
            }
            size++;
            locker.notifyAll();
        }
    }

    /**
     * 出队列方法
     */
    public String take() {
        String topElem = "";
        synchronized (locker) {
            // 判断队列是否为空
            while (size == 0) {
                try {
                    locker.wait();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            topElem = array[head];
            head++;
            if (head >= capacity) {
                head = 0;
            }
            size--;
            locker.notifyAll();
        }
        return topElem;
    }

    public static void main(String[] args) {
        MyBlockingQueue queue = new MyBlockingQueue(10);
        // 生产者
        Thread producer = new Thread(() -> {
            int i = 0;
            while (true) {
                queue.put(i + "");
                System.out.println("生产元素:" + i);
                i++;
            }
        });
        // 消费者
        Thread consumer = new Thread(() -> {
            while (true) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                String elem = queue.take();
                System.out.println("消费元素" + elem);
            }
        });
        producer.start();
        consumer.start();
    }
}

我们使用wait/notify这组操作实现了阻塞/唤醒功能,并且满足必须使用在synchronized关键字内部的使用条件,这里有一个注意点

为什么我们将if判断条件改成了while循环呢???这是需要考虑清楚的!

image.png
如图所示:一开始由于队列满所以生产者1进入阻塞状态,释放锁,然后生产者2也进入阻塞状态释放锁,此时消费者消费一个元素后唤醒生产者1,然后生产者1生产一个元素后(记住此时队列已满)继续唤醒,但是此时唤醒的恰恰是 生产者2 ,生产者2继续执行生产元素,于是就出现问题,我们总结一下出现问题的原因:

  1. notifyAll是随机唤醒,无法指定唤醒线程,因此可能出现生产者唤醒生产者,消费者唤醒消费者的情况
  2. if判定条件一经执行就无法继续判定,所以生产者2被唤醒后没有再次判断当前队列是否满

于是我们的应对策略就是使用while循环,当线程被唤醒使重新判断,如果队列仍满,入队列操作继续阻塞,而队列仍空,出队列操作继续阻塞!Java标准也推荐我们使用 while 关键字和 wait 关键字一起使用!
image.png

4. 应用场景(实现生产者消费者模型)

我们继续基于我们自定义实现的阻塞队列再来实现 生产者-消费者模型
代码示例(主函数)

public static void main(String[] args) {
    MyBlockingQueue queue = new MyBlockingQueue(10);
    // 生产者
    Thread producer = new Thread(() -> {
        int i = 0;
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            queue.put(i + "");
            System.out.println("生产元素:" + i);
            i++;
        }
    });
    // 消费者
    Thread consumer = new Thread(() -> {
        while (true) {
            String elem = queue.take();
            System.out.println("消费元素" + elem);
        }
    });
    producer.start();
    consumer.start();
}

运行效果
image.png
此时我们创建两个两个线程,producer作为生产者线程每隔1s生产一个元素,consumer作为消费者线程不断消费元素,此时我们看到的就是消费者消费很快,当阻塞队列空时就进入阻塞状态,直到生产者线程生产元素后才被唤醒继续执行!此时我们真正模拟实现了 阻塞队列 这样的数据结构!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1442850.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CSP-202012-2-期末预测之最佳阈值

CSP-202012-2-期末预测之最佳阈值 【70分思路】 本题的难点还是时间复杂度&#xff0c;暴力枚举会导致时间超限。对于每一个可能的阈值theta&#xff0c;代码都重新计算了整个predict数组&#xff0c;统计预测正确的数目&#xff0c;因为有两个嵌套的循环&#xff0c;使得时间…

各版本安卓的彩蛋一览

目录 前言前彩蛋纪Android 2.3 GingerbreadAndroid 3 HoneycombAndroid 4.0 Ice Cream SandwichAndroid 4.1-4.3 JellybeanAndroid 4.4 KitKatAndroid 5 LollipopAndroid 6 MarshmallowAndroid 7 NougatAndroid 8 OreoAndroid 9 PieAndroid 10 Queen CakeAndroid 11 Red Velvet…

分享66个相册特效,总有一款适合您

分享66个相册特效&#xff0c;总有一款适合您 66个相册特效下载链接&#xff1a;https://pan.baidu.com/s/1jqctaho4sL_iGSNExhWB6A?pwd8888 提取码&#xff1a;8888 Python采集代码下载链接&#xff1a;采集代码.zip - 蓝奏云 学习知识费力气&#xff0c;收集整理更不…

【Java EE初阶十】多线程进阶二(CAS等)

1. 关于CAS CAS: 全称Compare and swap&#xff0c;字面意思:”比较并交换“&#xff0c;且比较交换的是寄存器和内存&#xff1b; 一个 CAS 涉及到以下操作&#xff1a; 下面通过语法来进一步进项说明&#xff1a; 下面有一个内存M&#xff0c;和两个寄存器A,B; CAS(M,A,B)&am…

社区店营销新趋势:如何吸引并留住顾客?

作为一名资深的鲜奶吧创业者&#xff0c;我已经在这个行业摸爬滚打了五年。 这五年的时间&#xff0c;我见证了社区店营销的变迁&#xff0c;也积累了一些关于如何吸引并留住顾客的经验。今天&#xff0c;我想和大家分享一些留住顾客的核心干货。&#xff08;可以点赞收藏&…

YOLOv8改进 | 检测头篇 | 独创RFAHead检测头超分辨率重构检测头(适用Pose、分割、目标检测)

一、本文介绍 本文给大家带来的改进机制是RFAHead,该检测头为我独家全网首发,本文主要利用将空间注意力机制与卷积操作相结合的卷积RFAConv来优化检测头,其核心在于优化卷积核的工作方式,特别是在处理感受野内的空间特征时。RFAConv主要的优点就是增加模型的特征提取能力,…

MATLAB实现LSTM时间序列预测

LSTM模型可以在一定程度上学习和预测非平稳的时间序列&#xff0c;其具有强大的记忆和非线性建模能力&#xff0c;可以捕捉到时间序列中的复杂模式和趋势[4]。在这种情况下&#xff0c;LSTM模型可能会自动学习到时间序列的非平稳性&#xff0c;并在预测中进行适当的调整。其作为…

股票均线的使用方法和实战技术,看涨看空的均线形态与案例教学

一、教程描述 本套教程讲解了14种均线的特殊形态&#xff0c;通过直观图形以及大量案例的教学&#xff0c;将深奥、繁琐的均线变得生动与具体&#xff0c;广大投资者在认真学习以后&#xff0c;可以学会均线的使用方法&#xff0c;掌握最强的均线应用实战技术。本套教程不仅适…

Qt可视化大屏布局

科技大屏现在非常流行&#xff0c;这里分享一下某个项目的大屏布局&#xff08;忘了源码是哪个博主的了&#xff09; 展示 这个界面整体是垂直布局&#xff0c;分为两个部分&#xff0c;标题是一个部分&#xff0c;然后下面的整体是一个layout布局&#xff0c;为另外一部分。 l…

Shell脚本系列| SSH分发公钥方法 - expect脚本的使用

ssh原理&#xff1a;在SSH安全协议的原理中&#xff0c; 是一种非对称加密与对称加密算法的结合。用于确保远程登录和其他网络服务的会话安全&#xff0c;通过非对称加密、会话加密、多重验证机制等手段&#xff0c;保护数据传输的机密性和完整性。 ssh登录有2种方法&#xff1…

图书商城系统

文章目录 图书商城系统一、项目演示二、项目介绍三、系统部分功能截图四、部分代码展示五、底部获取项目&#xff08;9.9&#xffe5;带走&#xff09; 图书商城系统 一、项目演示 网上书城 二、项目介绍 基于SSM的图书商城网站 运行环境:idea或eclipse 数据库:mysql 开发语…

鸿蒙(HarmonyOS)项目方舟框架(ArkUI)之Web组件

鸿蒙&#xff08;HarmonyOS&#xff09;项目方舟框架&#xff08;ArkUI&#xff09;之Web组件 一、操作环境 操作系统: Windows 10 专业版、IDE:DevEco Studio 3.1、SDK:HarmonyOS 3.1 二、Web组件 提供具有网页显示能力的Web组件&#xff0c;ohos.web.webview提供web控制能…

4. TypeScript

目录 1 TypeScript 介绍 2 TypeScript 常用类型 2.1 类型标注的位置 2.2 字符串、数字、布尔类型 2.3 字面量类型 2.4 interface 类型 2.5 class 类型 Vue1&#xff1a;基础跟使用方式 Vue2&#xff1a;路由方面 Vue3&#xff1a;状态管理 vuex 状态管理库 1 TypeScrip…

鸿蒙原生应用再添新丁!央视新闻 入局鸿蒙

鸿蒙原生应用再添新丁&#xff01;央视新闻 入局鸿蒙 来自 HarmonyOS 微博2月9日消息&#xff0c;#央视新闻启动鸿蒙原生应用开发#中央广播电视总台旗舰央视新闻客户端正式宣布&#xff0c;将基于HarmonyOS NEXT鸿蒙星河版&#xff0c;启动央视新闻 鸿蒙原生应用开发&#xf…

springboot微信小程序 uniapp学习资料分享系统v9uy4

理论意义 当今网络教学已成为各国教育改革 和发展的趋势。因此&#xff0c;构建一个适合交互式课堂教学模式的教学平台就成了当务之 急。 在国内高校&#xff0c;目前交互平台主要用于网络学院的远程教学&#xff0c;至于校园内的正规教学&#xff0c;老师自发建立课程主页的比…

Web后端开发:事务与AOP

事务管理 在学习数据库时&#xff0c;讲到&#xff1a;事务是一组操作的集合&#xff0c;它是一个不可分割的工作单位。事务会把所有的操作作为一个整体&#xff0c;一起向数据库提交或者是撤销操作请求&#xff0c;要么同时成功&#xff0c;要么同时失败。 事务的操作主要有三…

VUE学习——数组变化侦测

官方文档 变更方法&#xff1a; 使用之后&#xff0c;ui可以直接发生改变。改变原数组 替换数组&#xff1a; 使用之后需要接受重新赋值&#xff0c;不然ui不发生改变。不改变原数组

深度学习的进展及其在各领域的应用

深度学习&#xff0c;作为人工智能的核心分支&#xff0c;近年来在全球范围内引起了广泛的关注和研究。它通过模拟人脑的学习机制&#xff0c;构建复杂的神经网络结构&#xff0c;从大量数据中学习并提取有用的特征表示&#xff0c;进而解决各种复杂的模式识别问题。 一、深度…

一、OpenAI API介绍

Open AI API可以应用到任何的业务场景。 文本生成 创造助理 嵌入数据 语音转化 图片生成 图片输入 1. 核心概念 1.1 Text generation models OpenAI 的文本生成模型(通常被称为generative pre-trained transformers 模型简称&#xff1a;GPT),有GPT-4和G…

RCS-YOLO复现

复现结果–Precision&#xff1a;0.941&#xff0c;Recall&#xff1a;0.945&#xff0c;AP 50 _{50} 50​&#xff1a;0.941&#xff0c;AP 50 : 95 _{50:95} 50:95​&#xff1a;0.693&#xff0c;误差在5个点内&#xff0c;可以接受 感想 第5篇完全复现的论文