LinkedBlockingQueue详解,深入探究LinkedBlockingQueue源码

news2024/11/18 16:41:13

目录

1、LinkedBlokingQueue是一个有界队列

2、LinkedBlokingQueue是一个单向队列

3、LinkedBlokingQueue中的非阻塞方法

4、LinkedBlokingQueue中的阻塞方法


LinkedBlockingQueue是通过ReentrantLock实现的(有界/无界)阻塞队列,在线程池TheadPoolExecutor中的workQueue就是一个LinkedBlockingQueue的实例。

思考:为什么说LinkedBlockingQueue是一个队列?

根据数据结构中队列的特点判断:先进先出(FIFO),队尾进,队头出。

- LinkedBlockingQueue中的插入方法offer()、put()都是在队尾添加元素。

- LinkedBlockingQueue中的获取/删除方法peek()、poll()、take()都是在队头获取/删除元素。

与普通队列相比,线程池使用LinkedBlockingQueue作为缓存队列的好处是:

  • 当队列满了的时候可以阻塞添加任务的线程(放到条件变量ConditionObject的条件队列notFull里),而不用丢弃当前线程
  • 当队列为空时,会阻塞获取任务的线程(放到条件变量ConditionObject的条件队列notEmpty里),而不用丢弃当前线程

在这篇文章中,会详细介绍LinkedBlockingQueue的底层实现原理。

在此之前,你需要了解ReentrantLock、ConditionObject以及LockSupport几个并发相关的API

为了方便快速了解其结构,简单画了一下的LinkedBlockingQueue类图

通过上面类图可以了解到,LinkedBlokingQueue中依赖了ReentrantLock来保证入队(putLock)和出队(takeLock)的线程安全,同时通过Condition(条件变量)来保存take()方法因队列为空而阻塞的线程(对应条件变量为notEmpty)和put()方法因队列已满而阻塞的线程(对应条件变量为notFull)。

    /** Lock held by take, poll, etc */    
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

1、LinkedBlokingQueue是一个有界队列

LinkedBlokingQueue是一个有界队列,因为它内部通过int类型的capacity属性来保存当前队列的长度,可以通过实例化时传入int类型参数指定,当通过无参构造方法实例化时,队列长度为Integer.MAX_VALUE,所以这依然是一个无界队列。

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) {
            throw new IllegalArgumentException();
        }

        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

2、LinkedBlokingQueue是一个单向队列

LinkedBlokingQueue是一个单向队列,因为其内部定义的Node是一个单向的链表,并且LinkedBlokingQueue只通过head和last保存了队头和队尾节点。

    /**
     * Linked list node class
     */
    static class Node<E> {
        E item;

        Node<E> next;

        Node(E x) { item = x; }
    }

3、LinkedBlokingQueue中的非阻塞方法

public boolean offer(E e):往队尾添加元素,如果队列已满,则直接返回false,不会阻塞线程。

    public boolean offer(E e) {
        if (e == null) {
            throw new NullPointerException();
        }

        // 获取队列长度
        final AtomicInteger count = this.count;

        // 队列已满,返回false,添加失败
        if (count.get() == capacity) {
            return false;
        }

        // 创建一个变量保存队列的大小(长度)
        int c = -1;
        // 根据数据创建Node节点
        Node<E> node = new Node<E>(e);

        // 加锁
        final ReentrantLock putLock = this.putLock;
        putLock.lock();

        try {
            if (count.get() < capacity) {
                // 入队
                enqueue(node);

                // 队列长度自增1
                c = count.getAndIncrement();

                // 如果队列还没有满,唤醒notFull中因为添加失败被阻塞的一个线程
                if (c + 1 < capacity) {
                    notFull.signal();
                }
            }
        } finally {
            // 释放锁
            putLock.unlock();
        }

        // 如果入队之前队列为空,则入队之后队列中有一个元素
        // 唤醒一个因为调用take()方法被阻塞的线程
        if (c == 0) {
            signalNotEmpty();
        }

        return c >= 0;
    }

