【并发专题】阻塞队列BlockingQueue实战及其原理分析

news2025/1/9 6:46:30

目录

  • 前置知识
    • 队列
      • 有界队列、无界队列
      • Queue——队列在JAVA里面的接口
    • 阻塞队列介绍
      • BlockingQueue——阻塞队列在JAVA里面的接口
      • 阻塞队列的应用场景
      • JUC包下的阻塞队列
  • 课程内容
    • *一、ArrayBlockingQueue
      • 基本介绍
      • 应用场景
      • 使用示例
      • 基本原理
      • 数据结构
      • 核心源码解析
        • 双指针与环形数组
    • *二、LinkedBlockingQueue
      • 基本介绍
      • 应用场景
      • 使用示例
      • 基本原理
      • 数据结构
      • 核心源码解析
      • LinkedBlockingQueue与ArrayBlockingQueue对比
    • *三、DelayQueue
      • 基本介绍
      • 应用场景
      • 使用示例
      • 基本原理
      • 数据结构
      • 核心源码分析
    • 四、SynchronousQueue
      • 基本介绍
      • 应用场景
      • 使用示例
      • 基本原理
      • 数据结构
    • 五、如何选择阻塞队列
      • 1.选择策略
      • 2.线程池对于阻塞队列的选择
  • 学习总结

前置知识

队列

大家还记得什么是队列吗?队列有什么特征?
队列是一种特殊的线性表,遵循先进先出、后进后出的基本原则。一般来说,它只允许在表的前端(队头)进行删除操作,而在表的后端(队尾)进行插入操作。总结下来,大概就是下面三个特征:

  • 是限定在一端进行插入,另一端进行删除的特殊线性表;
  • 先进先出(FIFO)线性表;
  • 允许出队的一端称为队头,允许入队的一端称为队尾。在这里插入图片描述

有界队列、无界队列

  • 有界队列,即有【固定容量大小】的队列;
  • 无界队列,即【没有固定容量大小】的队列。这种队列可以直接插入,直到OOM。但事实上,在JAVA也不会真的是【无界】,他的默认容量是Integer.MAX_VALUE,42个亿,基本上也不会用到这个体量,所以说是【无界】也不为过。

Queue——队列在JAVA里面的接口

我们在学习一个新的API之前,比较恰当的方式通常是看看相关的接口生命,这是因为接口在JAVA中有一个很重要的作用,拿就是起到一个规范作用。所有实现了我这个接口的子类,通常来说需要按照我的标准来做事,否则一个不被定义的接口实现方法,它的功能是不可预期的,这显然跟接口编程思想相违背。
以下面的Queue接口的 E remove();方法举例子,接口说明上已经明说了//返回并删除队首元素,队列为空则抛出异常,那么后续我们在实现这个接口的这个方法的时候,就得按照这个标准来实现。所以,我们根据这个特性,可以很简单的洞悉到每一个实现子类的功能了。

public interface Queue<E> extends Collection<E> {
    
    // 添加一个元素,添加成功返回true, 如果队列满了,就会抛出异常
    boolean add(E e);
    // 添加一个元素,添加成功返回true, 如果队列满了,返回false
    boolean offer(E e);
    // 返回并删除队首元素,队列为空则抛出异常
    E remove();
    // 返回并删除队首元素,队列为空则返回null
    E poll();
    // 返回队首元素,但不移除,队列为空则抛出异常
    E element();
    // 获取队首元素,但不移除,队列为空则返回null
    E peek();
}

阻塞队列介绍

阻塞队列 (BlockingQueue)是Java util.concurrent包下重要的数据结构,BlockingQueue提供了线程安全的队列访问方式:

  • 当阻塞队列插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;
  • 从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。

并发包下很多高级同步类的实现都是基于BlockingQueue实现的。

BlockingQueue——阻塞队列在JAVA里面的接口

在这里插入图片描述
在这里插入图片描述
BlockingQueue实际上也是继承了Queue接口,所以,它的功能大体上跟Queue一样。只不过,它新增了4个可以阻塞的接口,分别为:

// 添加一个元素,如果队列满了则阻塞等待,直到队列不满或者线程被中断,又或者是等待超时
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;

// 添加一个元素,如果队列满了则阻塞等待,直到队列不满或者线程被中断
void put(E e) throws InterruptedException;

// 返回并删除队首元素,队列为空则进入阻塞等待,直到队列不为空或者线程被中断
E take() throws InterruptedException;

