JUC阻塞队列(二):LinkedBlockingQueue

news2024/12/25 13:29:08

1、LinkedBlockingQueue 介绍

      LinkedBlockingQueue 也是接口BlockingQueue的一个实现类,与 ArrrayBlockingQueue基于

       数组实现不同的是,LinkedBlockingQueue是基于单项链表实现的,在LinkedBlockingQueue

       内部维护了一个单向链表来存储数据;链表原则上是无边界的,但LinkedBlockingQueue维护

       了一个常量 capacity 表示队列的容量,new 创建 LinkedBlockingQueue 时若不指定 capacity

       的值,capacity 默认是 Integer.MAX_VALUE

          LinkedBlockingQueue 结构如下:

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    private static final long serialVersionUID = -6903933977591709194L;


    //存放数据的节点
    //从这里可以发现,LinkedBlockingQueue采用单向链表来存储数据
    /**
     * Linked list node class
     */
    static class Node<E> {
        E item;

        Node<E> next;

        Node(E x) { item = x; }
    }

    private final int capacity;//队列容量

    /**
     * 使用 AtomicInteger 来记录 数据的个数
     * todo 问题:在ArrayBlockingQueue采用int 类型来记录数据个数,但在
     *           该类中为什么使用 AtomicInteger 来记录数据个数?
     *        因为 ArrayBlockingQueue 是通过一个锁来保证数据的 入队/出队,可以通过锁来
     *        保证int数据的原子性;而LinkedBlockingQueue 的 入队/出队 采用不同的锁,但
     *        count 在 入队/出队 都需要操作,所以要想保证 count需要CAS来保证原子性
     *     
     */
    private final AtomicInteger count = new AtomicInteger();

    transient Node<E> head;//队列头节点

    private transient Node<E> last;//队列尾节点

    private final ReentrantLock takeLock = new ReentrantLock();//取队列数据的锁

    private final Condition notEmpty = takeLock.newCondition(); //取数据的Condition

    private final ReentrantLock putLock = new ReentrantLock();//向队列添加数据的锁

    private final Condition notFull = putLock.newCondition();//入队列的Condition

    
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

    /**
     * 初始化时向队列中添加数据
     */
    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); 
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }
}

2、LinkedBlockingQueue 使用示例

      LinkedBlockingQueue常用方法也是 BlockingQueue定义的那几个方法,使用方式与

      ArrayBlockingQueue差不多,只是每个方法的具体实现不同而已,、;

               LinkedBlockingQueue 使用示例如下:

public class LinkedBlockingQueueDemo01 {

    public static void main(String[] args) throws InterruptedException {

        //若创建队列时不指定队列容量大小,则默认大小是 Integer.MAX_VALUE
        LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);

        //add 添加失败抛出异常
        queue.add("1");
        queue.add("2");
        queue.add("3");
        queue.add("4");
        //put 添加失败,则一直阻塞,直到队列有空位数据添加成功
        queue.put("4");
        //添加成功返回true,添加失败返回false
        boolean b = queue.offer("5");
        System.out.println(b);
        //带超时时间的添加
        b = queue.offer("6",5, TimeUnit.SECONDS);
        System.out.println(b);

        //从队列中取数据
        //remove 若队列中没有数据,则抛出异常
        String s = queue.remove();
        //poll 若队列为空,则返回null
        s = queue.poll();
        //带超时时间的取数据,若队列为空,则线程阻塞,若阻塞超过超时时间之后队列中还没有数据,则返回null
        s= queue.poll(5,TimeUnit.SECONDS);
        //take 从队列中取数据,若队列为空,则一直阻塞,直到队列中有数据
        s = queue.take();


    }
}

3、LinkedBlockingQueue常用方法解析

3.1、add(E e) 方法

         该方法作用是向队列中添加数据,若添加失败,则直接抛出异常;

          方法代码如下:

                 

3.2、offer(E e) 方法

         该方法作用也是向队列中添加数据,若添加成则返回true,添加失败返回false

          offer方法代码如下:

               

