概要
AQS是AbstractQueuedSynchronizer类的简称,为了方便,后面都以AQS来指代。AQS通过对互斥锁、共享锁和条件等待的基础实现和封装,同时为juc包下其他类提供扩展,完善了java体系的锁、线程池和并发集合类的实现,主要包括:ReentrantLock(可重入锁,由AQS互斥锁扩展实现)、ReentrantReadWriteLock(可重入读写锁,由AQS互斥锁和共享锁扩展实现)、Semaphore(信号量,由AQS共享锁扩展实现)、CountDownLatch(由AQS共享锁扩展实现)、CyclicBarrier(通过AQS的互斥锁机制(ReentrantLock)+条件(Condition)实现的)、ThreadPoolExecutor、ArrayBlockingQueue(由一把AQS互斥锁扩展实现)、LinkedBlockingQueue(由两把AQS互斥锁扩展实现,分别为存放和获取两把锁)等。
AQS(jdk 1.5版本开始引入)可以概括为由两个排队队列(同步队列和条件队列)和一个抢占资源(state)构成,提供了依赖先进先出(FIFO)等待队列实现的阻塞锁和相关同步器(信号量、事件等)的框架。此类被设计为多种依赖单个原子性int值来表示状态的同步器的实用的基础。子类必须通过实现protected方法去改变state值,通过改变state值代表该对象是获取或释放锁。综上所述,类中其他的方法主要是执行队列的排队和阻塞机制。子类也可以维护其他状态字段,但是仅是原子性更新int值的操作还是一致使用方法getState,setState和compareAndSetState。它只赋予了竞争的权利。因此,当前被释放的竞争者线程也可能需要重新等待。
同步队列是“CLH”锁队列的一个变种,CLH锁通常用在自旋锁中。相反,我们在阻塞同步器中使用它们,但使用相同的基本策略,即在其节点的前一个节点的线程中保存一些有关线程的控制信息。每个节点的"status"字段维持着一个线程是否需要阻塞。当一个节点的前驱节点被释放后,会向其发送一个信号通知。队列中的每一个节点都是一个特殊通知类型的监听器,保存着一个等待线程。状态字段不作为实际控制线程是否允许上锁的标志。如果线程是队列中的第一个,那么线程会尝试获取(锁),但是第一并不能保证获取成功。要排队进入CLH锁,可以将其作为新的尾部自动拼接。要退出队列,只需设置head字段。这其实就是一个双端队列,队列中head节点是一个空节点(没有线程的),也就是说只要head不为空,那么队列肯定不为空,同理,当前实际节点被消耗后立马上升为head节点,并致该节点的thread和prev为null。
同步队列:双向队列,head永远代表当前节点且内容是空
条件队列:单向队列,当条件节点被唤醒后,会把节点从条件队列删除并把该节点添加到同步队列去重新竞争锁
源码解析
Node
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
//线程已被取消
static final int CANCELLED = 1;
//活着的状态, 后续节点需要该状态节点来唤醒
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
//节点状态,初始化为0
volatile int waitStatus;
//前驱节点
volatile Node prev;
//后继节点
volatile Node next;
//当前线程
volatile Thread thread;
//条件队列的后继节点
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
//获取前驱节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { //用于创建初始化head节点或者共享锁标记
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
acquire方法
获取互斥锁的方法,忽略中断。tryAcquire方法是一个模板方法,由子类来实现,为什么由子类能实现呢?因为AQS并不知道子类要如何定义获取锁的操作。该方法具体工作流程是:
1.通过子类实现的tryAcquire方法获取锁操作,如果获取到了锁,则退出该方法,线程获取到锁;否则,进行下一步
2.排队:addWaiter方法将当前线程包装成队列节点放在队列的尾部
3.排队后等待被唤醒:acquireQueued方法用于排队后阻塞线程或等待其他线程释放锁后的唤醒
public final void acquire(int arg) {
if (!tryAcquire(arg) && //子类判断获取锁,如果获取成功,返回true,取反就是false,直接获取成功,退出该方法;否则继续下一步
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //获取锁失败后,通过addWaiter方法将当前线程包装成节点然后放到队列尾部,再通过acquireQueued方法判断加入队列的节点是阻塞等待,还是尝试拿锁。该方法最终返回当前线程是否被中断的状态
//设置当前线程中断
selfInterrupt();
}
//子类定义如何获取锁的操作,AQS并不知道怎么用这个state来上锁
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
addWaiter方法
private Node addWaiter(Node mode) {
//将当前线程包装成一个队列节点
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
//先设定前驱节点
node.prev = pred;
if (compareAndSetTail(pred, node)) {
//compareAndSetTail这一步比较关键,当多个线程并发过来行,会争抢设置tail节点,结合上一步node.prev = pred,不管多少并发先设置好前驱节点,为什么要这样做呢?因为当并发执行时,有那么一瞬间,通过head节点遍历,遍历不到最新的尾节点:N1(head) <->N2(旧tail) <-N3(tail),那么这时只能从tail开始遍历才行,直到pred.next = node执行成功,head节点才能遍历到尾节点
pred.next = node; //走到这一步,代表当前节点入队成功,直接返回节点并退出方法
return node;
}
}
//这个方法主要功能是使节点完成入队操作,同时为了弥补并发访问时,部分节点竞争compareAndSetTail(pred, node)失败,需要后续步骤继续完成入队操作,这时的节点状态有2种,第1种状态是:如下图-1;第2种状态是:整个队列为空时,只有N1(当前节点)
enq(node);
return node;
}
//入队方法
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 这个简单,代表整个队列为空,大家通过CAS竞争初始化head节点(内容为空的节点),失败了,走下一次轮循
if (compareAndSetHead(new Node()))
tail = head;
} else {
//队列中有值,重新设置前驱节点为tail节点,并通过CAS竞争成为下一届tail节点,失败了,走下一次轮循
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued方法
当加入阻塞队列后,调用该方法判断是否将当前线程进行阻塞还是继续尝试获取锁。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//当前节点的前驱节点为head节点,然后继续尝试获取锁
if (p == head && tryAcquire(arg)) {
//锁获取成功,设置head节点为当前节点
setHead(node);
p.next = null; // 方便GC回收资源
failed = false;
return interrupted;
}
//p不是head节点,或者又抢锁失败了,那么就判断是否应该阻塞,如果需要阻塞,则调用parkAndCheckInterrupt进行阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) //阻塞并检查是否中
interrupted = true; //线程被中断,设置true
}
} finally {
if (failed) //只有一种情况failed为true,那就node.predecessor()抛出异常时
//取消线程节点获取锁操作
cancelAcquire(node);
}
}
//获取某节点的前驱节点,如果前驱节点为null,抛出空指针,为什么呢,因为队列中至少需要有一个head节点(内容为空),如果前驱节点为null,证明队列非法了,则抛出异常,head节点实际的功能就是用于建立初始head或SHARED(共享)标记
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
//该方法用于判断当前线程节点是否应该阻塞。就是最终找到一个可靠(活着有效)的节点,然后将当前线程节点作为其后继节点即可。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; //node的waitStatus
if (ws == Node.SIGNAL)
// 如果前驱节点状态是SIGNAL,那么此时可以安全的睡眠(因为SIGNAL状态,代表了上一个节点线程是活的,它可以通知你,所以当前线程节点可以安全的阻塞了),阻塞后也可以防止线程在acquireQueued方法中for (;;) 块无效轮循
return true;
if (ws > 0) {
//waitStatus状态值中,大于0的只有CANCELLED,也就是前驱节点线程取消了,那么跳过该节点,直到找到一个状态非CANCELLED的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node; //抛弃CANCELLED节点,设置下一个节点为当前节点
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//正常节点(状态为0【初始化时】或为PROPAGATE(传递)),CAS设置状态为Node.SIGNAL,需要等待其他节点线程通知,但是不需要park(阻塞),调用者需要在被阻塞前,重试确保拿不到锁
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
/*阻塞线程。park方法源码中有注释说明,在三种情况下,该方法会休眠(也就是执行到该方法下一步):
*1.其他线程调用了unpark释放该线程
*2.其他线程中断了该线程
*3.TODO 需要进一步了解和解释,待翻译,这块可能需要深入hotspot源码后来解释
*/
LockSupport.park(this);
//被中断时,返回是否中断标记true/false
return Thread.interrupted();
}
release方法
释放互斥锁的方法
public final boolean release(int arg) {
//子类判断释放锁
if (tryRelease(arg)) {
Node h = head;
//如果head节点不为空【为空代表没有使用过,意思就是一直没有产生排队】,并且waitStatus不为0【这里其实就是SIGNAL:-1】,因为head节点一直是代表当前活跃节点的存在,用于唤醒后继节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); //唤醒后继节点
return true;
}
return false;
}
unparkSuccessor方法
private void unparkSuccessor(Node node) {
//如果ws < 0,CAS设置节点waitStatus为0,表示已经响应此次唤醒操作,并恢复状态,避免多次唤醒
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//获取到后继节点
Node s = node.next;
if (s == null //后继节点为空?因为并发情况下,在addWaiter方法中已讲过,节点入队时是先设置前驱节点node.prev = pred,后遍历并CAS设置后继节点,所以在并发时,会有一瞬间后继节点为空,这时需要从tail遍历
|| s.waitStatus > 0) { //此时线程被取消了,CANCELLED状态
s = null;
//从尾部开始遍历,一直找到离node节点最近的waitStatus小于0的后继节点,此时就是SIGNAL状态
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null) //找到了需要唤醒的节点,然后unpark唤醒
LockSupport.unpark(s.thread);
}
acquireShared方法
public final void acquireShared(int arg) {
//子类判断获取共享锁,如果获取失败(<0),则去排队
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
doAcquireShared方法
private void doAcquireShared(int arg) {
//创建新的node,标记为共享模式,并添加到队列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) { //前驱节点是头节点
//因为是头节点,重新尝试获取锁,尝试看看头节点是否已经释放锁
int r = tryAcquireShared(arg);
if (r >= 0) {
//刚好获取锁成功,更新头节点,并将唤醒操作传递下去
setHeadAndPropagate(node, r);
p.next = null; // 方便垃圾回收
if (interrupted) //如果等待中被中断了,并且由中断唤醒了,那么设置当前线程中断标志位
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate方法
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 记录原头节点
setHead(node); //设置当前节点为头节点(内容置空)
if (propagate > 0 //资源还有,可以唤醒后续节点
|| h == null //这一步不会发生,因为head永远不会为null
|| h.waitStatus < 0 // PROPAGATE(-3)、SIGNAL(-1)两种情况都会触发唤醒
|| (h = head) == null // 其实这一步就是再次获取(因为并发情况下,可能此时head已经变更了)
|| h.waitStatus < 0) { // PROPAGATE(-3)、SIGNAL(-1)两种情况都会触发唤醒
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared(); //唤醒后继节点操作
}
}
doReleaseShared方法
这个方法主要是唤醒后继节点,在两种情况下会发起调用,第一种是线程主动释放资源:在releaseShared方法中调用;第二种是线程抢到资源后:在acquireShared方法中调用。
private void doReleaseShared() {
for (;;) {
Node h = head; //头节点有2种状态:head未更新,head已更新,见详细说明【1】
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {//SIGNAL状态,直接唤醒后继节点
//防止多线程同时唤醒同一个节点,CAS原子性改变头节点状态
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
else if (ws == 0 && //头节点已经唤醒过后续节点
//CAS原子改变头节点状态,为了让最新活跃节点继续传递唤醒动作,但又要防止步骤多线程并发同时修改1个节点的问题
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
//头节点始终没有改变过(当然是在前提(h != null && h != tail)下),表示没有后继节点需要唤醒了,直接退出
if (h == head)
break;
}
}
假设当前资源数为2,并且分别被T1、T2线程获取到了这2个资源,然后线程T3、T4进来了,此时排队,当前队列快照如:
T1、T2获得锁,head(旧)->T3->T4
详细说明【1】
1)当T1释放时,进入doReleaseShared方法,此时head节点还是原旧节点,并未更新
2)此时如果T2也释放了资源,进入doReleaseShared方法拿到的也是旧的头节点
3)T1、T2是并发执行的,这就涉及到多种状态,但每个线程都要经过下面几步:
(1)设置头节点状态为0:compareAndSetWaitStatus(h, Node.SIGNAL, 0),防止多个线程同时唤醒同1个后续节点
(2)唤醒后继节点:unparkSuccessor(h),该方法中还会针对状态做检查是否<0,并设置为0 compareAndSetWaitStatus(node, ws, 0)
(3)unpark解除阻塞线程:找到了需要唤醒的节点,然后unpark唤醒,让该线程继续去抢锁
现假设T1先触发释放,T2后触发释放,但是整个释放过程是交叉的,这就存在几种情况,当T1和T2同时执行到步骤(1)时,此时通过CAS原子性修改状态,目的是为了防止多个线程同时唤醒同1个后续节点,那么此时T2会继续for循环,这就走到了
(ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))这一步,这是为了让最新活跃节点继续传递唤醒动作,但又要防止步骤(1)的并发问题,所以把状态改成Node.PROPAGATE(-3),因为setHeadAndPropagate方法中只要waitStatus<0就可以继续唤醒后面节点,此时如果T1唤醒了T3,那么T4线程得由T3来唤醒,以此类推。假设一下,如果不加(ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))这一步会产生什么问题?那么问题很严重,T4就没人唤醒了。
总结下来,就是2种情况,T1节点未变更成头节点、T1节点已经变更成头节点,如果是情况1,那么侧重点就是控制并发操作时,多线程同时唤醒同1个节点;如果是情况2,则要变更新头节点状态PROPAGATE,为了让新的头节点继续唤醒后续节点。
cancelAcquire方法
当前结果有三种节点位置状态:
位置1:该节点为尾节点
位置2:该节点为中间节点(既不是头,也不是尾)
位置3:该节点在最前面,即头节点后面(因为头节点为信号结果,是个空节点,不存在释放的概念,所以也就不可能为头节点)
private void cancelAcquire(Node node) {
// 节点不存在,则直接忽略
if (node == null)
return;
node.thread = null; //置空节点绑定的线程,方便gc
//跳过所有cancelled的节点,一直往前找,直到找到一个未被未cancelled取消的节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
//标记节点为取消状态,设置之后,其他线程也可以看到该状态,并跳过该节点的处理
node.waitStatus = Node.CANCELLED;
// 如果当前节点就是尾节点【位置1】,那直接CAS移除就行
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head && //节点不是头节点【位置2】
((ws = pred.waitStatus) == Node.SIGNAL || //SIGNAL状态,需要唤醒后继节点
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && //cas设置pred的状态为SIGNAL,唤醒后续节点
pred.thread != null) { //线程不能为空,如果线程为null,证明此节点为无效节点
Node next = node.next;
if (next != null && next.waitStatus <= 0)
//跳过被取消的节点,设置该节点前驱和后继节点指针引用
compareAndSetNext(pred, predNext, next);
} else {
//【位置3】直接唤醒后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
ConditionObject原理
接口设计
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
public class ConditionObject implements Condition, java.io.Serializable {
//条件队列的头节点
private transient Node firstWaiter;
//条件队列的尾节点
private transient Node lastWaiter;
}
await方法
public final void await() throws InterruptedException {
if (Thread.interrupted()) //如果当前线程被中断,那么抛出中断异常
throw new InterruptedException();
Node node = addConditionWaiter(); //创建并添加节点到条件队列
//调用release操作唤醒竞争队列的节点。注意:当前线程的state变量需要保存,因为在后面需要重新唤醒并恢复状态
int savedState = fullyRelease(node);
int interruptMode = 0;
//节点未放入到AQS的竞争队列,那么一直阻塞
while (!isOnSyncQueue(node)) {
LockSupport.park(this); // 一直阻塞
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //响应线程中断
break;
}
//竞争锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
addConditionWaiter方法
private Node addConditionWaiter() {
Node t = lastWaiter;
//尾节点不为空,并且waitStatus不为Node.CONDITION,尾节点不是条件节点
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//创建条件节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null) //条件队列中没有等待节点
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
unlinkCancelledWaiters方法
从头遍历队列节点,把已取消的节点从队列中断开
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
signal方法
唤醒一个有效的后继节点,该方法一定是在互斥锁内执行的,所以是线程安全的
public final void signal() {
if (!isHeldExclusively()) //判断执行该方法线程是不是当前获取锁的线程,如果不是,则抛出异常
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first); //从头节点开始,唤醒后面节点
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
//将条件队列的节点插入到AQS阻塞队列中
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
signalAll方法
唤醒整个条件队列的节点,并插入到AQS阻塞队列,重新抢资源,与signal()方法的区别就是,signal只唤醒一个,signalAll唤醒所有。在开发时需要根据自己的场景来选择唤醒方法,如果是大量释放资源,则可以用signalAll方法,如果不是,建议用signal方法,这样可以避免过多线程抢占一个资源的情况,以致降低性能。
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}