// 返回并删除队首元素,队列为空则进入阻塞等待,直到队列不为空或者线程被中断,又或者是等待超时
E poll(long timeout, TimeUnit unit) throws InterruptedException;
方法抛出异常返回特定的值阻塞阻塞特定时间
入队add(e)offer(e) put(e) offer(e, time, unit)
出队remove()poll() take() poll(time, unit)
获取队首元素element()peek()不支持不支持

阻塞队列的应用场景

阻塞队列在实际应用中有很多场景,通常都是在【生产者-消费者模型】下,它可以帮助我们解决并发问题,提高程序的性能和可靠性。比如下面的这些常见的应用场景:

  1. 线程池
    线程池中的任务队列通常是一个阻塞队列。当任务数超过线程池的容量时,新提交的任务将被放入任务队列中等待执行。线程池中的工作线程从任务队列中取出任务进行处理,如果队列为空,则工作线程会被阻塞,直到队列中有新的任务被提交。
  2. 消息队列
    消息队列使用阻塞队列来存储消息,生产者将消息放入队列中,消费者从队列中取出消息进行处理。消息队列可以实现异步通信,提高系统的吞吐量和响应性能,同时还可以将不同的组件解耦,提高系统的可维护性和可扩展性。
  3. 缓存系统
    缓存系统使用阻塞队列来存储缓存数据,当缓存数据被更新时,它会被放入队列中,其他线程可以从队列中取出最新的数据进行使用。使用阻塞队列可以避免并发更新缓存数据时的竞争和冲突。
  4. 并发任务处理
    在并发任务处理中,可以将待处理的任务放入阻塞队列中,多个工作线程可以从队列中取出任务进行处理。使用阻塞队列可以避免多个线程同时处理同一个任务的问题,并且可以将任务的提交和执行解耦,提高系统的可维护性和可扩展性。

JUC包下的阻塞队列

BlockingQueue 接口的实现类都被放在了 juc 包中,它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于take与put操作的原理却是类似的。

队列描述
ArrayBlockingQueue基于数组结构实现的一个有界阻塞队列
LinkedBlockingQueue基于链表结构实现的一个无界阻塞队列,指定容量为有界阻塞队列
PriorityBlockingQueue支持按优先级排序的无界阻塞队列
DelayQueue基于优先级队列(PriorityBlockingQueue)实现的无界阻塞队列
SynchronousQueue不存储元素的阻塞队列
LinkedTransferQueue基于链表结构实现的一个无界阻塞队列
LinkedBlockingDeque基于链表结构实现的一个双端阻塞队列

课程内容

*一、ArrayBlockingQueue

基本介绍

ArrayBlockingQueue是一个典型的有界队列,它是基于数组结构来存储元素。初始化时需要指定容量大小,利用 ReentrantLock 实现线程安全。

应用场景

ArrayBlockingQueue可以用于实现数据缓存、限流、生产者-消费者模式等各种应用。

使用示例

BlockingQueue queue = new ArrayBlockingQueue(1024);
queue.put("1");   //向队列中添加元素
Object object = queue.take();   //从队列中取出元素

基本原理

在这里插入图片描述
ArrayBlockingQueue使用独占锁ReentrantLock实现线程安全,入队和出队操作使用同一个锁对象,也就是只能有一个线程可以进行入队或者出队操作;这也就意味着生产者和消费者无法并行操作,在高并发场景下会成为性能瓶颈。

数据结构

利用Lock+Condition的等待通知机制进行阻塞控制。核心:一把锁,两个条件

// 数据元素数组
final Object[] items;
// 下一个待取出元素索引
int takeIndex;
// 下一个待添加元素索引
int putIndex;
// 元素个数
int count;
// 内部锁
final ReentrantLock lock;
// 消费者等待条件
private final Condition notEmpty;
// 生产者等待条件
private final Condition notFull;  

// @param capacity 初始化容量
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
    ...
    lock = new ReentrantLock(fair); //公平,非公平
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

核心源码解析

  • 入队put方法:
// 添加元素方法
public void put(E e) throws InterruptedException {
    // 检查是否为空
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    // 加锁,如果线程中断抛出异常 
    lock.lockInterruptibly();
    try {
       // 阻塞队列已满,则将生产者挂起,等待消费者唤醒
       // 设计注意点: 用while不用if是为了防止虚假唤醒
        while (count == items.length)
            notFull.await(); //队列满了,使用notFull等待(生产者阻塞)
        // 入队
        enqueue(e);
    } finally {
        lock.unlock(); // 唤醒消费者线程
    }
}
    