public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        //引用成员变量,获取队列数量
        final AtomicInteger count = this.count;
        //队列数据个数是否等于队列限制长度(队列容量)
        if (count.get() == capacity)
            return false;
        //作为标记存在
        //todo 使用数值类型作为标识的特点
        int c = -1;
        //将存储的数据封装成Node
        Node<E> node = new Node<E>(e);
        //生产者锁
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            //再次判断,查看队列是否还有空间
            //双重检查
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                //判断队列是否满了
                if (c + 1 < capacity)
                    //通知其他阻塞的生产者线程
                    //这里生产者和消费者不是互斥的,但消费者之间是互斥的
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        //如果c==0,表示添加数据之前队列元素个数为0,这时可能会出现消费者全在阻塞状态
        //所以,添加数据之后需要唤醒消费者
        if (c == 0)
            //唤醒消费者
            signalNotEmpty();
        //c>=0表示添加成功
        return c >= 0;
    }


//添加数据
private void enqueue(Node<E> node) {
        
        last = last.next = node;
    }

3.3、singnalNotEmpty()、signalNotNull()、enqueue()

         signalNotEmpty(): 唤醒消费者线程

         signalNotFull(): 唤醒生产者线程

         enqueue(): 入队列

         

private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        //只有持有锁后才能调用 wait/signal
        //所以这里要先获取读锁
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

    
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        //要想执行Condition 的方法,必须先获取相关的锁
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

    
    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }

3.4、offer(E e, long timeout, TimeUnit unit)

         该方法功能也是向队列添加数据,若添加失败则会阻塞,若超过了超时时间还没添加成功

         则表示添加失败,返回false

         

public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException();
        //将超时时间转换成纳秒
        long nanos = unit.toNanos(timeout);
        //标记位
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        //加锁,若被中断,则抛出异常
        putLock.lockInterruptibly();
        try {
            //阻塞,直到超时时间为0
            while (count.get() == capacity) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            //入队
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            //队列未满,通知其他生产者线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        //此时所有消费者线程可能都在阻塞,所有生产数据后需要唤醒消费者线程
        if (c == 0)
            signalNotEmpty();
        return true;
    }

3.5、put(E e) 方法

        该方法功能也是向队列中添加数据,若添加失败,则一直阻塞,直到队列中有空位可以

        添加成功;若线程被中断,则直接抛出异常退出

        

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;//作为标记
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        //加锁,若线程被中断,则抛出异常
        putLock.lockInterruptibly();
        try {
            /*
             * 若队列已经满了,则阻塞,直到队列有空位
             */
            while (count.get() == capacity) {
                notFull.await();
            }
            //向队列中添加数据
            enqueue(node);
            //更新队列数据个数
            c = count.getAndIncrement();
            //队列还没有满,通知其他生产者线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        //此时所有消费者线程可能都在阻塞,所有生产数据后需要唤醒消费者线程
        if (c == 0)
            signalNotEmpty();
    }

3.6、remove() 方法

        该方法是从队列取数据,若队列为空,则抛出异常;

        remove方法代码如下:

              

3.7、poll() 方法

         该方法功能是删除队列头元素(第一个进入队列的元素),若队列为空,则返回null;

         poll方法代码如下:

              

