多线程-阻塞队列

news2025/1/11 14:07:37

    在这篇博客中我们接触的队列都是非阻塞队列,比如PriorityQueue、LinkedList(LinkedList是双向链表,它实现了Dequeue接口)。

      使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产生阻塞,那么在面对类似消费者-生产者的模型时,就必须额外地实现同步策略以及线程间唤醒策略,这个实现起来就非常麻烦。

一、认识BlockingQueue

       阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示:

      从上图我们可以很清楚看到,通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出;

常用的队列主要有以下两种:

  先进先出(FIFO):先插入的队列的元素也最先出队列,类似于排队的功能。从某种程度上来说这种队列也体现了一种公平性。

  后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件。

      阻塞队列常用于生产者和消费者的场景,生产者线程可以把生产结果存到阻塞队列中,而消费者线程把中间结果取出并在将来修改它们。

队列会自动平衡负载,如果生产者线程集运行的比消费者线程集慢,则消费者线程集在等待结果时就会阻塞;如果生产者线程集运行的快,那么它将等待消费者线程集赶上来。

作为BlockingQueue的使用者,我们再也不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了。

看下BlockingQueue的核心方法

1、放入数据

  (1)put(E e):put方法用来向队尾存入元素,如果队列满,则等待。    

  (2)offer(E o, long timeout, TimeUnit unit):offer方法用来向队尾存入元素,如果队列满,则等待一定的时间,当时间期限达到时,如果还没有插入成功,则返回false;否则返回true;

2、获取数据

 (1)take():take方法用来从队首取元素,如果队列为空,则等待;

 (2)drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

 (3)poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null;

 (4)poll(long timeout, TimeUnit unit):poll方法用来从队首取元素,如果队列空,则等待一定的时间,当时间期限达到时,如果取到,则返回null;否则返回取得的元素;

二、常见BlockingQueue

       在了解了BlockingQueue的基本功能后,让我们来看看BlockingQueue家庭大致有哪些成员?

1、ArrayBlockingQueue

      基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。

2、LinkedBlockingQueue

     基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。

3、PriorityBlockingQueue

       以上2种队列都是先进先出队列,而PriorityBlockingQueue却不是,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即

容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列。

4、DelayQueue

       基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会

被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

 5、小案例

       有关生产者-消费者,上篇博客我写了基于wait和notifyAll实现过,也基于await和signal实现过,网址:https://www.cnblogs.com/qdhxhz/p/9206076.html

这里已经是第三个相关生产消费者的小案例了。

      这里通过LinkedBlockingQueue实现生产消费模式

(1)测试类

public class BlockingQueueTest {
      
          public static void main(String[] args) throws InterruptedException {
              // 声明一个容量为10的缓存队列
             BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
      
             //new了两个生产者和一个消费者,同时他们共用一个queue缓存队列
             Producer producer1 = new Producer(queue);
             Producer producer2 = new Producer(queue);          
             Consumer consumer = new Consumer(queue);
      
             // 通过线程池启动线程
             ExecutorService service = Executors.newCachedThreadPool();

             service.execute(producer1);
             service.execute(producer2);          
             service.execute(consumer);
      
             // 执行5s
             Thread.sleep(5 * 1000);
             producer1.stop();
             producer2.stop();
           
             Thread.sleep(2000);
             // 退出Executor
             service.shutdown();
         }
     }

(2)生产者

/**
  * 生产者线程
  */
 public class Producer implements Runnable {
     
     private volatile boolean  isRunning = true;//是否在运行标志
     private BlockingQueue<String> queue;//阻塞队列
     private static AtomicInteger count = new AtomicInteger();//自动更新的值
    
     //构造函数
     public Producer(BlockingQueue<String> queue) {
         this.queue = queue;
     }
  
     public void run() {
         String data = null;
         System.out.println(Thread.currentThread().getName()+" 启动生产者线程!");
         try {
             while (isRunning) {
                 Thread.sleep(1000);
                 
                //以原子方式将count当前值加1
                 data = "" + count.incrementAndGet();
                 System.out.println(Thread.currentThread().getName()+" 将生产数据:" + data + "放入队列中");
                 
               //设定的等待时间为2s,如果超过2s还没加进去返回false
                 if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
                     System.out.println(Thread.currentThread().getName()+" 放入数据失败:" + data);
                 }
             }
         } catch (InterruptedException e) {
             e.printStackTrace();
             Thread.currentThread().interrupt();
         } finally {
             System.out.println(Thread.currentThread().getName()+" 退出生产者线程!");
         }
     }
  
     public void stop() {
         isRunning = false;
     }
 }