// 入队方法
private void enqueue(E x) {
    final Object[] items = this.items;
    // 入队   使用的putIndex
    items[putIndex] = x;
    if (++putIndex == items.length) 
        putIndex = 0;  // 设计的精髓: 环形数组,putIndex指针到数组尽头了,返回头部
    count++;
    // notEmpty条件队列转同步队列,准备唤醒消费者线程,因为入队了一个元素,肯定不为空了
    notEmpty.signal();
}

该方法实现其实挺简单的,基本意思已经注释了

  • 出队take方法:
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 加锁,如果线程中断抛出异常 
    lock.lockInterruptibly();
    try {
       // 如果队列为空,则消费者挂起
        while (count == 0)
            notEmpty.await();
        // 出队
        return dequeue();
    } finally {
        lock.unlock();// 唤醒生产者线程
    }
}

// 出队方法
private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex]; // 取出takeIndex位置的元素
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0; // 设计的精髓: 环形数组,takeIndex 指针到数组尽头了,返回头部
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    // notFull条件队列转同步队列,准备唤醒生产者线程,此时队列有空位
    notFull.signal();
    return x;
}

双指针与环形数组

我们从最上面ArrayBlockingQueue的数据结构也看到了,他有两个成员变量int takeIndex;以及int putIndex;,这就是双指针。然后,他们是通过源码中的以下代码,形成的环形数组:

if (++putIndex == items.length) 
    putIndex = 0;  // 设计的精髓: 环形数组,putIndex指针到数组尽头了,返回头部

if (++takeIndex == items.length)
    takeIndex = 0; // 设计的精髓: 环形数组,takeIndex 指针到数组尽头了,返回头部

在这里插入图片描述
使用双指针的好处在于可以避免数组的复制操作。如果使用单指针,每次删除元素时需要将后面的元素全部向前移动,这样会导致时间复杂度为 O(n)。而使用双指针,我们可以直接将 takeIndex 指向下一个元素,而不需要将其前面的元素全部向前移动。同样地,插入新的元素时,我们可以直接将新元素插入到 putIndex 所指向的位置,而不需要将其后面的元素全部向后移动。这样可以使得插入和删除的时间复杂度都是 O(1) 级别,提高了队列的性能。

*二、LinkedBlockingQueue

基本介绍

LinkedBlockingQueue是一个基于链表实现的阻塞队列,默认情况下,该阻塞队列的大小为Integer.MAX_VALUE,由于这个数值特别大,所以 LinkedBlockingQueue 也被称作无界队列,代表它几乎没有界限,队列可以随着元素的添加而动态增长,但是如果没有剩余内存,则队列将抛出OOM错误。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小

应用场景

他跟ArrayBlockingQueue使用上场景基本没有什么区别,但是因为底层存储的数据结构不同,所以在设计上基于链表的特性(链表不需要连续空间,而数组需要。所以链表,只需要把用完的元素置空,再下一次GC的时候就会被回收掉),做了读写分离锁,这使得它的性能比ArrayBlockingQueue高很多。这也是为什么在线程池里,用的是LinkedBlockingQueue,而不是ArrayBlockingQueue。

使用示例

//指定队列的大小创建有界队列
BlockingQueue<Integer> boundedQueue = new LinkedBlockingQueue<>(100);

//无界队列
BlockingQueue<Integer> unboundedQueue = new LinkedBlockingQueue<>();

boundedQueue.put("1");   //向队列中添加元素
Object object = boundedQueue.take();   //从队列中取出元素

基本原理

在这里插入图片描述
LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。LinkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞,添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行。

数据结构

// 容量,指定容量就是有界队列
private final int capacity;
// 元素数量
private final AtomicInteger count = new AtomicInteger();
// 链表头  本身是不存储任何元素的,初始化时item指向null
transient Node<E> head;
// 链表尾
private transient Node<E> last;
// take锁   锁分离,提高效率
private final ReentrantLock takeLock = new ReentrantLock();
// notEmpty条件
// 当队列无元素时,take锁会阻塞在notEmpty条件上,等待其它线程唤醒
private final Condition notEmpty = takeLock.newCondition();
// put锁
private final ReentrantLock putLock = new ReentrantLock();
// notFull条件
// 当队列满了时,put锁会会阻塞在notFull上,等待其它线程唤醒
private final Condition notFull = putLock.newCondition();