入队操作 

    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;

        last = last.next = node;
    }

public E poll():在队头获取并删除一个元素,如果队列为空,直接返回null,不会阻塞线程。

    public E poll() {
        final AtomicInteger count = this.count;
        
        // 队列没有元素,返回null
        if (count.get() == 0) {
            return null;
        }

        E x = null;
        int c = -1;

        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();

        try {
            if (count.get() > 0) {
                // 出队操作
                x = dequeue();
                // 队列长度自减1
                c = count.getAndDecrement();
                
                // 队列不为空
                // 唤醒notEmpty中因为队列为空,即通过take()获取元素失败而被阻塞的一个线程
                if (c > 1) {
                    notEmpty.signal();
                }
            }
        } finally {
            takeLock.unlock();
        }

        // 如果出队之前队列是满的,则出队之后队列中还有一个可用的位置
        // 唤醒一个因为调用put()方法被阻塞的线程
        if (c == capacity) {
            signalNotFull();
        }

        return x;
    }

出队操作

    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

public E peek():从队头获取一个元素,但是不删除元素。

这个方法非常简单,加锁获取队列的头结点,如果队列为空返回null。

    public E peek() {
        if (count.get() == 0)
            return null;

        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();

        try {
            Node<E> first = head.next;

            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }

4、LinkedBlokingQueue中的阻塞方法

public E take() throws InterruptedException:获取队头的元素,如果队列为空,则阻塞当前线程。

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;

        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();

        try {
            // 队列为空,当前线程(获取元素的线程)被阻塞
            while (count.get() == 0) {
                notEmpty.await();
            }

            // 出队
            x = dequeue();
            // 队列长度自增
            c = count.getAndDecrement();

            // 队列不为空,唤醒notEmpty中的一个线程
            if (c > 1) {
                notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }

        if (c == capacity) {
            signalNotFull();
        }

        return x;
    }

notEmpty.await();这行代码完成了阻塞当前线程,我们看一下他的实现

因为notEmpty是调用ReentrantLock的newCondition()方法得到的,所以用的是AQS的内部Condition实现类ConditionObject。

        public final void await() throws InterruptedException {
            // 如果当前线程被中断了,清除中断状态,抛出中断异常返回
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }

            // 把当前线程放到条件队列中
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;

            // node节点已经在条件队列中
            while (!isOnSyncQueue(node)) {
                // 中断线程
                LockSupport.park(this);

                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

所以,最终是通过LockSupport.part()方法来中断线程的,对应的signal()和signalAll()方法也是通过LockSupport.unpark()方法来唤醒线程。

        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            
            if (first != null)
                doSignal(first);
        }

        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                
                first.nextWaiter = null;
            } while (!transferForSignal(first) && (first = firstWaiter) != null);
        }

        final boolean transferForSignal(Node node) {
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;

            Node p = enq(node);
            int ws = p.waitStatus;

            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread); // 唤醒线程
            
            return true;
        }

