Java 一文掌握全部阻塞队列的使用

news2024/11/29 4:48:18

1、简介

本文主要对Java常用阻塞队列进行介绍和提供相关使用案例

2、 阻塞队列作用

阻塞队列提供了一种线程安全、高效的数据传递和同步机制 , 主要用于缓冲数据、限流、削峰填谷,生产者-消费者模型,线程间的协作等等。

3、 各阻塞队列区别对比

队列有界性锁方式数据结构
ArrayBlockingQueue有锁ReentrantLock数组
LinkedBlockingQueue有界有锁两个锁ReentrantLock + 条件变量Condition双向链表
LinkedTransferQueue无界无锁CAS+原子变量链表
PriorityBlockingQueue无界有锁独占锁(ReentrantLock)优先级队列(DelayWorkQueue)
DelayQueue无界有锁ReentrantLock堆(PriorityQueue)
SynchronousQueue无容量有锁CAS+自旋(无锁),自旋了一定次数后调用 LockSupport.park()进行阻塞链表

4、 阻塞队列常用方法说明

注意

  • 队列添加元素是从队尾添加, 删除元素是从队头删除,有顺序性
  • 虽然有些方法看起来功能很像,但是实际的逻辑可能完全不一样,一定要根据具体场景去使用

添加元素

  • add: 如果队列已满,抛出 IllegalStateException 异常
  • offer:如果队列已满,false
  • put: 如果队列已满,阻塞等待直到队列有空闲位置

删除元素

  • take: 如果队列为空, 阻塞等待直到队列有元素
  • poll: 如果队列为空,返回 null
  • remove: 如果队列为空,抛出NoSuchElementException异常
  • drainTo(Collection): 批量从队列中取出全部元素到集合中
  • drainTo(Collection, int): 批量从队列中取出n个元素到集合中
  • remove(Obejct): 删除指定元素,删除成功返回true, 如果有多个相同元素只会删除一个
  • removeIf(Predicate): 根据断言表达式删除所有符合条件的元素,删除失败返回false
  • removeAll(Collection): 删除队列中所有在集合中存在的的元素,删除失败返回false (差集)
  • retainAll(Collection): 保留队列中所有在集合中存在的的元素。 (交集)

查看元素

  • peek: 查看队头元素, 如果队列为空, 返回null
  • element: 查看队头元素, 如果队列为空,抛出 NoSuchElementException异常

其他:

  • remainingCapacity: 返回队列可用容量大小
  • isEmpty: 队列是否为空

5、使用介绍

5.1、普通阻塞队列

包括ArrayBlockingQueue、LinkedBlockingQueue、ConcurrentLinkedQueue 、PriorityBlockingQueue这些队列在用法上无本质区别,只是底层数据结构和加锁方式不一样。

  • 其中PriorityBlockingQueue逻辑有点不同,队列元素支持按优先级排序取出,其实就是阻塞队列里对于优先级队列的实现,支持排序。

简单的生产者-消费者模型(1P-3C)使用