// 典型的单链表结构
static class Node<E> {
    E item;  // 存储元素
    Node<E> next;  // 后继节点    单链表结构
    Node(E x) { item = x; }
}

核心源码解析

  • 构造方法
在这里插入代码片public LinkedBlockingQueue() {
    // 如果没传容量,就使用最大int值初始化其容量
    this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    // 初始化head和last指针为空值节点
    last = head = new Node<E>(null);
}
  • 入队put方法
public void put(E e) throws InterruptedException {    
    // 不允许null元素
    if (e == null) throw new NullPointerException();
    int c = -1;
    // 新建一个节点
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 使用put锁加锁
    putLock.lockInterruptibly();
    try {
        // 如果队列满了,就阻塞在notFull上等待被其它线程唤醒(阻塞生产者线程)
        while (count.get() == capacity) {
            notFull.await();
        }  
        // 队列不满,就入队
        enqueue(node);
        c = count.getAndIncrement();// 队列长度加1,返回原值
        // 如果现队列长度小于容量,notFull条件队列转同步队列,准备唤醒一个阻塞在notFull条件上的线程(可以继续入队) 
        // 这里为啥要唤醒一下呢?
        // 因为可能有很多线程阻塞在notFull这个条件上,而取元素时只有取之前队列是满的才会唤醒notFull,此处不用等到取元素时才唤醒
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock(); // 真正唤醒生产者线程
    }  
    // 如果原队列长度为0,现在加了一个元素后立即唤醒阻塞在notEmpty上的线程
    if (c == 0)
        signalNotEmpty();
}
private void enqueue(Node<E> node) { 
    // 直接加到last后面,last指向入队元素
    last = last.next = node;
}    
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock; 
    takeLock.lock();// 加take锁
    try {  
        notEmpty.signal();// notEmpty条件队列转同步队列,准备唤醒阻塞在notEmpty上的线程
    } finally {
        takeLock.unlock();  // 真正唤醒消费者线程
    }
}
  • 出队take方法
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    // 使用takeLock加锁
    takeLock.lockInterruptibly();
    try {
        // 如果队列无元素,则阻塞在notEmpty条件上(消费者线程阻塞)
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 否则,出队
        x = dequeue();
        c = count.getAndDecrement();// 长度-1,返回原值
        if (c > 1)// 如果取之前队列长度大于1,notEmpty条件队列转同步队列,准备唤醒阻塞在notEmpty上的线程,原因与入队同理
            notEmpty.signal();
    } finally {
        takeLock.unlock(); // 真正唤醒消费者线程
    }
    // 为什么队列是满的才唤醒阻塞在notFull上的线程呢?
    // 因为唤醒是需要加putLock的,这是为了减少锁的次数,所以,这里索性在放完元素就检测一下,未满就唤醒其它notFull上的线程,
    // 这也是锁分离带来的代价
    // 如果取之前队列长度等于容量(已满),则唤醒阻塞在notFull的线程
    if (c == capacity)
        signalNotFull();
    return x;
}
private E dequeue() {
     // head节点本身是不存储任何元素的
    // 这里把head删除,并把head下一个节点作为新的值
    // 并把其值置空,返回原来的值
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // 方便GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();// notFull条件队列转同步队列,准备唤醒阻塞在notFull上的线程
    } finally {
        putLock.unlock(); // 解锁,这才会真正的唤醒生产者线程
    }
}

LinkedBlockingQueue与ArrayBlockingQueue对比

