Java多线程篇(10)——BlockingQueue(数组,链表,同步阻塞队列)

news2024/11/14 11:31:20

文章目录

  • 1、ArrayBlockingQueue
  • 2、LinkedBlockingQueue
  • 3、SynchronousQueue
    • 3.1、transfer 公平实现(队列)
    • 3.2、transfer 非公平实现(栈)

在这里插入图片描述

1、ArrayBlockingQueue

put

    public void put(E e) throws InterruptedException {
        Objects.requireNonNull(e);
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lockInterruptibly();
        try {
            //队列已满,不满足notFull条件,notFull.await
            while (count == items.length)
                notFull.await();
            //队列未满,入队
            enqueue(e);
        } finally {
       	    //解锁 
            lock.unlock();
        }
    }
    
    private void enqueue(E e) {
    	//入队
        final Object[] items = this.items;
        items[putIndex] = e;
        if (++putIndex == items.length) putIndex = 0;
        count++;
        //满足notEmpty条件,notEmpty.signal
        notEmpty.signal();
    }

take

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        //解锁
        lock.lockInterruptibly();
        try {
        	//队列为空,不满足notEmpty条件,notEmpty.await
            while (count == 0)
                notEmpty.await();
            //出队
            return dequeue();
        } finally {
        	//解锁
            lock.unlock();
        }
    }
    
    private E dequeue() {
    	//出队
        final Object[] items = this.items;
        E e = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length) takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        //满足 notFull 条件,notFull.signal
        notFull.signal();
        return e;
    }

2、LinkedBlockingQueue

put

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        final int c;
        final Node<E> node = new Node<E>(e);
        //注意:与数组队列不同的是,这里put和take各用一把锁
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        //put加锁
        putLock.lockInterruptibly();
        try {
        	//队列已满,不满足notFull条件,notFull.await
            while (count.get() == capacity) {
                notFull.await();
            }
            //入队
            enqueue(node);
            //如果入队后,队列仍未满,notFull.signal
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
        	//put释放锁
            putLock.unlock();
        }
        //队列 空->不空,notEmpty.signal
        if (c == 0)
            signalNotEmpty();
    }

    private void enqueue(Node<E> node) {
        last = last.next = node;
    }

    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

take

    public E take() throws InterruptedException {
        final E x;
        final int c;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        //take加锁
        takeLock.lockInterruptibly();
        try {
        	//队列为空,不满足notEmpty条件,notEmpty.await
            while (count.get() == 0) {
                notEmpty.await();
            }
            //出队
            x = dequeue();
            c = count.getAndDecrement();
            //如果出队后,队列仍不空,notEmpty.signal
            if (c > 1)
                notEmpty.signal();
        } finally {
        	//take释放锁
            takeLock.unlock();
        }
        //队列 满->不满,notFull.signal
        if (c == capacity)
            signalNotFull();
        return x;
    }

    private E dequeue() {
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h;
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }
    
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

总得来说,ArrayBlockingQueue和LinkedBlockingQueue的实现原理差不多,都是通过 ReentrantLock + 条件队列去实现的,如果队列满了(不满足条件)put就阻塞,同理如果队列为空(不满足条件)take就阻塞。
他们之间的区别在于:
ArrayBlockingQueue数组结构,put和take用一把锁。
LinkedBlockingQueue链表结构,put和take各用一把锁。

3、SynchronousQueue

这个是同步队列,与上面队列不同的是,这个队列的put和take是一对一交替执行的(文章开头的案例可以证明)。所以这个队列比较特殊,不需要容量的概念,或者说容量为0。实现原理大概就是:如果队列为空,或者当前操作和队列中的操作是一样的就自旋一定次数后阻塞并入队。否则就匹配队列中的第一个元素后将匹配到的元素出队。

值得一提的是,这个队列并没有用到 ReentrantLock,而是用 cas自旋 + LockSupport.park 来代替 ReentrantLock 。

put

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }

take

    public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }

无论是put还是take主要逻辑都在transferer.transfer。
而这个方法有公平,非公平两个实现,对应的数据结构分别是队列和栈,默认为栈结构。

在这里插入图片描述
在这里插入图片描述

3.1、transfer 公平实现(队列)