(3)消费者

/**
  * 消费者线程
  */
 public class Consumer implements Runnable {
     
     private BlockingQueue<String> queue;

     //构造函数
     public Consumer(BlockingQueue<String> queue) {
         this.queue = queue;
     }
  
     public void run() {
         System.out.println(Thread.currentThread().getName()+" 启动消费者线程!");

         boolean isRunning = true;
         try {
             while (isRunning) {
                //有数据时直接从队列的队首取走,无数据时阻塞,在2s内有数据,取走,超过2s还没数据,返回失败
                 String data = queue.poll(2, TimeUnit.SECONDS);
                 
                 if (null != data) {
                     System.out.println(Thread.currentThread().getName()+" 正在消费数据:" + data);
                     Thread.sleep(1000);
                 } else {
                     // 超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。
                     isRunning = false;
                 }
             }
         } catch (InterruptedException e) {
             e.printStackTrace();
             Thread.currentThread().interrupt();
         } finally {
             System.out.println(Thread.currentThread().getName()+" 退出消费者线程!");
         }
     }     
 }

运行结果(其中一种)

 三、阻塞队列的实现原理

     主要看两个关键方法的实现:put()和take()

 1、put方法

public void put(E e) throws InterruptedException {
    
    //首先可以看出,不能放null,否在报空指针异常
    if (e == null) throw new NullPointerException();
    final E[] items = this.items;
    
    //发现采用的是Lock锁
    final ReentrantLock lock = this.lock;
    
    //如果当前线程不能获取锁则抛出异常
    lock.lockInterruptibly();
    try {
        try {
            while (count == items.length)
    //这里才是关键,我们发现它的堵塞其实是通过await()和signal()来实现的
                notFull.await();
        } catch (InterruptedException ie) {
            notFull.signal(); 
            throw ie;
        }
        insert(e);
    } finally {
        lock.unlock();
    }
}

       当被其他线程唤醒时,通过insert(e)方法插入元素,最后解锁。

我们看一下insert方法的实现:

private void insert(E x) {
    items[putIndex] = x;
    putIndex = inc(putIndex);
    ++count;
    notEmpty.signal();
}

      它是一个private方法,插入成功后,通过notEmpty唤醒正在等待取元素的线程。

 2、take()方法

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        try {
            while (count == 0)
                notEmpty.await();
        } catch (InterruptedException ie) {
            notEmpty.signal(); 
            throw ie;
        }
        E x = extract();
        return x;
    } finally {
        lock.unlock();
    }
}

        跟put方法实现很类似,只不过put方法等待的是notFull信号,而take方法等待的是notEmpty信号。在take方法中,如果可以取元素,则通过extract方法取得元素,

下面是extract方法的实现:

private E extract() {
    final E[] items = this.items;
    E x = items[takeIndex];
    items[takeIndex] = null;
    takeIndex = inc(takeIndex);
    --count;
    notFull.signal();
    return x;
}

跟insert方法也很类似。

其实从这里大家应该明白了阻塞队列的实现原理,事实它和我们用Object.wait()、Object.notify()和非阻塞队列实现生产者-消费者的思路类似,只不过它这里通过await()和signal()一起集成到了阻塞队列中实现。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/999307.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

飞行动力学 - 第19节-part2-尾旋及改出 之 基础点摘要

