BlockingQueue

news2024/11/24 8:25:19

网上看了好多文章将线程池的但是似乎都没的多少人会详细讲解里面的任务队列,所以只有自己动手学习其中的任务队列

BlockingQueue

在这里插入图片描述
要学习其中的任务队列就需要先学习BlockingQueue,Blocking是一个接口,其中主要的方法为

	// 尝试往队尾添加元素,添加成功返回true,添加失败返回false
	boolean add(E e);
	// 尝试往队尾添加元素,添加成功返回true,添加失败返回false
	boolean offer(E e);
	// 尝试往队尾添加元素,如果队列满了,则阻塞当前线程,直到其能够添加成功为止
	void put(E e) throws InterruptedException;
	// 尝试往队尾添加元素,如果队列满了,则阻塞当前线程,直到超时
	boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
	// 从队头取出元素,如果队列为空则一直等待
	E take() throws InterruptedException;
	// 从队头取出元素,如果队列为空则等待一段时间
	E poll(long timeout, TimeUnit unit) throws InterruptedException;
	int remainingCapacity();
	//从队列中移除指定对象
	boolean remove(Object o);
	//判断队列是否存在指定对象
	public boolean contains(Object o);
	//将队列中元素转移到指定集合
	int drainTo(Collection<? super E> c);
	//将最多MAX个元素转移到指定集合
	int drainTo(Collection<? super E> c, int maxElements);

ArrayBlockingQueue

ArrayBlockingQueue的底层是基于数组实现,当指定容量后数组就确定了不会发生扩容

参数

	// 元素
    final Object[] items;
    //可以被取到的元素下标
    int takeIndex;
    //可以放入元素的下标
    int putIndex;
    //元素个数
    int count;
    //锁
    final ReentrantLock lock;
    //等待条件,用于队列为空的时候阻塞当前线程获取
    private final Condition notEmpty;
    //等待条件,用于队列满的时候阻塞当前线程加入元素
    private final Condition notFull;
    transient Itrs itrs = null;

通过上述数据结构可以看出,ArrayBlockingQueue是通过一个循环数组的方式来实现存储元素的,这里takeIndex记录当前可以取元素的索引位置,而putIndex则记录了下一个元素可以放入的位置,如果队列满了则是takeIndex == putIndex,这里可以通过判断count字段来判断当前是处于满状态还是空置状态,通过一个全局锁lock来实现控制
对于其中的方法比较重要的是出队与入队方法,enqueue与dequeue

重要方法

enqueue与dequeue

其中入队与出队就是将对应位置的putIndex与takeIndex放入其中位置即可,然后加一,但是加一要判断是否超过了当前数组最大位置,如果是则设置为0,同时需要唤醒对应条件的等待队列

    private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }
    
    private E dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

这是其中内层调用的方法,而外部方法我们提供方法为

put与take

put与take实现了其阻塞队列满足条件的方法

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();   //通过while循环以防止当前线程被意外唤醒,如果当前循环被打破则代表没有满了
            enqueue(e); // 放入元素
        } finally {
            lock.unlock();
        }
    }
    
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await(); //与上面类似
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

从这里可以看出ArrayBlockingQueue实现的是先进先出

LinkedBlockingQueue

LinkedBlockingQueue,其底层是通过一个单项链表实现的,由于单项链表需要有一个指向下一个节点的指针,因而其必须使用一个对象这里是Node来存储当前元素的值和下一个节点索引

Node节点

    static class Node<E> {
        //当前元素的值
        E item;
        //下一个元素
        Node<E> next;
        Node(E x) { item = x; }
    }

参数

	//容量
    private final int capacity;
    //当前队列已经存储个数
    private final AtomicInteger count = new AtomicInteger();
    //头指针
    transient Node<E> head;
    //尾指针
    private transient Node<E> last;
    //从队列取出元素的锁
    private final ReentrantLock takeLock = new ReentrantLock();
    //等待如果队列为空
    private final Condition notEmpty = takeLock.newCondition();
    //放入元素的锁
    private final ReentrantLock putLock = new ReentrantLock();
    private final Condition notFull = putLock.newCondition();