LinkedBlockingQueue是一个阻塞队列,内部由两个ReentrantLock来实现出入队列的线程安全,由各自的Condition对象的await和signal来实现等待和唤醒功能。它和ArrayBlockingQueue的不同点在于:

  1. 队列大小有所不同,ArrayBlockingQueue是有界的初始化必须指定大小,而LinkedBlockingQueue可以是有界的也可以是无界的(Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题。
  2. 数据存储容器不同,ArrayBlockingQueue采用的是数组作为数据存储容器,而LinkedBlockingQueue采用的则是以Node节点作为连接对象的链表。
  3. 由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象(浮动垃圾对象)。这可能在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影响。
  4. 两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

*三、DelayQueue

基本介绍

DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。延迟队列的特点是:不是先进先出,而是会按照延迟时间的长短来排序,下一个即将执行的任务会排到队列的最前面。
它是无界队列,放入的元素必须实现 Delayed 接口,而 Delayed 接口又继承了 Comparable 接口,所以自然就拥有了比较和排序的能力,代码如下:

public interface Delayed extends Comparable<Delayed> {
    //getDelay 方法返回的是“还剩下多长的延迟时间才会被执行”,
    //如果返回 0 或者负数则代表任务已过期。
    //元素会根据延迟时间的长短被放到队列的不同位置,越靠近队列头代表越早过期。
    long getDelay(TimeUnit unit);
}

应用场景

用于在需要支持延迟消费的场景。如:定时线程池

使用示例

public class DelayQueueExample {

    public static void main(String[] args) throws InterruptedException {
        DelayQueue<Order> delayQueue = new DelayQueue<>();

        // 添加三个订单,分别延迟 5 秒、2 秒和 3 秒
        delayQueue.put(new Order("order1", System.currentTimeMillis(), 5000));
        delayQueue.put(new Order("order2", System.currentTimeMillis(), 2000));
        delayQueue.put(new Order("order3", System.currentTimeMillis(), 3000));

        // 循环取出订单,直到所有订单都被处理完毕
        while (!delayQueue.isEmpty()) {
            Order order = delayQueue.take();
            System.out.println("处理订单:" + order.getOrderId());
        }
    }

    static class  Order implements Delayed{
        private String orderId;
        private long createTime;
        private long delayTime;

        public Order(String orderId, long createTime, long delayTime) {
            this.orderId = orderId;
            this.createTime = createTime;
            this.delayTime = delayTime;
        }

        public String getOrderId() {
            return orderId;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            long diff = createTime + delayTime - System.currentTimeMillis();
            return unit.convert(diff, TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            long diff = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
            return Long.compare(diff, 0);
        }
    }
    
//  系统输出:
//  处理订单:order2
//	处理订单:order3
//	处理订单:order1
}

基本原理

在这里插入图片描述

数据结构

//用于保证队列操作的线程安全
private final transient ReentrantLock lock = new ReentrantLock();
// 优先级队列,存储元素,用于保证延迟低的优先执行
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 用于标记当前是否有线程在排队(仅用于取元素时) leader 指向的是第一个从队列获取元素阻塞的线程
private Thread leader = null;
// 条件,用于表示现在是否有可取的元素   当新元素到达,或新线程可能需要成为leader时被通知
private final Condition available = lock.newCondition();

public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
    this.addAll(c);
}

核心源码分析

  • 入队put方法:
public void put(E e) {
    offer(e);
}
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 入队
        q.offer(e);
        if (q.peek() == e) {
            // 若入队的元素位于队列头部,说明当前元素延迟最小
            // 将 leader 置空
            leader = null;
            // available条件队列转同步队列,准备唤醒阻塞在available上的线程
            available.signal();
        }
        return true;
    } finally {
        lock.unlock(); // 解锁,真正唤醒阻塞的线程
    }
}
  • 出队take方法:
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();// 取出堆顶元素( 最早过期的元素,但是不弹出对象)   
            if (first == null)// 如果堆顶元素为空,说明队列中还没有元素,直接阻塞等待
                available.await();//当前线程无限期等待,直到被唤醒,并且释放锁。
            else {
                long delay = first.getDelay(NANOSECONDS);// 堆顶元素的到期时间             
                if (delay <= 0)// 如果小于0说明已到期,直接调用poll()方法弹出堆顶元素
                    return q.poll();
                
                // 如果delay大于0 ,则下面要阻塞了
                // 将first置为空方便gc
                first = null; 
                // 如果有线程争抢的Leader线程,则进行无限期等待。
                if (leader != null)
                    available.await();
                else {
                    // 如果leader为null,把当前线程赋值给它
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 等待剩余等待时间
                        available.awaitNanos(delay);
                    } finally {
                        // 如果leader还是当前线程就把它置为空,让其它线程有机会获取元素
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 成功出队后,如果leader为空且堆顶还有元素,就唤醒下一个等待的线程
        if (leader == null && q.peek() != null)
            // available条件队列转同步队列,准备唤醒阻塞在available上的线程
            available.signal();
        // 解锁,真正唤醒阻塞的线程
        lock.unlock();
    }
}