transfer

        E transfer(E e, boolean timed, long nanos) {
            QNode s = null;
            //传入的e不为null,isDate=true,反正false
            boolean isData = (e != null);
			//自旋
            for (;;) {
                QNode t = tail;
                QNode h = head;
                //如果头或尾节点没有初始化,则自旋  
                if (t == null || h == null)
                    continue;

				//分支一:如果队列为空,或者模式相同就入队
                if (h == t || t.isData == isData) {
                    QNode tn = t.next;
                    //入队要保证 t=tail,tail.next=null。
                    //不满足说明其他线程改了现场,则自旋
                    if (t != tail) 
                        continue;
                    if (tn != null) { 
                        advanceTail(t, tn);
                        continue;
                    }
                    //超时,返回null
                    if (timed && nanos <= 0L)
                        return null;
                    
                    //new QNode节点
                    if (s == null)
                        s = new QNode(e, isData);
                    //cas入队
                    if (!t.casNext(null, s))
                        continue;
					//更新tail尾指针
                    advanceTail(t, s);
                    //自旋一定次数后阻塞(LockSupport.park)
                    Object x = awaitFulfill(s, e, timed, nanos);

					//...唤醒后
					
					//如果m=s,说明取消了,就把它清除掉,并返回null
                    if (x == s) {
                        clean(t, s);
                        return null;
                    }
					//如果s被唤醒后仍在队列中,自己出队+传递数据
                    if (!s.isOffList()) {
                        advanceHead(t, s);
                        if (x != null)
                            s.item = s;
                        s.waiter = null;
                    }
                    //优先返回匹配到的元素,否则返回传入的元素
                    return (x != null) ? (E)x : e;

                }
                //分支二:模式不同,进行匹配
                else {
                    QNode m = h.next;
                    //现场被改变,自旋
                    if (t != tail || m == null || h != head)
                        continue;
                    Object x = m.item;
                    
                    //cas传递数据并出队
                    if (isData == (x != null) ||
                        x == m ||
                        !m.casItem(x, e)) {
                        //非预期情况的自旋
                        advanceHead(h, m);
                        continue;
                    }
                    advanceHead(h, m);
                    //唤醒线程(LockSupport.unpark)
                    LockSupport.unpark(m.waiter);
                    
                    //优先返回匹配到的元素,否则返回传入的元素
                    return (x != null) ? (E)x : e;
                }
            }
        }

3.2、transfer 非公平实现(栈)

transfer

        E transfer(E e, boolean timed, long nanos) {
            SNode s = null;
            //传入的e不为null则为DATA模式,否则为REQUEST模式
            int mode = (e == null) ? REQUEST : DATA;
			//自旋
            for (;;) {
                SNode h = head;
                //分支一:栈为空,或者模式相同就压栈
                if (h == null || h.mode == mode) {
                	//超时情况:如果头结点被取消(超时节点会被取消)则出栈自旋,然后返回null
                    if (timed && nanos <= 0L) {
                        if (h != null && h.isCancelled())
                            casHead(h, h.next);
                        else
                            return null;
                    }
                    //cas压栈new SNode 
                    else if (casHead(h, s = snode(s, e, h, mode))) {
                    	//自旋一定次数后阻塞(LockSupport.park)
                        SNode m = awaitFulfill(s, timed, nanos);

						//唤醒后...

                        //如果m=s,说明取消了,就把它清除掉,并返回null
                        if (m == s) {
                            clean(s);
                            return null;
                        }
                        //将匹配到的元素出栈
                        if ((h = head) != null && h.next == s)
                            casHead(h, s.next);
                        //返回传递的数据(一定是数据,不是线程也不是节点)
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    }
                }
                //分支二:模式不同,且没有正在匹配,就匹配后出栈 
                else if (!isFulfilling(h.mode)) {
                	//头结点已被取消,出栈
                    if (h.isCancelled())
                        casHead(h, h.next);
                        
	                //先让当前节点入栈,然后再匹配,且此时当前节点状态是匹配中
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                        for (;;) {
                        	//栈为空,没有可匹配的元素,将当前节点出栈后,自旋
                            SNode m = s.next;
                            if (m == null) {
                                casHead(s, null);
                                s = null;
                                break;
                            }
                            SNode mn = m.next;
                            //进行匹配,唤醒下一个节点(LockSupport.unpark)
                            if (m.tryMatch(s)) {
                            	//匹配成功,出栈两个元素
                                casHead(s, mn);
                                //返回传递的数据
                                return (E) ((mode == REQUEST) ? m.item : s.item);
                            } else
                            	//匹配失败,说明m已经先一步被其它线程匹配了,就协助出栈两个元素
                                s.casNext(m, mn);
                        }
                    }
                }
                //分支三:模式不同,且正在匹配,则协助匹配
                //其实就是跟分支二执行一样的内容,只不过是这个分支的线程帮忙执行了
                else {
                    SNode m = h.next;
                    if (m == null)
                        casHead(h, null);
                    else {
                        SNode mn = m.next;
                        if (m.tryMatch(h))
                            casHead(h, mn);
                        else
                            h.casNext(m, mn);
                    }
                }
            }
        }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1085220.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