这里与ArrayBBlockingQueue存在着一些差异,其中head与last与takeIndex与putIndex都是类似的,但是LinkedBlockingQueue使用了两把锁,而上面只使用了一把锁

重要方法

enqueue与dequeue

private void enqueue(Node<E> node) {
		//将队列尾部节点的下一个节点指向新的节点,并更新尾部节点为最新的节点
        last = last.next = node;
    }
	
	//返回头节点的下一个节点并更新头节点
	//因为头节点存储不是第一个元素
    private E dequeue() {
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

可以看到对于链表的入队与出队操作是非常简单的,所以我们需要看其中的take与put方法

take与put


 public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                notFull.await();    // 如果满了则进入等待
            }
            enqueue(node); //放入元素
            c = count.getAndIncrement(); //元素个数++
            if (c + 1 < capacity) //如果添加元素过后还是未满那么则继续唤醒下一个
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0) 
        	//将等待取出的线程唤醒,而唤醒的时候也必须获取take锁才能唤醒
            signalNotEmpty();
    }


 public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await(); //同理
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal(); //继续获取
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();  //同理
        return x;
    }

ArrayBlockingQueue与LinkedBlockingQueue区别

1、两种底层数据结构不同,一个是基于循环数组一个是基于单向链表
2、两种阻塞方式不同,ArrayBlockingQueue使用了一个全局锁来处理所有操作,也就是无论插入还是获取都只能一个线程执行,而LinkedBlockingQueue则是使用两个锁,使得获取与放入无干扰
3、两着初始化不同,ArrayBlockingQueue必须指定一个大小初始化而LinkedBlockingQueue则可以不指定,不指定则为Integer.MAX_VALUE

SynchronousQueue

这个阻塞队列就比上面两种麻烦多了,那就需要一步一步理解
SynchronousQueue也是一个队列来的,但他的特别之处在于它内部没有容器,一个生产线程,当它生产产品(即put时候),如果当前没有人想要消费产品此生产线程必须阻塞等待一个消费者调用take操作,take操作将唤醒该生产线程,同时消费线程会获取生产线程的产品(即数据传递),这样的一个过程称一次配对过程

构造器

其构造器可传入是公平还是非公平的,默认是非公平的

如果是公平的则采用TransferQueue如果是非公平的则采用TransferStack

    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

从源码上课其中的pull、take等方法都素调用transfer方法

transfer中有三个参数:
e:要存放的元素
timed:是否超时等待
nanos:超时等待时间

TransferQueue

TransferQueue内部有一个内部类:QNode,TransferQueue是由QNode节点构成的链表结构

QNode

	//下一个节点
    volatile QNode next;
    //存入元素         
    volatile Object item;
    //等待线程     
    volatile Thread waiter; 
    //是否是数据
    final boolean isData;

在这里插入图片描述

TransferQueue初始化

TransferQueue创建时会初始化一个QNode节点,head,tail都会指向这个空节点,在TransferQueue中会以根据传入的参数:e是否为null来将节点分为两类,从TransferQueue队列中获取元素的线程是同一类节点,比如:调用take,poll的线程就是同一类节点;从TransferQueue队列中添加元素的线程是一类节点

TransferQueue队列特殊的地方就在于这个队列中只会存在一种节点:要么是获取元素的线程节点,要么是添加元素的线程节点

在这里插入图片描述
在初始化TransferQueue对象时,会初始化生产一个节点队列的头,尾:head,tail都会指向这个init节点

举个例子:假设当前队列中都是put线程,此时有一个take线程,那么这个take线程就会唤醒队列中的一个put线程

在这里插入图片描述
在唤醒线程时,同时会修改该线程所在节点的item值,在后面分析源码时候会看到,如果只是唤醒线程是没有用的,还需要将item的值修改才能真正唤醒该线程

Transfer

下面就来分析Transfer方法

