文章目录
- 1、BlockingQueue
- 1)接口方法
- 2)阻塞队列分类
- 2、ArrayBlockingQueue
- 1)构造函数
- 2)put()入队
- 3)take()出队
- 3、LinkedBlockingQueue
- 1)构造函数
- 2)put()入队
- 3)take()出队
1、BlockingQueue
BlockingQueue是JUC包下提供的一个阻塞队列 接口;
1)接口方法
队列操作
- 抛出异常:add(e)、remove()、element()
- 返回特定值:offer()队尾入队/poll()删除队头元素/peek()
- 一直阻塞:put(e)/take()
- 超时退出:offer(e,time,unit)/poll(time,unit)
其中:BlockingQueue 不接受 null 元素。试图 add 、 put 或 offer ⼀个 null 元素时,某些实现会抛出 NullPointerException 。
2)阻塞队列分类
-
ArrayBlockingQueue:由数组结构组成的有界阻塞队列。
-
LinkedBlockingQueue:由链表组成的有界阻塞队列。
-
PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列(默认升序排序)。
-
DelayQueue:支持延时获取元素的无界阻塞队列。
队列使用PriorityQueue来实现。
-
SynchronousQueue:一个不存储元素的阻塞队列。
-
LinkedTransferQueue:由链表组成的无界阻塞队列。
其多了tryTransfer()方法和transfer()方法。
- transfer():可以把生产者传入的元素立刻传输给消费者。如果没有consumer在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。
- tryTransfer():用来试探生产者传入的元素是否能直接传给消费者。不管是否有consumer正在等待接收元素,都立刻返回。
2、ArrayBlockingQueue
ArrayBlockingQueue是由数组结构组成的有界阻塞队列。通过ReentrantLock保证线程安全、并实现 Producer-Consumer模式。
1)构造函数
从构造函数可知,默认采用非公平锁;
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
// 底层使用对象数组保存元素
this.items = new Object[capacity];
// 初始化需要加锁使用的ReentrantLock实例,默认采用非公平锁
lock = new ReentrantLock(fair);
/**
* 判断队列是 空 or 满
* notEmpty⽤于执⾏take时进⾏await()等待操作,put时进⾏signal()唤醒操作
* notFull⽤于执⾏take时进⾏signal()唤醒操作,put时进⾏await()等待操作
*/
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
方法释义:
- 构造函数的入参capacity指定了底层存储元素数组⻓度的⼤⼩;
- 初始化需要加锁使⽤的ReentrantLock实例,默认采⽤的是⾮公平锁;
- 基于Lock的Condition判断队列是 空 or 满
- notEmpty⽤于执⾏take时进⾏await()等待操作,put时进⾏signal()唤醒操作;
- notFull⽤于执⾏take时进⾏signal()唤醒操作,put时进⾏await()等待操作;
2)put()入队
public void put(E e) throws InterruptedException {
// 入参不能为空
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 加可中断锁
lock.lockInterruptibly();
try {
while (count == items.length)
// 队列满了,则阻塞等待signal唤醒,同时释放次有的锁。
notFull.await();
// 入队操作
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
// 如果数据已经插入到数组末尾,则重置putIndex为0,从0开始继续插入。
if (++putIndex == items.length)
putIndex = 0;
count++;
// 通知take线程解除阻塞
notEmpty.signal();
}
方法释义:
- ⾸先尝试获得可中断锁,即:lock.lockInterruptibly(),当执⾏interrupt操作时,该锁可以被中断。
- 如果数组中元素的个数(count)等于数组的⻓度了,说明队列已经满了,在该线程上执⾏等待操作:
notFull.await();
,等待signal唤醒。- 如果队列没有满,则调⽤enqueue(e)⽅法执⾏⼊列操作;
- ⼊列操作⾸先会将待插⼊值x放⼊数组下标为putIndex的位置上,然后再将putIndex加1,来指向下⼀次插⼊的下标位置。
- 如果加1后的putIndex等于了数组的⻓度,那么说明已经越界了(因为putIndex是从0开始的);则
做循环式插⼊
,重置putIndex为0,从0开始继续插入。- 最后,执⾏count++来计算元素总个数;并调⽤notEmpty.signal()⽅法来解除阻塞;
- 当队列为空的时候,执⾏take⽅法会被notEmpty.await()阻塞;
- 此处就是通知take线程解除阻塞。
3)take()出队
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
// 如果队列为空,则线程阻塞 等待signal唤醒,释放持有的锁
notEmpty.await();
// 执行出队操作
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// 获取takeIndex下标的元素
E x = (E) items[takeIndex];
// 将takeIndex下标下的袁术置为null,便于后面GC回收
items[takeIndex] = null;
// 如果出队操作的到了数组末尾,则重置takeIndex,从0开始继续取出
if (++takeIndex == items.length)
takeIndex = 0;
count--;
// 默认itrs为null,不会走进if代码段。
if (itrs != null)
itrs.elementDequeued();
// 通知put线程解除阻塞
notFull.signal();
return x;
}
方法释义:
take⽅法和put⽅法类似,区别是出队的指针是takeIndex;
⾸先尝试获得可中断锁,即:lock.lockInterruptibly(),当执⾏interrupt操作时,该锁可以被中断。
如果队列中为空;
执⾏notEmpty.await()
将线程阻塞 等待signal唤醒,释放持有的锁。
- 当调⽤put⽅法向队列中放⼊元素之后 ,会调⽤notEmpty.signal⽅法对等待的线程执⾏唤醒操作;
如果出队操作的到了数组末尾,则重置takeIndex,从0开始继续取出;
出队执⾏完毕后,调⽤
notFull.signal
⽅法来唤醒在notFull上⾯await的线程。 通知put线程解除阻塞。
3、LinkedBlockingQueue
LinkedBlockingQueue是由链表结构组成的有界阻塞队列。通过ReentrantLock保证线程安全、并实现 Producer-Consumer模式。
1)构造函数
如果不指定容量,则默认LinkedBlockingQueue是无界阻塞队列(capacity = Integer.MAX_VALUE)
构造函数中会创建⼀个空的节点,作为整个链表的头节点。
2)put()入队
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
整个put()流程和ArrayBlockingQueue基本一致,对于链表容量的统计会额外采用一个AtomiceInteger类型的变量count维护。
-
- 最后唤醒put()线程的代码段上有一个
c == 0
的判断,这里的c是入队操作之前的数量。
- 最后唤醒put()线程的代码段上有一个
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
这里有个比较有意思的点:
- 入队操作添加完元素之后,如果发现当前队列的元素数量还没到最大容量,则尝试唤醒其他put()操作阻塞的线程;
if (c + 1 < capacity)
notFull.signal();
3)take()出队
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
整个take()流程和ArrayBlockingQueue基本一致,稍微看一下即可。
- 最后唤醒take()线程的代码段上有一个
c == capacity
的判断,这里的c是出队操作之前的数量。
和LinkedBlockingQueue的put()操作一样:
- 出队操作移除完元素之后,如果发现当前队列的元素数量 > 1,则尝试唤醒其他take()操作阻塞的线程;
if (c > 1)
notEmpty.signal();