/**
     * 从队列中取数据,若队列为空则返回null
     * @return
     */
    public E poll() {
        final AtomicInteger count = this.count;
        //若队列为空,则返回null
        if (count.get() == 0)
            return null;
        E x = null;
        //标记位
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        //加锁,取数据锁
        takeLock.lock();
        try {
            //判断队列是否为空
            if (count.get() > 0) {
                //从队列取数据
                x = dequeue();
                //CAS队列元素个数减1
                //先获取再减1
                c = count.getAndDecrement();
                //队列元素多余1,当前线程消费后继续唤醒其他消费者线程
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        //c获取的当前消费者线程消费之前的线程,若 c == capacity 表示队列满了,此刻当前线程
        //消费后可能会出现所有生产者线程都处于“阻塞等待” 状态,所以需要唤醒生产者线程
        if (c == capacity)
            signalNotFull();
        //返回消费的元素
        return x;
    }

3.8、poll(long timeout, TimeUnit unit)

         该方法是带有超时时间的获取队列的第一个元素;若队列为空,则当前线程会阻塞,直到

         超过了超时时间 timeout 后,若队列还是为空,则返回null

         

/**
     * 删除队列第一个元素,并返回
     * 若队列为空,当前线程会阻塞,直到时间超过 timeout,若队列还是为空,则返回null
     * 若线程被中断,则直接抛出中断异常,并退出
     *
     * @param timeout
     * @param unit
     * @return
     * @throws InterruptedException
     */
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        //标记位
        int c = -1;
        //将时间转换为纳秒
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        //加锁,若线程被中断,则直接抛出异常,并退出
        takeLock.lockInterruptibly();
        try {
            //若队列为空,则阻塞等待,直到阻塞时间之后
            while (count.get() == 0) {
                if (nanos <= 0)
                    return null;
                //阻塞,并返回剩余阻塞时间
                nanos = notEmpty.awaitNanos(nanos);
            }
            //取数据
            x = dequeue();
            //CAS,队列元素个数减1
            //先获取再减1
            c = count.getAndDecrement();
            //队列元素有多个,则通知其他消费者线程
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        //此刻,生产者线程可能全处于“阻塞等待” 状态,通知唤醒生产者线程
        if (c == capacity)
            signalNotFull();
        return x;
    }

3.9、take() 方法

         该方法功能也是删除并返回队列的第一个元素,若队列为空,则一直阻塞,直到队列

         不为空,或着线程被中断,异常退出

 /**
     * 删除并返回队列的第一个元素,若队列为空,则一直阻塞;
     * 若线程被中断,则直接抛出异常,退出
     * 
     * @return
     * @throws InterruptedException
     */
    public E take() throws InterruptedException {
        E x;
        /**
         * todo 问题:这里c为什么设置为-1?
         *    c=-1 作为标记存在,使用数值类型作为标识的特点
         */
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        //可中断的线程锁,若线程被中断则抛出异常
        takeLock.lockInterruptibly();
        try {
            //队列为空,则阻塞
            while (count.get() == 0) {
                notEmpty.await();
            }
            //取数据
            x = dequeue();
            //更新队列数据
            c = count.getAndDecrement();
            //队列不为空,通知其他消费者线程
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        /**
         * todo 注意:
         *    需要注意 c == capacity 这个判断,c == capacity 表示之前队列满了,当前消费了一个元素后,
         *    但此时可能存在 生产者线程全是 “阻塞” 状态,所以消费数据之后需要唤醒一个生产者线程
         */
        if (c == capacity)
            signalNotFull();
        return x;
    }

            

3.10、peek() 方法

           该方法功能是查看(返回)队列中的第一个元素,但并不会把该元素从队列中删除。

          

/**
     * 查看队列头元素,但并不会删除头元素
     * @return
     */
    public E peek() {
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            //获取队列第一个节点(头节点后边的节点)
            Node<E> first = head.next;
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }

3.11、signalNotFull()、dequeue()

          signalNotFull():唤醒阻塞等待中的生产者线程

          dequeue():删除并返回队列的第一个元素

          

/**
     * 唤醒阻塞中的生产者线程
     */
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        //要想执行Condition 的方法,必须先获取相关的锁
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }




/**
     * 删除链表的第一个元素并返回
     */
    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head; //头节点(头节点是一个虚拟节点)
        Node<E> first = h.next;//第一个头节点
        //删除头节点,即修改节点的next指向(或 h.next=null也是一样)
        //让下一个节点作为头节点
        h.next = h; // help GC,
        //更新头结点
        head = first;
        E x = first.item;
        //删除节点数据,让其作为虚拟头节点
        first.item = null;
        return x;
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2054220.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

探索深度学习的力量:从人工智能到计算机视觉的未来科技革命

目录 1. 引言 2. 人工智能的历史背景 3. 深度学习的崛起 3.1 深度神经网络的基本原理 4. 计算机视觉的发展现状 4.1 传统计算机视觉与深度学习的结合 5. 深度学习在计算机视觉中的应用 5.1 图像分类 5.2 目标检测 6. 深度学习引领的未来科技创新与变革 7. 结论 引言…

opencv cv.findContours 函数图像轮廓层级(记录)

opencv cv.findContours 函数详解 图像轮廓层级 图像轮廓检索方式 cv.findContours contours, hierarchy cv.findContours( image, mode, method[, contours[, hierarchy[, offset]]] ) 参数1&#xff1a;源图像 参数2&#xff1a;轮廓的检索方式&#xff0c;主要参数 参数3…

专题--自底向上的计算机网络(物理层)

目录 计算机网络概述 物理层 数据链路层 网络层 运输层 应用层 网络安全 详细见http://t.csdnimg.cn/MY5aI http://t.csdnimg.cn/8Ipa4 http://t.csdnimg.cn/uvMxS

康耐视相机与发那科机器人通过Ethernet I/P直连与程序编写

配置TCP/IP&#xff1a;按MENU—SETUP—NEXT—HOSTCOMM&#xff0c;选择TCP/IP—按ENTER或者F3[DETAIL] Port#1 IP addr&#xff1a;输入机器人IP地址&#xff0c;按ENTER后输入&#xff0c;如192.168.1.11&#xff1b;如果控制柜有2个网络端口&#xff0c;则按F3[PORT]进行切换…

科创中心“核”动力|趋动科技:AI算力界的领跑者

近日&#xff0c;趋动科技与深信服正式推出联合解决方案。联合解决方案将深信服EDS的高性能存储与趋动科技OrionX AI算力资源池化软件、以及GeminiAI训练平台有机结合&#xff0c;整合存力与算力资源的同时&#xff0c;帮助用户建好AI平台、管好AI资源、用好AI服务。 双方已完成…

监控zabbix的安装与使用

文章目录 1.zabbix的安装步骤2.zabbix的主动模式和被动模式简介及实现3.zabbix proxy主动及被动4.自定义监控&#xff0c;监控linux和连接状态&#xff0c;创建email进行基础报警5.部署zabbix agent脚本&#xff0c;适配rocky和ubuntu系统6.使用脚本&#xff0c;基于zabbix api…

yolov8旋转框+关键点检测

一、Yolov8obb_kpt -----------------------------------现已在v8官方库上更新旋转框分割算法和旋转框关键点检测算法-------------------------- ------------------------------------------- https://github.com/yzqxy/ultralytics-obb_segment---------------------------…

每天五分钟深度学习框架pytorch:自动求导机制

本文重点 深度学习框架pytorch拥有自动求导的机制,自动求导是 PyTorch 中非常重要的特性,能够让我们避免手动去计算非常复杂的导数,这能够极大地减少了我们构建模型的时间。本文学习的是第10步反向传播,学习路线参考前面一篇文章。 pytorch0.4版本 在pytorch的0.4版本中…

YOLO知识点总结:

分类&#xff1a; 即是将图像结构化为某一类别的信息&#xff0c;用事先确定好的类别(category)或实例ID来描述图片。这一任务是最简单、最基础的图像理解任务&#xff0c;也是深度学习模型最先取得突破和实现大规模应用的任务。其中&#xff0c;ImageNet是最权威的评测集&…

C语言刷题日记(附详解)(1)

一、选择判断部分 第一题&#xff1a; 如下代码是否存在风险&#xff0c;并说明原因和修改方案 #include<stdio.h> int main() {char* str "hello world";*str a;return 0; }思路提示&#xff1a;这种形式的字符串存储在什么区域呢&#xff1f;是否真的有…

【个人笔记公司项目】vue项目配置代理解决跨域问题

前后端分离模式势必会遇到跨域问题&#xff0c;比如我是10.106.46.169:8080要去请求10.114.46.108:9191。下面讲下代理详细步骤。 本文步骤基于本人的项目结构 一般项目结构已支持代理 // 部署时需要将改开关置为false window.isDev trueif (window.isDev) { // Devwindow.l…

计算机网络速成(二)

计算机网络面试&#xff08;二&#xff09;-CSDN博客 OSI七层体系架构 OSI七层模型是什么&#xff1f;每层的功能是什么&#xff1f; OSI七层模型是国际标准化组织&#xff08;ISO&#xff09;制定的一个用于计算机或通信系统间互联的标准体系&#xff0c;它从上到下分别是&am…

揭秘“商业园区综合管理平台”的无代码开发流程!

本文中的素材来自我在某国资投资集团朋友小赵的“有偿”投稿&#xff0c;要知道现在的商业园区也正在经历数字化改造&#xff0c;面对多商场、多店铺的复杂管理需求&#xff0c;各类商管集团纷纷进行线上互联网管理模式的转型。 这份素材有何不同之处呢&#xff1f;因为他们走了…

EthernetIP IO从站设备数据 转IEC61850项目案例

目录 1 案例说明 1 2 VFBOX网关工作原理 1 3 准备工作 2 4 网关采集ETHERNETIP IO数据 2 5 用IEC61850协议转发数据 4 6 网关使用多个逻辑设备和逻辑节点的方法 6 7 从设备的的EDS文件获取参数信息 8 8 案例总结 10 1 案例说明 设置网关采集EthernetIP IO设备数据把采集的数据…

成功解决:el-popconfirm组件来确认删除、修改等操作无效

我 | 在这里 ⭐ 全栈开发攻城狮、全网10W粉丝、2022博客之星后端领域Top1、专家博主。 &#x1f393;擅长 指导毕设 | 论文指导 | 系统开发 | 毕业答辩 | 系统讲解等。已指导60位同学顺利毕业 ✈️个人公众号&#xff1a;热爱技术的小郑。回复 Java全套视频教程 或 前端全套视频…

深入理解java web分层架构的高内聚低耦合

​ 在软件开发中&#xff0c;构建一个高效、可维护且可扩展的应用系统一直是开发者追求的目标。分层架构和依赖注入&#xff08;IOC&#xff09;是实现这一目标的重要策略。本文将深入探讨三层架构的高内聚特性、低耦合的设计原则&#xff0c;以及如何通过IOC&#xff08;控制反…

前端宝典之五:React源码解析之深度剖析Diff算法

本文主要针对React源码进行解析&#xff0c;内容有&#xff1a; 1、Diff算法原理、两次遍历 2、Diff瓶颈及限制 3、Diff更新之单节点和多节点原理 一、Diff源码解析 以下是关于 React Diff 算法的详细解析及实例&#xff1a; 1、React Diff 算法的基本概念和重要性 1.1 概念…

非专业人员该学什么程序语言

编程&#xff0c;一度被认为和驾驶一样是一项现代社会的基本技能&#xff0c;非专业人员也该有所掌握&#xff0c;中小学也在教。但实际上&#xff0c;它的普及程度远比驾驶差&#xff0c;掌握这个技能的人很少&#xff0c;在学校学过的知识&#xff0c;因为工作中用不上也都忘…

一文弄懂评分卡是什么

在最开始的信用审批过程中,客户的信用等级主要由专家进行主观评判。随着数据分析工具的发展和数据收集、存储越来越容易,各大机构逐渐使用统计模型将专家的评判标准量化为评分卡模型。从而更有利于客观评价客户风险,和批量高效对客户进行风险分级。随着技术的发展,机器学习…

力扣经典题目~快乐数~零基础也能看懂哦

202. 快乐数https://leetcode.cn/problems/happy-number/ 题目描述&#xff1a; 编写一个算法来判断一个数 n 是不是快乐数。 「快乐数」 定义为&#xff1a; 对于一个正整数&#xff0c;每一次将该数替换为它每个位置上的数字的平方和。然后重复这个过程直到这个数变为 1&…