小程序:下拉刷新+上拉加载+自定义导航栏

下拉刷新 &#xff1a; <scroll-view scroll-y"true" 允许纵向滚动 refresher-enabled"true" 开启自定义下拉刷新 默认为false :refresher-triggered&quo…

从读不完一篇文章,到啃下20万字巨著,大模型公司卷起“长文本”

点击关注 文丨郝 鑫 编丨刘雨琦 4000到40万token&#xff0c;大模型正在以“肉眼可见”的速度越变越“长”。 长文本能力似乎成为象征着大模型厂商出手的又一新“标配”。 国外&#xff0c;OpenAI经过三次升级&#xff0c;GPT-3.5上下文输入长度从4千增长至1.6万token&…

MySQL常用命令01

今天开始&#xff0c;每天总结一点MySQL相关的命令&#xff0c;方便大家后期熟悉。 1.命令行登录数据库 mysql -H IP地址 -P 端口号 -u 用户名 -p 密码 数据库名称 -h 主机IP地址 登录本机 localhost或127.0.0.1 -P 数据库端口号 Mysql默认是3306 -u 用户名 -p 密码 …

nodejs+vue+elementui医院挂号预约管理系统4n9w0

前端技术&#xff1a;nodejsvueelementui 前端&#xff1a;HTML5,CSS3、JavaScript、VUE 1、 node_modules文件夹(有npn install Express 框架于Node运行环境的Web框架, 开发语言 node.js 框架&#xff1a;Express 前端:Vue.js 数据库&#xff1a;mysql 数据库工具&#xff…

公司寄件管理教程

不少企业为了规范因公寄件的管理&#xff0c;节约企业的快递成本&#xff0c;最终简化企业内部办公流程&#xff0c;提升企业整体办公效率&#xff0c;在因公寄件达到一定量的时候&#xff0c;都会推出或繁或简的“公司寄件管理制度”。 所谓的“或繁或简”。是根据企业的寄件场…

前端练习项目(附带页面psd图片及react源代码)

一、前言 相信很多学完前端的小伙伴都想找个前端项目练练手&#xff0c;检测自己的学习成果。但是现在很多项目市面上都烂大街了。今天给大家推荐一个全新的项目——电子校园 项目位置&#xff1a;https://github.com/v5201314/eSchool 二、项目介绍(部分页面展示)&#xff…

C++QT-day6

/*定义一个基类 Animal&#xff0c;其中有一个虛函数perform&#xff08;)&#xff0c;用于在子类中实现不同动物的表演行为。*/ #include <iostream> using namespace std; class Animal //封装Animal类&#xff08;基类&#xff09; { private:string person; public:A…

力扣:130. 被围绕的区域(Python3)

题目&#xff1a; 给你一个 m x n 的矩阵 board &#xff0c;由若干字符 X 和 O &#xff0c;找到所有被 X 围绕的区域&#xff0c;并将这些区域里所有的 O 用 X 填充。 来源&#xff1a;力扣&#xff08;LeetCode&#xff09; 链接&#xff1a;力扣&#xff08;LeetCode&#…

在线免费AI绘画工具

体验地址 点我进行AI绘画 使用 选择以文搜图进行绘画 提问 介绍 首先&#xff0c;我们来了解一下ChatGPT。作为一个人工智能语言模型&#xff0c;它可以自动回答你的问题、提供信息&#xff0c;并与你进行流畅的对话。它通过大量的训练数据和机器学习算法&#xff0c;学…

