文章目录
- 1、ArrayBlockingQueue
- 2、LinkedBlockingQueue
- 3、SynchronousQueue
- 3.1、transfer 公平实现(队列)
- 3.2、transfer 非公平实现(栈)
1、ArrayBlockingQueue
put
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
//加锁
lock.lockInterruptibly();
try {
//队列已满,不满足notFull条件,notFull.await
while (count == items.length)
notFull.await();
//队列未满,入队
enqueue(e);
} finally {
//解锁
lock.unlock();
}
}
private void enqueue(E e) {
//入队
final Object[] items = this.items;
items[putIndex] = e;
if (++putIndex == items.length) putIndex = 0;
count++;
//满足notEmpty条件,notEmpty.signal
notEmpty.signal();
}
take
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//解锁
lock.lockInterruptibly();
try {
//队列为空,不满足notEmpty条件,notEmpty.await
while (count == 0)
notEmpty.await();
//出队
return dequeue();
} finally {
//解锁
lock.unlock();
}
}
private E dequeue() {
//出队
final Object[] items = this.items;
E e = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
//满足 notFull 条件,notFull.signal
notFull.signal();
return e;
}
2、LinkedBlockingQueue
put
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
final int c;
final Node<E> node = new Node<E>(e);
//注意:与数组队列不同的是,这里put和take各用一把锁
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//put加锁
putLock.lockInterruptibly();
try {
//队列已满,不满足notFull条件,notFull.await
while (count.get() == capacity) {
notFull.await();
}
//入队
enqueue(node);
//如果入队后,队列仍未满,notFull.signal
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
//put释放锁
putLock.unlock();
}
//队列 空->不空,notEmpty.signal
if (c == 0)
signalNotEmpty();
}
private void enqueue(Node<E> node) {
last = last.next = node;
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
take
public E take() throws InterruptedException {
final E x;
final int c;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//take加锁
takeLock.lockInterruptibly();
try {
//队列为空,不满足notEmpty条件,notEmpty.await
while (count.get() == 0) {
notEmpty.await();
}
//出队
x = dequeue();
c = count.getAndDecrement();
//如果出队后,队列仍不空,notEmpty.signal
if (c > 1)
notEmpty.signal();
} finally {
//take释放锁
takeLock.unlock();
}
//队列 满->不满,notFull.signal
if (c == capacity)
signalNotFull();
return x;
}
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h;
head = first;
E x = first.item;
first.item = null;
return x;
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
总得来说,ArrayBlockingQueue和LinkedBlockingQueue的实现原理差不多,都是通过 ReentrantLock + 条件队列去实现的,如果队列满了(不满足条件)put就阻塞,同理如果队列为空(不满足条件)take就阻塞。
他们之间的区别在于:
ArrayBlockingQueue数组结构,put和take用一把锁。
LinkedBlockingQueue链表结构,put和take各用一把锁。
3、SynchronousQueue
这个是同步队列,与上面队列不同的是,这个队列的put和take是一对一交替执行的(文章开头的案例可以证明)。所以这个队列比较特殊,不需要容量的概念,或者说容量为0。实现原理大概就是:如果队列为空,或者当前操作和队列中的操作是一样的就自旋一定次数后阻塞并入队。否则就匹配队列中的第一个元素后将匹配到的元素出队。
值得一提的是,这个队列并没有用到 ReentrantLock,而是用 cas自旋 + LockSupport.park 来代替 ReentrantLock 。
put
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
take
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
无论是put还是take主要逻辑都在transferer.transfer。
而这个方法有公平,非公平两个实现,对应的数据结构分别是队列和栈,默认为栈结构。
3.1、transfer 公平实现(队列)
transfer
E transfer(E e, boolean timed, long nanos) {
QNode s = null;
//传入的e不为null,isDate=true,反正false
boolean isData = (e != null);
//自旋
for (;;) {
QNode t = tail;
QNode h = head;
//如果头或尾节点没有初始化,则自旋
if (t == null || h == null)
continue;
//分支一:如果队列为空,或者模式相同就入队
if (h == t || t.isData == isData) {
QNode tn = t.next;
//入队要保证 t=tail,tail.next=null。
//不满足说明其他线程改了现场,则自旋
if (t != tail)
continue;
if (tn != null) {
advanceTail(t, tn);
continue;
}
//超时,返回null
if (timed && nanos <= 0L)
return null;
//new QNode节点
if (s == null)
s = new QNode(e, isData);
//cas入队
if (!t.casNext(null, s))
continue;
//更新tail尾指针
advanceTail(t, s);
//自旋一定次数后阻塞(LockSupport.park)
Object x = awaitFulfill(s, e, timed, nanos);
//...唤醒后
//如果m=s,说明取消了,就把它清除掉,并返回null
if (x == s) {
clean(t, s);
return null;
}
//如果s被唤醒后仍在队列中,自己出队+传递数据
if (!s.isOffList()) {
advanceHead(t, s);
if (x != null)
s.item = s;
s.waiter = null;
}
//优先返回匹配到的元素,否则返回传入的元素
return (x != null) ? (E)x : e;
}
//分支二:模式不同,进行匹配
else {
QNode m = h.next;
//现场被改变,自旋
if (t != tail || m == null || h != head)
continue;
Object x = m.item;
//cas传递数据并出队
if (isData == (x != null) ||
x == m ||
!m.casItem(x, e)) {
//非预期情况的自旋
advanceHead(h, m);
continue;
}
advanceHead(h, m);
//唤醒线程(LockSupport.unpark)
LockSupport.unpark(m.waiter);
//优先返回匹配到的元素,否则返回传入的元素
return (x != null) ? (E)x : e;
}
}
}
3.2、transfer 非公平实现(栈)
transfer
E transfer(E e, boolean timed, long nanos) {
SNode s = null;
//传入的e不为null则为DATA模式,否则为REQUEST模式
int mode = (e == null) ? REQUEST : DATA;
//自旋
for (;;) {
SNode h = head;
//分支一:栈为空,或者模式相同就压栈
if (h == null || h.mode == mode) {
//超时情况:如果头结点被取消(超时节点会被取消)则出栈自旋,然后返回null
if (timed && nanos <= 0L) {
if (h != null && h.isCancelled())
casHead(h, h.next);
else
return null;
}
//cas压栈new SNode
else if (casHead(h, s = snode(s, e, h, mode))) {
//自旋一定次数后阻塞(LockSupport.park)
SNode m = awaitFulfill(s, timed, nanos);
//唤醒后...
//如果m=s,说明取消了,就把它清除掉,并返回null
if (m == s) {
clean(s);
return null;
}
//将匹配到的元素出栈
if ((h = head) != null && h.next == s)
casHead(h, s.next);
//返回传递的数据(一定是数据,不是线程也不是节点)
return (E) ((mode == REQUEST) ? m.item : s.item);
}
}
//分支二:模式不同,且没有正在匹配,就匹配后出栈
else if (!isFulfilling(h.mode)) {
//头结点已被取消,出栈
if (h.isCancelled())
casHead(h, h.next);
//先让当前节点入栈,然后再匹配,且此时当前节点状态是匹配中
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) {
//栈为空,没有可匹配的元素,将当前节点出栈后,自旋
SNode m = s.next;
if (m == null) {
casHead(s, null);
s = null;
break;
}
SNode mn = m.next;
//进行匹配,唤醒下一个节点(LockSupport.unpark)
if (m.tryMatch(s)) {
//匹配成功,出栈两个元素
casHead(s, mn);
//返回传递的数据
return (E) ((mode == REQUEST) ? m.item : s.item);
} else
//匹配失败,说明m已经先一步被其它线程匹配了,就协助出栈两个元素
s.casNext(m, mn);
}
}
}
//分支三:模式不同,且正在匹配,则协助匹配
//其实就是跟分支二执行一样的内容,只不过是这个分支的线程帮忙执行了
else {
SNode m = h.next;
if (m == null)
casHead(h, null);
else {
SNode mn = m.next;
if (m.tryMatch(h))
casHead(h, mn);
else
h.casNext(m, mn);
}
}
}
}