public void put(E e) throws InterruptedException:往队尾添加元素,如果队列已满,则阻塞当前线程。

    public void put(E e) throws InterruptedException {
        // 添加的元素不能为空
        if (e == null) 
            throw new NullPointerException();

        int c = -1;
        Node<E> node = new Node<E>(e);

        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();

        try {
            // 队列已满,阻塞线程
            while (count.get() == capacity) {
                notFull.await();
            }

            // 入队
            enqueue(node);
            // 队列长度自增
            c = count.getAndIncrement();

            // 如果队列还没有满,唤醒notFull中因为添加失败被阻塞的一个线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }

        if (c == 0)
            signalNotEmpty();
    }

好了,这篇文章就分享到这里了,看完不要忘了点赞+收藏哦~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/898847.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PHP8的字符串操作3-PHP8知识详解

今天继续分享字符串的操作&#xff0c;前面说到了字符串的去除空格和特殊字符&#xff0c;获取字符串的长度&#xff0c;截取字符串、检索字符串。 今天继续分享字符串的其他操作。如&#xff1a;替换字符串、分割和合成字符串。 5、替换字符串 替换字符串就是对指定字符串中…

SUMO traci接口控制电动车前往充电站充电

首先需要创建带有停车位的充电站(停车场和充电站二合一)&#xff0c;具体参考我的专栏中其他文章。如果在仿真的某个时刻&#xff0c;希望能够控制电动车前往指定的充电站充电&#xff0c;并且在完成充电后继续前往车辆原来的目的地&#xff0c;那么可以使用以下API&#xff1a…

STM32 F103C8T6学习笔记8:0.96寸单色OLED显示屏显示字符

使用STM32F103 C8T6 驱动0.96寸单色OLED显示屏: OLED显示屏的驱动&#xff0c;在设计开发中OLED显示屏十分常见&#xff0c;因此今日学习一下。一篇文章从程序到显示都讲通。 文章提供源码、原理解释、测试工程下载&#xff0c;测试效果图展示。 目录 OLED驱动原理—IIC通信…

【论文解读】Hybrid-SORT: Weak Cues Matter for Online Multi-Object Tracking

因为Hybrid-SORT的baseline是基于OCSORT进行改进的&#xff0c;在这之前建议先了解byteTrack和【】的相关知识 1.介绍 1.1 基本框架 多目标跟踪(MOT)将问题分为两个子任务。第一个任务是检测每个帧中的对象。第二个任务是将它们在不同的框架中联系起来。关联任务主要通过显式…

搜狗拼音暂用了VSCode及微信小程序开发者工具快捷键Ctrl + Shit + K 搜狗拼音截图快捷键

修改搜狗拼音的快捷键 右键--更多设置--属性设置--按键--系统功能快捷键--系统功能快捷键设置--取消Ctrl Shit K的勾选--勾选截屏并设置为Ctrl Shit A 微信开发者工具设置快捷键 右键--Command Palette--删除行 微信开发者工具快捷键 删除行&#xff1a;Ctrl Shit K 或…

n5173b是德科技keysight N5173B信号发生器

产品概述 是德科技/安捷伦N5173B EXG模拟信号发生器 当您需要平衡预算和性能时&#xff0c;是德科技N5173B EXG微波模拟信号发生器是经济高效的选择。它提供解决宽带滤波器、放大器、接收机等参数测试的基本信号。执行基本LO上变频或CW阻塞&#xff0c;低成本覆盖13、20、31.…

FPGA应用学习笔记----I2S和总结

时序一致在慢时序方便得多 增加了时序分布和分析的复杂性 使用fifo会开销大量资源

SqlServer的with(nolock)关键字的用法介绍

举个例子 下面就来演示这个情况。 为了演示两个事务死锁的情况&#xff0c;我们下面的测试都需要在SQL Server Management Studio中打开两个查询窗口。保证事务不被干扰。 --1、 没有提交的事务&#xff0c;NOLOCK 和 READPAST处理的策略&#xff1a; --查询窗口一请执行如下…

python中可以处理word文档的模块:docx模块

前言 大家早好、午好、晚好吖 ❤ ~欢迎光临本文章 话不多说&#xff0c;直接开搞&#xff0c;如果有什么疑惑/资料需要的可以点击文章末尾名片领取源码 一.docx模块 Python可以利用python-docx模块处理word文档&#xff0c;处理方式是面向对象的。 也就是说python-docx模块…

设备文件和设备绑定

实验目的&#xff1a;使用函数让设备文件和设备绑定&#xff0c;完成对LED的简单控制 在test.c中完成硬件逻辑控制 test.c #include <stdio.h> #include <unistd.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #inclu…

openGauss学习笔记-44 openGauss 高级数据管理-存储过程

文章目录 openGauss学习笔记-44 openGauss 高级数据管理-存储过程44.1 语法格式44.2 参数说明44.3 示例 openGauss学习笔记-44 openGauss 高级数据管理-存储过程 存储过程是能够完成特定功能的SQL语句集。用户可以进行反复调用&#xff0c;从而减少SQL语句的重复编写数量&…

eUICC 识别号 (EIN)

GSMA 是业界指定的一级 EID&#xff08;eUICC 标识符&#xff09;分配机构&#xff0c;负责协调 eUICC 标识号的发行和使用。每个 eSIM 都需要具有唯一、持久且安全的 EID&#xff0c;以识别嵌入式或可移动 eUICC&#xff0c;如SGP.29中所定义。 GSMA eUICC 身份方案为每个 eUI…

“智能查单轻松实现批量快递查询,高效掌握快递物流信息!“

亲爱的用户&#xff0c;你是否常常为了查询大量快递单号而感到烦恼&#xff1f;不用担心&#xff0c;我们已经为你提供了一种高效、智能的解决方案&#xff01;现在&#xff0c;只需一键操作&#xff0c;即可实现批量快递查询&#xff0c;迅速了解每个单号的详细物流信息。 首…

网络编程(基础)

一、OSI体系结构 ISO&#xff08;国际标准化组织&#xff09;制定了一个国际标准OSI&#xff08;开放式通讯系统互联参考模型&#xff09;&#xff0c;对通讯系统进行了标准化。 定义了7层模型&#xff1a; 二、TCP/IP协议介绍 OSI模型是一个理想化的模型已经很少使用&#x…

git merge规则

参考文档&#xff1a;https://juejin.cn/post/7129333439299321887 丹尼尔&#xff1a;Hi&#xff0c;蛋兄&#xff0c;周杰伦都出新专辑了&#xff0c;你咋还不更新啊&#xff0c;真的打算半年一更啊&#xff1f; 蛋先生&#xff1a;好像确实是这样&#xff0c;要不&#xff0…

【机器学习】— 2 图神经网络GNN

一、说明 在本文中&#xff0c;我们探讨了图神经网络&#xff08;GNN&#xff09;在推荐系统中的潜力&#xff0c;强调了它们相对于传统矩阵完成方法的优势。GNN为利用图论来改进推荐系统提供了一个强大的框架。在本文中&#xff0c;我们将在推荐系统的背景下概述图论和图神经网…

在“听得懂”之后“看得见、动起来”,实在智能首发“你说PC做”的大模型Agent

大洋彼岸种下了一颗AI的种子&#xff0c;拥有“算力魔法”的ChatGPT在海内外掀起一场“大”爆发——大型语言模型爆发&#xff0c;带动了AI大模型技术的新热潮。 “你问我答”的不仅是ChatGPT上的交互形态&#xff0c;更是一张名为“大模型”的问卷&#xff0c;答的是全球人工…

Python可视化在量化交易中的应用(13)_Seaborn直方图

Seaborn中带核密度的直方图的绘制方法 seaborn中绘制直方图使用的是sns.histlot()函数&#xff1a; sns.histplot(data,x,y,hue,weights,stat‘count’,bins‘auto’,binwidth,binrange,discrete,cumulative,common_bins,common_norm,multiple‘layer’,element‘bars’,fill,…

如何解决使用npm出现Cannot find module ‘XXX\node_modules\npm\bin\npm-cli.js’错误

遇到问题&#xff1a;用npm下载组件时出现Cannot find module ‘D&#xff1a;software\node_modules\npm\bin\npm-cli.js’ 问题&#xff0c;导致下载组件不能完成。 解决方法&#xff1a;下载缺少的npm文件即可解决放到指定node_modules目录下即可解决。 分析问题&#xff1…

KubeSphere 社区双周报 | Java functions framework 支持 SkyWalking | 2023.8.4-8.17

KubeSphere 社区双周报主要整理展示新增的贡献者名单和证书、新增的讲师证书以及两周内提交过 commit 的贡献者&#xff0c;并对近期重要的 PR 进行解析&#xff0c;同时还包含了线上/线下活动和布道推广等一系列社区动态。 本次双周报涵盖时间为&#xff1a;2023.08.04-2023.…