1.简介
ArrayBlockingQueue
是 BlockingQueue
接口的一个实现类,它基于数组实现了一个有界阻塞队列。创建 ArrayBlockingQueue
实例时需要指定队列的容量,队列的大小是固定的,无法动态增长。
主要特点包括:
-
有界性:
ArrayBlockingQueue
的容量是固定的,在创建队列时需要指定容量大小,一旦队列达到最大容量,再尝试向队列中添加元素将会导致阻塞,直到队列中有空间。 -
线程安全性: 提供了线程安全的队列操作,多个线程可以安全地在队列中进行添加和移除元素的操作,无需额外的同步措施。
-
FIFO 排序:基于数组实现,采用先进先出(FIFO)的顺序,保证了队列中元素的顺序性。
-
阻塞操作:当尝试向队列中添加元素时,如果队列已满,生产者线程将会被阻塞,直到队列中有空间可用;当尝试从队列中取出元素时,如果队列为空,消费者线程将会被阻塞,直到队列中有新的元素可用。
-
性能稳定:基于数组实现的,在添加和移除元素时具有稳定的性能表现,与队列大小无关。
2.重要的方法
BlockingQueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:
抛异常 | 特定值 | 阻塞 | 超时 | |
---|---|---|---|---|
插入 | add(o) | offer(o) | put(o) | offer(o, timeout, timeunit) |
插入 | remove() | poll() | take() | poll(timeout, timeunit) |
检查 | element() | peek() |
四组不同的行为方式解释:
-
抛异常: 如果试图的操作无法立即执行,抛一个异常。
-
特定值: 如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
-
阻塞: 如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
-
超时: 如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是
true
/false
)。
无法向一个 ArrayBlockingQueue
中插入 null
。如果你试图插入null
,ArrayBlockingQueue
将会抛出一个 NullPointerException
。
3.重要属性
final Object[] items;
int takeIndex;
int putIndex;
int count;
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
-
items
数组: 用于存储队列元素的数组。 -
count
: 用于记录当前队列中的元素数量。 -
takeIndex
和putIndex
: 分别表示队列头部和尾部元素在数组中的索引位置。 -
lock
: 使用ReentrantLock
来实现对队列操作的线程安全控制。 -
notEmpty
和notFull
: 条件变量,用于实现队列非空和队列未满的等待通知机制。
这些属性在ArrayBlockingQueue
内部使用,可以帮助实现队列的基本功能和线程安全访问。
4.构造方法
构造方法如下所示:
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
-
首先,构造方法接受两个参数:
capacity
表示队列的容量大小,fair
表示是否使用公平性策略。 -
在方法中首先对
capacity
进行检查,如果小于等于 0,则抛出IllegalArgumentException
异常。 -
然后,创建一个
Object
类型的数组items
作为队列的存储结构,其大小为capacity
,即指定了队列的容量。 -
创建一个
ReentrantLock
对象lock
,用于在多线程环境下对队列进行加锁操作。fair
参数用于决定是否使用公平性策略,如果为true
,则表示使用公平性策略,否则为非公平性策略。 -
最后,创建两个
Condition
对象notEmpty
和notFull
,它们分别用于表示队列非空和非满的条件,通过它们可以实现线程的等待和唤醒操作。
5.添加元素方法
5.1 offer(e)方法
offer(e)
方法会在加锁的情况下尝试向队列中添加元素,如果队列已满则返回false
,否则将元素插入队列并返回true
。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
-
首先,对传入的元素e进行空指针检查,如果为空则抛出
NullPointerException
。 -
获取队列的
ReentrantLock
对象lock
,并对其进行加锁操作。 -
在加锁的情况下,判断队列是否已满,如果已满则返回
false
;如果未满,调用enqueue(e)
方法将元素e
插入队列,并返回true
。 -
最后,无论是否成功添加元素,最终都会释放锁。
5.1.1 enqueue(e)方法
enqueue(e)
插入元素,并通过更新putIndex
和count
来维护队列的状态,同时通过notEmpty.signal()
来唤醒可能阻塞在队列非空条件上的消费者线程。
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
-
首先获取
ArrayBlockingQueue
中的items
数组,items
数组用于存放队列中的元素。 -
items[putIndex] = x;
: 将元素x插入到items
数组的putIndex
位置上,putIndex
表示当前要插入元素的位置。 -
if (++putIndex == items.length) putIndex = 0;
: 这一行代码用于更新putIndex
的值,如果putIndex
超出了数组长度,则将其置为0,实现循环利用数组空间的目的。 -
count++;
: 增加队列的元素数量计数器count
。 -
n
otEmpty.signal();
: 发送信号通知处于等待状态的消费者线程,队列中已经有元素可以消费了。
5.2 offer(o, timeout, timeunit)方法源码如下:
offer(o, timeout, timeunit)
方法会在加锁的情况下尝试向队列中添加元素,在指定的超时时间内等待队列有空间可用,如果成功添加则返回true
,超时未成功则返回false
,如果被中断则抛出InterruptedException
异常。
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
-
首先,对传入的元素
e
进行空指针检查,如果为空则抛出NullPointerException
。 -
将
timeout
转换为纳秒单位,并获取队列的ReentrantLoc
k对象lock
,使用lockInterruptibly
方法加锁,支持响应中断。 -
在加锁的情况下,如果队列已满,则会进入循环等待,同时根据剩余的超时时间不断进行等待,直到超时或者队列有空间可用。
-
如果在等待过程中成功添加元素,则返回
true
,否则如果超时未成功添加元素则返回false
。 -
最后,无论是否成功添加元素,最终都会释放锁。
5.3 add(e)方法源码如下:
add(e)
方法是调用父类AbstractQueue
的add(e)
方法:
public boolean add(E e) {
return super.add(e);
}
5.3.1 AbstractQueue的add(e)方法,源码如下
add(e)
方法会首先尝试向队列中添加元素,如果成功则返回true
,如果队列已满则抛出异常。
public boolean add(E e) {
if (offer(e)) {
return true;
} else {
throw new IllegalStateException("Queue full");
}
}
-
首先,调用了
offer(e)
方法尝试向队列中添加元素。 -
如果成功添加元素,则返回
true
。 -
如果队列已满,
offer(e)
方法会返回false
,此时会抛出IllegalStateException
异常,提示队列已满。
5.4 put(e)方法
put(e)
方法会在加锁的情况下尝试向队列中添加元素,如果队列已满则会阻塞等待直到有空间可以插入元素,如果被中断则会抛出InterruptedException
异常。
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
-
首先,对传入的元素
e
进行空指针检查,如果为空则抛出NullPointerException
。 -
获取队列的
ReentrantLock
对象lock
,并使用lockInterruptibly
方法加锁,支持响应中断。 -
在加锁的情况下,如果队列已满,则使用
notFull
条件变量等待,直到队列有空间可以插入元素。 -
一旦有空间可以插入元素,调用
insert(e)
方法将元素e
插入队列。 -
最后,无论是否成功添加元素,最终都会释放锁。
6.移除元素
6.1 poll()方法
poll()
方法用于从队列中取出并移除头部的元素,如果队列为空,则返回null
。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
-
首先获取队列中的锁对象
lock
,并尝试获取锁。 -
判断队列元素数量
count
是否为0,如果为0则表示队列为空,直接返回null
。 -
如果队列不为空,则调用
dequeue()
方法来取出并返回队列头部的元素。 -
最后释放锁并返回取出的元素。
6.1.1 dequeue()方法
dequeue()
主要完成了从队列中取出元素的操作,并通过更新takeIndex
和count
来维护队列的状态,同时通过notFull.signal()
来唤醒可能阻塞在队列非满条件上的生产者线程。
/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
-
final Object[] items = this.items;
: 首先获取items
数组,该数组用于存放队列中的元素。 -
E x = (E) items[takeIndex];
: 将takeIndex
位置上的元素强制转换为泛型类型E
,并赋值给变量x
,表示要取出的元素。 3.items[takeIndex] = null;
: 将takeIndex
位置上的元素置为null
,表示该位置上的元素已经被取出。 -
if (++takeIndex == items.length) takeIndex = 0;
: 更新takeIndex
的值,如果takeIndex
超出了数组长度,则将其置为0,实现循环利用数组空间的目的。 -
count--;
: 减少队列的元素数量计数器count。 -
if (itrs != null) itrs.elementDequeued();
: 如果存在迭代器iterator
,则调用elementDequeued()
方法通知迭代器,表示有元素被取出。 -
notFull.signal();
: 发送信号通知处于等待状态的生产者线程,队列中已经有空间可以生产新元素了。 -
return x;
: 返回被取出的元素。
6.2 l(long timeout, TimeUnit unit)方法
poll(long timeout, TimeUnit unit)
方法用于从队列中取出并移除头部的元素,如果队列为空,则在指定的时间范围内等待元素可用。当超时时间到达后仍然没有可用元素,则返回null
。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
-
将传入的
timeout
参数转换为纳秒单位。 -
获取队列中的锁对象
lock
,并尝试获取锁(支持中断)。 -
当队列为空时,进入循环等待状态。如果
nanos
小于等于0,表示超时时间已到,直接返回null
。 -
使用
notEmpty.awaitNanos(nanos)
来等待非空条件满足,并根据等待时间的剩余纳秒数更新nanos
值。 -
当队列不为空时,调用
dequeue()
方法来取出并返回队列头部的元素。 -
最终释放锁并返回取出的元素。
6.3 remove()方法
remove()
方法用于移除并返回队列的头部元素。如果队列为空,则抛出NoSuchElementException
异常。
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
-
调用队列的
poll()
方法来尝试取出并返回队列的头部元素。 -
如果
poll()
方法返回的元素不为null
,表示成功取出了一个元素,则直接返回该元素。 -
如果
poll()
方法返回的元素为null
,表示队列为空,此时抛出NoSuchElementException
异常。
6.4 take()方法
take()
方法用于从队列中取出并移除头部的元素。如果队列为空,该方法将阻塞直到队列中有可用元素为止。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
-
获取队列中的锁对象
lock
,并尝试获取锁(支持中断)。 -
当队列为空时,进入循环等待状态,调用
notEmpty.await()
来等待非空条件满足,即等待队列中有元素可用。 -
当队列不为空时,调用
dequeue()
方法来取出并返回队列头部的元素。 -
最终释放锁并返回取出的元素。
7.检查元素
7.1 peek
在调用peek()
时,会返回队列头部的元素但不将其从队列中移除。如果队列为空,则返回null。
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
-
获取队列中的锁对象
lock
,并尝试获取锁。 -
调用
itemAt(int i)
获取索引为takeIndex
的元素。 -
最终释放锁。
7.2 itemAt
返回items
中索引为i
的元素
final E itemAt(int i) {
return (E) items[i];
}
7.3 element
element()
方法用于获取但不移除队列的头部元素。如果队列为空,则抛出NoSuchElementException
异常。
public E element() {
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
-
调用队列的
peek()
方法来获取但不移除队列的头部元素。 -
如果
peek()
方法返回的元素不为null
,表示成功获取了一个元素,则直接返回该元素。 -
如果
peek()
方法返回的元素为null
,表示队列为空,此时抛出NoSuchElementException
异常。
关注公众号:小黄学编程 回复:架构师 获取小黄收集的架构师资料