阻塞队列(消息队列)

news2025/1/11 22:49:13

1、阻塞队列

队列是一种先进先出的数据结构。而阻塞队列也是一种特殊的队列,也遵守”先进先出“的原则。

阻塞队列是一种线程安全的的数据结构,并且具有以下特性

1、队列往进写元素是从队尾插入,队首取出

2、当插入元素的时候,先判断一下,队列是否已经满了,如果满了就继续等(阻塞),等到队列有空余的位置的时候再去插入。

3、当取出元素的时候,先判断一下,队列是否为空,如果空了就继续等(阻塞),等到队列中有元素的时候再去取出。

阻塞队列有一个典型的应用场景就是”生产者消费者模型“,这是一种非常典型的开发模型。

生产者消费者模型:

生产者和消费者模式就是通过一个容器来解决生产者和消费者强耦合问题。

生产者和消费者彼此之间不直接通讯,而是通过阻塞队列来进行通讯,所以生产者生产完数据之后不用原地等待消费者进行处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列中获取。

1、阻塞队列相当于是一个缓冲区,平衡了消费者和生产者之间的处理能力

比如在一些购物软件有"秒杀"场景,服务器在同一时刻可能会收到大量的支付请求,如果直接处理这些请求,服务器很有可能会扛不住大量数据的冲击(每一个支付请求的处理都需要比较复杂的流程),这个时候就可以把它放到一个阻塞队列中,然后由服务器慢慢处理每个支付请求。

这样做可以有效的进行“削峰",防止服务器被突然到来的一波请求直接冲垮。

2、阻塞队列也能使生产者和消费者之间解耦

比如过年一家人在一起包饺子,一般都是分工明确,比如一个人负责擀饺子皮,其他人负责包饺子,那么擀饺子皮的人就是生产者,包饺子的人就是消费者。

擀饺子皮的不关心包饺子的人是谁,只管擀包子皮,包饺子的人也不管擀包子皮的人是谁,只管包饺子。

 

 1.1、消息队列

消息队列是阻塞队列一种典型应用,基于消费者生产者模型实现的,是在业务的驱使下,应用队列这个数据结构做了一些自定义的功能开发,满组一些真实业务工作,类似这样的框架或是软件,被叫做“中间件”。

工作原理:

1、正常队列,先进先出,完全遵守这个顺序。

2、消息队列,把每个消息打了个”标签“。标签可以理解为”类型",把消息类型分类

1.2、为什么要使用消息队列(阻塞队列)?

1、解耦

现在的程序尽量做到高内聚,低耦合。 也就是业务强相关的代码放到一起,为了维护程序方便,设计和组织代码的一种方式。需要哪一种方法去接口调用即可,避免代码分散开发。

 2、削峰填谷

微博很难应对流量暴增的情况,流量暴增会在系统中申请很多很多线程,各种资源,最终会瞬间把服务器资源耗尽。

 

 陶宝应对流量冲击案例:

 

削峰:在流量暴增的时候用消息队列把消息缓存起来,后面的服务器一点一点正常处理。
填谷:消费信息的服务器在流量不多的情况下,处理之前堆积的消息,就是填谷

3、异步操作

在发起请求到接收到响应的过程中,啥也不干,叫做同步;如果发起请求之后去执行别任务,那么就叫做异步。

1.3、标准库中的阻塞队列 

在java标准库中内置了阻塞队列,如果需要在一些程序中使用阻塞队列,直接使用标准库中的即可。

*BlockingQueue是一个接口,真正实现类的是LinkedBlockingQueue。

*put方法用于阻塞式的入队列,take用于阻塞式的出队列。

*BlockingQueue也有offer,poll,peek等方法,但是这些方法不带有阻塞特性。 

 代码示例:


import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Exe_01 {
    public static void main(String[] args) throws InterruptedException {
        //JDK提供的创建方式
        BlockingQueue<Integer> queue=new LinkedBlockingQueue<>(3);
        //往阻塞队列添加3个元素
        queue.put(1);
        queue.put(2);
        queue.put(3);
        System.out.println("添加了3个元素");
        //再添加第四个
        //queue.put(4);
        //System.out.println("添加了4个元素");
        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println("take了三个元素");
    }
}

 1.4、阻塞队列的实现

