一、ArrayBlockingQueue源码剖析
ArrayBlockingQueue底层是采用数组实现的一个队列。因为底层是数据,一般被成为有界队列、其阻塞模式是基于ReentrantLock来实现的。
// 存数据操作 add(E),offer(E),put(E),offer(E,time,unit) // add(E):添加数据到队列,如果满了,扔异常。 // offer(E):添加数据到队列,如果满了,返回false // put(E):添加数据到队列,如果满了,线程挂起 // offer(E,time,unit):添加数据到队列,如果满了,线程挂起一段时间
// 取数据操作 remove(),poll(),take(),poll(time,unit) // remove():从队列拿数据,拿到返回,拿到null,甩异常 // poll():从队列拿数据,拿到返回,拿到null,也返回 // take():从队列拿数据,拿到返回,没数据,一直阻塞 // poll(time,unit):从队列拿数据,拿到返回,没数据,阻塞time时间
1.1 ArrayBlockingQueue应用
public static void main(String[] args) throws InterruptedException, BrokenBarrierException, IOException {
// ArrayBlockingQueue,因为底层使用数组,必须要指定数组的长度,作为队列的长度
ArrayBlockingQueue queue = new ArrayBlockingQueue(1);
// 存数据操作 add(E),offer(E),put(E),offer(E,time,unit)
// add(E):添加数据到队列,如果满了,扔异常。
// offer(E):添加数据到队列,如果满了,返回false
// put(E):添加数据到队列,如果满了,线程挂起
// offer(E,time,unit):添加数据到队列,如果满了,线程挂起一段时间
// 取数据操作 remove(),poll(),take(),poll(time,unit)
// remove():从队列拿数据,拿到返回,拿到null,甩异常
// poll():从队列拿数据,拿到返回,拿到null,也返回
// take():从队列拿数据,拿到返回,没数据,一直阻塞
// poll(time,unit):从队列拿数据,拿到返回,没数据,阻塞time时间
}
1.2 ArrayBlockingQueue核心属性
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/**
* Serialization ID. This class relies on default serialization
* even for the items array, which is default-serialized, even if
* it is empty. Otherwise it could not be declared final, which is
* necessary here.
*/
private static final long serialVersionUID = -817911632652898426L;
/** The queued items */
// 真正存放数据的数组
final Object[] items;
/** items index for next take, poll, peek or remove */
// 下一个拿数据的下标
int takeIndex;
/** items index for next put, offer, or add */
// 下一个存放数据的下标
int putIndex;
/** Number of elements in the queue */
// 存放数据的数量
int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
// 锁
final ReentrantLock lock;
/** Condition for waiting takes */
// 通知读线程获取数据的condition
private final Condition notEmpty;
/** Condition for waiting puts */
// 通知写线程写数据的condition
private final Condition notFull;
}
1.3 存数据源码剖析
offer在存放数据的时候,如果此时数组存放已满则直接返回false。
offer(E e, long timeout, TimeUnit unit)添加时,先判断队列满了没,满了先阻塞time时间,自动唤醒,还是满的,也返回false。
put,添加时,先判断队列满了没,满了就阻塞,阻塞到被唤醒,或者被中断。
// 存数据
public boolean offer(E e) {
// 非空校验
checkNotNull(e);
// 互斥锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果数组中的数据已经达到了数组的长度,没地儿了~,队列满了
if (count == items.length)
return false;
else {
// 还有位置
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
// 存放数据到数组中
private void enqueue(E x) {
// 拿到数组
final Object[] items = this.items;
// 数组放进去
items[putIndex] = x;
// 把put指针++, 指针是否已经到了最后一个位置,归位到0位置。
if (++putIndex == items.length)
// 归位到0位置。
putIndex = 0;
// 数据条数 + 1
count++;
// 唤醒在阻塞的取数据线程
notEmpty.signal();
}
// put方法
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
// offer方法,可以阻塞一段时间
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
public boolean add(E e) {
// 调用父类的添加方法
return super.add(e);
}
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
1.4 取数据源码剖析
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// count == 0代表没数据, 就返回null,有数据走dequeue
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
// 从数组中那数据
private E dequeue() {
final Object[] items = this.items;
// 取数据
E x = (E) items[takeIndex];
// 将取完的位置置位null
items[takeIndex] = null;
// take指针++,如果到头,归位0~~
if (++takeIndex == items.length)
takeIndex = 0;
// 数据条数 - 1
count--;
// 唤醒队列满的时候,阻塞住的写线程
notFull.signal();
return x;
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await(); // 挂起线程,需要被唤醒
return dequeue();
} finally {
lock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos); // 挂起线程,到时间自动唤醒、或者被手动唤醒
}
return dequeue();
} finally {
lock.unlock();
}
}
二、LinkedBlockingQueue源码剖析
底层基于链表实现的,会将每个元素封装为Node,Node有当前值,还有一个next指针,一般成为无界队列。
LinkedBlockingQueue本质就是一个用Node封装的单向链表。 LinkedBlockingQueue内部提供了读锁和写锁,读写不互斥,而且记录数据条数的属性是Atomic原子类。
2.1 LinkedBlockingQueue核心属性
/**
* 阻塞队列元素会被封装为Node
*/
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
/** 指定队列的长度,如果不传值,默认为Integer.MAX */
private final int capacity;
/** 记录数据条数 */
private final AtomicInteger count = new AtomicInteger();
transient Node<E> head;
private transient Node<E> last;
/** 读锁 */
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
/** 写锁 */
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
2.2 存数据源码剖析
// 写操作~
public boolean offer(E e) {
// 非空
if (e == null) throw new NullPointerException();
// 拿到count(记录当前数据条数)
final AtomicInteger count = this.count;
// 如果count达到了最大值
if (count.get() == capacity)
// 数据满了。
return false;
// 声明c
int c = -1;
// 将当前数据封装为Node
Node<E> node = new Node<E>(e);
// 添加写锁~
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// !!DCL!!
// 再次拿到条数判断,如果还有空间,enqueue存数据
if (count.get() < capacity) {
// 数据放进来
enqueue(node);
// 拿到count,再自增
c = count.getAndIncrement();
// 添加完数据之后,长度依然小于最大长度,唤醒可能阻塞的写线程
// 读写不互斥,可能前面在执行时,队列是满的,但是读操作依然在进行
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
// c == 0,说明添加数据之前,队列是空的,唤醒可能阻塞的读线程
if (c == 0)
signalNotEmpty();
// 返回count >= 0
return c >= 0;
}
// 插入数据到链表~~~
private void enqueue(Node<E> node) {
last = last.next = node;
}
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
2.3 读数据源码剖析
public E poll() {
final AtomicInteger count = this.count;
// 为0,没数据,拜拜~~
if (count.get() == 0)
return null;
E x = null;
int c = -1;
// 读锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 如果队列有数据 DCL
if (count.get() > 0) {
x = dequeue();
// count --
c = count.getAndDecrement();
if (c > 1)
// c > 1,说明还有数据,唤醒读线程
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
// 到这说明还有位置呢,唤醒写线程
signalNotFull();
return x;
}
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
三、PriorityQueue
这个就是一个普通的队列,不是阻塞的。因为DelayQueue和PriorityBlockingQueue都和PriorityQueue有关系,很类似。先把PriorityQueue搞定,后续再看其他的优先级阻塞队列,效果更佳!PriorityQueue才是真正而定无界队列。底层是数组实现,会扩容!PriorityQueue实现优先级的方式,是基于二叉堆实现的。
二叉堆:
-
二叉堆是一颗完整的二叉树
-
任意一个节点大于父节点 或者 小于父节点
因为这个二叉堆是实现优先级队列的原理,那么队列或有添加和获取的操作,这种操作会影响二叉堆的结构,查看PriorityQueue队列的添加和获取操作如何保证结构 。
3.1 添加操作源码剖析
// 优先级队列添加操作,确定如何保证小顶堆结构
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
modCount++;
// size是数组数据条数,大于等于数组长度后,需要扩容
int i = size;
if (i >= queue.length)
// Double size if small; else grow by 50%
grow(i + 1);
// size + i,数据多一条
size = i + 1;
// 如果i == 0,说明添加的是第一个数据
if (i == 0)
queue[0] = e;
else
// 不是第一个数据,Up上移保证结构
siftUp(i, e);
return true;
}
// 让当前节点和父节点比较,如果当前节点比较小,就上移~~~
private void siftUpUsingComparator(int k, E x) {
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = queue[parent];
if (comparator.compare(x, (E) e) >= 0)
break;
queue[k] = e;
k = parent;
}
queue[k] = x;
}
3.2 取数据如何保证二叉堆结构
// 取堆顶数据
public E poll() {
// 没有数据返回null
if (size == 0)
return null;
// 最后一个数据的索引
int s = --size;
// 需要全都的数据
E result = (E) queue[0];
// 取出最后一个数据
E x = (E) queue[s];
// 将最后一个数据置位null
queue[s] = null;
if (s != 0)
// 下移保证安全
siftDown(0, x);
return result;
}
// 堆顶数据下移,知道last数据可以存放的位置,然后替换即可
private void siftDownUsingComparator(int k, E x) {
while (k < half) {
int child = (k << 1) + 1;
// 找到左子
Object c = queue[child];
int right = child + 1;
if (right < size &&
comparator.compare((E) c, (E) queue[right]) > 0)
c = queue[child = right];
if (comparator.compare(x, (E) c) <= 0)
break;
queue[k] = c;
k = child;
}
queue[k] = x;
}
四、PriorityBlockingQueue
这个阻塞的优先级队列的实现跟PriorityQueue基本一模一样,只是PriorityBlockingQueue基于Lock锁实现的多线程操作安全并且线程可以挂起阻塞的操作 。
PriorityBlockingQueue底层基于数组,并且可以扩容,不会基于condition挂起线程,读会阻塞。
4.1 写操作
因为底层基于数组,并且可以扩容,所以写操作的put和poll(time,unit)的方式不会基于condition挂起线程。
并且是多线程基于CAS的方式争抢扩容的标识。
// 所有添加都走着,没有await挂起的方式,
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
// 扩容,允许多线程并发扩容。一会看~~~
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
//添加数据到二叉堆
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
// 唤醒读线程
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
// 跟PriorityQueue一样的上移操作
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (key.compareTo((T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = key;
}
// 尝试扩容
private void tryGrow(Object[] array, int oldCap) {
// 允许多线程并发扩容的。(不是协助扩容),但是只有一个线程会成功,基于CAS的方式,避免并发问题
lock.unlock();
Object[] newArray = null;
// 线程将allocationSpinLock从0改为1,得到了扩容的权利,可以创建新数组
if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,0, 1)) {
try {
// 计算新数组长度
int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : (oldCap >> 1));
// 判断长度是否超过界限
if (newCap - MAX_ARRAY_SIZE > 0) {
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
// 创建新数组
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
if (newArray == null)
// 如果newArray是null,说明当前线程没有执行扩容操作
// 让出CPU时间片,尽量让扩容的线程先走完扩容操作
Thread.yield();
lock.lock();
if (newArray != null && queue == array) {
queue = newArray;
// 扩容结束
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
4.2 读操作源码剖析
PriorityBlockingQueue的读操作,是允许使用condition挂起的,因为二叉堆可能没有数据。没有数据,就挂起。
public E poll() {
// 基于lock锁保证安全,
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
// 拿到堆顶数据
E result = (E) array[0];
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
// 保证结构,下移~~
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null && nanos > 0)
nanos = notEmpty.awaitNanos(nanos);
} finally {
lock.unlock();
}
return result;
}