这个方法多少有点难度,这里的难点还是在于理解Thread leader的作用。它的作用为:尽量保持最先申请获取元素的线程,能优先在最早过期元素可消费的时候获取到它
它的流程描述大概如下:

  1. 当获取元素时,先获取到锁对象;
  2. 获取最早过期的元素,但是并不从队列中弹出元素,只是访问获取;
  3. 最早过期元素是否为空,如果为空则直接让当前线程无限期等待状态,并且让出当前锁对象;
  4. 如果最早过期的元素不为空;
  5. 获取最早过期元素的剩余过期时间,如果已经过期则直接返回当前元素;
  6. 如果没有过期,也就是说剩余时间还存在,则先获取Leader对象,如果Leader已经有线程在处理,则当前线程进行无限期等待,如果Leader为空,则首先将Leader设置为当前线程,并且让当前线程等待剩余时间;
  7. 最后将Leader线程设置为空;
  8. 如果Leader已经为空,并且队列有内容则唤醒一个等待的队列。

四、SynchronousQueue

基本介绍

SynchronousQueue,翻译:同步队列。它是一个比较特殊的阻塞队列,特殊在哪?和其他的BlockingQueue不同,它的capacity是0,基于这个原因,不不可能存储任何元素。也就是说SynchronousQueue的每一次insert操作,都只能等待其他线性的remove操作。而每一个remove操作也必须等待其他线程的insert操作。
在这里插入图片描述

应用场景

在线程池的newCacheThreadPool中用的就是这个阻塞队列
在这里插入图片描述

使用示例

public class SynchronousQueueTest {
    public static void main(String[] args) throws InterruptedException {
        final SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();

        Thread producer = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("生产者开始生产了");
                try {
                    queue.put(1);
                } catch (InterruptedException e) {
                }
                System.out.println("生产者生产完了,并且在库房里面等待消费者过来拿");
            }
        });

        Thread consumer = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("消费者来了");
                try {
                    System.out.println("消费者从生产者手中把东西拿了过来: " + queue.take());
                } catch (InterruptedException e) {
                }
                System.out.println("消费者开开心心地回家了");
            }
        });

        producer.start();
        Thread.sleep(1000);
        consumer.start();
    }
    
//    系统调用:
//    生产者开始生产了
//    消费者来了
//    生产者生产完了,并且在库房里面等待消费者过来拿
//    消费者从生产者手中把东西拿了过来: 1
//    消费者开开心心地回家了
}

基本原理

在这里插入图片描述

数据结构

该阻塞队列内部有两个容器,都是实现自:Transfer抽象类

    abstract static class Transferer<E> {
		// 用来交换数据
		// e:如果不为空,
        abstract E transfer(E e, boolean timed, long nanos);
    }

紧接着,内部两个容器分别为双端传输队列TransferQueue和双端传输栈TransferStack。这两个类的区别在于是否选择公平策略。Queue先进先出代表公平,Stack先进后出代表不公平

五、如何选择阻塞队列

1.选择策略

通常我们可以从以下 5 个角度考虑,来选择合适的阻塞队列:

  • 功能:比如是否需要阻塞队列帮我们排序,如优先级排序、延迟执行等。如果有这个需要,我们就必须选择类似于 PriorityBlockingQueue 之类的有排序能力的阻塞队列;
  • 容量:考虑的是容量,或者说是否有存储的要求,还是只需要“直接传递”。在考虑这一点的时候,我们知道前面介绍的那几种阻塞队列,有的是容量固定的,如 ArrayBlockingQueue;有的默认是容量无限的,如 LinkedBlockingQueue;而有的里面没有任何容量,如 SynchronousQueue;而对于 DelayQueue 而言,它的容量固定就是 Integer.MAX_VALUE。所以不同阻塞队列的容量是千差万别的,我们需要根据任务数量来推算出合适的容量,从而去选取合适的 BlockingQueue;
  • 能否扩容:有时我们并不能在初始的时候很好的准确估计队列的大小,因为业务可能有高峰期、低谷期。如果一开始就固定一个容量,可能无法应对所有的情况,也是不合适的,有可能需要动态扩容。如果我们需要动态扩容的话,那么就不能选择 ArrayBlockingQueue ,因为它的容量在创建时就确定了,无法扩容。相反,PriorityBlockingQueue 即使在指定了初始容量之后,后续如果有需要,也可以自动扩容。所以我们可以根据是否需要扩容来选取合适的队列。
  • 内存结构:我们分析过 ArrayBlockingQueue 的源码,看到了它的内部结构是“数组”的形式。和它不同的是,LinkedBlockingQueue 的内部是用链表实现的,所以这里就需要我们考虑到,ArrayBlockingQueue 没有链表所需要的“节点”,空间利用率更高。所以如果我们对性能有要求可以从内存的结构角度去考虑这个问题。
  • 性能:比如 LinkedBlockingQueue 由于拥有两把锁,它的操作粒度更细,在并发程度高的时候,相对于只有一把锁的 ArrayBlockingQueue 性能会更好。另外,SynchronousQueue 性能往往优于其他实现,因为它只需要“直接传递”,而不需要存储的过程。如果我们的场景需要直接传递的话,可以优先考虑 SynchronousQueue。