*通过“循环队列”的方式来实现

*使用synchronized来进行加锁控制

*put插入元素的时候,判定如果队列满的话,就进行wait。(注意:要在循环的时候进行wait,被唤醒时不一定队列就不满了,因为同时可能唤醒了多个线程)

*take取出元素的时候,判定如果队列为空,就进行wait。(也是循环wait)

阻塞队列实现分析:

1、在普通队列的基础上加了等待操作,在入队时如果队列已满就要等,出队时队列为空就要等

2、在普通队列的基础上加上了唤醒操作,执行完入队操作就会唤醒出队线程,执行完出队操作就会唤醒入队线程

阻塞队列不可能出现即是空的,又是满的这种状态,所以不会出现相互等待的现象。

 

代码示例:


/**
 * 实现阻塞队列
 */
public class MyBlockingQueue {
    //需要一个数组来保存数据
    private Integer[] elementData=new Integer[20];
    //定义队首队尾的下标
    private volatile int head=0;
    private volatile int tail=0;
    //有效元素的个数
    private volatile int size=0;

    /**
     * 添加元素
     * @param value
     */
    public void put(Integer value) throws InterruptedException {
        synchronized(this){
            //判断队列是否已经满了
            if(size>=elementData.length){
                this.wait();
            }
            //从队尾入队
            elementData[tail]=value;
            //队尾向前移动
            tail++;
            //处理循环
            if(tail>= elementData.length){
                tail=0;
            }
            size++;
            //添加新的元素唤醒线程
            this.notifyAll();
        }
    }

    /**
     * 获取元素
     * @return
     */
    public Integer take() throws InterruptedException {
        synchronized(this){
            //先判断是否为空
            if(size==0){
                //出队时,如果为空,继续等待
                this.wait();
            }
            //出队队首元素
            Integer value=elementData[head];
            //向后移动head
            head++;
            //处理循环
            if(head>=elementData.length){
                head=0;
            }
            size--;
            //出队时唤醒其他线程
            this.notifyAll();
            return value;
        }
    }
}

public class Exe_02 {
    public static void main(String[] args) throws InterruptedException {
        MyBlockingQueue queue=new MyBlockingQueue();
        //往阻塞队列添加三个元素
        queue.put(1);
        queue.put(2);
        queue.put(3);
        System.out.println("添加了3个元素");
        //再添加第四个
        //queue.put(4);
        //System.out.println("添加了4个元素");
        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println("take了三个元素");
    }
}

运行结果:

1.5、实现生产者消费者模型

代码示例:


