LinkedBlockingQueue源码
LinkedBlockingQueue介绍
【1】LinkedBlockingQueue是一个基于链表实现的阻塞队列,默认情况下,该阻塞队列的大小为Integer.MAX_VALUE,由于这个数值特别大,所以 LinkedBlockingQueue 也被称作无界队列,代表它几乎没有界限,队列可以随着元素的添加而动态增长,但是如果没有剩余内存,则队列将抛出OOM错误。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在自定义线程池使用到LinkedBlockingQueue队列时会根据业务需求定义合适的队列容量值capacity。
【2】LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。LinkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞,分别是写锁putLock和读锁takeLock。添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行。
LinkedBlockingQueue类的一些关键特性和方法
- 无界队列:如果没有指定容量,队列的容量默认为 Integer.MAX_VALUE。
- 线程安全:通过内部锁机制确保了线程安全。
- FIFO 顺序:元素按照先进先出的方式进行排列。
- 条件对象:使用 ReentrantLock 和 Condition 对象来控制队列的入队和出队操作。
- 动态创建节点:每次插入操作时,除非这会使队列超出其容量,否则都会动态创建新的链表节点。
- 序列化:该类是可序列化的,这意味着它可以被写入到一个输出流中,并从输入流中恢复。
- 迭代器:提供了弱一致性迭代器,用于按顺序访问队列中的元素。
- Spliterator:提供了一个 Spliterator,支持在并行操作中使用 Stream API。
LinkedBlockingQueue使用
//创建有界队列,指定队列的大小为100
BlockingQueue<Runnable> boundedQueue = new LinkedBlockingQueue<>(100);
//无界队列
BlockingQueue<Runnable> unboundedQueue = new LinkedBlockingQueue<>();
LinkedBlockingQueue源码介绍
1.属性值
// 表示队列的容量上限,如果不指定容量值,则为Integer.MAX_VALUE,无限大
private final int capacity;
// 当前队列的元素数量 使用AtomicInteger来实现线程安全的计数器
private final AtomicInteger count = new AtomicInteger();
// 链表的头节点,其item属性总是null,用于标识队列的开始
transient Node<E> head;
// 链表的尾节点,总是指向最后一个节点,其next属性为null
private transient Node<E> last;
// 读锁 控制从队列中取出元素操作
private final ReentrantLock takeLock = new ReentrantLock();
// notEmpty:takeLock的条件对象
// 当拿到takeLock锁并且notEmpty条件对象条件成立也就是队列至少有一条可取数据的时候,才会从队列取出一个元素。
// 拿到takeLock锁,但是队列无元素时,线程会调用notEmpty条件对象的wait方法,这将导致线程释放锁并进入WAITING状态,等待其他线程的唤醒。(注意,offer()方法是非阻塞的,从队列中获取不到数据会返回false)
private final Condition notEmpty = takeLock.newCondition();
// 写锁 控制往队列中添加元素操作
private final ReentrantLock putLock = new ReentrantLock();
// notFull条件
// 当队列满了时,put锁会会阻塞在notFull上,等待其它线程唤醒
// 当拿到putLock锁并且notFull条件对象条件成立也就是队列未满(通过AutomicInteger记录的队列状态来判断当前队列元素数量是否大于队列容量阈值,小于则表示未满),才会往队列中添加元素。
private final Condition notFull = putLock.newCondition();
// 典型的单链表结构
// Node类是一个泛型类,可以存储任何类型的元素(由类型参数E指定)。
static class Node<E> {
E item; // 存储节点中的数据项
Node<E> next; // 单链表结构 指向链表中的下一个节点,有三种可能的值:
// 1.真实的后继节点。
// 2.this Node,这通常意味着后继是head.next,这可能是在初始化时或者在某些特殊情况下使用。
// 3.null,表示没有后继节点,即当前节点是链表的最后一个节点。
Node(E x) { item = x; } // 有参构造函数
}
2.构造器
public LinkedBlockingQueue() {
// 如果没传容量,就使用最大int值初始化其容量
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
// 初始化head和last指针为空值节点
last = head = new Node<E>(null);
}
// TaskQueue队列才会使用到这个有参够咱
// 这是构造器的声明,它接受一个Collection类型的参数c,该参数中的元素类型是E的子类型。
public LinkedBlockingQueue(Collection<? extends E> c) {
// 调用LinkedBlockingQueue的另一个有参构造器,将队列的容量设置为Integer.MAX_VALUE,这意味着队列可以无限大。
this(Integer.MAX_VALUE);
// 创建一个ReentrantLock对象putLock,它用于控制对队列的写入操作。
final ReentrantLock putLock = this.putLock;
putLock.lock(); // 获取putLock的锁,以确保在向队列添加元素时,其他线程不能同时修改队列。
try {
// 初始化一个计数器n,用于记录添加到队列中的元素数量。
int n = 0;
for (E e : c) {
// 如果集合中的某个元素为null,则抛出NullPointerException。
if (e == null)
throw new NullPointerException();
// 如果队列已满(即添加的元素数量达到队列容量),则抛出IllegalStateException。
if (n == capacity)
throw new IllegalStateException("Queue full");
// 将元素e封装成Node对象,并使用enqueue方法将其添加到队列中。
enqueue(new Node<E>(e));
// 每次成功添加一个元素后,增加计数器n。
++n;
}
// 更新队列中的元素数量。
count.set(n);
} finally {
putLock.unlock();
}
}
3.入队put方法-阻塞
/**
* 队列添加元素
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
// 检查传入的元素是否为 null,如果是,则抛出 NullPointerException。
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
// 声明了一个整型变量 c 并初始化为 -1,这通常用作一个标志,表示操作尚未成功完成。
int c = -1;
// 创建了一个新的节点 node 来存储传入的元素 e。
Node<E> node = new Node<E>(e);
// 获取用于控制对队列进行入队操作的 ReentrantLock。
final ReentrantLock putLock = this.putLock;
// 获取用于记录队列中元素数量的
final AtomicInteger count = this.count;
// 获取 putLock 锁,如果线程在获取锁的过程中被中断,则会抛出 InterruptedException。
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
// 如果队列已满(当前计数等于容量),则调用 notFull 条件对象的 await 方法,使当前线程等待,直到被另一个线程唤醒。
while (count.get() == capacity) {
notFull.await();
}
// 调用 enqueue 方法将新节点添加到队列尾部。
enqueue(node);
// 原子地读取当前的计数并将其增一,c 变量存储了增一前的计数。
c = count.getAndIncrement();
// 如果队列中还可以添加更多元素(当前计数加一小于容量),则使用 notFull 条件对象的 signal 方法唤醒等待的线程。
if (c + 1 < capacity)
notFull.signal();
} finally {
// 释放 putLock 锁。
putLock.unlock();
}
// 如果入队前队列为空(计数为0),调用 signalNotEmpty 方法,以通知可能等待的线程队列现在不为空。
if (c == 0)
signalNotEmpty();
}
4.入队offer方法-非阻塞
/**
* 队列添加元素
* 这段代码是LinkedBlockingQueue类中的offer方法的实现。offer方法用于向队列尾部添加一个元素,如果队 * 列未满,则添加成功并返回true,否则返回false。与add方法不同,offer方法在无法添加元素时不会抛出异 * 常,而是返回一个布尔值来告知操作结果。
*
* @throws NullPointerException 元素为空抛出空指针异常
*/
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
// 队列当前元素计数器
final AtomicInteger count = this.count;
// 队列已满,则返回false
if (count.get() == capacity)
return false;
// 队列当前元素计数器初始化为-1
int c = -1;
// Node节点封装元素,包含节点数据和指向下一节点的指针
Node<E> node = new Node<E>(e);
// 写锁获取
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 如果队列当前元素计数小于队列最大容量,则执行元素入队操作
if (count.get() < capacity) {
// 入队
enqueue(node);
// 这个原子操作表示,c=count(原值),count(新值)=count+1。注意:这个c的值是count+1之前的值,所以才会用 if (c + 1 < capacity)判断
c = count.getAndIncrement();
// 如果队列未满,唤醒等待队列不满的线程(如果有的话)来往队列添加任务。
if (c + 1 < capacity)
// signal底层:notFull条件对象实现了AQS接口,通知线程的底层是AQS,唤醒的方法是 LockSupport.unpark(node.thread);
// 这个通知不需要重新获取putLock锁,因为使用的是ReentrantLock->重入锁
notFull.signal();
}
} finally {
// 释放锁
putLock.unlock();
}
// 如果队列当前元素为空,说明队列已经添加了元素了,唤醒等待队列非空的线程(如果有的话)来往队列获取任务。
// notEmpty条件对象实现了AQS接口,通知线程的底层是AQS,唤醒的方法是 LockSupport.unpark(node.thread);
// 这个通知需要获取takeLock锁,获取到锁才会通知。
if (c == 0)
signalNotEmpty();
// 任务添加成功返回true
return c >= 0;
}
/**
* 队列添加元素,并且支持单位时间内添加成功
*
* @return {@code true} if successful, or {@code false} if
* the specified waiting time elapses before space is available
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
// 如果传入的元素为null,则抛出NullPointerException。
if (e == null) throw new NullPointerException();
// 将超时时间转化为纳秒
long nanos = unit.toNanos(timeout);
// 初始化一个计数器c,用于记录添加操作前的队列元素数量。
int c = -1;
// 获取用于控制队列尾部添加操作的锁。
final ReentrantLock putLock = this.putLock;
// 获取队列中的元素计数器。
final AtomicInteger count = this.count;
// 注意,获取写锁,这里是支持中断锁:如果当前线程在尝试获取锁的过程中被中断,抛出 InterruptedException
putLock.lockInterruptibly();
try {
// 如果队列当前元素数量等于队列容量最大值,则进入while循环
while (count.get() == capacity) {
// 如果队列已满,但是超时时间还没结束,则一直循环,直到有效时间过期,返回false
if (nanos <= 0)
return false;
// 更新剩余的超时时间,等待直到队列有空间或超时时间结束。
// notFull是一个Condition对象,调用它的awaitNanos方法将导致当前线程释放锁并等待,直到被唤醒或超时
nanos = notFull.awaitNanos(nanos);
}
// 如果队列有空间,将新节点添加到队列尾部
enqueue(new Node<E>(e));
// 原子性地获取当前的元素数量并将其增加1
c = count.getAndIncrement();
// 如果队列未满,唤醒一个等待队列不满的线程
if (c + 1 < capacity)
notFull.signal();
} finally {
// 释放锁
putLock.unlock();
}
// 如果队列当前元素为空,说明队列已经添加了元素了,唤醒等待队列非空的线程(如果有的话)来往队列获取任务。
if (c == 0)
signalNotEmpty();
return true;
}
5.出队take方法-阻塞
// 从队列中取出一个元素。这个方法会抛出 InterruptedException,如果线程在等待队列非空时被中断。
// 阻塞获取
public E take() throws InterruptedException {
// 声明了元素变量 x,稍后将用于存储从队列中取出的元素。
E x;
// 声明了一个整型变量 c,用于当前记录队列中的元素数量。
int c = -1;
// 获取用于记录队列中元素数量的 AtomicInteger 引用。
final AtomicInteger count = this.count;
// 获取用于控制对队列进行出队操作的 ReentrantLock 引用。
final ReentrantLock takeLock = this.takeLock;
// 调用 lockInterruptibly 方法获取锁,如果当前线程在获取锁的过程中被中断,会抛出 InterruptedException。
takeLock.lockInterruptibly();
try {
// 如果队列为空(计数为0),则调用 notEmpty 条件对象的 await 方法,使当前线程等待,直到被另一个线程唤醒。
while (count.get() == 0) {
notEmpty.await();
}
// 调用 dequeue 方法从队列头部移除并返回一个元素,存储在变量 x 中。
x = dequeue();
// 原子地读取当前的计数并将其减一,c 变量存储了减一前的计数。
c = count.getAndDecrement();
// 如果队列中还有至少一个元素(c > 1),则使用 notEmpty 条件对象的 signal 方法唤醒等待的线程。
if (c > 1)
notEmpty.signal();
} finally {
// 释放 takeLock 锁。
takeLock.unlock();
}
// 如果取出元素后的计数等于队列的容量,调用 signalNotFull 方法,以通知可能等待的线程队列现在不是满的。
if (c == capacity)
signalNotFull();
return x;
}
6.出队poll方法-非阻塞
// 从队列获取元素,返回类型是E,E是队列中元素的类型,获取不到直接返回null
public E poll() {
// 创建了一个AtomicInteger对象count,用于记录队列中的元素数量。
final AtomicInteger count = this.count;
// 检查队列是否为空,如果是,则返回null
if (count.get() == 0)
return null;
// 声明了一个元素变量x,用于存储从队列中取出的元素。
E x = null;
// 声明了一个整型变量c,用于稍后记录取出元素前队列中的元素数量。
int c = -1;
// 声明了一个ReentrantLock对象takeLock,用于同步线程对队列的操作。
final ReentrantLock takeLock = this.takeLock;
// 获取takeLock的锁,以确保线程安全。
takeLock.lock();
try {
// 再次检查队列是否非空。
if (count.get() > 0) {
// 如果队列非空,从队列中移除并返回一个元素。
x = dequeue();
// 原子的读取当前的计数并将其减一,c变量存储了减一前的计数。
c = count.getAndDecrement();
// 如果队列中还有至少一个元素,发送一个信号通知可能等待的线程队列非空。
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
// 如果取出元素后c的计数等于队列的容量,发送一个信号通知可能等待的线程队列现在不是满的了。
if (c == capacity)
signalNotFull();
// 返回从队列中取出的元素。
return x;
}
// 有效时间内获取队列元素,获取不到直接返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 声明了元素变量 x,并初始化为 null。
E x = null;
// 声明了一个整型变量 c,并初始化为 -1,用作状态标志。
int c = -1;
// 将超时时间转换为纳秒数。
long nanos = unit.toNanos(timeout);
// 获取用于记录队列中元素数量的 AtomicInteger。
final AtomicInteger count = this.count;
// 获取用于控制对队列进行出队操作的 ReentrantLock。
final ReentrantLock takeLock = this.takeLock;
// 获取 takeLock 锁,如果线程在获取锁的过程中被中断,则会抛出 InterruptedException。
takeLock.lockInterruptibly();
try {
// 如果队列为空,则循环等待,直到队列非空或超时。
while (count.get() == 0) {
// 如果超时时间已经过去,则立即返回 null。
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
// 如果队列非空,则调用 dequeue 方法移除并返回队列头部的元素。
x = dequeue();
// 原子地读取当前的计数并将其减一,c 变量存储了减一前的计数。
c = count.getAndDecrement();
// 如果队列中还有至少一个元素,则使用 notEmpty 条件对象的 signal 方法唤醒等待的线程。
if (c > 1)
notEmpty.signal();
} finally {
// 释放 takeLock 锁。
takeLock.unlock();
}
// 如果当前队列元素计数等于队列的容量,证明队列至少有一个空位,调用 signalNotFull 方法,以通知可能等待的线程队列现在不是满的。
if (c == capacity)
signalNotFull();
return x;
}
队列的出队公共方法
// 队列移除元素
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
// 获取头节点,头节点的item为null,next指向第一个节点,也就是指向最先入队的节点
Node<E> h = head;
// 获取第一个节点
Node<E> first = h.next;
// h.next表示h下一个节点指向,现在指向h,也就是指向本身,证明没有其他对象引用,方便JVM垃圾收集器回收
h.next = h; // help GC
// 移动头节点next指针:将 head 引用移动到 first,即队列的第一个元素,因为原来的头节点已经被移除了。那么,first的next节点就是第一个元素了。
head = first;
// 获取第一个节点的数据
E x = first.item;
// 将 first 节点中的元素置为 null,这样做可以减少对对象的引用,有助于垃圾收集。
// 头节点head的item也置为空,只维护next指针,指向就是第一个元素,也就是没移除第一个元素之前的第二个元素或者null
first.item = null;
// 返回第一个节点数据
return x;
}
队列的入队公共方法
// 队列添加元素
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
LinkBlockingQueue单向列表结构
在 LinkedBlockingQueue 中,队列的头(head)和尾(tail)是两个非常重要的概念:
- 头节点(Head):队列的头部,即队列中的第一个元素。在 LinkedBlockingQueue 中,头节点是队列中最老的元素,也就是最先进入队列的元素。当我们从队列中取元素时,总是从头节点开始取,头节点的next指针指向的元素就是我们的第一个元素,头节点的item永远为空。
- 尾节点(Tail):队列的尾部,即队列中的最后一个元素,尾节点的item就是我们的最后一个元素,尾节点的next永远为空。在 LinkedBlockingQueue 中,尾节点是队列中最新的元素,也就是最后进入队列的元素。当我们向队列中添加元素时,总是从尾节点开始添加。
LinkedBlockingQueue 中的头节点和尾节点是通过两个指针 head 和 tail 来维护的。当队列不为空时,head 总是指向队列的第一个元素,tail 总是指向队列的最后一个元素。
LinkedBlockingQueue总结
【1】无界阻塞队列,可以指定容量,默认为 Integer.MAX_VALUE,先进先出,存取互不干扰
【2】数据结构:链表(可以指定容量,默认为 Integer.MAX_VALUE,内部类Node存储元素)
【3】锁分离:读写锁分离,存取互不干扰,存取操作的是不同的Node对象【这是最大的亮点】
【4】阻塞对象(notEmpty【出队:队列count=0,无元素可取时,阻塞在该对象上】,notFull【入队:队列count=capacity,放不进元素时,阻塞在该对象上】)
【5】入队,从队尾入队,由last指针记录。
【6】出队,从队首出队,由head指针记录。
【7】线程池中采用LinkedBlockingQueue而不采用ArrayBlockingQueue的原因便是因为锁分离带来了性能的提升,大大提高队列的吞吐量。
小结:
LinkedBlockingQueue使用锁和条件对象来控制对链表的并发访问,保证线程安全。