E transfer(E e, boolean timed, long nanos) {
            QNode s = null; 
            boolean isData = (e != null); // 判断当前是什么类型线程

            for (;;) {
                QNode t = tail;
                QNode h = head;
                if (t == null || h == null)       
                    continue;                    

                if (h == t || t.isData == isData) {  // 如果队列为空 || 新类型线程与队列中线程类型一致
                    QNode tn = t.next;  
                    if (t != tail)   //队列尾节点已经被更新                
                        continue;
                    if (tn != null) {  //有新节点加入到队列      
                        advanceTail(t, tn); //更新尾节点
                        continue;
                    }
                    if (timed && nanos <= 0)        
                        return null;
                    if (s == null)
                        s = new QNode(e, isData); //将线程包装成QNode节点
                    if (!t.casNext(null, s))        //将新节点添加到队列末尾
                        continue;

                    advanceTail(t, s); //添加成功后更新tail      
                    Object x = awaitFulfill(s, e, timed, nanos); //等待被唤醒
                    if (x == s) { //中断标记,带阻塞时间的线程等待了规定时间恢复运行            
                        clean(t, s); //节点从队列中删除
                        return null;
                    }

                    if (!s.isOffList()) {           
                        advanceHead(t, s);         
                        if (x != null)              
                            s.item = s;
                        s.waiter = null;
                    }
                    return (x != null) ? (E)x : e;

                } else {        //唤醒队列节点         
                	// 取出当前节点            
                    QNode m = h.next;               
                    if (t != tail || m == null || h != head)
                        continue;                 

                    Object x = m.item;
                    if (isData == (x != null) ||    
                        x == m ||                  
                        !m.casItem(x, e)) {  //将被唤醒线程的值修改为当前线程的值  
                        advanceHead(h, m);          
                        continue;
                    }

                    advanceHead(h, m);              
                    LockSupport.unpark(m.waiter); //唤醒线程
                    return (x != null) ? (E)x : e;
                }
            }
        }

awaitFulfill

Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
            /* Same idea as TransferStack.awaitFulfill */
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Thread w = Thread.currentThread();
            int spins = ((head.next == s) ?
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) {
                if (w.isInterrupted())
                    s.tryCancel(e);
                Object x = s.item;
                if (x != e)
                    return x;
                if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        s.tryCancel(e);
                        continue;
                    }
                }
                if (spins > 0)  
                    --spins;
                else if (s.waiter == null)
                    s.waiter = w;
                else if (!timed)
                    LockSupport.park(this);
                else if (nanos > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanos);
            }
        }

特别说明一下变量spins,所有进入阻塞队列的线程都不着急立即阻塞,而是会先自旋一段时间,然后再阻塞,因为阻塞线程再唤醒线程的代价就比让线程自选的大

TransferStack

里面存在一个内部类:SNode,TransferStack是由Snode单链表构建成的堆栈结构,只有一个head指针指向链表的表头;每次添加元素都是在表头处添加,新节点称为新的表头head,唤醒的线程的时候也是唤醒head节点,因此就形成了先进后出的堆栈结构,TransferStack中根据e也就线程分为两类,一类是获取元素:REQUEST,一类的添加元素:DATA,其中也只有一种节点只有被唤醒时候才会短暂出现2种节点

在这里插入图片描述
SNode

	  //下一个节点
      volatile SNode next;        
      volatile SNode match;
      //当前线程      
      volatile Thread waiter;
      //值    
      Object item;
      //模式                
      int mode;

在TransferStack的堆栈中,如果新加入的线程类型与堆栈中的节点类型不同,那么会先将新线程包装成Snode节点加入堆栈中,成为新的header节点并将旧的节点唤醒。然后更新head节点返回DATA类型节点的元素值

在这里插入图片描述

在有不同类型的节点进入堆栈中的时候,新节点添加到堆栈顶端并更新为新的head节点;这个节点的mode = REQUEST | FULFILLING ;FULFILLING 是用来标记,表示这个head节点正在唤醒堆栈中的一个节点线程;最后在新节点唤醒旧的head节点( oldHead节点)之后,更新堆栈的head节点;

