文章目录
- 1、PriorityBlockingQueue
- 2、DelayQueue
1、PriorityBlockingQueue
优先级阻塞队列就是在优先级队列的基础上增加队列排序的功能,将高优先级排在前面,所以优先级队列的元素需要实现Comparator接口。
如果数据结构用数组去维护队列的话,要么在put有大量的后移操作,要么在take有大量的前移操作。
为避免这个问题优先级队列内部用二叉堆的数据结构去实现,这样无论是put还是take都不会有大量的移动操作。具体逻辑如下:
put:如果优先级比父节点高就上浮,依次类推,直至不能再上浮
take:直接拿走堆顶元素,然后再用最后一个元素顶上堆顶,再根据优先级下沉该元素
不懂二叉堆的,可以先去查阅一下二叉堆或者堆排序。
虽然二叉堆不是完全有序的,但可以保证堆顶元素的优先级肯定是最高的。
put
public void put(E e) {
offer(e); //优先级队列是无界的,所以不需要阻塞,直接调用offer
}
//入队
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
//加锁
lock.lock();
int n, cap;
Object[] es;
//扩容
//先释放锁,开辟新数组(长度= size<64? size+2 : size*1.5。)
//之后再重新加锁复制到新数组
while ((n = size) >= (cap = (es = queue).length))
tryGrow(es, cap);
try {
final Comparator<? super E> cmp;
//如果没有传入比较器就用默认的
if ((cmp = comparator) == null)
//二叉堆节点上浮
siftUpComparable(n, e, es);
else
siftUpUsingComparator(n, e, es, cmp);
size = n + 1;
//有元素了,不为空,notEmpty.signal
notEmpty.signal();
} finally {
//释放锁
lock.unlock();
}
return true;
}
//上浮
private static <T> void siftUpComparable(int k, T x, Object[] es) {
Comparable<? super T> key = (Comparable<? super T>) x;
//如果k=0,说明上浮到堆顶了
while (k > 0) {
//二叉堆父节点在数据的下标=(当前节点下标-1)/2
int parent = (k - 1) >>> 1;
Object e = es[parent];
//不能再上浮了就break
if (key.compareTo((T) e) >= 0)
break;
//可以上浮,交换当前节点和父节点
es[k] = e;
k = parent;
}
es[k] = key;
}
take
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//加锁
lock.lockInterruptibly();
E result;
try {
//dequeue出队一个元素
while ( (result = dequeue()) == null)
//队列为空,notEmpty.await 阻塞
notEmpty.await();
} finally {
//释放锁
lock.unlock();
}
return result;
}
//出队
private E dequeue() {
final Object[] es;
final E result;
//堆顶记入result
if ((result = (E) ((es = queue)[0])) != null) {
final int n;
//--size并将最后一个元素记入x后清空(赋值null)
final E x = (E) es[(n = --size)];
es[n] = null;
if (n > 0) {
final Comparator<? super E> cmp;
//如果没有传入比较器就用默认的
if ((cmp = comparator) == null)
//二叉堆节点下沉
siftDownComparable(0, x, es, n);
else
siftDownUsingComparator(0, x, es, n, cmp);
}
}
//返回堆顶
return result;
}
//下沉
private static <T> void siftDownComparable(int k, T x, Object[] es, int n) {
Comparable<? super T> key = (Comparable<? super T>)x;
//假如这里是 (n-1)/2 就是(前面被清空的那个)最后一个元素x的父节点下标
//但这里是 (n/2),意味着当最后一个元素x是左孩子节点的话half就是父节点下标,右孩子节点的话half就是父节点下标+1
int half = n >>> 1;
//k>=half说明下沉到底了
while (k < half) {
//n*2+1就是左孩子节点
int child = (k << 1) + 1;
Object c = es[child];
//右孩子节点=左孩子节点+1
int right = child + 1;
//在两个孩子节点中取优先级比较高去跟下沉节点比较
if (right < n &&
((Comparable<? super T>) c).compareTo((T) es[right]) > 0)
c = es[child = right];
//如果不能再下沉了就break
if (key.compareTo((T) c) <= 0)
break;
//可以下沉,交换当前节点和被选中的孩子节点
es[k] = c;
k = child;
}
//最后别忘了替换下沉堆顶的值为被清空的最后一个元素x
es[k] = key;
}
2、DelayQueue
而延迟队列是在优先级队列的基础上实现的(优先级按延迟时间排序),其内部维护了一个优先级队列。
注:是优先级队列而不是优先级阻塞队列,这两者的区别在于有无阻塞。
为什么要用优先级队列不用优先级阻塞队列?
因为不适用。延迟队列的put不需要阻塞,而take则是需要自己实现一个根据延迟时间来阻塞的逻辑。
put
public void put(E e) {
offer(e); //因为不需要阻塞,所以直接调用offer即可
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
//加锁
lock.lock();
try {
//优先级队列 PriorityQueue.offer() 入队
q.offer(e);
//如果入队后是第一个元素,需要更新leader的阻塞时间
if (q.peek() == e) {
//清空leader,leader记录带超时时间等待阻塞队列头节点的线程(只有一个)
leader = null;
//唤醒所有正在等待的线程重新take(自旋),以更新leader的阻塞时间,同时leader也可能会变
available.signal();
}
return true;
} finally {
//释放锁
lock.unlock();
}
}
take
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//加锁
lock.lockInterruptibly();
try {
for (;;) {
//第一个节点
E first = q.peek();
//如果为空阻塞,available.await()
if (first == null)
available.await();
else {
//如果延迟时间到了,出队
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0L)
return q.poll();
first = null;
//下面的就是延迟时间还没到
//如果已经有leader了,就不带超时时间的阻塞,后续由leader唤醒
if (leader != null)
available.await();
//如果还没有leader,就记录leader是当前线程带并超时时间的阻塞
else {
//记录leader是当前线程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//带超时时间的阻塞
available.awaitNanos(delay);
} finally {
//阻塞时间到了,清空leader
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
//如果leader为空(leader的阻塞时间已到,此时leader已经获取到资源了)
//且队列中还有资源
//就唤醒后面等待的线程
if (leader == null && q.peek() != null)
available.signal();
//释放锁
lock.unlock();
}
}