飞行动力学 - 第19节-part2-尾旋及改出 之 基础点摘要 1. 尾旋2. 尾旋进入3. 尾旋改出4. 参考资料 1. 尾旋 尾旋是一种绕垂直轴自动旋转、下降的特殊失速现象。 特点&#xff1a; 尾旋半径半个翼展长度旋转速度120 度/秒 2. 尾旋进入 不同型号飞机&#xff0c;其尾旋特点不…

2023-简单点-什么是protobuf?

protobuf mother: 谷歌 作用 序列化 人话&#xff1a; 存储数据的一种结构 优势在&#xff1f; 类型安全 易用性好 序列化/反序列性能好 兼容性好 不仅可以定义结构体&#xff0c;还可以定义rpc服务接口 劣势在&#xff1f; 可读性较差&#xff1a;没有schema的情况下&a…

element的el-select给下拉框添加背景

第一步 :popper-append-to-body"false" <el-selectv-model"value"placeholder"请选择":popper-append-to-body"false"><el-optionv-for"item in options":key"item.value":label"item.label&quo…

web自动化测试工具之Selenium的使用

Selenium的使用 Selenium概述工作原理应用场景安装浏览器驱动 基本使用安装Selenium模块注意点使用分析代码实现 常见方法driver对象定位标签元素与获取标签对象获取文本内容与属性值 使用无界面浏览器使用pyantomjs驱动设置chrome启动参数 其他操作窗口切换ifrme切换设置User-…

ERR_PNPM_NO_GLOBAL_BIN_DIR Unable to find the global bin directory

错误提示 ERROR Unable to find the global bin directory Run "pnpm setup"to create it automatically, or set the global-bin-dir setting, or the PNPM HOME env variable.The global bin directory should be in the PATH.错误&#xff0c;找不到全局bin目录 …

Web跨域问题

目录 一、引言二、跨域问题1.同源策略2.跨域3.出现跨域问题的情况 三、解决方案1.普通web&#xff0c;使用Filter过滤器2.SpringBoot项目&#xff0c;使用CrossOrigin注解 四、示例 一、引言 在web开发的过程中&#xff0c;因为前后端的分离我们经常会遇到跨域问题&#xff0c…

私人问答网站搭建指南:Ubuntu+Cpolar+Tipas

文章目录 前言2.Tipask网站搭建2.1 Tipask网站下载和安装2.2 Tipask网页测试2.3 cpolar的安装和注册 3. 本地网页发布3.1 Cpolar临时数据隧道3.2 Cpolar稳定隧道&#xff08;云端设置&#xff09;3.3 Cpolar稳定隧道&#xff08;本地设置&#xff09; 4. 公网访问测试5. 结语 前…

Spring MVC 六 - DispatcherServlet处理请求过程

前面讲过了DispatcherServlet的初始化过程&#xff08;源码角度的DispatcherServlet的具体初始化过程还没说&#xff0c;先放一放&#xff09;&#xff0c;今天说一下DispatcherServlet处理请求的过程。 处理过程 WebApplicationContext绑定在当前request属性上&#xff08;属…

SpringMVC多文件上传

文章目录 一、文件上传1.1 导入pom依赖1.2 配置文件上传解析器1.3 设置文件上传表单1.4 实现文件上传 二、文件下载三、多文件上传四、JRebel的使用 一、文件上传 1.1 导入pom依赖 <commons-fileupload.version>1.3.3</commons-fileupload.version><dependency…

springboot第40集:架构师写的代码,那叫一个优雅

事务的隔离性上&#xff0c;从低到高可能产生的读现象分别是&#xff1a;脏读、不可重复读、幻读。 脏读指读到了未提交的数据。 不可重复读指一次事务内的多次相同查询&#xff0c;读取到了不同的结果。 幻读师不可重复读的特殊场景。一次事务内的多次范围查询得到了不同的结果…

系统测试AC5. AC6. IAR和GCC调试效果,MDK AC6不开优化调试乱跳,甚至倒序执行