2.线程池对于阻塞队列的选择

线程池有很多种,不同种类的线程池会根据自己的特点,来选择适合自己的阻塞队列。
Executors类下的线程池类型:

  • FixedThreadPool(SingleThreadExecutor 同理)选取的是 LinkedBlockingQueue
  • CachedThreadPool 选取的是 SynchronousQueue
  • ScheduledThreadPool(SingleThreadScheduledExecutor同理)选取的是延迟队列DelayQueue

学习总结

  1. 学习了ArrayBlockingQueue和LinkedBlockingQueue、DelayQueue的源码,了解了他们的基本原理及内部数据结构
  2. 小小的看了下SynchronousQueue的源码,有点晦涩难懂
  3. 附上一张Fox老师的配图总结
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/781596.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

设计模式:创建型模式

抽象工厂 abstract factory 例子 考虑一个多风格的界面应用&#xff0c;要求可以切换不同风格类型的组件&#xff08;窗口&#xff0c;滚动条&#xff0c;按钮等&#xff09; 风格/组件pm风格motif风格滚动条pmScrollBarmotifScrollBar窗口pmWindowMotifWindow WidgetFact…

Vue中TodoList案例_静态

MyHeader.vue <template><div class"todo-header"><input type"text" placeholder"请输入你的任务名称&#xff0c;按回车键确认"></div> </template><script> export default {name: "MyHeader"…

16. python从入门到精通——Python网络爬虫

目录 什么是爬虫 优点 网络爬虫的常用技术 网络请求&#xff1a;有三个常用网络请求模块 Urllib模块&#xff1a;python原生系统中标准库模块 urllib中的子模块 urllib.parse.urlencode() 常用于进行 URL 的 get 请求参数拼接 Urllib3模块&#xff1a;Urllib模块的升级版…

paramiko模块使用(2)

远程查看服务器资源使用情况 单机实现 import paramiko# 定义远程服务器的连接信息 hostname 192.168.2.198 username root password 123456# 创建SSH客户端对象 client paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy())try:# 连接到…

力扣刷题SQL-197. 上升的温度---分步解题

表&#xff1a; Weather ------------------------ | Column Name | Type | ------------------------ | id | int | | recordDate | date | | temperature | int | ------------------------ id 是这个表的主键 该表包含特定日期的温度信息编…

qemu搭建arm环境以及文件共享

几乎完全参照该文章 使用QEMU搭建ARM64实验环境 - 简书 ubuntu 14.04&#xff0c;linux3.16&#xff0c; busybox-1.31.0 arm-linux-gnueabi-gcc -v linux3.16以及busybox下载安装可参考链接 Ubuntu14.04安装qemu&#xff0c;运行linux-3.16gdb调试_qemu 安装 ubuntu 14_这个我…

项目开启启动命令整合

启动RabbitMQ管理插件 1.启动 RabbitMQ 管理插件。 rabbitmq-plugins enable rabbitmq_management rabbitmq-server # 直接启动&#xff0c;如果关闭窗⼝或需要在该窗⼝使⽤其他命令时应⽤就会停⽌ rabbitmq-server -detached # 后台启动 rabbitmq-server start # 启⽤服务 rab…

【亲测可用】安装Qt提示“无法下载存档 http://download.qt.io/online/qtsdkrepository...“

下载Qt安装程序exe之后&#xff0c;一般直接双击运行然后&#xff0c;注册登录后&#xff0c;到了第三步【安装程序】时&#xff0c;进行远程检索文件总会卡在这里&#xff0c;无法进行到下一步。报错如下&#xff1a; 解决办法&#xff1a; 关闭安装程序&#xff0c;然后&…

一百三十二、ClickHouse——ClickHouse建表时默认字段非空导致数据问题

一、ClickHouse建表问题 由于ClickHouse建表时默认字段非空 &#xff08;一&#xff09;建表语句 &#xff08;二&#xff09;查看字段属性 ClickHouse建表时一般情况下直接默认字段非空 &#xff08;三&#xff09;导致问题 所以这就导致一般情况下&#xff0c;一些字段的…

iphone新机官网验机流程

苹果官网验机流程 进入苹果官网&#xff0c;找到技术支持&#xff0c;进入“查看保障服务和支持期限“页面&#xff0c;输入要查询的机器的序列号&#xff0c;就可以查询了。 苹果官网验机入口&#xff1a;https://checkcoverage.apple.com/ 输入iphone序列号进行验机&#xff…

小程序体验版上线注意事项

1.接口域名必须是https&#xff0c;有ssh证书。不能用ip地址。 2.需要在微信公众平台进行配置 微信公众平台->开发-> 开发管理->开发设置 对服务器域名和业务域名进行配置对业务域名进行配置时&#xff0c;需要下载校验文件&#xff0c;放在域名根目录下

力扣刷题27.移除元素(Accept03)

力扣刷题 代码随想录数组 3.移除元素 力扣27. 移除元素 方法一&#xff1a;暴力解决法 1. 思路 两层嵌套循环遍历数组&#xff0c;内层循环主要是当第一层循环遍历到的元素等于要移除的元素的值的时候&#xff0c;其后的元素依次向前挪动一个位置&#xff08;覆盖要删除的…

计科web常见错误排错【HTTP状态404、导航栏无法点开、字符乱码及前后端数据传输呈现、jsp填写的数据传到数据库显示null、HTTP状态500】

web排错记录 在使用javaweb的过程中会出现的一些错误请在下方目录查找。 目录 错误1&#xff1a;HTTP状态404——未找到 错误2&#xff1a;导航栏下拉菜单无法点开的问题 错误3&#xff1a;字符乱码问题 错误4&#xff1a;jsp网页全部都是&#xff1f;&#xff1f;&#x…

科技云报道:边缘云赛道开启,谁能成为首个“出线”厂商?

科技云报道原创。 每一轮底层技术变革&#xff0c;都会带来全新的商业机遇。随着万物智联时代到来&#xff0c;大量数据产生的源头由传统的中心化向分散数据源变革&#xff0c;越来越多云边协同场景的出现&#xff0c;使得边缘云成为计算领域数据处理的新范式之一。 自2020年…

CHI协议保序之Compack保序

一致性系统中&#xff0c;使用三种保序方式&#xff1b; Completion ack response ⭕Completion acknowledgment&#xff1a; □ 该域段主要是用来&#xff0c; □ 决定 RN 发送的 trans&#xff0c;与其他 RN 发送的命令产生的 SNP 之间的顺序&#xff1b; …

[VUE]Element_UI 实现TreeSelect 树形选择器

文章目录 前言1、安装2、引用3、使用 前言 最近在做一个人员管理系统&#xff0c;在增改用户信息时&#xff0c;可能会设置用户所在的部门&#xff0c;因为部门是多级的&#xff0c;于是想到用Element_UI的TreeSelect组件实现 效果&#xff1a; 1、安装 npm install --save…

蓝桥杯专题-真题版含答案-【牌型种数】【煤球数目】【寒假作业】【奖券数目】

点击跳转专栏>Unity3D特效百例点击跳转专栏>案例项目实战源码点击跳转专栏>游戏脚本-辅助自动化点击跳转专栏>Android控件全解手册点击跳转专栏>Scratch编程案例点击跳转>软考全系列点击跳转>蓝桥系列 &#x1f449;关于作者 专注于Android/Unity和各种游…

如何动态修改 spring aop 切面信息?让自动日志输出框架更好用

业务背景 很久以前开源了一款 auto-log 自动日志打印框架。 其中对于 spring 项目&#xff0c;默认实现了基于 aop 切面的日志输出。 但是发现一个问题&#xff0c;如果切面定义为全切范围过大&#xff0c;于是 v0.2 版本就是基于注解 AutoLog 实现的。 只有指定注解的类或…

pytest常用执行参数详解

1. 查看pytest所有可用参数 我们可以通过pytest -h来查看所有可用参数。 从图中可以看出&#xff0c;pytest的参数有很多&#xff0c;下面是归纳一些常用的参数&#xff1a; -s&#xff1a;输出调试信息&#xff0c;包括print打印的信息。-v&#xff1a;显示更详细的信息。…

GAN在图像超分辨领域的应用

本篇博客介绍了对抗生成网络GAN在图像超分辨领域的应用&#xff0c;包括(SRGAN, ESRGAN, BSRGAN, Real-ESRGAN),详细介绍了论文内容&#xff0c;方法&#xff0c;网络结构并对其做了相关总结。相关GAN原理的介绍大家可以查看我之前的几篇博客&#xff0c;链接如下&#xff1a;生…