java并发编程:LinkedBlockingQueue详解

news2024/11/19 23:32:21

文章目录

  • 简介
  • 源码分析
    • 属性
    • 构造函数
    • 入队方法
      • put(E e)
      • offer(E e)
      • offer(E e, long timeout, TimeUnit unit)
    • 出队方法
      • take()
      • poll()
      • 获取元素方法
      • 删除元素方法
  • 问题
  • 总结


简介

在集合框架里,想必大家都用过ArrayList和LinkedList,也经常在面试中问到他们之间的区别。ArrayList和ArrayBlockingQueue一样,内部基于数组来存放元素,而LinkedBlockingQueue则和LinkedList一样,内部基于链表来存放元素。

LinkedBlockingQueue实现了BlockingQueue接口,这里放一张类的继承关系图:
在这里插入图片描述
LinkedBlockingQueue不同于ArrayBlockingQueue,它如果不指定容量,默认为Integer.MAX_VALUE,也就是无界队列。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小。

源码分析

属性

/**
 * 节点类,用于存储数据
 */
static class Node<E> {
    E item;
    Node<E> next;

    Node(E x) { item = x; }
}

/** 阻塞队列的大小,默认为Integer.MAX_VALUE */
private final int capacity;

/** 当前阻塞队列中的元素个数 */
private final AtomicInteger count = new AtomicInteger();

/**
 * 阻塞队列的头结点
 */
transient Node<E> head;

/**
 * 阻塞队列的尾节点
 */
private transient Node<E> last;

/** 获取并移除元素时使用的锁,如take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** notEmpty条件对象,当队列没有数据时用于挂起执行删除的线程 */
private final Condition notEmpty = takeLock.newCondition();

/** 添加元素时使用的锁如 put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** notFull条件对象,当队列数据已满时用于挂起执行添加的线程 */
private final Condition notFull = putLock.newCondition();

从上面的属性我们知道,每个添加到LinkedBlockingQueue队列中的数据都将被封装成Node节点,添加的链表队列中,其中head和last分别指向队列的头结点和尾结点。与ArrayBlockingQueue不同的是,LinkedBlockingQueue内部分别使用了takeLock 和 putLock 对并发进行控制,也就是说,添加和删除操作并不是互斥操作,可以同时进行,这样也就可以大大提高吞吐量。

这里如果不指定队列的容量大小,也就是使用默认的Integer.MAX_VALUE,如果存在添加速度大于删除速度时候,有可能会内存溢出,这点在使用前希望慎重考虑。

另外,LinkedBlockingQueue对每一个lock锁都提供了一个Condition用来挂起和唤醒其他线程。

构造函数

public LinkedBlockingQueue() {
    // 默认大小为Integer.MAX_VALUE
    this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

public LinkedBlockingQueue(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        int n = 0;
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            enqueue(new Node<E>(e));
            ++n;
        }
        count.set(n);
    } finally {
        putLock.unlock();
    }
}

默认的构造函数和最后一个构造函数创建的队列大小都为Integer.MAX_VALUE,只有第二个构造函数用户可以指定队列的大小。第二个构造函数最后初始化了last和head节点,让它们都指向了一个元素为null的节点。
在这里插入图片描述
最后一个构造函数使用了putLock来进行加锁,但是这里并不是为了多线程的竞争而加锁,只是为了放入的元素能立即对其他线程可见。

同样,LinkedBlockingQueue也有着和ArrayBlockingQueue一样的方法,我们先来看看入队列的方法。

入队方法

LinkedBlockingQueue提供了多种入队操作的实现来满足不同情况下的需求,入队操作有如下几种:

  • void put(E e);
  • boolean offer(E e);
  • boolean offer(E e, long timeout, TimeUnit unit)。

put(E e)

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 获取锁中断
    putLock.lockInterruptibly();
    try {
        //判断队列是否已满,如果已满阻塞等待
        while (count.get() == capacity) {
            notFull.await();
        }
        // 把node放入队列中
        enqueue(node);
        c = count.getAndIncrement();
        // 再次判断队列是否有可用空间,如果有唤醒下一个线程进行添加操作
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    // 如果队列中有一条数据,唤醒消费线程进行消费
    if (c == 0)
        signalNotEmpty();
}