public class Exe_03 {
    //用两个线程生产者消费者模型
    public static void main(String[] args) {
        //创建一个阻塞队列,表示交易场所
        MyBlockingQueue queue=new MyBlockingQueue();
        //创建生产者线程
        Thread producer=new Thread(() ->{
            int num=0;
            while(true){
                try {
                    queue.put(num);
                    System.out.println("生产了元素:"+num);
                    num++;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"生产者");
        //启动线程
        producer.start();
        //创建消费者线程
        Thread consumer=new Thread(() ->{
            while(true) {
                try {
                    Integer value = queue.take();
                    //睡眠一会
                    Thread.sleep(1000);
                    System.out.println("消费了了元素" + value);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"消费者");
        //启动线程
        consumer.start();
    }
}

 运行结果:

现象就是 ,生产者都把队列填满后,开始阻塞,等消费者一点一点消费,生产者一点一点生产。

调用wait()解决虚假唤醒问题 

 

更新代码: 

 


public class Exe_03 {
    //用两个线程生产者消费者模型
    public static void main(String[] args) {
        //创建一个阻塞队列,表示交易场所
        MyBlockingQueue queue=new MyBlockingQueue();
        //创建生产者线程
        Thread producer=new Thread(() ->{
            int num=0;
            while(true){
                try {
                    queue.put(num);
                    System.out.println("生产了元素:"+num);
                    num++;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"生产者");
        //启动线程
        producer.start();
        //创建消费者线程
        Thread consumer=new Thread(() ->{
            while(true) {
                try {
                    Integer value = queue.take();
                    //睡眠一会
                    Thread.sleep(1000);
                    System.out.println("消费了了元素" + value);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"消费者");
        //启动线程
        consumer.start();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/687871.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python面向对象编程基础知识和示例代码

文章目录 对象&#xff08;Object&#xff09;示例代码一 类的成员方法&#xff08;Method&#xff09;示例代码二 类和对象&#xff08;Class and Object&#xff09;&#xff1a;示例代码三 构造方法&#xff08;Constructor&#xff09;&#xff1a;示例代码四 魔术方法&…

JUC简介

1、JUC介绍 JUC (java.util.concurrent)是在并发编程中使用的工具类&#xff0c;主要包括以下三个 &#xff08;1&#xff09;java.util.concurrent &#xff08;2&#xff09;java.util.concurrent.atomic 原子性&#xff1a;不可分割。Int i0; i, &#xff08;3&#xff09;…

MySQL——函数与约束的讲解

作者简介&#xff1a;一名云计算网络运维人员、每天分享网络与运维的技术与干货。 座右铭&#xff1a;低头赶路&#xff0c;敬事如仪 个人主页&#xff1a;网络豆的主页​​​​​ 前言 本章将会讲解MySQL数据库的函数与约束的讲解。 一.函数 函数 是指一段可以直接被另一…

渗透测试面试题汇总

2023年快过去一半了&#xff0c;不知道小伙伴们有没有找到自己心仪的工作呀【doge】&#xff0c;本文总结了常见的安全岗位面试题&#xff0c;方便各位复习。祝各位事业顺利&#xff0c;财运亨通。在网络安全的道路上越走越远&#xff01; 所有的资料都整理成了PDF&#xff0c…

DCN v1阅读笔记

DCN v1即 Deformable Convolutional Networks。 视觉识别&#xff08;例如对象检测和语义分割&#xff09;中的一个关键挑战是如何适应物体尺度、姿态、视角和零件变形中的几何变化或模型几何变换。卷积神经网络&#xff08;CNN&#xff09;构建模块中为固定几何结构&#xff1…

神经网络基础

文章目录 一、神经网络基础1.得分函数 f(xi;W,b)1&#xff09;从输入到输出的映射2&#xff09;数学表示3&#xff09;计算方法4&#xff09;多组权重参数构成了决策边界 2.损失函数 L3.前向传播4.Softmax分类器 梯度下降2.反向传播 一、神经网络基础 回归任务&#xff1a;最终…

软件测试技能,JMeter压力测试教程,JDBC配置连接mysql数据库(十)

前言 使用jmeter压测接口的时候&#xff0c;有时候需要批量造数据&#xff0c;需使用jmeter连数据库造对应的测试数据 或者测试结束后&#xff0c;对测试的数据还原&#xff0c;删掉一些垃圾数据&#xff0c;都会用到连接数据库执行sql的操作 一、JDBC 连接配置 添加配置元…

7-WebApis-2

Web APIs - 2 掌握事件绑定处理和事件对象&#xff0c;完成常见网页交互 事件监听事件类型事件对象拓展知识综合案例 事件监听 以前写的代码都是自动执行的&#xff0c;我们希望一段代码在某个特定的时机才去执行&#xff0c;比如 点击按钮可以弹出警示框比如鼠标经过显示下拉…

通过easyui的filebox上传文件

本篇文章重点分享一下怎么通过easyui的filebox实现文件上传的功能&#xff0c;从前端代码到后端接口都会展示给大家。 1、form表单同步上传 传统的文件上传会把<input type"file" />放到一个<form></form>里&#xff0c;设置form表单的提交方式为…

开源代码分享(5)—配电网重构的启发式算法(附matlab代码)

来源于文献IEEE TRANSACTIONS ON POWER SYSTEMS期刊文献的开源代码。 摘要&#xff1a;本文提出了一种两阶段的启发式计算方法&#xff0c;可以在最小的计算时间内重新配置一个径向分布网络。所有的网络交换机在操作的初始阶段都是关闭的&#xff0c;并提出了一个顺序的开关开闸…

基于SSM+jsp的教学质量评价系统设计与实现

博主介绍&#xff1a; 大家好&#xff0c;我是一名在Java圈混迹十余年的程序员&#xff0c;精通Java编程语言&#xff0c;同时也熟练掌握微信小程序、Python和Android等技术&#xff0c;能够为大家提供全方位的技术支持和交流。 我擅长在JavaWeb、SSH、SSM、SpringBoot等框架…

Origin如何绘制三维离散点并拟合曲面?

文章目录 0.引言1.准备数据2.三维离散点参数设置并绘图3.拟合曲面参数设置并绘图 0.引言 在数据统计分析中&#xff0c;有时希望知道一个因变量在两个自变量变化情况下的变化情况&#xff0c;这时可以绘制散点图&#xff0c;观察基础情况&#xff0c;进一步可以拟合散点&#x…

腾讯安全吴石:基于威胁情报构建免疫体系,助力企业稳步迈向智能安全新阶段

6月13日&#xff0c;腾讯安全、腾讯研究院联合IDC、《中国信息安全》杂志社、CIO时代、新基建创新研究院等多家权威机构、媒体共同发起“数字安全免疫力研讨论坛”&#xff0c;聚合产学研各界专家学者探讨数字安全建设新范式。论坛上&#xff0c;腾讯安全联合IDC发布“数字安全…

【物联网】使用RabbitMQ作为MQTT服务端并自定义设备连接权限

文章目录 项目背景一、部署RabbiqMQ二、设备连接鉴权1.开启插件2.修改配置3.连接鉴权4.消息鉴权 总结 项目背景 最近公司启动了一个新的物联网项目&#xff0c;使用MQTT协议与设备通信&#xff0c;在比较了各大MQTT服务后&#xff0c;决定选用开源的RabbitMQ搭建我们的服务端。…

最专业的敏捷需求管理工具推荐

为了协助大家找到合适的需求管理工具&#xff0c;我们选择了国内外几款款工具作比对&#xff1a; Leangoo领歌敏捷工具 Jama Software Visure Requirements IBM DOORS Next ReqSuite RM ReQtest Xebrio Orcanos Helix RM SpiraTeam Accompa Innoslate Leangoo领歌…

Python学习——元组

一、元组的定义 这部分就没有增、删、改操作了&#xff0c;是因为元组是一个不可变序列&#xff0c;元组也是Python内置的数据结构之一。 补充&#xff1a;关于可变序列与不可变序列 可变序列是指可以对序列进行增、删、改的操作&#xff0c;对象地址不发生变化。常见的可变序列…

【Jvm】Java类加载机制是什么?

文章目录 一、目标&#xff1a;二、原理 &#xff08;类的加载过程及其最终产品&#xff09;三、过程&#xff08;类的生命周期&#xff09;3.1、加载3.2、校验3.3、准备3.4、解析3.5、初始化 四、类加载器五、双亲委派机制 一、目标&#xff1a; 什么是类的加载&#xff1f;类…

vue3.x+elementPlus+swiper+vuedraggable实现页面装修

前言 该实现代码依赖框架&#xff1a;vue3.xelementPlusswipervuedraggable&#xff0c;做好前期工作&#xff0c;可直接在下面的附件处点击下载链接来下载相关文件&#xff1b;文件中包括搜索/图文广告/滚动消息三个模块代码示例&#xff0c;其他组件实现思路相同&#xff0c…

APT 系列 (一):APT 筑基之反射

什么是反射&#xff1f; 简单来讲&#xff0c;反射就是&#xff1a;已知一个类&#xff0c;可以获取这个类的所有信息 一般情况下&#xff0c;根据面向对象封装原则&#xff0c;Java实体类的属性都是私有的&#xff0c;我们不能获取类中的属性。但我们可以根据反射&#xff0…

问题解决 |关于CUDA的代码错误总结以及解决方法

本博客主要关于常见的CUDA的代码错误总结以及解决方法~ 1.RuntimeError运行错误 1.1.RuntimeError: CUDA error: out of memory CUDA kernel errors might be asynchronously reported at some other API call,so the stacktrace below might be incorrect. For debugging cons…