TransferStack部分的源码就再不分析了,入队阻塞部分的源码几乎与TransferQ ueue一样;TransferStack唤醒节点的方式与TransferQueue有点差别,TransferStack是将新节点先包装成节点添加到堆栈中,再唤醒节点线程,最后重新设置堆栈的head指针并将这2个节点清除出堆栈。

SynchronousQueue 这位大佬写的SynchronousQueue感觉很好,画图也很好只有自己理解但是想不出这些理解的话,感谢这位大佬我只是资源的整合者

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/58419.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

m基于OFDM的OMP压缩感知信道估计算法误码率仿真,对比传统的LS,MMSE以及LMMSE信道估计性能

目录 1.算法描述 2.仿真效果预览 3.MATLAB核心程序 4.完整MATLAB 1.算法描述 正交频分复用技术(orthogonalfrequencydivisionmultiplexing,ofdm)应用在通信系统中可以有效抵抗码间干扰(inter-symbolinterference,isi)。同时&#xff0c;通过在符号间插入循环前缀(cyclicpre…

格利尔在北交所上市:市值突破9亿元,朱从利夫妇为实控人

12月2日&#xff0c;格利尔数码科技股份有限公司&#xff08;下称“格利尔”&#xff0c;BJ:831641&#xff09;在北京证券交易所上市。本次上市&#xff0c;格利尔公开发行1050万股&#xff0c;发行价格为9.60元/股&#xff0c;发行市盈率为23.32倍。 据了解&#xff0c;格利尔…

压缩文件7-Zip与WinRAR个人免费版在不同压缩等级下的对比

总结&#xff1a; 压缩率越高&#xff0c;压缩及解压时间相对更长&#xff0c;但传输时间越短&#xff0c;消耗流量越少。在各个压缩等级下&#xff0c;7-Zip均比RAR的压缩率更高&#xff0c;更能达到**“压缩”**的目的&#xff1b;而且相同参数下的压缩速度更快&#xff0c;…

基于增强蛇优化算法求解单目标优化问题附matlab代码

✅作者简介&#xff1a;热爱科研的Matlab仿真开发者&#xff0c;修心和技术同步精进&#xff0c;matlab项目合作可私信。 &#x1f34e;个人主页&#xff1a;Matlab科研工作室 &#x1f34a;个人信条&#xff1a;格物致知。 更多Matlab仿真内容点击&#x1f447; 智能优化算法 …

从源码分析vue3组件的生命周期

概览 借官网一张图充篇幅☺ 这张图展示了一个vue组件从开始渲染到卸载结束一整个生命周期经历的每个环节 但只罗列了选项式api生命周期钩子&#xff0c;没有将组合式api的生命周期钩子放进去 下面这个表格列出了所有选项式api生命周期钩子和组合式api生命周期钩子&#xff0c…

函数的节流和防抖?节流和防抖的区别及实现

一.防抖和节流的本质 本质上就是优化高频率执行代码的一种手段。 比如说&#xff1a;浏览器的scroll,keypress,mousemove,resize等事件在触发时&#xff0c;会不断的调用绑定在事件上的回调函数&#xff0c;会极大地浪费资源&#xff0c;降低前端的性能。 因此为了优化用户体…

【C++模板】非类型模板参数

目录什么是非类型模板参数&#xff1f;非类型的类模板参数非类型的函数模板参数非类型模板参数的局限性限制使用的场景支持使用的场景什么是非类型模板参数&#xff1f; 在函数模板和类模板中&#xff0c;模板参数并不仅仅可以当作类型&#xff0c;还可以当作普通值。当使用普通…

Numpy解决找出二维随机矩阵中每行数据中最接近某个数字的数字

解决思路&#xff1a; 利用np.random.rand()函数生成随机的矩阵。abs函数实现对矩阵中每一个元素和指定元素相减np.argsort()函数实现找到排序后新元素在原来矩阵中的下标利用mask函数提取矩阵中第一列的元素最后利用for循环遍历所有的二维坐标&#xff0c;找到矩阵中每行中满…

微信小程序-读取数据

