一 概括
简介
LinkedBlockingQueue(链接阻塞队列)类是BlockingQueue(阻塞队列)接口的主要实现类之一,也是Executor(执行器)框架最常搭配使用的实现之一,采用链表的方式实现。相比基于数组的实现,基于链表的实现有着更高的并发量,但是在绝大多数的并发应用中其性能的可预见性较差(即难以预测性能可以达到什么程度)。由于需要额外创建节点(节点是元素的容器,也是队列的组成单位)用于容纳元素的原因,在需要长时间高效并发地处理大批量数据时,对于GC可能存在较大影响(多出了分配内存创建节点以及回收节点的消耗)。LinkedBlockingQueue(链接阻塞队列)类是一个标准的FIFO队列,新元素会从队列的尾部插入/放置(即尾插法),而元素的移除/拿取则会在队列的头部进行。
LinkedBlockingQueue(链接阻塞队列)类不允许存null值,或者说BlockingQueue(阻塞队列)接口的所有的实现类都不允许存null值。null被作为poll()及peek()方法(两个方法由Queue(队列)接口定义)作为队列不存在元素的标记值,因此所有的BlockingQueue(阻塞队列)接口实现类都不允许存null值。
LinkedBlockingQueue(链接阻塞队列)类不同于我们常规使用的Collection(集)接口的实现类,没有扩容的说法,即其容量会在创建时确定,之后不会再发生改变。如此一来,其内部的实现结构便相对来说简洁的多。但这就使得我们必须根据业务的实际情况及资源的硬性限制来选择具体的容量…可一个很现实的问题是早期选定的容量又往往很难支持后期的发展(不论是业务还是资源)…这也算是个令人很头疼的问题了。
LinkedBlockingQueue(链接阻塞队列)类可同时作为有界及无界队列使用。如果在创建LinkedBlockingQueue(链接阻塞队列)类对象(下文简称队列)时没有指定具体的容量,那其就是一个无界队列。话虽如此,但实际上不存在真正意义上的无界队列。所谓的无界队列其实是在未指定具体容量时默认赋予一个最大容量,在LinkedBlockingQueue(链接阻塞队列)类的实现中这个值是int类型的最大值,即Integer.MAX_VALUE(2 ^ 31 - 1)。我们并不推荐使用无界队列,因为它的容量实在太大了,一旦元素的移除/拿取速率低于元素的插入/放置速率,则很容易导致元素堆积从而造成OOM。因此在实际的开发中,还是推荐指定容量的有界队列用法,并根据业务的实际场景及资源的硬性限制选择合适的容量大小。
LinkedBlockingQueue(链接阻塞队列)类是线程安全的,或者说BlockingQueue(阻塞队列)接口的所有的实现类都是线程安全的(接口的定义中强制要求实现类必须线程安全)。LinkedBlockingQueue(链接阻塞队列)类采用一种被称为双锁的线程安全机制(下文简称双锁机制)来保证同步,通俗的讲就是LinkedBlockingQueue(链接阻塞队列)类在内部创建了放置与拿取两把ReentrantLock(可重入)类对象锁分别用于管理插入/放置及移除/拿取两类方法的同步(即插入/放置方法只会受放置锁的保护,而移除/拿取方法只会受拿取锁的保护),从而在保证线程安全的同时有效的提高并发,该知识点的详细内容会在下文详述。
LinkedBlockingQueue(链接阻塞队列)类的迭代器是弱一致性(即可能迭代到已经队列移除的元素)。弱一致性迭代器可以有效兼容并发的使用环境,使得插入/放置及移除/拿取方法的执行不会导致迭代的中断,该知识点的详细内容会在下文详述。
LinkedBlockingQueue(链接阻塞队列)类虽然与BlockingQueue(阻塞队列)接口一样都被纳入Executor(执行器)框架的范畴,但同时是Collection(集)框架的成员。
二 创建
有界队列
通过构造方法LinkedBlockingQueue(int capacity),我们可以得到一个有界队列。有界队列是现实中使用最多,也是最推荐使用的LinkedBlockingQueue(链接阻塞队列)类对象。其优点在于其容量由开发者预先指定,最大程度的贴合了业务的实际场景及资源的硬性限制。具体源码及注释如下:
/**
* Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
* 创建一个指定容量的链接阻塞队列
*
* @param capacity the capacity of this queue 队列的容量
* @throws IllegalArgumentException if {@code capacity} is not greater than zero 如果容量不比0大(即小于等于0)
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
// 设置队列的容量。
this.capacity = capacity;
// 创建一个空节点作为头节点和尾节点。
last = head = new Node<E>(null);
}
无界队列
通过无参构造方法LinkedBlockingQueue(),我们可以得到一个无界队列。上文中我们已经提及,不存在真正意义上的无界队列。所谓的无界队列其实是在未指定具体容量时默认赋予一个最大容量,在LinkedBlockingQueue(链接阻塞队列)类的实现中这个值是int类型的最大值,即Integer.MAX_VALUE(2 ^ 31 - 1)。我们并不推荐使用无界队列(这是阿里巴巴Java规范里的限制,而不是Java的限制),因为它的容量实在太大了,一旦元素的移除/拿取速率低于元素的插入/放置速率,则很容易导致元素堆积从而造成OOM。因此在实际的开发中,还是推荐指定容量的有界队列用法,并根据业务的实际场景及资源的硬性限制选择合适的容量大小。具体源码及注释如下:
/**
* Creates a {@code LinkedBlockingQueue} with a capacity of {@link Integer#MAX_VALUE}.
* 创建一个容量为int类型最大值的对象(也就是所谓的无限队列,实际上是一个长度非常大的队列罢了)。
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
我们可以发现,无参构造方法LinkedBlockingQueue()的底层是调用LinkedBlockingQueue(int)构造方法实现的,而传入的容量值为int类型的最大值,即Integer.MAX_VALUE(2 ^ 31 - 1)。
无界队列(包含初始元素)
通过构造方法LinkedBlockingQueue(Collection<? extends E> c),我们可以得到一个包含了指定集中所有元素的无界队列。既然是无界队列,那自然也是不推荐使用的,但该方法在实现上也有值得一说的地方。具体源码及注释如下:
/**
* Creates a {@code LinkedBlockingQueue} with a capacity of {@link Integer#MAX_VALUE}, initially containing the elements of the given collection, added
* in traversal order of the collection's iterator.
* 创建一个容量为int最大值的链接阻塞队列,最初包含指定集的元素,按集的迭代器顺序新增(即按集的迭代器的顺序将元素添加到队列中)。
*
* @param c the collection of elements to initially contain 为了最初包含的元素集
* @throws NullPointerException if the specified collection or any of its elements are null 如果指定元素集中有任何元素为null
*/
public LinkedBlockingQueue(Collection<? extends E> c) {
// 容量为int类型的最大值。
this(Integer.MAX_VALUE);
// 加放置锁,将指定集中的所有元素都添加到队列中。
final ReentrantLock putLock = this.putLock;
putLock.lock();
// Never contended, but necessary for visibility
// 没有竞争,但为了可见性有必要(加锁)。
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
// 如果队列满了,也要抛出异常。
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
// 设置总数。
count.set(n);
} finally {
putLock.unlock();
}
}
我们可以在上述的源码中发现将指定集中的元素存入队列的整个过程是加了锁的,而源码给出的英文注释是“没有竞争,但为了可见性有必要(加锁)”。其中本人对可见性是可以理解的(如果理解没有错的话):如果在过程中没有加锁,就会出现获取到节点但获取不到元素的情况(即节点中不包含元素)。为什么会出现这样的问题呢?这是因为在Java中对象的创建不是原子操作,整个过程按顺序大致可分为以下三部分:
- 实例化(分配内存);
- 初始化(执行构造方法);
- 引用赋值(令变量持有对象的引用)。
但现实中由于指令重排序的原因,具体在执行中顺序就可能变成:
- 实例化(分配内存);
- 引用赋值(令变量持有对象的引用);
- 初始化(执行构造方法)。
这就意味着加入到队列中的节点可能是个空节点(即还未完成初始化的过程)。如此一来,如果有线程在队列构造方法执行结束后获取元素,则其获取到的节点中元素完全可能是空的,因此整个过程必须在锁的保护下执行。锁的存在虽然无法避免同步块中的指令重排序,但可以避免其重排序到同步块外(这就使得节点的初始化一定会在同步块中完成),因此使得队列创建完成后进行获取的线程获取到的节点一定是完整的(存在元素的)。
如果上述我的理解都是对的,那就有一个讲不通的地方就是既然对象的创建不是原子操作,那完全可能存在有线程向一个未执行初始化(或未执行完初始化)的队列保存元素的可能,如此应该是可能存在竞争的,而不是英文注释中说的“没有竞争”,这也是我目前尚未完全理解的地方…当然,也可能是我对可见性的理解本就是错的,希望有懂的同学能在评论区不吝赐教,本人万分感谢。
三 插入/放置
插入/放置是LinkedBlockingQueue(链接阻塞队列)类两大最核心也是最常用的方法之一,用于将指定的元素加入队列的尾部。插入/放置方法受双锁中放置锁的保护以确保同步,因此在同一时刻内最多只会存在一个执行插入/放置方法的线程(下文简称放置者)。由于场景的多样性需求,LinkedBlockingQueue(链接阻塞队列)类定义了该类方法四种形式的实现以供使用(更准确的说,这四种形式是由BlockingQueue(阻塞队列)接口定义的,包括后续的移除/拿取及检查)。
异常
add(E e)方法是插入/放置方法中“异常”形式的实现,其特点是当队列存在剩余容量时正常插入并返回true;否则直接抛出IllegalStateException(非法状态异常)。add(E e)方法由Collection(集)接口进行定义,并最终由AbstractQueue(抽象队列)抽象类实现。在继承关系中虽也有其它类进行实现,但最终会被其重写。因此LinkedBlockingQueue(链接阻塞队列)类是直接通过继承获取该方法的,其本身并没有实现这个方法。具体源码及注释如下:
/**
* Inserts the specified element into this queue if it is possible to do so immediately without violating capacity restrictions, returning <tt>true</tt>
* upon success and throwing an <tt>IllegalStateException</tt> if no space is currently available.
* 再没有违背容量限制的情况下并指定元素插入至队列,并在成功之后返回true。如果当前没有可用空间则抛出非法状态异常。
* <p>
* This implementation returns <tt>true</tt> if <tt>offer</tt> succeeds, else throws an <tt>IllegalStateException</tt>.
* 该实现如果offer()方法成功则返回true,否则抛出一个非法状态异常(该方法底层是通过调用offer()方法实现的,而offer()方法则交由子类进行
* 实现,故而子类可以无需实现add()方法,这是一种模板模式的使用)。
*
* @param e the element to add 新增的元素
* @return <tt>true</tt> (as specified by {@link Collection#add}) true(具体看Collection#add方法文档)
* @throws IllegalStateException if the element cannot be added at this time due to capacity restrictions
* 非法状态异常:如果元素当前由于容量限制无法新增
* @throws ClassCastException if the class of the specified element prevents it from being added to this queue
* 类型转换异常:如果指定元素的类妨碍其新增至队列
* @throws NullPointerException if the specified element is null and this queue does not permit null elements
* 空指针异常:如果元素为null并且该队列不允许null元素
* @throws IllegalArgumentException if some property of this element prevents it from being added to this queue
* 非法参数异常:如果元素的特性妨碍其新增至队列
* @Description: 新增(true:成功):新增元素。底层调用offer()方法实现,成功返回true,失败抛出非法状态异常。
*/
public boolean add(E e) {
// 调用offer(E)方法。
if (offer(e))
// 如果调用成功,直接返回true。
return true;
else
// 如果调用失败,抛出一个非法状态异常。
throw new IllegalStateException("Queue full");
}
可以看到add(E e)方法底层是调用offer(E e)方法实现的,而offer(E e)方法并没有默认实现,即会交由子类负责实现。这种做法被称为模板模式(常用设计模式的一种),在部分方法中定义流程(就像add(E e)方法这样),而具体的操作细节(就像offer(E e)方法这样)则交由子类去实现。如此一来,就可以避免在多个实现类中重复的编写作用相似的流程代码。
特殊值
offer(E e)方法是插入/放置方法中“特殊值”形式的实现,其特点是当队列存在剩余容量时正常插入并返回true;否则返回false。offer(E e)方法虽由Queue(队列)接口定义,但却是由LinkedBlockingQueue(链接阻塞队列)类自身实现的。具体源码及注释如下:
/**
* Inserts the specified element at the tail of this queue if it is possible to do so immediately without exceeding the queue's capacity, returning {@code true}
* upon success and {@code false} if this queue is full. When using a capacity-restricted queue, this method is generally preferable to method {@link BlockingQueue#add add},
* which can fail to insert an element only by throwing an exception.
* 如果可以在不超出队列容量的情况下立即执行,则在队列的尾部插入指定的元素,成功之后返回true,如果队列满了(插入前)则返回false。当只有一个容量
* 受限的队列,该方法相比add()方法更好,因为其(指add()方法)会在插入一个元素失败时抛出一个异常。
*
* @throws NullPointerException if the specified element is null
* 空执行异常:如果指定元素为null
*/
@Override
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
// 在锁外进行一次判断,如果容量已满则直接返回false。
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 如果有空余容量则直接进行插入/放置,并在还有空余容量的前提下唤醒一个挂起的放置者。
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
// 如果计数(快照)为0,意味着队列不再是空队列了,可以唤醒被阻塞的拿取者了。之所以只在为0的时候唤醒是因为其它数字意味着队列不为空,这
// 种情况下拿取者是不会被阻塞的的。
if (c == 0)
signalNotEmpty();
// 如果成功放置则返回true。
return c >= 0;
}
阻塞(无限阻塞)
put(E e)方法是插入/放置方法中“阻塞”形式的实现,其特点是当队列不存在剩余容量时阻塞,直至存在剩余容量后插入/放置,且该方法没有返回值。put(E e)方法由BlockingQueue(阻塞队列)接口定义,并由LinkedBlockingQueue(链接阻塞队列)类实现。放置者的阻塞是通过放置锁创建的Condition(条件)接口对象的await()方法实现的,其作用于synchronized机制的wait()方法相似,都是负责将当前调用方法的线程挂起。具体源码及注释如下:
/**
* Inserts the specified element at the tail of this queue, waiting if necessary for space to become available.
* 在队列的尾部插入指定的元素(尾插法),如果必要等待空间变的可用(即如果容量满了就先将自身加入条件队列中挂起)
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
@Override
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var holding count negative to indicate failure unless set.
// 注意:惯例是所有put/take等操作会预备/事先准备本地变量var保持计数为负以表示操作失败,除非设置。
// c是操作是否成功的标志位,负数表示失败...但从逻辑上看似乎没有什么作用。
int c = -1;
// 声明一个新的节点。
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// lockInterruptibly方法与lock方法的差别在于其被中断(准确的说是其在中断后会抛出中断异常)。
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is not protected by lock. This works because count can only decrease at this point (all other
* puts are shut out by lock), and we (or some other waiting put) are signalled if it ever changes from capacity. Similarly for all other uses of count
* in other wait guards.
* 注意count(变量)在等待/延缓保护(不是很懂等待/延缓保护的意思,但翻译过来是这样,猜想应该是一个可以存在并发但并发情况不是很严重的
* 环境。因为count的类型是AtomicInteger,该类型本身就是用了乐观锁保证数据安全性,因此可以接受一定程度的并发)中使用(当前情景受放置锁
* 的保护,但不受拿取锁的保护,因此依然可能存在并发,符合猜想中存在并发但并发情况不是很严重的环境),即使不受锁的保护。这是有效的,因
* 为count(变量)在该位置只能减少(所有其它放置者(即执行put操作的线程)通过锁被排除在外)(因为受放置锁的保护,所有的put操作都被阻塞,
* 不会出现元素总数增大的情况),并且如果它从容量处变化(意思应该是队列中的元素被拿取...及元素总数减少...这TM都什么句子...),我们(或其
* 它等待中的放置者)会收到信号。相似地,count(变量)的所有其它使用(方式)也要在等待/延缓保护(环境)中(真TM服了这段注释...我这么差
* 的英文能翻译出这么一段话也真是佩服自己)。
*/
// 判断当前元素的总量是否等于容量。相等意味着已经没有空间可以存放元素了,当前线程必须进行等待。并且该操作是循环进行的,即使被唤醒的
// 第一件事也是判断,因为无法保证自己是第一个获取到锁的(可能有一次唤醒全部放置者的情况)。
while (count.get() == capacity) {
notFull.await();
}
// 执行尾部入队操作。
enqueue(node);
// 获取并自增元素总数,如果还有剩余空间,则继续唤醒一个后继放置者进行放置。
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// 如果计数(快照)为0,意味着队列不再是空队列了,可以唤醒被阻塞的拿取者了。之所以只在为0的时候唤醒是因为其它数字意味着队列不为空,这
// 种情况下拿取者是不会被阻塞的的。
if (c == 0)
signalNotEmpty();
}
超时(有限阻塞)
offer(E e, long timeout, TimeUnit unit)方法是插入/放置方法中“超时”形式的实现,其特点是当队列不存在剩余容量时阻塞,但只会阻塞指定的一定时间。如果在指定的时间内队列空出了剩余容量则进行插入/放置后返回true;否则返回false。offer(E e, long timeout, TimeUnit unit)方法由BlockingQueue(阻塞队列)接口定义,并由LinkedBlockingQueue(链接阻塞队列)类实现。放置者的阻塞同样是通过放置锁创建的Condition(条件)接口对象实现的,只不过其调用的是awaitNanos(long nanosTimeout)方法,其作用是令线程挂起至指定的时间。具体源码及注释如下:
/**
* Inserts the specified element at the tail of this queue, waiting if necessary up to the specified wait time for space to become available.
* 在队列的尾部插入指定的元素,如果必要则等待指定等待时间以致空间变得可用。
*
* @return {@code true} if successful, or {@code false} if the specified waiting time elapses before space is available
* 如果成功则返回true,如果在空间变得可用之前指定等待时间过去则返回false
* @throws InterruptedException {@inheritDoc} 中断异常
* @throws NullPointerException {@inheritDoc} 空指针异常
*/
@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
// 检查元素是否合法
if (e == null) throw new NullPointerException();
// 计算过期时间。
long nanos = unit.toNanos(timeout);
// c是操作是否成功的标志位,负数表示失败...但从逻辑上看似乎没有什么作用。
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
// 相比put()方法其差异点就在次数,每次限时挂起之前都会先检查是否已经超时,超时则直接退出并返回false(offer()方法在定义上是不会抛出
// 异常的)。
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// 如果计数(快照)为0,意味着队列不再是空队列了,可以唤醒被阻塞的拿取者了。之所以只在为0的时候唤醒是因为其它数字意味着队列不为空,这
// 种情况下拿取者是不会被阻塞的的。
if (c == 0)
signalNotEmpty();
return true;
}
四 移除/拿取
移除/拿取是LinkedBlockingQueue(链接阻塞队列)类两大最核心也是最常用的方法之一,用于从队列的头部移除并返回元素。移除/拿取方法受双锁中拿取锁的保护以确保同步,因此在同一时刻内最多只会存在一个执行移除/拿取方法的线程(下文简称拿取者)。移除/拿取方法同样有四种形式的实现。
异常
remove()方法是移除/拿取方法中“异常”形式的实现,其特点是当队列存在元素时将头部元素移除并返回;否则直接抛出NoSuchElementException(无元素异常)。remove()方法由Queue(集)接口进行定义,并最终由AbstractQueue(抽象队列)抽象类实现。因此LinkedBlockingQueue(链接阻塞队列)类是直接通过继承获取该方法的,其本身并没有实现这个方法。具体源码及注释如下:
/**
* Retrieves and removes the head of this queue. This method differs from {@link #poll poll} only in that it throws an exception if this queue is empty.
* 检索并移除队列的头。该方法不同于poll()方法,如果队列为空时其会抛出一个异常。
* <p>
* This implementation returns the result of <tt>poll</tt> unless the queue is empty.
* 除非队列为空,否则该实现返回poll()的结果。
*
* @return the head of this queue 队列的头(元素)
* @throws NoSuchElementException if this queue is empty
* 无元素异常:如果队列为空
*/
public E remove() {
// 调用poll()方法获取元素。
E x = poll();
if (x != null)
// 如果元素存在,直接返回。
return x;
else
// 如果元素不存在,抛出无元素异常。
throw new NoSuchElementException();
}
可以看到remove()方法底层是调用poll()方法实现的,这意味着其与offer(E e)方法一样,都采用了模板模式。
特殊值
poll()方法是移除/拿取方法中“特殊值”形式的实现,其特点是当队列存在元素时将头部元素移除并返回;否则返回null。由于被null被作为队列不存在元素的标志值,因此LinkedBlockingQueue(链接阻塞队列)类是不允许存null的。poll()方法由Queue(队列)接口定义,并由LinkedBlockingQueue(链接阻塞队列)类自身实现。具体源码及注释如下:
/**
* @Description: 轮询:直接进行获取,没有元素的情况下直接返回null。
*/
@Override
public E poll() {
// 进行一次判断,如果没有则直接返回null,这是双重检查锁的第一次非同步检查,有助于提高并发。
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
// 获取拿取锁。
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 在持有锁的情况下再进行一次判断,在满足条件的情况下获取头元素的条目。
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
// 如果还有元素,则唤醒其它的拿取者。
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
// 如果此次拿取之前队列是满溢的状态,则拿取后需要唤醒被挂起的放置者。只在该条件下唤醒是因为放置者在队列未满溢的情况下是不会挂起的。
if (c == capacity)
signalNotFull();
return x;
}
阻塞(无限阻塞)
take()方法是移除/拿取方法中“阻塞”形式的实现,其特点是当队列不存在元素时阻塞,直至存在元素后移除/拿取,且该方法没有返回值。take()方法由BlockingQueue(阻塞队列)接口定义,并由LinkedBlockingQueue(链接阻塞队列)类实现。拿取者的阻塞实现与put()方法相同,但Condition(条件)接口对象是通过拿取锁创建的。具体源码及注释如下:
/**
* @Description: 拿取:如果不存在可用元素则无限等待至有元素插入为止。
*/
@Override
public E take() throws InterruptedException {
E x;
int c = -1;
// 获取拿取锁。
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 如果没有可用的元素,则将自身等待(本质是将自身加入相应的条件队列),直至被唤醒,并再次重复判断。
while (count.get() == 0) {
notEmpty.await();
}
// 获取队列的头元素中保存的条目(将旧头元素抛弃,并返回其后继元素保存的条目。将后继元素作为新的头元素保留在队列中)。
x = dequeue();
c = count.getAndDecrement();
// 如果还有元素,则唤醒处于等待状态的其他拿取者。
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 如果此次拿取之前队列是满溢的状态,则拿取后需要唤醒被挂起的放置者。只在该条件下唤醒是因为放置者在队列未满溢的情况下是不会挂起的。
if (c == capacity)
signalNotFull();
return x;
}
超时(有限阻塞)
poll(long timeout, TimeUnit unit)方法是移除/拿取方法中“阻塞”形式的实现,其特点是当队列不存在元素时阻塞,但只会阻塞指定的时间。在指定时间内一旦存在元素就会移除/拿取并返回,否则返回false。poll(long timeout, TimeUnit unit)方法由BlockingQueue(阻塞队列)接口定义,并由LinkedBlockingQueue(链接阻塞队列)类实现。拿取者的阻塞实现与offer(E e, long timeout, TimeUnit unit)方法相同,但Condition(条件)接口对象是通过拿取锁创建的。具体源码及注释如下:
/**
* @Description: 轮询:如果元素不存在则等待限定的时间,在指定时间内获取则返回,否则返回null。
*/
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
// 计算过期时间并获取拿取锁。
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 在没有元素的情况下将自身限时挂起,挂起前先判断是否超时,超时则直接返回null。
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
// 获取头元素的内容。
x = dequeue();
c = count.getAndDecrement();
// 如果还有元素,则唤醒其它挂起的拿取者。
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 如果此次拿取之前队列是满溢的状态,则拿取后需要唤醒被挂起的放置者。只在该条件下唤醒是因为放置者在队列未满溢的情况下是不会挂起的。
if (c == capacity)
signalNotFull();
return x;
}
五 检查
检查也是LinkedBlockingQueue(链接阻塞队列)类的常用方法之一,用于获取队列的头元素,并不会将元素从队列中移除/拿取。该类方法本质上属于移除/拿取方法阉割版,因此也属于拿取锁的保护范围。检查方法同样具备多形式的实现,但只有“异常”与“特殊值”两种。
异常
element()方法是检查方法中“异常”形式的实现,其特点是当队列存在元素时将头部元素返回(但不移除);否则抛出NoSuchElementException(无元素异常)。element()方法由Queue(集)接口进行定义,并最终由AbstractQueue(抽象队列)抽象类实现。因此LinkedBlockingQueue(链接阻塞队列)类是直接通过继承获取该方法的,其本身并没有实现这个方法。具体源码及注释如下:
/**
* Retrieves, but does not remove, the head of this queue. This method differs from {@link #peek peek} only in that it throws an exception if this
* queue is empty.
* 检索,但不移除队列的头(元素)。该方法不同于peek()方法,如果队列为空时其会抛出一个异常。
* <p>
* This implementation returns the result of <tt>peek</tt> unless the queue is empty.
* 除非队列为空,否则该实现返回peek()的结果。
*
* @return the head of this queue 队列的头(元素)
* @throws NoSuchElementException if this queue is empty
* 无元素异常:如果队列为空
* @Description: 元素:用于返回队列的头元素(但不移除)。当队列中不存在元素时抛出无元素异常。
*/
public E element() {
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
可以看到element()方法底层是调用peek()方法实现的,这意味着其与offer(E e)及remove()方法一样,都采用了模板模式。
特殊值
peek()方法是检查方法中“特殊值”形式的实现,其特点是当队列存在元素时将头部元素返回(但不移除);否则返回null。由于被null被作为队列不存在元素的标志值,因此LinkedBlockingQueue(链接阻塞队列)类是不允许存null的。peek()方法由Queue(队列)接口定义,并由LinkedBlockingQueue(链接阻塞队列)类自身实现。具体源码及注释如下:
/**
* @Description: 窥视:获取元素,但不会将之从元素中取出。
*/
@Override
public E peek() {
// 进行双重检查锁的第一次判断,如果没有元素则直接返回null。
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 如果存在头元素(准确的说应该是头元素的后继元素),则直接返回其条目,否则返回null。但注意,该操作不会将条目从条目中移除。
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
// 因为不存在元素数量的变化,因此也就不会进行唤醒操作。
}
六 其它
除了上述提及的主要方法之外,LinkedBlockingQueue(链接阻塞队列)类还有一些常用方法如下所示:
- size() —— 大小 —— 用于获取队列当前的元素总数;
- remainingCapacity() —— 剩余容量 —— 用于获取队列当前的剩余容量(剩余容量 = 容量 - 大小);
- drainTo() —— 流失 —— 将队列中的元素迁移到指定的集中;
- drainTo(Collection<? super E> c, int maxElements) —— 流失 —— 从队列迁移指定数量的元素到指定的集中。
事实上,由于LinkedBlockingQueue(链接阻塞队列)类是Collection(集)接口的实现类,因为其也实现了其定义的所有方法,例如contains(Object o)、remove(Object o)及 toArray()等。但由于这些方法的执行效率不高,并且与LinkedBlockingQueue(链接阻塞队列)类的主流使用方式并不兼容,因此一般情况下是不推荐使用的,有兴趣的可以去查看源码实现。
七 双锁机制
LinkedBlockingQueue(链接阻塞队列)类采用双锁机制来保证线程安全。所谓双锁机制,是指LinkedBlockingQueue(链接阻塞队列)类在内部创建了放置与拿取两把ReentrantLock(可重入)类对象锁,分别用于管理插入/放置及移除/拿取两类方法的同步(即插入/放置方法只会受放置锁的保护,而移除/拿取方法只会受拿取锁的保护)。如此设计的原因是由于调用最为频繁的插入/放置及移除/拿取方法分别在队列的两端发生,因此二者基本不存在资源的竞争问题,完全可以在大多数情况下实现两类方法的并发(基本不存在,但还是已存在的,下文有详述)。故而使用双锁机制不仅可以保证单类方法的同步执行,还允许两类方法的并发执行,有效的提高了队列的并发性能。
双锁机制虽然优秀,但也并非尽善尽美:首先,由于双锁机制是专为插入/放置及移除/拿取这两类方法的执行特性而设计,因此也只适用于这两类或操作本质与之相同(例如drainTo()方法)的方法。而对于类的其它方法而言为了保证线程安全性就必须加双锁,这增加了线程同步的成本。例如实现自Collection(集)接口的remove(Object o)方法的作用是从队列中移除指定元素,由于指定元素可能存在于队列的任意位置,因此虽然也是移除,但并不符合移除/拿取方法的执行特性(从队列头部移除),因此必须在双锁的保护下执行…不过由于这些方法的共性是性能较低,并且也只是为了偶尔的使用而保留,因此一般较少也不推荐使用,故而整体影响不大;其次想要正确的使用双锁机制至少有两个问题需要解决:
- 维护“总数”的正确性;
- 单元素时的安全并发。
维护“总数”的正确性
所谓总数,即队列中元素的数量,是代码逻辑中一个至关重要的运行数据及判断依据,关系着能否获取到正确的元素总数/剩余容量,挂起的放置者/拿取者是否应该唤醒以及放置者能否插入/放置,拿取者能否移除/拿取等一系列操作能否正确执行,因此总数必须保证绝对的正确,即与队列实际的元素总数保持强一致。
维护总数正确性的最终目的是实现总数快照(总数是保存在主内存中的共享数据,线程从主内存中读取共享数据时会在其工作内存中储存数据的拷贝,也被称为快照)的可靠性,而总数快照可靠性必须在总数正确性的基础上实现。那又为何要实现总数快照的可靠性呢?这是因为总数快照是是否执行插入/放置及移除/拿取两类方法的判断依据(总数快照等于容量,即队列已满时无法进行插入/放置,而总数快照等于0时无法进行移除/拿取),如果总数快照不可靠,那这两类方法必然也是不可靠的。
总数的正确性由于受插入/放置及移除/拿取两类方法的影响,并且两类方法又允许并发,那根据常规做法,应该将对其的修改同时置于两把锁的保护下进行以保证同步。这不仅确保总数的正确性,甚至还确保了总数快照的正确性,因为在双锁的保护下程序是不存在并发的(请注意!!!这里我们形容总数快照时用的是正确性,而不是可靠性。所谓快照的正确性是指其与主数据保持强一致。正确的一定是可靠的,但可靠的并不一定是正确的。在并发环境中,我们并无法保证快照的正确性,因为其终究只是主数据的拷贝,而主数据完全可能已被其它线程所修改,那在这种情况下,快照就是错误的。而由于错误的快照也可能可靠,那此时快照是否可靠就变得难以揣测,除非我们能够得知主数据的变化趋势进行具体的分析)。可如此做法显然破坏了方案的整体基调,双锁机制不但没有带来任何收益,反而成为了额外的负担(即插入/放置时需要额外获取拿取锁,而移除/拿取时又需要额外获取放置锁,这不但造成了两类操作的再次阻塞,还徒增了一个锁的消耗)。为了避免这一点,LinkedBlockingQueue(链接阻塞队列)类采用的方案是将用于记录总数的count(总数)字段设计为AtomicInteger(原子整数)类型以使得对其的操作呈原子性,具体如下:
/**
* Current number of elements
* 当前元素数量
*
* @Description: 总数:记录当前队列中元素的数量。
*/
private final AtomicInteger count = new AtomicInteger();
AtomicInteger(原子整数)类是一个基于乐观锁实现的线程安全的整数类,其使得对count(总数)字段的修改都转变为了原子操作,这就意味着“总数”必然是正确的(都已经原子操作了怎么可能还不正确…除非用的不对),达成了实现总数快照可靠性的先决条件,并且也没有破坏插入/放置及移除/拿取两类方法间的并发,那么接下来要做的就是保证总数快照在两类方法中的可靠性。我们已知的是正确的一定是可靠的,那AtomicInteger(原子整数)类型的count(总数)字段能保证总数快照在两类方法的单锁场景下的正确性吗?不行!因为上文已经强调过无法保证并发环境中快照的正确性…那可靠性呢?答案是可以。
我们来仔细分析一下这个过程,首先直接跳过可能正确的情况,因为正确的一定是可靠的,故而没有分析的必要。在认定总数快照一定错误的情况下(即总数已被其它线程修改),如何判定其是否可靠呢?这就需要根据总数的变化趋势来进行判断,具体的模拟场景如下:
放置者(插入/放置) | 拿取者(移除/拿取) | |
---|---|---|
T01 | 获取放置锁 | |
T02 | 获取总数快照(5)以判断队列是否已满(否) | |
T03 | 获取拿取锁 | |
T04 | 获取总数快照(5)以判断队列是否为空(否) | |
T05 | 从队列头部移除/拿取元素(实际元素总数4) | |
T06 | 原子操作递减总数(5 - 1 = 4) | |
T07 | 从队列尾部插入/放置元素(实际元素总数5) | |
T08 | 原子操作递增总数(4 + 1 = 5) | |
T09 | 解除放置锁 | |
T10 | 解除拿取锁 |
上述的模拟场景仅仅是一个示例,实际上我们可以列举出很多相似的场景。但无一例外我们都会发现,无论在放置者执行插入/放置方法期间,有多少拿取者(串行)执行了移除/拿取方法,都只会使总数在原有5的基础上进一步减少,并不会破坏放置者执行插入/放置的前提,即“总数快照(5)小于容量”;反之亦然,无论在拿取者执行移除/拿取方法期间,有多少放置者(串行)执行了插入/放置方法,都只会使总数在原有5的基础上进一步增加,也不会破坏拿取者执行移除/拿取的前提,即“总数快照(5)大于0”。因此我们可以得到一个结论:判断错误快照是否可靠的关键在于判断并发执行的操作是否会破坏“以快照作为判断依据并成立的判断结果”。而很显然,放置者与拿取者的并发执行并不会对方造成破坏,也因此,其各自的总数快照无论正确错误都是可靠的。
单元素时的安全并发
双锁机制虽然在绝大多数的情况下并不存在元素的竞争问题,但是细心的同学肯定很快就想到了一种场景 —— 队列中仅存在单元素。没错,这的确也是双锁机制中仅有的存在线程安全问题的场景。当放置者和拿取者都判断到队列只剩单个元素时,放置者要在其的基础上进行插入/放置,而拿取者则是要将之移除/拿取,那么就可能发生类似于以下场景的线程安全问题(注:由于队列中只存在一个元素,因此用于持有头节点引用的head(头)字段与用于持有尾节点引用的last(尾/最后)字段指向的都是同一个节点):
放置者(插入/放置) | 拿取者(移除/拿取) | |
---|---|---|
T01 | 获取放置锁 | |
T02 | 获取总数快照(1)以判断队列是否已满(否) | |
T03 | 获取拿取锁 | |
T04 | 获取总数快照(1)以判断队列是否为空(否) | |
T05 | 通过head(头)字段获取头节点,并将之移除/拿取(实际元素总数0) | |
T06 | 由于队列已不存在节点,将head(头)字段赋值为null | |
T07 | 通过last(尾/最后)字段获取尾节点,于其后插入/放置(实际元素总数1) | |
T08 | 将last(尾/最后)字段赋值为新插入/放置的节点 | |
T09 | 由于队列已不存在节点,将last(尾/最后)字段赋值为null | |
T10 | 原子操作递减总数(1 - 1 = 0) | |
T11 | 原子操作递增总数(0 + 1 = 1) | |
T12 | 解除放置锁 | |
T13 | 解除拿取锁 |
虽然只是列举了一个场景,但已足以说明“在单元素场景下,双锁机制依然存在线程安全问题”。如何解决这个问题呢?加双锁?这种不切实际的事情就别想了…事实上,LinkedBlockingQueue(链接阻塞队列)类采用了一个很常规的方案:预留一个空节点作为头节点。
所谓的空节点,即不容纳具体元素的节点。将空节点作为头节点是基于链表实现的API中非常常见的做法,例如AQS就是如此,因此其还有一个专属名词:dummy node(哨兵节点)。一般来说使用空节点的目的是为了避免插入首个节点时所需要进行的特殊判断,从而令首个节点可以按常规流程加入链表。那这和我们上文提及的问题有什么关系呢?有的,因为空节点可以避免资源竞争问题。所谓单线程场景下,双锁机制的线程安全问题,本质其实就是线程在没有同步的情况下竞争资源。那解决思路自然也就存在两种:一是保证同步;而是避免资源竞争。所谓保证同步就是加双锁,显然不可能采用这样的思路,那避免资源竞争恰好就是双锁机制的立身之本(除了单元素场景外,插入/放置与移除/拿取两类操作是不存在资源竞争问题的),因此自然是首选,而具体的方案就是空节点。
由于空节点充当了头节点,因此在单元素场景下当拿取者执行移除/拿取操作时,其虽然会将首个非空节点(即空节点的首个后继节点)中保存的元素用于返回,但真正移除/拿取的实际上是空节点,而被获取了元素的非空节点则会转变空节点成为新的头节点保留在队列中(虽然移除的是空节点,但是由于元素所在的节点成为了新的空节点,因此变相等价于元素所在的节点被移除)。如此以来就避免了移除/拿取操作对last(尾/最后)字段的影响(尾节点依然保留在队列中,只不过于此同是它还成为了头节点/空节点),避免了对资源的竞争,自然也就避免了与放置者的并发时的线程安全问题。两者的运行流程大致如下:
放置者(插入/放置) | 拿取者(移除/拿取) | |
---|---|---|
T01 | 获取放置锁 | |
T02 | 获取总数快照(1)以判断队列是否已满(否) | |
T03 | 获取拿取锁 | |
T04 | 获取总数快照(1)以判断队列是否为空(否) | |
T05 | 通过head(头)字段获取头节点,并将之移除/拿取(实际元素总数0) | |
T06 | 将head(头)字段赋值为原头节点的后继节点 | |
T07 | 通过last(尾/最后)字段获取尾节点,于其后插入/放置(实际元素总数1) | |
T08 | 将last(尾/最后)字段赋值为新插入/放置的节点 | |
T09 | 原子操作递减总数(1 - 1 = 0) | |
T10 | 原子操作递增总数(0 + 1 = 1) | |
T11 | 解除放置锁 | |
T12 | 解除拿取锁 |
八 弱一致性迭代器
LinkedBlockingQueue(链接阻塞队列)类自身实现了弱一致性的迭代器。所谓弱一致性,即数据最终会达成一致,但可能存在延迟。具体表现在迭代器中即是迭代(即调用next()方法)得到一个已从队列中移除的元素。大致表现如下:
迭代者(即执行迭代的线程) | 移除者(执行移除/拿取方法及remove(Object o)方法的线程) | |
---|---|---|
T01 | 实例化迭代器,通过head(头)字段获取头节点,并获取其后继节点/元素A(队列首个非空节点)并保存,以备迭代 | |
T03 | 移除/拿取元素A | |
T07 | 迭代获取元素A,并获取节点B的后继节点/元素B并保存,以备迭代 | |
T08 | 迭代获取元素B,并获取节点B的后继节点/元素C并保存,以备迭代 | |
T09 | remove(Object o)方法移除元素C | |
T10 | 迭代获取元素C,并获取节点B的后继节点/元素D并保存,以备迭代 |
上述表格中重点标红的两句是实现弱一致性的关键。可以发现,无论是移除/拿取还是remove(Object o)方法都不会影响迭代器的正常迭代。这是因为为了实现弱一致性迭代器,哪怕节点已从队列中移除,也不会断开其后继引用。这就使得保存在迭代器中的节点即使已经从队列中移除,依然可以找到其在队列中的后继节点(如果有多个连续的节点都被移除,则会一直向后遍历,直到找到首个处于队列中的后继节点为止…或遍历结束)从而继续向后迭代。如此一来,弱一致性的迭代器便完成了…但与此同时会产生两个问题(并且只能解决一部分):
- 移除节点/元素可能在迭代器中长期保留;
- 可能导致的频繁老年代GC。
移除节点/元素可能在迭代器中长期保留
由于迭代器会保存迭代的节点/元素,因此已移除的节点/元素便会因此而无法被GC回收。问题的关键在于这种保留在时间上是难以预测的,即可能是短期、长期、甚至是永久保存。这完全取决于迭代器的迭代频率,而频率又与具体的业务相关。频率高就是短期,低就是长期,不调用就是永久。
可能导致的频繁老年代GC
由于为了实现迭代器的弱一致性的原因,被移除/拿取的节点不会断开后继引用,那就可能造成这样一种情况:由于拿取者不断的执行移除/拿取操作,使得在队外也形成一条空节点链(下文简称外链),并与最终与头节点相连。示例如下:
这个看起来似乎不是什么太大的事情…甚至可能算不上事情。不就是有一些没用的节点残留么?等着GC慢慢回收不就好了。事实上也确实如此,Java的GC太过智能,以至于我们往往/从来不会考虑这方面的执行过程。但实际上虽然无需亲自执行回收,但开发者仍有义务保证GC准确高效的执行。在上面的情况中存在这样一种场景:外链中存在部分空节点存活于老年代,并与后继存活于新生代的空节点相连,造成跨带引用的现象。示例如下:
跨带引用是非常难处理的。在跨带引用中,为了回收被引用的新生代对象,必须要先执行老年代对象的回收。老年代本身的Major GC就是不频繁的(因为老年代的对象存活周期一般都比较长),并且执行速度也远比新生代的Minor GC慢得多(有资料说至少有10倍以上的差距),因此在实际开发中要时刻注意避免。在当前场景中,为了回收外链中处于新生代的空节点,必须先触发老年代的Minor GC。这本质上不算太过严重的问题,一次跨带引用并不会对程序带来什么影响…但问题的关键在于队列是在不断的使用中的。每时每刻都可能有放置者将新的元素插入/放置进对象,同样也不断会有拿取者执行移除/拿取。这就意味着外链会一次又一次的形成,从而频繁的导致跨带引用。
为了解决这一点(即在保证迭代器弱一致性的基础上避免跨带引用),LinkedBlockingQueue(链接阻塞队列)类采用的方案是将移除/拿取的节点的后继引用设置为自身(即自我引用)。如此操作由两大作用:一是移除/拿取的节点不再具有其它节点的后继引用,那自然也就不会再出现跨带引用的问题;二是其可以作为迭代器的标记位使用。当迭代器迭代是发现保存在迭代器中的节点是自引用时,就说明该节点已经因为正常的移除/拿取出队了,这就意味着其首个存在于队列的后继节点必然是队列的首个非空节点,因此会直接通过head(头)字段跳跃到队列的头部获取。
上述操作虽然精妙,但遗憾的是该操作并不能对调用remove(Object o)方法移除的节点使用(即该类节点依然要保持原本的后继引用)。这是因为该类移除可能在队列的任意位置发生,其首个存在于队列的后继节点并不一定是队列的首个非空节点(或者说一定不是),故而不可以采用该方案,也意味着remove(Object o)方法是存在引发跨带引用的隐患的…但由于remove(Object o)方法本身在开发中就较少且不推荐使用…并且想要连续的删除还是有一定难度的…因此总体来说并不会有太大影响。
在阐述完上述所有的内容后,可知在LinkedBlockingQueue(链接阻塞队列)类的迭代器在实际的迭代中保证弱一致性的场景应该如下图所示:
九 相关系列
- 《Java ~ Executor》
- 《Java ~ Executor ~ LinkedBlockingQueue【总结】》
- 《Java ~ Executor ~ LinkedBlockingQueue【源码】》