小结put方法来看,它总共做了以下情况的考虑:

  • 队列已满,阻塞等待。
  • 队列未满,创建一个node节点放入队列中,如果放完以后队列还有剩余空间,继续唤醒下一个添加线程进行添加。如果放之前队列中没有元素,放完以后要唤醒消费线程进行消费。

很清晰明了是不是?

我们来看看该方法中用到的几个其他方法,先来看看enqueue(Node node)方法:

private void enqueue(Node<E> node) {
    last = last.next = node;
}

该方法可能有些同学看不太懂,我们用一张图来看看往队列里依次放入元素A和元素B,毕竟无图无真相:
在这里插入图片描述
接下来我们看看signalNotEmpty,顺带着看signalNotFull方法。

private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

为什么要这么写?因为signal的时候要获取到该signal对应的Condition对象的锁才行。

offer(E e)

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        // 队列有可用空间,放入node节点,判断放入元素后是否还有可用空间,
        // 如果有,唤醒下一个添加线程进行添加操作。
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

可以看到offer仅仅对put方法改动了一点点,当队列没有可用元素的时候,不同于put方法的阻塞等待,offer方法直接方法false。

offer(E e, long timeout, TimeUnit unit)

public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

    if (e == null) throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        // 等待超时时间nanos,超时时间到了返回false
        while (count.get() == capacity) {
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(new Node<E>(e));
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return true;
}

该方法只是对offer方法进行了阻塞超时处理,使用了Condition的awaitNanos来进行超时等待,这里为什么要用while循环?因为awaitNanos方法是可中断的,为了防止在等待过程中线程被中断,这里使用while循环进行等待过程中中断的处理,继续等待剩下需等待的时间。

出队方法

入队列的方法说完后,我们来说说出队列的方法。LinkedBlockingQueue提供了多种出队操作的实现来满足不同情况下的需求,如下:

  • E take();
  • E poll();
  • E poll(long timeout, TimeUnit unit);

take()

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        // 队列为空,阻塞等待
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        // 队列中还有元素,唤醒下一个消费线程进行消费
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    // 移除元素之前队列是满的,唤醒生产线程进行添加元素
    if (c == capacity)
        signalNotFull();
    return x;
}

take方法看起来就是put方法的逆向操作,它总共做了以下情况的考虑:

  • 队列为空,阻塞等待。
  • 队列不为空,从队首获取并移除一个元素,如果消费后还有元素在队列中,继续唤醒下一个消费线程进行元素移除。如果放之前队列是满元素的情况,移除完后要唤醒生产线程进行添加元素。

我们来看看dequeue方法

private E dequeue() {
    // 获取到head节点
    Node<E> h = head;
    // 获取到head节点指向的下一个节点
    Node<E> first = h.next;
    // head节点原来指向的节点的next指向自己,等待下次gc回收
    h.next = h; // help GC
    // head节点指向新的节点
    head = first;
    // 获取到新的head节点的item值
    E x = first.item;
    // 新head节点的item值设置为null
    first.item = null;
    return x;
}

可能有些童鞋链表算法不是很熟悉,我们可以结合注释和图来看就清晰很多了。
在这里插入图片描述
其实这个写法看起来很绕,我们其实也可以这么写:

private E dequeue() {
    // 获取到head节点
    Node<E> h = head;
    // 获取到head节点指向的下一个节点,也就是节点A
    Node<E> first = h.next;
    // 获取到下下个节点,也就是节点B
    Node<E> next = first.next;
    // head的next指向下下个节点,也就是图中的B节点
    h.next = next;
    // 得到节点A的值
    E x = first.item;
    first.item = null; // help GC
    first.next = first; // help GC
    return x;
}

poll()