首先感谢大家对上一个视频的点评回复&#xff0c;非常有意义的讨论&#xff0c;这次AC6的表现更新惊呆&#xff0c;不开优化都可以乱跳。 【实验目的】 同样的程序代码&#xff0c;目的是测试C环境的调试现象。 【实验版本】 IAR版本 &#xff1a;9.3x MDK版本&#xff1a;5…

开赛啦!第六届“中国法研杯”司法人工智能挑战赛精彩启幕

9月9日&#xff0c;第六届“中国法研杯”司法人工智能挑战赛&#xff08;简写为“LAIC2023”&#xff09;在福建厦门正式拉开帷幕&#xff0c;主办方中国司法大数据研究院&#xff08;以下简称“中国法研”&#xff09;以及厦门市思明区政府、厦门海丝办有关领导共同参加了启动…

No1.详解【2023年全国大学生数学建模竞赛】C题——蔬菜类商品的自动定价与补货决策(代码 + 详细输出 + 数据集代码 下载)

时间告诉你什么叫衰老,回忆告诉你什么叫幼稚。不要总在过去的回忆里纠缠,昨天的太阳,晒不干今天的衣裳。 🎯作者主页: 追光者♂🔥 🌸个人简介: 💖[1] 计算机专业硕士研究生💖 🌿[2] 2023年城市之星领跑者TOP1(哈尔滨)🌿 🌟[3] 2022年度博客…

Java低代码核心引擎:jvs-list(列表)按钮配置与数据权限配置

一、列表页按钮配置 按钮类型 A、表级按钮&#xff1b;B、行级按钮&#xff0c;表级按钮往往用作新增或者是整体性的操作&#xff0c;行级按钮主要用于对单行数据的操作。行级按钮超过3个自动折叠。 按钮增减 系统内置了 新增、删除、修改、详情、导入、导出、模板下载这几个…

DBeaver Community抢先体验版下载

https://dbeaver.io/files/ea/ 根据自己使用的平台&#xff0c;按需下载。

分享一下鲜花店怎么在小程序上做商城功能

随着互联网的普及和移动支付的便捷性&#xff0c;越来越多的人选择在线上购物。对于鲜花店来说&#xff0c;通过开发小程序上的商城功能&#xff0c;可以拓宽销售渠道、提高用户体验&#xff0c;同时提升品牌知名度。本文将探讨如何为鲜花店在小程序上实现商城功能。 对于鲜花店…

第2次实验:Ethernet

目的&#xff1a; 要探索以太网帧的细节。以太网是一种流行的链路层协议&#xff0c;在你的课文的第4.3节中有介绍&#xff1b;现代计算机连接到以太网交换机&#xff08;第4.3.4节&#xff09;&#xff0c;而不是使用经典的以太网&#xff08;第4.3.2节&#xff09;。在做这个…

细说GNSS模拟器的RTK功能(三)应用实例01——运行和分析模拟

在上期文章中我们介绍了基于RTCM插件来模拟RTCM使用的硬件和软件设置&#xff0c;本期文章我们将继续进行运行和分析模拟。 使用RTCM插件 运行和分析模拟 连接Ublox接收器 虽然采用了Novatel接收器进行模拟来获得更好的位置精度&#xff0c;但也同样适用于Ublox接收器。要将…

管理类联考——数学——汇总篇——知识点突破——应用题——植树

⛲️ 一、考点讲解 开放型植树 植树数量 总长 间隔 1 植树数量\frac{总长}{间隔}1 植树数量间隔总长​1封闭型植树 植树数量 总长 间距 植树数量\frac{总长}{间距} 植树数量间距总长​ 二、考试解读 本考点要注意开放型与封闭型植树公式的区别。本考点的难点在于变间距…

为什么要反复讲EasyAVFilter这个东西,真能替代ffmpeg吗?

最近我写了不少关于EasyAVfilter的东西&#xff0c;有rtsp转mp4、MP4转HLS、rtsp转rtmp&#xff0c;就简简单单几行代码&#xff0c;就能解决很多技术上的问题&#xff0c;而且就算是音视频开发的小白&#xff0c;也可以用EasyAVfilter开发出一个音视频后端出来&#xff0c;他既…