PriorityBlockingQueue
- 优先级队列
- 不满足FIFO原则
- 它将插入元素进行排序
- 排序的实现是基于数组结构实现的二叉堆排序
二叉堆
在分析优先级别队列时候,需要了解一下二叉堆是什么
- 二叉堆是一种完全二叉树,除了最底层外,其它层被完全填充。
- 二叉堆分为最小堆和最大堆
- 最小堆:任何一个父节点总是小于等于其任意子节点的值
- 最大堆:任何一个父节点宗师大于等于其任意节点的值
图解
- 延迟优先级队列基于优先级队列实现
- 优先级队列实现基于最小顶二叉堆实现
- 存元素时候,基于二叉堆排序
- 再存取数据时候,通过上下移动操作保证堆结构
源码分析
成员变量
private static final int DEFAULT_INITIAL_CAPACITY = 11; //队列(数组)默认大小
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;//最大值,减8为了适配不同版本虚拟机
private transient Object[] queue;//存储元素的数组
private transient int size;//队列长度
private transient Comparator<? super E> comparator;//排序比较器
private final ReentrantLock lock = new ReentrantLock();//锁
private final Condition notEmpty = lock.newCondition();//挂起、唤醒线程
private transient volatile int allocationSpinLock;//标识位,代替加锁操作,提升效率
private PriorityQueue<E> q;//优先级队列,这里是为了解决序列化和反序列化使用的
注意:在PriorityBlackQueue中并没有基于PriorityQueue实现而是基于private transient Object[] queue自己实现的
添加元素
offer(E e)方法
public boolean offer(E e) {
//非空校验
if (e == null)
throw new NullPointerException();
//获取锁
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;//队列元素数量、队列长度
Object[] es;//存储元素的数组
//当元素个数大于等于队列长度时候扩容
while ((n = size) >= (cap = (es = queue).length))
//扩容
tryGrow(es, cap);
//否则添加元素
try {
final Comparator<? super E> cmp;
//这里是判断比较器,如果为null是应用默认的比较器
if ((cmp = comparator) == null)
//添加元素
siftUpComparable(n, e, es);
else
//添加元素
siftUpUsingComparator(n, e, es, cmp);
size = n + 1;
//唤醒生产者线程
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
tryGrow(es, cap)扩容
//传入就旧数组和队列长度
private void tryGrow(Object[] array, int oldCap) {
//释放锁,基于allocationSpinLock保证线程安全
lock.unlock();
//声明一个新数组
Object[] newArray = null;
//这里allocationSpinLock为0证明没有线程正在扩容数组,并基于CAS将0改为1,保证其它线程不能再进行扩容
if (allocationSpinLock == 0 &&
ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) {
try {
//如果原先数组长度小于64,扩容为原先长度一倍+1,否则扩容为原理长度的1.5倍
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
//如果新数组长度比最大长度大
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
//对原先素组进行加一操作,如果加一后小于零或大于最大长度了,抛异常,超了~~
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
//否则直接设置为最大长度
newCap = MAX_ARRAY_SIZE;
}
//这里保证数组没有被其它线程扩容过
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
//allocationSpinLock修改为0
allocationSpinLock = 0;
}
}
//执行到这里证明当前线程没有进行扩容操作
if (newArray == null) // back off if another thread is allocating
Thread.yield();
//加锁
lock.lock();
//再次判断,扩容成功了,且没有并发问题,将新数组引用复制给queue,将老数组元素复制到新数组中
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
siftUpUsingComparator添加元素的操作
//传入元素的个数size(要放数据的索引位置),元素T,存储元素的数组queue
private static <T> void siftUpComparable(int k, T x, Object[] es) {
//将传入元素强转为Comparable,因此元素必须实现Comparable接口
Comparable<? super T> key = (Comparable<? super T>) x;
//一直循环,直到下标为0或者,找到一个比当前元素小的节点退出循环
while (k > 0) {
//找到当前节点的父节点
int parent = (k - 1) >>> 1;
//拿到父节点元素
Object e = es[parent];
//判断传入元素是否大于其父节点元素,如果大于直接退出while循环
if (key.compareTo((T) e) >= 0)
break;
//如果不满足上面if条件,将k位置元素存放父节点元素,k修改为夫父节点下标
//其实就是将父节点下移操作
es[k] = e;
k = parent;
}
//数组k位置存储传入元素
es[k] = key;
}
移除元素操作
poll()方法,关键看dequeue()方法如何保证堆结构
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
移除堆顶元素
private E dequeue() {
final Object[] es;//存储元素的数组
final E result;//数组第0位置元素,即堆顶元素
//如果队列不为空
if ((result = (E) ((es = queue)[0])) != null) {
final int n;//数组长度-1后的长度
//拿到数组的最后一个元素
final E x = (E) es[(n = --size)];
//将堆顶元素置为null
es[n] = null;
//判断移除元素后队列是否为空,如果不为空,操作堆,保证堆结构
if (n > 0) {
final Comparator<? super E> cmp;
//如果没有指定比较器使用默认比较器
if ((cmp = comparator) == null)
siftDownComparable(0, x, es, n);
else
siftDownUsingComparator(0, x, es, n, cmp);
}
}
return result;
}
siftDownComparable移除元素保证堆结构
//传入:被取出元素的下标,数组的最后一个元素,存储元素的数组,取出元素后的数组长度
private static <T> void siftDownComparable(int k, T x, Object[] es, int n) {
//将元素强转为Comparable
Comparable<? super T> key = (Comparable<? super T>)x;
//只遍历一个叶子树就可以保证二叉堆结构
int half = n >>> 1; // loop while a non-leaf
//下移操作
while (k < half) {
//找到左叶子节点
int child = (k << 1) + 1; // assume left child is least
//坐子节点元素
Object c = es[child];
//右子节点
int right = child + 1;
//有右子节点,且左子节点大于右子节点
if (right < n &&
((Comparable<? super T>) c).compareTo((T) es[right]) > 0)
//如果左节点大于右边节点,c=右边节点,这里就是找到左右节点中较小的那个节点
c = es[child = right];
if (key.compareTo((T) c) <= 0)
break;
//将较小的节点放堆顶
es[k] = c;
//继续遍历下方叶子树
k = child;
}
//最后将k位置存放最后索引的元素
es[k] = key;
}