public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        if (count.get() > 0) {
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

poll方法去除了take方法中元素为空后阻塞等待这一步骤,这里也就不详细说了。同理,poll(long timeout, TimeUnit unit)也和offer(E e, long timeout, TimeUnit unit)一样,利用了Condition的awaitNanos方法来进行阻塞等待直至超时。这里就不列出来说了。

获取元素方法

public E peek() {
    if (count.get() == 0)
        return null;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        Node<E> first = head.next;
        if (first == null)
            return null;
        else
            return first.item;
    } finally {
        takeLock.unlock();
    }
}

加锁后,获取到head节点的next节点,如果为空返回null,如果不为空,返回next节点的item值。

删除元素方法

public boolean remove(Object o) {
    if (o == null) return false;
    // 两个lock全部上锁
    fullyLock();
    try {
        // 从head开始遍历元素,直到最后一个元素
        for (Node<E> trail = head, p = trail.next;
             p != null;
             trail = p, p = p.next) {
            // 如果找到相等的元素,调用unlink方法删除元素
            if (o.equals(p.item)) {
                unlink(p, trail);
                return true;
            }
        }
        return false;
    } finally {
        // 两个lock全部解锁
        fullyUnlock();
    }
}

void fullyLock() {
    putLock.lock();
    takeLock.lock();
}

void fullyUnlock() {
    takeLock.unlock();
    putLock.unlock();
}

因为remove方法使用两个锁全部上锁,所以其他操作都需要等待它完成,而该方法需要从head节点遍历到尾节点,所以时间复杂度为O(n)。我们来看看unlink方法。

void unlink(Node<E> p, Node<E> trail) {
    // p的元素置为null
    p.item = null;
    // p的前一个节点的next指向p的next,也就是把p从链表中去除了
    trail.next = p.next;
    // 如果last指向p,删除p后让last指向trail
    if (last == p)
        last = trail;
    // 如果删除之前元素是满的,删除之后就有空间了,唤醒生产线程放入元素
    if (count.getAndDecrement() == capacity)
        notFull.signal();
}

问题

看源码的时候,我给自己抛出了一个问题。

  • 为什么dequeue里的h.next不指向null,而指向h?
  • 为什么unlink里没有p.next = null或者p.next = p这样的操作?
    这个疑问一直困扰着我,直到我看了迭代器的部分源码后才豁然开朗,下面放出部分迭代器的源码:
private Node<E> current;
private Node<E> lastRet;
private E currentElement;

Itr() {
    fullyLock();
    try {
        current = head.next;
        if (current != null)
            currentElement = current.item;
    } finally {
        fullyUnlock();
    }
}

private Node<E> nextNode(Node<E> p) {
    for (;;) {
        // 解决了问题1
        Node<E> s = p.next;
        if (s == p)
            return head.next;
        if (s == null || s.item != null)
            return s;
        p = s;
    }
}

迭代器的遍历分为两步,第一步加双锁把元素放入临时变量中,第二部遍历临时变量的元素。也就是说remove可能和迭代元素同时进行,很有可能remove的时候,有线程在进行迭代操作,而如果unlink中改变了p的next,很有可能在迭代的时候会造成错误,造成不一致问题。这个解决了问题2。

而问题1其实在nextNode方法中也能找到,为了正确遍历,nextNode使用了 s == p的判断,当下一个元素是自己本身时,返回head的下一个节点。

总结