react–antd 实现TreeSelect树形选择组件,实现点开一层调一次接口

效果图: 注意: 当选择“否”&#xff0c;开始调接口&#xff0c;不要把点击调接口写在TreeSelect组件上&#xff0c;这样会导致问题出现&#xff0c;没有层级了 部分代码:

01Maven的工作机制: Maven作为依赖管理工具以及Maven作为构建管理工具

Maven的特点及其应用 Maven 是 Apache 软件基金会组织维护的一款专门为Java项目提供构建和依赖管理支持的工具 Maven 作为依赖管理工具 管理jar包的规模: 随着我们使用的框架数量越来越多以及框架的封装程度也越来越高&#xff0c;项目中使用的jar包也就越来越多 配置工程依…

自动化测试 —— Pytest fixture及conftest详解!

前言 fixture是在测试函数运行前后&#xff0c;由pytest执行的外壳函数。fixture中的代码可以定制&#xff0c;满足多变的测试需求&#xff0c;包括定义传入测试中的数据集、配置测试前系统的初始状态、为批量测试提供数据源等等。fixture是pytest的精髓所在&#xff0c;类似u…

单值二叉树的判断——递归

如果二叉树每个节点都具有相同的值&#xff0c;那么该二叉树就是单值二叉树。 只有给定的树是单值二叉树时&#xff0c;才返回 true&#xff1b;否则返回 false。 根左右 ——递归 代码&#xff1a; /*** Definition for a binary tree node.* struct TreeNode {* int v…

RobotFramework自动化测试框架的基础关键字

1.1.1 如何搜索RobotFramework的关键字 有两种方式可以快速的打开RIDE的关键字搜索对话框 1、选择菜单栏Tools->Search Keywords&#xff0c;然后会出现如下的关键字搜索对话框&#xff0c;这个对话框就类似提供了一个关键字的API的功能&#xff0c;提供了关键字的…

K8S:HPA pod水平自动伸缩

文章目录 一.HPA概念1.什么是HPA2.HPA原理 二.部署 metrics-server1.node节点上传镜像包2.master节点安装metrics-server 三.部署 HPA1.所有节点安装镜像2.master创建测试的 Pod 资源3.创建 HPA 控制器4.创建测试客户端容器5.弹性缩容 四.扩展1.资源限制 - Pod①资源限制的原理…

快速排序详解(递归实现与非递归实现)

目录 一、快速排序的基本思想 二、将序列划分成左右区间的常见方法 2.1hoare版本&#xff08;动图解释代码实现&#xff09; 2.2挖坑法 2.3前后指针法 三、快速排序的初步实现 四、快速排序的优化实现 4.1快排的特殊情况 4.2对区间划分代码的优化 4.3小区间优化 五、…

邮政编码,格式校验:@ZipCode(自定义注解)

目标 自定义一个用于校验邮政编码格式的注解ZipCode&#xff0c;能够和现有的 Validation 兼容&#xff0c;使用方式和其他校验注解保持一致&#xff08;使用 Valid 注解接口参数&#xff09;。 校验逻辑 有效格式 不能包含空格&#xff1b;应为6位数字&#xff1b; 不校验…

5G安卓核心板-MT6833/MT6853核心板规格参数

随着智能手机的不断发展&#xff0c;芯片技术在推动手机性能和功能方面发挥着关键作用。MT6833和MT6853安卓核心板是两款高度集成的基带平台&#xff0c;为LTE/5G/NR和C2K智能手机应用提供强大的处理能力和多样化的接口。 这两款安卓核心板都集成了蓝牙、FM、WLAN和GPS模块&…

CSS网页标题图案和LOGO SEO优化

favicon图标 将网页的头名字旁边放入一个图案 想将想要的图案切成png图片 然后把png图片转换成ico图案可以借助进行访问 将语法引用到head里面 SEO译为搜索引擎优化。是一种利用搜索引擎的规则提高网站有关搜索引擎的自然排名的方式 SEO的目的是对网站进行深度的优化&…

SQL Server 创建表

切换数据库&#xff0c;判断是否存在 --切换数据库 use DBTEST--判断表是否存在 --创建的所有表都可以在sys.boject中找到&#xff0c;所以这里在sys.objects中查找是否有名字为department的表并且type为U 即用户生成的表 if exists(select * from sys.objects where namedepa…