阻塞队列---BlockQueue
BlockingQueue是带阻塞功能的队列,继承了Queue接口,当执行入队操作时,如果队列满了,则阻塞调用者;当执行出队操作时,如果队列是空的,也阻塞调用者。
public interface BlockingQueue<E> extends Queue<E> {
//非阻塞入队,成功返回true,失败会抛出异常
boolean add(E e);
//非阻塞入队,成功返回true,失败返回false
boolean offer(E e);
//阻塞入队,会响应中断
void put(E e) throws InterruptedException;
//入队,若没有可用空间则等待指定时间,会响应中断
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
//阻塞出队,会响应中断
E take() throws InterruptedException;
//出队,若没有可用元素则等待指定时间,会响应中断
E poll(long timeout, TimeUnit unit) throws InterruptedException;
//返回剩余容量
int remainingCapacity();
//移除元素,移除成功返回true
boolean remove(Object o);
//是否包含元素,是返回true
public boolean contains(Object o);
//元素全部出队,添加到给定的集合中
int drainTo(Collection<? super E> c);
//元素最大出队maxElements个,添加到给定的集合中
int drainTo(Collection<? super E> c, int maxElements);
}
ArrayBlockingQueue
ArrayBlockingQueue 是一个用数组实现的循环队列,在构造函数中,会要求传入数组的容量,核心源码说明如下:
/** 存储元素的数组 */
final Object[] items;
/** 队头索引 */
int takeIndex;
/** 队尾索引 */
int putIndex;
/** 队列的元素个数 */
int count;
final ReentrantLock lock;
/** lock的非空条件变量 */
private final Condition notEmpty;
/** lock的非满条件变量 */
private final Condition notFull;
......
//阻塞式入队操作
public void put(E e) throws InterruptedException {
//空指针检查
checkNotNull(e);
final ReentrantLock lock = this.lock;
//可响应中断的加锁
lock.lockInterruptibly();
try {
//如果队列满了,则在非满条件上等待
while (count == items.length)
notFull.await();
//入队
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
//获取元素数组
final Object[] items = this.items;
//队尾位置放置x
items[putIndex] = x;
//如果队尾的下一个位置到达数组末尾,则置为0
if (++putIndex == items.length)
putIndex = 0;
//元素个数加1
count++;
//唤醒在非空条件上等待的线程
notEmpty.signal();
}
//阻塞式出队
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//可中断的上锁
lock.lockInterruptibly();
try {
//当队列为空时,在非空条件上等待
while (count == 0)
notEmpty.await();
//出队
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
//获取队头位置的元素
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
//队头位置置为null
items[takeIndex] = null;
//如果队头的下一个位置到达数组长度,则队头置为0
if (++takeIndex == items.length)
takeIndex = 0;
//元素个数减1
count--;
//迭代器链表不是空,则出队后更新迭代器状态
if (itrs != null)
itrs.elementDequeued();
//唤醒在非满条件等待的线程
notFull.signal();
return x;
}
LinkedBlockingQueue
LinkedBlockingQueue是用单向链表实现的阻塞队列。
核心源码如下
/**
* 元素节点
* @param <E> 元素类型
*/
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
/** 队列容量,构造函数没有传入时默认是0x7fffffff */
private final int capacity;
/** 当前元素数量,因为出队,入队用的是两把锁,所以需要原子类保证线程安全 */
private final AtomicInteger count = new AtomicInteger();
//队头指针
transient Node<E> head;
//队尾指针
private transient Node<E> last;
/** 出队锁 */
private final ReentrantLock takeLock = new ReentrantLock();
/** 非空条件变量 */
private final Condition notEmpty = takeLock.newCondition();
/** 入队锁 */
private final ReentrantLock putLock = new ReentrantLock();
/** 非满条件变量 */
private final Condition notFull = putLock.newCondition();
//阻塞式入队
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//队满则在非满条件上等待
while (count.get() == capacity) {
notFull.await();
}
//入队
enqueue(node);
//元素个数加1
c = count.getAndIncrement();
//通知其他进行put操作的线程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
//如果队列本来是空的,此次操作入队成功,则通知在非空条件等待的线程
if (c == 0)
signalNotEmpty();
}
private void signalNotEmpty() {
//获取出队锁才能操作非空条件变量
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
private void enqueue(Node<E> node) {
last = last.next = node;
}
//阻塞式出队
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//队列为空时,在非空条件上等待
while (count.get() == 0) {
notEmpty.await();
}
//出队
x = dequeue();
//元素个数减1
c = count.getAndDecrement();
//通知其他进行take操作的线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//如果队列本来是满的,此次操作出队成功,则通知在非满条件等待的线程
if (c == capacity)
signalNotFull();
return x;
}
private void signalNotFull() {
//获取出队锁才能操作非满条件变量
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
PriorityBlockingQueue
PriorityBlockingQueue叫做优先队列,是按照元素的优先级从小到大出队的。所以PriorityQueue中的2个元素之间可以比较大小,实现Comparable接口。
核心源码如下
/**
* 优先队列表示为一个平衡的二叉小根堆:队列[n]的两个子队列是队列[2*n+1]和队列[2*(n+1)]。
* 优先队列根据比较器排序,或者如果比较器为空,则根据元素的自然顺序排序:对于堆中的每个
* 节点n和n的每个后代d,有n <= d。假设队列非空,最小值的元素在queue[0]中
*/
private transient Object[] queue;
/**
* 队列元素的个数
*/
private transient int size;
/**
* 比较器,如果为空则使用元素的自然排序
*/
private transient Comparator<? super E> comparator;
private final ReentrantLock lock;
/**
* 非空的条件变量
*/
private final Condition notEmpty;
public void put(E e) {
offer(e); // never need to block
}
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
//元素个数超出数组长度,进行扩容
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)//没有比较器,使用元素自带的比较功能
siftUpComparable(n, e, array);//入堆
else//有比较器使用比较器排序
siftUpUsingComparator(n, e, array, cmp);
//元素个数加1
size = n + 1;
//唤醒在非空条件等待的线程
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
//如果出队为空元素,说明没有元素,则进行等待;否则返回出队元素
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
//二叉小根堆的堆顶即为要出队的元素
E result = (E) array[0];
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
//调整堆的结构
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
构造方法如果不传入大小,内部会默认数组长度为11,优先队列只有notEmpty条件,没有notFull条件,当元素个数超出数组长度时,执行扩容操作。
DelayQueue
即延迟队列,也就是一个按延迟时间从小到大出队的PriorityQueue。延迟时间,就是“未来将要执行的时间”-“当前时间”。放入DelayQueue中的元素,必须实现Delayed接口。
Delayed接口:
1.如果getDelay的返回值<=0,则说明该元素到期,需要从队列中拿出来执行。
2.该接口继承了Comparable 接口,需要实现Comparable接口的方法。
核心源码如下所示
private final transient ReentrantLock lock = new ReentrantLock();
//普通优先队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
//第一个等待延迟队列队头元素的线程
private Thread leader = null;
private final Condition available = lock.newCondition();
//入队
public void put(E e) {
offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//入队,放入二叉小根堆
q.offer(e);
//如果放入的元素在队头(堆顶),说明是延迟时间最小,则通知其他等待线程
//否则说明延迟时间不是最小,不做操作
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
//出队
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
//从优先队列取出堆顶元素(延迟时间最小的)
E first = q.peek();
//如果是空,说明队列为空,需要阻塞
if (first == null)
available.await();
else {
//获取延迟时间,如果<=0直接出队
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
//否则说明延迟时间未到,先释放队头元素的引用,然后查看是否有其他
//线程在等待队头元素,如果有,则阻塞
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {//否则当前线程是第一个获取队头元素的线程,等待有限的时间
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
//如果当前线程已获取了队头元素,则唤醒其他线程
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
关于延迟队列的take()函数,要注意以下两点:
(1)在队列为空 和 堆顶元素的延迟时间没到 这两种情况下,会阻塞。
(2)Thread leader变量记录了等待堆顶元素的第1个线程,如果当前线程是第一个等待队头元素的线程,则使用condition.awaitNanos()等待一个有限的时间;当发现还有其他线程也在等待堆顶元素(leader!=NULL)时,才需要使用condition.await()无限期等待。
SynchronousQueue
SynchronousQueue是一种特殊的BlockingQueue,它本身没有容量。先调put()/take(),线程会阻塞;直到另外一个线程调用了take()/put(),两个线程才同时解锁。
和锁一样,也有公平和非公平模式。如果是公平模式,则用TransferQueue类实现;如果是非公平模式,则用TransferStack类实现。
private transient volatile Transferer<E> transferer;
//......
public SynchronousQueue(boolean fair) {
transferer = fair ? new SynchronousQueue.TransferQueue<E>() : new SynchronousQueue.TransferStack<E>();
}
put和take方法如下所示,核心都是调用Transferer类的transfer方法。transfer有两种模式,分别是生产模式和消费模式,对应put和take。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
//第一个参数不为空时为生产模式(put),为空时为消费模式(take)
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
public E take() throws InterruptedException {
//第一个参数不为空时为生产模式(put),为空时为消费模式(take)
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
公平模式和非公平模式
假设3个线程分别调用了put(..),3个线程会进入阻塞状态,直到其他线程调用3次take(),和3个put(..)一一配对才解除阻塞。
如果是公平模式(队列模式),则第1个调用put(..)的线程1会在队列头部,第1个到来的take()线程和它进行配对,遵循先到先配对的原则;
如果是非公平模式(栈模式),则第3个调用put(..)的线程3会在栈顶,第1个到来的take()线程和它进行配对,遵循的是后到先配对的原则。
公平模式的实现
TransferQueue是以单向链表实现。
static final class TransferQueue<E> extends Transferer<E> {
/**
* 队列的节点,单向链表
*/
static final class QNode {
//队列下个节点
volatile QNode next;
//如果是put item不为null,如果是take,item为null
volatile Object item;
//put/take对应的阻塞线程
volatile Thread waiter;
//put操作为true,take为false
final boolean isData;
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
//......省略其他方法
}
//队头
transient volatile QNode head;
//队尾
transient volatile QNode tail;
transient volatile QNode cleanMe;
TransferQueue() {
//初始化一个空节点,队头队尾都指向它
QNode h = new QNode(null, false);
head = h;
tail = h;
}
E transfer(E e, boolean timed, long nanos) {
//这里有两种模式,分别是生产模式对应put,消费模式对应take
//两种模式不能同时存在于队列中,它们会一旦相遇就会配对出队
QNode s = null;
//当前线程的模式
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // 队头队尾有一个为null则说明未初始化,重新循环
continue;
if (h == t || t.isData == isData) { // 队列为空或当前线程的模式和队尾元素为同一种模式
QNode tn = t.next;
if (t != tail) // 不一致读,则重新循环
continue;
if (tn != null) { //tn不是空,说明当前队尾后面还有元素
advanceTail(t, tn); //CAS操作,将队尾指针由t的地址换成tn的地址(后移队尾指针)
continue; //继续重新循环
}
if (timed && nanos <= 0) // take和put不会进入此分支
return null;
if (s == null) //传入的元素新建一个QNode s
s = new QNode(e, isData);
if (!t.casNext(null, s)) //CAS操作 将s放到t的next属性,也就是加入队尾,如果失败则重新循环
continue;
advanceTail(t, s); //CAS操作 后移队尾指针
Object x = awaitFulfill(s, e, timed, nanos); //进入阻塞状态,直到s配对成功
if (x == s) {
clean(t, s);
return null;
}
if (!s.isOffList()) { // 唤醒之后,s还在队列中且为第一个元素
advanceHead(t, s); // 后移队头指针
if (x != null)
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
} else { // 配对模式
QNode m = h.next; // 取队列中第一个元素
if (t != tail || m == null || h != head)
continue; // 不一致读重新for循环
Object x = m.item;
if (isData == (x != null) || // m已经配对过
x == m ||
!m.casItem(x, e)) { // 尝试配对失败
advanceHead(h, m); // 队头直接出队
continue;
}
advanceHead(h, m); // 配对成功,出队
LockSupport.unpark(m.waiter); //唤醒队列中与第一个元素对应的线程
return (x != null) ? (E)x : e;
}
}
}
}
非公平模式实现
TransferStack也是一个单向链表,链表中的节点有三种状态,REQUEST对应take节点,DATA对应put节点,二者配对之后,会生成一个FULFILLING节点,入栈,然后FULLING节点和被配对的节点一起出栈。
static final class TransferStack<E> extends SynchronousQueue.Transferer<E> {
/* 节点的模式 */
/** 消费模式 */
static final int REQUEST = 0;
/** 生产模式 */
static final int DATA = 1;
/** 正在配对模式 */
static final int FULFILLING = 2;
/** 链表的节点 */
static final class SNode {
volatile SNode next; // 栈的下个节点
volatile SNode match; // 配对的节点
volatile Thread waiter; // 对应的阻塞线程
Object item; // data; or null for REQUESTs
int mode; //三种模式
SNode(Object item) {
this.item = item;
}
//......省略其他方法
}
/** 栈顶指针 */
volatile SNode head;
static SNode snode(SNode s, Object e, SNode next, int mode) {
if (s == null) s = new SNode(e);
s.mode = mode;
s.next = next;
return s;
}
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
SNode s = null;
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
if (h == null || h.mode == mode) { // 栈为空或同种模式
if (timed && nanos <= 0) { // take和put不会进入这个分支
if (h != null && h.isCancelled())
casHead(h, h.next);
else
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) {//新元素创建节点入栈
SNode m = awaitFulfill(s, timed, nanos); //阻塞等待,直到s配对成功
if (m == s) {
clean(s);
return null;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next);
return (E) ((mode == REQUEST) ? m.item : s.item);
}
} else if (!isFulfilling(h.mode)) { // 不是同一种模式,待配对
if (h.isCancelled())
casHead(h, h.next);
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {//生成fulfilling节点,入栈
for (;;) { // 循环,直到匹配成功或者等待线程=null
SNode m = s.next; // m和s进行配对
if (m == null) { // 栈中只剩s元素了
casHead(s, null); // 弹出fulfilling节点
s = null; // s引用置空以便下次使用
break; // 继续外层循环
}
SNode mn = m.next;
if (m.tryMatch(s)) { //m和s尝试配对
casHead(s, mn); // s和m一起出栈,栈顶指针指向mn
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // 配对失败
s.casNext(m, mn); // CAS操作,s.next字段由m指向mn,继续循环匹配直到匹配成功
}
}
} else { // 已经配对过了,出栈
SNode m = h.next; // m和h进行配对
if (m == null) // 栈中只剩h元素
casHead(h, null); // head=null,弹出h节点
else {
SNode mn = m.next;
if (m.tryMatch(h)) // 配对成功
casHead(h, mn); // h和m一起出栈,栈顶指针指向mn
else // 配对失败
h.casNext(m, mn); // CAS操作,h.next字段由m指向mn
}
}
}
}
}