LinkedBlockingQueue是一个阻塞队列,内部由两个ReentrantLock来实现出入队列的线程安全,由各自的Condition对象的await和signal来实现等待和唤醒功能。它和ArrayBlockingQueue的不同点在于:

  • 队列大小有所不同,ArrayBlockingQueue是有界的初始化必须指定大小,而LinkedBlockingQueue可以是有界的也可以是无界的(Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题。
  • 数据存储容器不同,ArrayBlockingQueue采用的是数组作为数据存储容器,而LinkedBlockingQueue采用的则是以Node节点作为连接对象的链表。
  • 由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影响。
  • 两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/633314.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

光线追踪RayTracing,基本原理,判断物体与光线相交

光线的三点假设&#xff1a; 光线按直线传播光线之间不会发生碰撞光线会经过一系列折射反射进入摄像机 可以从摄像机发出光线&#xff0c;推出可逆的光路 上图中&#xff0c;透明球在与相机直连的线条处&#xff0c;需要将折射和反射的着色点结果相加&#xff0c;如果有光源直…

Neuroimage | LMDA-Net第一作者亲自讲述其设计思想

近期&#xff0c; 天津大学精仪学院和医工院联合&#xff0c;在神经科学和神经成像顶刊 Neuroimage中发表题为《LMDA-Net: 一种具有通用性和可解释性的轻量级EEG解码网络》的学术论文, 为解决人工神经网络模型在EEG解码时面临的跨数据集泛化性差、预测波动性高和模型可解释性差…

【2023】Redis cluster集群模式搭建

目录 1.cluster集群介绍2.搭建cluster集群2.1.架构图2.2.搭建集群2.2.1.创建所需配置文件2.2.2.创建集群所需容器2.2.3.创建集群&#xff1a;master1节点连接其他节点2.2.4.配置从节点&#xff0c;完成三主三从 3.在cluster集群内读写数据 1.cluster集群介绍 Redis Cluster是R…

每个程序员都必须知道的8种通用数据结构

8种常用数据结构 数据结构是一种特殊的组织和存储数据的方式&#xff0c;可以使我们可以更高效地对存储的数据执行操作。数据结构在计算机科学和软件工程领域具有广泛而多样的用途。 几乎所有已开发的程序或软件系统都使用数据结构。此外&#xff0c;数据结构属于计算机科学和…

GitOps多环境部署问题及解决方案

大型组织应用GitOps难免会遇到在多环境中部署的问题&#xff0c;本文分析了应用环境分支策略会遇到到问题&#xff0c;介绍了应用文件夹策略解决这些问题的方案。原文&#xff1a;Stop Using Branches for Deploying to Different GitOps Environments[1], How to Model Your G…

STC8比较器功能案例介绍

STC8比较器功能案例介绍 &#x1f4cc;相关篇《STC8功能脚切换相关寄存器P_SW1和P_SW2以及MCLKOCR介绍》 &#x1f4cb;基于内部参考电压源&#xff0c;使用外部端口P3.7作为比较器正极输入源&#xff0c;比较器结果输出到P3.4 或者P4.1&#xff08;由P_SW2 中的CMPO_S 进行设定…

Oracle的最高认证并不是OCM,而是......

什么是Oracle数据库的最高认证呢&#xff1f;大家注意&#xff0c;不是Oracle OCM&#xff0c;而是Oracle高可用大师认证&#xff0c;全称叫&#xff1a;Maxium Availability Certified Master&#xff0c;要获得这一张证书&#xff0c;你需要同时有5个认证。 关于博主&#x…

Prompt 用法大全!让 ChatGPT 更智能的六种策略(中)

如果遵循以下六种策略来构建 Prompt 提示词&#xff0c;在和 ChatGPT 对话中我们将获得更好、更符合我们要求的回答。 这些策略&#xff0c;后几种更适合在编程调用 ChatGPT API 时使用&#xff0c;不过也适用直接和 ChatGPT 对话&#xff0c;让它更好的理解我们的意图。 1、写…

8.DIY可视化-拖拽设计1天搞定主流小程序-小程序首页幻灯片显示

小程序首页幻灯片显示 本教程均在第一节中项目启动下操作 小程序首页幻灯片显示前言一、添加组件: 图片轮播公告图文菜单二. 绑定幻灯片数据接口三:首页绑定接口数据:1.绑定字段 四.导出源码,解压后,导入hbulider,运行查看效果1.导出源码:2.解压,导入hbuilder 五.运行查看效果对…

Ae:蒙版插值面板

Ae菜单&#xff1a;窗口/蒙版插值 Mask Interpolation 蒙版插值Mask Interpolation面板可用于自动创建蒙版路径 Mask Path属性的关键帧&#xff0c;从而实现更平滑逼真的路径动画。 至少选择 2 个连续的蒙版路径关键帧后&#xff0c;智能蒙版插值将基于面板上的选项设置创建中间…

Qt5.12.6配置Android Arm开发环境(Linux)

1.安装jdk 2.安装android studio 3.安装sdk 与ndk 设置代理 安装SDK工具 安装SDK Platform 安装QT 选择JDK 1.8安装路径,SDK与NDK路径 如出现Platform SDK或者Platform Build Tools未安装,点击Update Installed 配置成功后可看到ARMv7与AMD-v8a kits Qt Version也可看到ARM…

既然有了HTTP,为什么还要RPC?

文章目录 HTTP和RPC区别RPC&#xff08;Remote Procedure Call&#xff09;服务流行的RPC框架 HTTP服务Restful RPC接口和HTTP接口的区别与联系传输协议传输效率性能消耗负载均衡服务治理&#xff08;下游服务新增&#xff0c;重启&#xff0c;下线时如何不影响上游调用者&…

Java012——Java引用数据类型String的简单学习

回顾Java数据类型 本次要学习的是Java引用数据类型String 一、对String类简单说明 说明&#xff1a;String是Java中的一个类 二、String类的作用 作用&#xff1a;主要用来创建和操作字符串。 三、使用String类 3.1、创建字符串 注意&#xff1a; 1、字符串使用双引号&qu…

基于Springboot+vue+协同过滤+前后端分离+鲜花商城推荐系统(用户,多商户,管理员)+全套视频教程

基于Springbootvue协同过滤前后端分离鲜花商城推荐系统(用户,多商户,管理员)(毕业论文11000字以上,共33页,程序代码,MySQL数据库) 代码下载: 链接&#xff1a;https://pan.baidu.com/s/1mf2rsB_g1DutFEXH0bPCdA 提取码&#xff1a;8888 【运行环境】Idea JDK1.8 Maven MySQL…

【网络原理】TCP/IP协议五层模型

&#x1f94a;作者&#xff1a;一只爱打拳的程序猿&#xff0c;Java领域新星创作者&#xff0c;CSDN、阿里云社区优质创作者。 &#x1f93c;专栏收录于&#xff1a;计算机网络原理 本期讲解协议、OSI七层模型、TCP/IP五层模型、网络设备所在的分层、数据的封装和分佣。 目录 …

Linux基础内容(22)—— 信号

Linux基础内容&#xff08;21&#xff09;—— 进程消息队列和信号量_哈里沃克的博客-CSDN博客https://blog.csdn.net/m0_63488627/article/details/130770830?spm1001.2014.3001.5501 目录 1.定义 1.介绍 2.解释 例子 操作系统信号 实现的大致思路 2.信号的产生方式 …

【C语言】计算含多种运算符的表达式

计算含多种运算符的表达式 引入运算符的优先级算术表达式的运算规则引例例题 引入 诸如下面这些表达式&#xff0c;它的计算过程&#xff0c;计算顺序是怎样的&#xff1f;计算结果为多少&#xff1f; 今天我们就一起来解决一下这个问题吧&#xff01; 运算符的优先级 解决…

6.数据结构期末复习之查找和排序1

概念 静态查找: 无插入和删除 动态查找: 边插入删除边查找静态和动态查找的实现方式 1.线性表: 静态查 2.树表(二叉排序树)动态查 3.散列表 静态动态都可以查找: 集合中查找满足条件的数据关键码 1.主关键码:可以表标识数据唯一性 2.次关键码: 不能标识查找效率: 比较次数决定的…

八、进程程序替换

文章目录 一、进程程序替换&#xff08;一&#xff09;概念&#xff08;二&#xff09;为什么程序替换&#xff08;三&#xff09;程序替换的原理&#xff08;四&#xff09;如何进行程序替换1. execl2. 引入进程创建——子进程执行程序替换&#xff0c;会不会影响父进程呢? &…

PDF 内容替换器软件工具 PDF Replacer Pro Crack

PDF 内容替换器软件工具 批量查找和替换 PDF 中的指定文本 PDF Replacer 是一款 Windows 软件程序&#xff0c;可在 PDF 文件中查找指定的单词或短语文本并替换为新文本&#xff0c;并保持 PDF 布局不变。 Windows 7/Win 8/Win 10 或更高版本&#xff08;32/64 位&#xff09; …