在这里插入图片描述

    @Test
    public void test4() throws InterruptedException {
        // 编写1个生产者-3个消费者的模型
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);

        // 1个生产者
        new Thread(() -> {
            for (int i = 0; i < 20; i++) {
                try {
                    // 生产元素如果满了阻塞等待
                    queue.put("data_"+i);
                    System.out.println("生产者生产元素: " + i);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();

        // 3个消费者
        for (int i = 0; i < 3; i++) {
            final int index = i;
            new Thread(() -> {
                while (true){
                    try {
                        // 消费元素,如果队列为空阻塞等待
                        System.out.println("消费者"+index+"消费元素: " + queue.take());
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }

                }
            }).start();
        }

        Thread.sleep(300000);
    }

5.2 SynchronousQueue队列

同步阻塞队列

  • 是一个无法存储元素的阻塞队列,队列的容量是0,底层并不会缓存数据。 如果你直接队列里add或者offer添加元素会失败。
  • 支持指定为公平队列模式, 即的等待的线程支持按FIFO(先进先出)的顺序去生产和消费。 默认是非公平。
    • 公平队列具体实现是: TransferQueue, 先进性出
    • 非公平队列具体实现是 TransferStack, 先进后出

既然不会存储元素那它能干什么呢? 还是生产者-消费者模型, 与一般阻塞队列区别是每次生产者线程只能生产一份数据, 只有这份数据被消费者线程消费了,生产者才能继续生产。 同理,消费者线会阻塞等待生产者线程提供数据后才能进行处理。

  • ps: 它容量不是 1 而是 0,因为它不需要去持有元素,它所做的就是直接传递而已

它适合的逻辑执行链路是 生产-->消费--> 生产--> 消费-->生产--> 消费.

5.2.1 使用案例1:

假设有一个场景, 两个客户端端线程A和B, 线程A和B需要通过几次的信号同步才能建立连接成功,下面是三次信号同步的逻辑,

操作时间操作客户端A客户端B
1A向B建立连接queue.put(true)queue.take()
2B向A建立连接queue.take()queue.put(true)
3A向B建立连接queue.put(true)queue.put(true)
    @Test
    public void test22() throws InterruptedException {
        SynchronousQueue<Boolean> queue = new SynchronousQueue<>(true);

        AtomicInteger connectionCount = new AtomicInteger(0);
        Client clientA = new Client(queue,connectionCount);
        Client clientB = new Client(queue,connectionCount);

        clientA.connectTo(clientB);
        System.out.println("第" + connectionCount.get() +"次连接成功");

        clientB.connectTo(clientA);
        System.out.println("第" + connectionCount.get() +"次连接成功");

        clientA.connectTo(clientB);
        System.out.println("第" + connectionCount.get() +"次连接成功");

        System.out.println("结束");
    }



    static class Client {

        private SynchronousQueue<Boolean> queue;
        private AtomicInteger connectionCount;

        public Client(SynchronousQueue<Boolean> queue, AtomicInteger connectionCount) {
            this.queue = queue;
            this.connectionCount = connectionCount;
        }

        public void ack() throws InterruptedException {
            this.queue.take();
        }

        public boolean connectTo(Client b) throws InterruptedException {
            new Thread(() -> {
                try {
                    Thread.sleep(2000);
                    b.ack();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }).start();

            queue.put(true);
            int count = connectionCount.addAndGet(1);

            if (count >= 3){
                return true;
            }else {
                return false;
            }
        }
    }

5.2.2 使用案例2:

假设有一个场景,有两夫妻, 第一天老公A负责卖鱼,赚到的钱给老婆B, 第二天老婆用这笔钱去投资,投资赚到的钱给老公,第三天老公用这笔钱去继续买鱼,如此日复一日, 夫妻两人属于隔天工作赚钱模式, 接下来我们用同步阻塞队列实现这个场景

 @Test
    public void test25() throws InterruptedException {
        SynchronousQueue<AtomicInteger> queue = new SynchronousQueue<>();

        Thread threadA = new Thread(() -> {
            try {
                new A(queue).startWork();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });

        Thread threadB = new Thread(() -> {
            try {
                new B(queue).startWork();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });

        threadA.start();
        threadB.start();

        threadA.join();
        threadB.join();
    }

    @Data
    class A {

        private SynchronousQueue<AtomicInteger> queue ;

        // 老公的钱
        AtomicInteger money = new AtomicInteger(1000);

        public A(SynchronousQueue<AtomicInteger> queue) {
            this.queue = queue;
        }

        public void startWork() throws InterruptedException {
            while (true) {
                // 卖鱼赚到的钱
                Thread.sleep(2000);
                int earnMoney = new Random().nextInt(100);
                money.addAndGet(earnMoney);
                System.out.println("老公赚到了" + earnMoney + "元,  当前余额: " + money.get());

                // 把钱全部给老婆
                queue.put(money);

                System.out.println("老公开始休息");

                // 等待老婆的钱
                money = queue.take();
                System.out.println("收到了老婆的" + money + "元, 继续卖鱼");

            }
        }
    }


    @Data
    class B  extends Thread {

        private SynchronousQueue<AtomicInteger> queue ;

        // 老婆的钱
        AtomicInteger money = new AtomicInteger(0);

        public B(SynchronousQueue<AtomicInteger> queue) {
            this.queue = queue;
        }

        public void startWork() throws InterruptedException {
            while (true) {
                // 等待老公的钱
                money = queue.take();
                System.out.println("收到了老公的" + money + "元, 继续投资");

                // 投资赚到的钱
                Thread.sleep(2000);
                int earnMoney = new Random().nextInt(100);
                money.addAndGet(earnMoney);
                System.out.println("老婆赚到了" + earnMoney + "元,  当前余额: " + money.get());

                // 把钱全部给老公
                queue.put(money);

                System.out.println("老婆开始休息");

            }
        }

    }

5.2.3 其他

在指定容量为1的普通阻塞队列和SynchronousQueue有什么区别?

  • 容量为1的普通阻塞队列在put第一个元素并不会阻塞等待,因为还没满,只有put第二个元素后因为队列满了才会阻塞等待。 而SynchronousQueue put就会直接阻塞等待。

    • 那你肯定说那指定容量为0, 但是为0没有意义根本无法使用,因为生产者无法生产数据,消费者也无法消费数据。
  • SynchronousQueue适用于需要精确控制线程之间交换传递元素的场景,而普通阻塞队列适用于需要缓冲多个元素的场景

适合场景:

  • 线程间的数据交换、线程同步
  • 实现任务执行器(Executor)框架, 作为任务提交者和任务执行者之间的交换通道,用于控制任务提交和执行的速率。当任务提交者提交一个任务时,它将被阻塞,直到任务执行者开始执行任务。这对于控制任务的执行顺序和速率非常有用
  • Executors.newCachedThreadPool() 默认用的就是SynchronousQueue, 这样线程池就无法存储任务,来一个任务就直接new一个线程去处理

5.3 LinkedTransferQueue

可以看作是SynchronousQueue和LinkedBlockingQueue的结合体。 即支持SynchronousQueue的直接传递性,减少用锁来同步,也支持普通无界阻塞队列的存储更多元素.

与普通阻塞队列区别就是多了一些以下的方法去添加元素

transfer方法

  • 当此时有消费者线程在阻塞等待时,调用transfer方法的生产者线程不会将元素存入队列,而是直接将元素传递给消费者。
  • 当此时没有正在等待的消费者线程,则会将元素入队,然后会阻塞等待. 当被消费一个后才会唤醒一个等待的生产线程(这个与普通阻塞队列的put方法一致)

tryTransfer方法

  • 与transfer方法不同是, 如果没有正在等待的消费者线程, 不会将元素入队而是返回false。 如果有等待的消费者线程则直接传递给它并返回true

hasWaitingConsumer: 是否有消费者线程在等待
getWaitingConsumerCount: 获取等待的消费者数量的

5.4 DelayQueue 延迟队列

  • 底层数据结构是优先级队列, 属于无界阻塞队列所以put操作不会阻塞等待(底层调用的是offer法) ,
  • 存储的元素是Delayed接口的子类, 会根据Delayed接口的getDelay方法的返回值进行优先级排序,时间越小元素的将被优先取出。
  • take方法取出元素时,只有队头的元素延迟时间到了才会被取出否则一致阻塞等待。 这个跟普通阻塞队列的队列为空就阻塞等待不同。
  • 并不适用于需要高精度的时间控制场景,因为其延迟时间的计算和排序是基于系统时间的,并受系统时间的精度和调整影响。

下面是一个使用案例:

    public static void main(String[] args) {
    	
        DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();

        // 添加延迟任务
        delayQueue.put(new DelayedTask("Task 1", 5, TimeUnit.SECONDS));  // 延迟5秒
        delayQueue.put(new DelayedTask("Task 2", 10, TimeUnit.SECONDS)); // 延迟10秒
        delayQueue.put(new DelayedTask("Task 3", 2, TimeUnit.SECONDS));
        delayQueue.add(new DelayedTask("Task 4", 0, TimeUnit.SECONDS));
        delayQueue.add(new DelayedTask("Task 5", 0, TimeUnit.SECONDS));

        log.info("start");

        // 处理延迟任务
        while (!delayQueue.isEmpty()) {
            try {
                // 队头的元素延迟时间到了才会被取出否则一致阻塞等待
                DelayedTask task = delayQueue.take();
                log.info("处理任务: {}", task.getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class DelayedTask implements Delayed {
        private final String name;
        private final long delay;
        private final long expireTime;

        DelayedTask(String name, long delay, TimeUnit unit) {
            this.name = name;
            this.delay = unit.toMillis(delay);
            this.expireTime = System.currentTimeMillis() + this.delay;
        }

        String getName() {
            return name;
        }

        // 返回指定的延迟时间
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(expireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed other) {
            return Long.compare(this.expireTime, ((DelayedTask) other).expireTime);
        }
    }

下面是执行结果,从打印时间可以看到,对应的任务都是到了指定的延迟时间才会被取出。
注意由于任务4和5指定的延迟时间为0所以会被马上取出处理

17:29:29.746 [main] INFO com.disruptor.blockQueue.BQTest3_DelayQueue - start
17:29:29.747 [main] INFO com.disruptor.blockQueue.BQTest3_DelayQueue - 处理任务: Task 4
17:29:29.748 [main] INFO com.disruptor.blockQueue.BQTest3_DelayQueue - 处理任务: Task 5
17:29:31.751 [main] INFO com.disruptor.blockQueue.BQTest3_DelayQueue - 处理任务: Task 3
17:29:34.749 [main] INFO com.disruptor.blockQueue.BQTest3_DelayQueue - 处理任务: Task 1
17:29:39.749 [main] INFO com.disruptor.blockQueue.BQTest3_DelayQueue - 处理任务: Task 2

应用场景(主要适用于延迟任务):

  • 订单业务: 下单之后如果三十分钟之内没有付款就自动取消订单。
  • 订餐通知: 下单成功后60s之后给用户发送短信通知。
  • 关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。
  • 缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。
  • 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求等

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/741627.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

人工智能学术顶会——NeurIPS 2022 议题(网络安全方向)清单、摘要与总结

按语&#xff1a;随着大模型的崛起&#xff0c;将AI再次推向一个高峰&#xff0c;受到的关注也越来越大。在网络安全领域&#xff0c;除4大安全顶会外&#xff0c;一些涉及AI的安全话题&#xff0c;包括对AI的攻防研究&#xff0c;以及应用AI做安全的研究方向&#xff0c;也会发…

在vite创建的vue3项目中使用Cesium加载纽约建筑模型、设置样式,划分城市区域并着色

在vite创建的vue3项目中使用Cesium加载纽约建筑模型、设置样式&#xff0c;划分城市区域并着色 使用vite创建vue3项目 npm create vitelatestcd到创建的项目文件夹中 npm install安装Cesium npm i cesium vite-plugin-cesium vite -D配置 vite.config.js文件&#xff1a;添加Ce…

系统架构设计师 8:系统质量属性与架构评估

软件系统属性包括功能属性和质量属性&#xff0c;软件架构重点关注的是质量属性。为了精确、定量地表达系统的质量属性&#xff0c;通常会采用质量属性场景的方式进行描述。 在确定软件系统架构&#xff0c;精确描述质量属性场景后&#xff0c;就需要对系统架构进行评估。软件…

前端|CSS(二)

参考视频&#xff1a;黑马程序员前端CSS3基础教程&#xff0c;前端必备基础 目录 &#x1f4da;CSS 布局的三种机制 &#x1f407;普通流 &#x1f407;浮动 ⭐️浮动介绍 ⭐️浮动(float)的应用 ⭐️浮动(float)的扩展 ⭐️清除浮动 &#x1f407;定位 ⭐️定位 ⭐️…

shell? 变量!

目录 ​编辑 &#x1f428;什么是shell &#x1f428;编译型语言和解释型语言 &#x1f428;解释型语言 &#x1f428;变量 &#x1f428;1.局部变量&#xff1a; &#x1f428;2.环境变量通常又称“全局变量” &#x1f428;3.设置环境变量&#xff1a; &#x1f4…

NZ系列工具:NZ11:VBA光标跟随策略

【分享成果&#xff0c;随喜正能量】生活就像是一杯苦茶&#xff0c;而情感是茉莉花&#xff0c;调兑在一起&#xff0c;才会馥郁芬芳。人活在世上&#xff0c;有诸多苦楚萦心&#xff0c;若不懂得自我调解&#xff0c;终究会被纷呈的世相掩埋。所以&#xff0c;更多的时候&…

Ubuntu18.04修改file descriptors(文件描述符限制),解决elasticsearch启动报错问题

最近在学习elasticsearch&#xff0c;使用的平台是Ubuntu18.04&#xff0c;在部署过程中的坑记录一下。 下载安装的过程就不说了&#xff0c;在启动es的时候报错 1 max file descriptors [4096] for elasticsearch process is too low, increase to at least [65535] 看了下…

TIOBE 7月编程语言榜出炉,这个语言强势突围,前三发生重大变化

TIOBE 公布了 2023 年 7 月的编程指数信息&#xff0c;在这个月&#xff0c;语言榜有什么新变化&#xff0c;让我们一起去看看吧&#xff01; JavaScript 创历史新高 几个月前&#xff0c;编程语言 C 占据了 TIOBE 指数的第 3 位&#xff08;超过了 Java&#xff09;。 但C的…

Golang 中的流程控制

1、Golang 中的流程控制 Go 语言中最常用的流程控制有 if 和 for &#xff0c;而 switch 和 goto 主要是为了简化代码、降低重复 代码而生的结构&#xff0c;属于扩展类的流程控制。 2、if else(分支结构) if 语句 if 语句 由一个布尔表达式后紧跟一个或多个语句组成。…

JDBC技术【SQL注入、JDBC批量添加数据、JDBC事务处理、其他查询方式】(三)-全面详解(学习总结---从入门到深化)

目录 SQL注入 JDBC批量添加数据 JDBC事务处理 Blob类型的使用 插入Blob类型数据 其他查询方式 动态条件查询 SQL注入 什么是SQL注入 所谓 SQL 注入&#xff0c;就是通过把含有 SQL 语句片段的参数插入到需要 执行的 SQL 语句中&#xff0c; 最终达到欺骗数据库服务器执行…

如何将各个阶段的数据进行对比?Sugar BI 教你快速搞定

折线图可以将当前和某个时间段的数据进行对比&#xff0c;比如前一天、上周、去年。 数据对比开启条件 当折线图 X 轴有且只有一个日期或时间字段&#xff0c;并且聚合方式为年-xx&#xff0c;Y 轴有且只有一个度量字段时&#xff0c;可以开启并配置折线图数据对比。 支持数据…

VMware使用ubuntu虚拟机的一些使用技巧

VMware安装Ubuntu虚拟机一般相对比较容易&#xff0c;本文记录一些VMware使用ubuntu虚拟机的其他使用技巧。 一、Ubuntu共享文件夹 1.1、 挂载镜像文件&#xff1a; 虚拟机->设置->硬件->CD/DVD.右边“连接”下面选择“使用IOS镜像文件”&#xff0c;浏览选择虚拟机…

【独家揭秘】微信居然可以自动通过好友申请并自动打招呼啦!

最近有客户来咨询&#xff0c;说是因为做内容引流到微信&#xff0c;所以每天很多人加她&#xff0c;微信都快被加爆了&#xff0c;每天手动通过好友申请和打招呼&#xff0c;回答了很多一模一样的问题&#xff0c;就一个小时已经让她很疲惫了&#xff0c;很机械的重复这些事。…

caj文件怎么转成pdf文件格式?分享两个免费方法!

在数字化的世界中&#xff0c;文件格式转换是我们日常生活和工作中常见的需求。CAJ文件是中国学术文献网络出版总库使用的一种文件格式&#xff0c;而PDF是全球广泛接受的文件格式&#xff0c;具有良好的兼容性和稳定性。本文将介绍两种免费的方法&#xff0c;帮助你将CAJ文件转…

细节:双花括号({{ ... }})在Vue.js中的用法

问题&#xff1a; 为什么后端返回的是数字类型时&#xff0c; {{ form.orderPrice }}可以拿到值展示&#xff0c; {{ form.orderPrice || "-" }} 不可以&#xff1f; 接口返回数据&#xff1a; <el-form-item label"订单金额&#xff1a;" prop"…

2.0 熟悉CheatEngine修改器

Cheat Engine 一般简称为CE&#xff0c;它是一款功能强大的开源内存修改工具&#xff0c;其主要功能包括、内存扫描、十六进制编辑器、动态调试功能于一体&#xff0c;且该工具自身附带了脚本工具&#xff0c;可以用它很方便的生成自己的脚本窗体&#xff0c;CE工具可以帮助用户…

shader学习记录——彩色光圈

参考连接 https://blog.csdn.net/stalendp/article/details/21993227 Shader "Custom/ColorRingShader" {Properties{_MainTex ("Texture", 2D) "white" {}}SubShader{Tags { "RenderType""Opaque" }LOD 100Pass{CGPROGRA…

fastapi docs打开为空白解决办法

空白的原因 使用的cdn为国外cdn 解决办法 使用国内cdn 解决步骤 1.打开此文件D:\Program Files\Python\Lib\site-packages\fastapi\openapi\docs.py 2.修改cdn地址 国内cdn不好找呀 &#xff08;1&#xff09;.七牛云存储 开放静态文件CDN&#xff0c;地址&#xff1a;h…

ubuntu20离线安装nodejs、GO、go.rice及yarn

虽然是离线安装&#xff0c;但该有的安装包还是需要的… 目录 1、安装nodejs1.1查看本地是否存在nodejs1.2创建nodejs文件夹1.3下载nodejs二进制文件1.4解压并改名1.5添加软连接 2安装GO2.1创建go文件夹2.2下载go二进制文件2.3解压文件2.4添加环境变量2.5设置sudo可执行go命令…

C++11 function包装器

前言 在C中&#xff0c;有三种可调用对象&#xff1a;函数指针&#xff0c;仿函数&#xff0c;lambda表达式。 三者有相似的作用和效果&#xff0c;但使用形式有很大的差异。 为了进行统一&#xff0c;C11引进了function包装器 文章目录 前言一. function的使用二. function对…