在开发微信小程序的时候,我们经常都会用到一些配置数据,或者当做“单向数据库(只读)”使用。 我们新建一个新的项目工程,JS版本就可以。 免于麻烦,我们新建一个page(showdata)来显示数据。 为了方便管理,我们在项目工程新建一个目录(data),用于存数据。另外我们再新…

面向对象-05-06-构造方法,标准的 javabean 类

实例化的本质就是调用构造方法 package com.luo.demo01;public class StudenTest {public static void main(String[] args) {// 创建对象// 本质&#xff1a;调用构造器Student s new Student();Student student new Student("luo",18);System.out.println(studen…

Git系列,自定义 git 命令,用 shell 脚本帮助你更好的实现 git 版本控制

一、问题引出 在实际的生产当中&#xff0c;无论是 git、小乌龟 git 、idea git 插件&#xff0c;都满足不了我们生产中遇到的一些常见的问题&#xff0c;例如&#xff1a; 工作任务重的时候&#xff0c;手头上可能有若干个分支&#xff0c;每个分支对应着不同的业务&#xf…

Mysql面试题汇总

Mysql面试题 文章目录Mysql面试题一 Mysql索引001 Mysql如何实现的索引机制&#xff1f;002 InnoDB索引与MyISAM索引实现的区别是什么&#xff1f;003 一个表中如果没有创建索引&#xff0c;那么还会创建B树吗&#xff1f;004 说一下B树索引实现原理&#xff08;数据结构&#…

妙啊,Python 管道 Pipe 编写代码如此优雅

大家好&#xff0c;今天这篇文章我将详细讲解 Pipe 如何让你的代码更加简洁的方法&#xff0c;喜欢本文点赞支持&#xff0c;欢迎收藏学习&#xff0c;文末提供技术交流群&#xff0c;欢迎畅聊&#xff01; 我们知道 map 和 filter 是两种有效的 Python 方法来处理可迭代对象。…

如何基于YAML设计接口自动化测试框架?看完秒会!

在设计自动化测试框架的时候&#xff0c;我们会经常将测试数据保存在外部的文件&#xff08;如Excel、YAML、CSV&#xff09;或者数据库中&#xff0c;实现脚本与数据解耦&#xff0c;方便后期维护。目前非常多的自动化测试框架采用通过Excel或者YAML文件直接编写测试用例&…

[附源码]计算机毕业设计springboot招聘系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

电子学会2021年6月青少年软件编程(图形化)等级考试试卷(四级)答案解析

青少年软件编程&#xff08;图形化&#xff09;等级考试试卷&#xff08;四级&#xff09; 分数&#xff1a;100.00 题数&#xff1a;24 一、单选题&#xff08;共10题&#xff0c;每题3分&#xff0c;共30分&#xff09; 1. 执行下列程序&#xff0c;输出的结果为…

bert 环境搭建之PytorchTransformer 安装

这两天跑以前的bert项目发现突然跑不了&#xff0c;报错信息如下&#xff1a; Step1 transformer 安装 RuntimeError: Failed to import transformers.models.bert.modeling_bert because of the following error (look up to see its traceback): module signal has no att…

IOT物联网系统架构

主要由 IotCloodServer物网联服务平台&#xff0c; IotAdminWeb物联网管理平台&#xff0c; IotDataProcessing物联网数据平台&#xff0c; IotDeviceGateway物联网边缘网关&#xff0c; IotDeviceToolHepler物联网边缘网关 控制 设备的工具类&#xff0c; IotApp物联网应…

SpringBoot_整合Mybatis

一 导入依赖 <!--整合mybatis--><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.1.3</version></dependency><dependency><group…

ubuntu20.04屏幕亮度无法调节的解决方法->安装 brightness-controller-simple 软件

文章目录一、问题描述二、解决方法参考链接一、问题描述 安装ubunt20.04.5 之后&#xff0c;调节Ubuntu上方的亮度控制条、按快捷键(FnF5、FnF6) 都不能实现调节亮度的功能。 二、解决方法 安装 brightness-controller-simple 软件&#xff0c;利用软件调节亮度。 sudo add…