在高并发的情况下,使用基于CAS自旋实现的轻量级锁存在恶性空自旋浪费CPU 资源和导致“总线风暴”两大问题, 解决CAS恶性空自旋的有效方法是空间换时间,常见解决方法有分散操作热点和使用队列削峰。JUC并发包使用的是队列削峰的方案解决CAS的性能问题,并提供了一个基于双向队列的削峰基类——抽象基础类AbstractQueuedSynchronizer(抽象同步器类,简称为AQS)。
AQS 简介
AQS是JUC提供的一个用于构建锁和同步容器的基础类,是CLH队列的一个变种。它实现了锁的基本抽象功能,支持独占锁与共享锁两种方式。
AQS的类图如下:
AQS队列内部维护的是一个FIFO的双向链表,其内部通过节点head和tail记录队首和队尾元素,队列元素的类型为Node。其中Node中的thread变量用来存放进入AQS队列里面的线程;Node节点内部的SHARED用来标记该线程是获取共享资源时被阻塞挂起后放入AQS队列的,EXCLUSIVE用来标记线程是获取独占资源时被挂起后放入AQS队列的;waitStatus记录当前线程等待状态,可以为CANCELLED(线程被取消了)、SIGNAL(线程需要被唤醒)、CONDITION(线程在条件队列里面等待)、PROPAGATE(释放共享资源时需要通知其他节点);prev记录当前节点的前驱节点,next记录当前节点的后继节点。
FIFO双向链表的特点是每个数据结构都有两个指针,分别指向直接的前驱节点和直接的后继节点。每个节点其实是由线程封装的,当线程争抢锁失败后会封装成节点加入AQS队列中;当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞的节点(线程)。其内部结果如下:
AQS有个内部类ConditionObject,用来结合锁实现线程同步。ConditionObject可以直接访问AQS对象内部的变量,比如state状态值和AQS队列。ConditionObject是条件变量,每个条件变量对应一个条件队列(单向链表队列),其用来存放调用条件变量的await方法后被阻塞的线程。
AQS 的成员
AQS根据“分离变与不变”的原则基于模板模式实现。
状态标志位
/**
* 同步状态,
*/
private volatile int state;
/**
* 获取同步的状态
*/
protected final int getState() {
return state;
}
/**
* 设置同步状态
*/
protected final void setState(int newState) {
state = newState;
}
AQS使用int类型的state标示锁的状态,可以理解为锁的同步状态。AQS 提供了getState()、setState()来获取和设置同步状态。
由于setState()无法保证原子性,因此AQS给我们提供了compareAndSetState()方法调用的是VarHandle.compareAndSet()方法,是具有CAS原性的操作,被@HotSpotIntrinsicCandidate修饰,在HotSpot中有一套高效的实现,该高效实现基于CPU指令,运行时,HotSpot维护的高效实现会替代JDK的源码实现,从而获得更高的效率。其代码如下:
private static final VarHandle STATE;
/**
* 通过CAS设置同步的状态
**/
protected final boolean compareAndSetState(int expect, int update) {
return STATE.compareAndSet(this, expect, update);
}
public final native
@MethodHandle.PolymorphicSignature
@HotSpotIntrinsicCandidate
boolean compareAndSet(Object... args);
队列节点类
static final class Node {
/**
* 标识节点在抢占共享锁
* 表示线程是因为获取共享资源时阻塞而被添加到队列中的
* */
static final Node SHARED = new Node();
/**
* 标识节点在抢占独占锁
* 线程是因为获取独占资源时阻塞而被添加到队列中的。
* */
static final Node EXCLUSIVE = null;
/**
* 节点等待状态值1:取消状态
*
* */
static final int CANCELLED = 1;
/** 节点等待状态值-1:标识后继线程处于等待状态 */
static final int SIGNAL = -1;
/** 节点等待状态值-2:标识当前线程正在进行条件等待 */
static final int CONDITION = -2;
/**
* 节点等待状态值-3:标识下一次共享锁的acquireShare操作需要无条件传播
*/
static final int PROPAGATE = -3;
/**
* 节点状态:值为SIGNAL、CANCELLED、CONDITION、PROPAGATE、0
* 普通的同步节点的初始值为0,条件等待节点的初始化值为CANCELLED
*/
volatile int waitStatus;
/**
* 前驱节点,当前节点会在前驱节点上自旋,循环检查前驱节点的waitStatus状态
*/
volatile Node prev;
/**
* 后继节点
*/
volatile Node next;
/**
* 节点所对应的线程,为抢占线程或者条件等待线程
*/
volatile Thread thread;
/**
* 若当前NOde不是普通节点而是条件等待节点,则节点处于某个条件的等待队列上,
* 此属性指向下一个条件等待节点,即其条件队列上的后继节点
*/
Node nextWaiter;
...
}
FIFO 双向同步队列
AQS通过内置的FIFO双向队列来完成线程的排队工作,内部通过节点head和tail记录队首和队尾元素,元素的节点类型为Node类型。
/**
* 首节点的引用
*/
private transient volatile Node head;
/**
* 尾节点的引用
*/
private transient volatile Node tail;
AQS的首节点和尾节点都是懒加载的。在需要的时候才真正创建。只有在线程竞争失败的情况下,有新线程加入同步队列时,AQS才创建一个head节点。head节点只能被setHead()方法修改,并且节点的waitStatus不能为CANCELLED。尾节点只在有新线程阻塞时才被创建。
AQS中的钩子方法
自定义同步器时,AQS中需要重写的钩子方法大致如下:
(1)tryAcquire(int):独占锁钩子,尝试获取资源,若成功则返回true,若失败则返回false。
(2)tryRelease(int):独占锁钩子,尝试释放资源,若成功则返回true,若失败则返回false。(3)tryAcquireShared(int):共享锁钩子,尝试获取资源,负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
(4)tryReleaseShared(int):共享锁钩子,尝试释放资源,若成功则返回true,若失败则返回false。
(5)isHeldExclusively():独占锁钩子,判断该线程是否正在独占资源。只有用到condition条件队列时才需要去实现它。
AQS锁抢占的原理
acquire是AQS封装好的获取资源的公共入口,它是AQS提供的利用独占的方式获取资源的方法,源码实现如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
通过源码可以发现,acquire(arg)至少执行一次tryAcquire(arg)钩子方法。tryAcquire(arg)方法AQS默认抛出一个异常,具体的获取独占资源state的逻辑需要钩子方法来实现。若调用tryAcquire(arg)尝试成功,则acquire()将直接返回,表示已经抢到锁;若不成功,则将线程加入等待队列。
在acquire()方法中,如果钩子方法tryAcquire尝试获取同步状态失败的话,就构造同步节点(独占式节点模式为Node.EXCLUSIVE),通过addWaiter(Node node,int args)方法将该节点加入同步队列的队尾。
private Node addWaiter(Node mode) {
// 创建新节点
Node node = new Node(mode);
for (;;) { //自旋
//加入队列尾部,将目前的队列tail作为自己的前驱节点oldTail
Node oldTail = tail;
//如果队列不为空时
i{
node.setPrevRelaxed(oldTail);
//先尝试通过AQS 方式修改尾节点为最新的节点
//如果修改成功, 将节点加入队列的尾部
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
}f (oldTail != null) else {
initializeSyncQueue();
}
}
}
addWaiter()第一次尝试在尾部添加节点失败,意味着有并发抢锁发生,需要进行自旋。enq()方法通过CAS自旋将节点添加到队列尾部。
private Node enq(Node node) {
for (;;) { //自旋入队
Node oldTail = tail;
if (oldTail != null) {
//队列不为空,将新节点插入队列尾部
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return oldTail;
}
} else {
//如果队列为空,初始化尾节点和头节点作为新节点
initializeSyncQueue();
}
}
}
/**
* Initializes head and tail fields on first contention.
* 队列为空,初始化尾节点和头结点为新节点
*/
private final void initializeSyncQueue() {
Node h;
if (HEAD.compareAndSet(this, null, (h = new Node())))
tail = h;
}
/**
* CASes tail field.
* CAS操作tail指针,仅仅被enq()使用
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return TAIL.compareAndSet(this, expect, update);
}
节点在第一次入队失败后,就会开始自旋入队,分为以下两种情况:
(1)如果AQS的队列非空,新节点入队的插入位置在队列的尾部,并且通过CAS方式插入,插入之后AQS的tail将指向新的尾节点。
(2)如果AQS的队列为空,新节点入队时,AQS通过CAS方法将新节点设置为头节点head,并且将tail指针指向新节点。
在节点入队之后,启动自旋抢锁的流程。acquireQueued()方法的主要逻辑:当前Node节点线程在死循环中不断获取同步状态,并且不断在前驱节点上自旋,只有当前驱节点是头节点时才能尝试获取锁,原因是:
(1)头节点是成功获取同步状态(锁)的节点,而头节点的线程释放了同步状态以后,将会唤醒其后继节点,后继节点的线程被唤醒后要检查自己的前驱节点是否为头节点。
(2)维护同步队列的FIFO原则,节点进入同步队列之后,就进入了自旋的过程,每个节点都在不断地执行for死循环。
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
//自旋检查当前节点的前驱节点是否为头节点,才能获取锁
//在前驱节点上自旋
for (;;) {
//获取节点的前缀节点
final Node p = node.predecessor();
//节点中的线程循环地检查自己的前驱节点是否为head节点
//前驱节点是head时,进一步调用子类tryAcquire(...)实现
if (p == head && tryAcquire(arg)) {
//tryAcquire()成功后,将当前节点设置为头节点,移除之前的头节点。
setHead(node);
p.next = null; // help GC
return interrupted;
}
//检查前一个节点的状态,预判当前获取锁失败的线程是否要挂起,
// 如果需要挂起,调用parkAndCheckInterrupt()方法挂起当前线程,直到被唤醒
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
//抛出异常时,取消请求,将当前节点从队列中移除。
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
acquireQueued()方法不断在前驱节点上自旋(for死循环),如果前驱节点是头节点并且当前线程使用钩子方法tryAcquire(arg)获得了锁,就移除头节点,将当前节点设置为头节点。
调用acquireQueued()方法的线程一定是node所绑定的线程,该线程也是最开始调用lock()方法抢锁的那个线程,在acquireQueued()的死循环(自旋)中,该线程可能重复进行阻塞和被唤醒。
AQS释放锁唤醒后继线程的代码如下:
public final boolean release(int arg) {
if (tryRelease(arg)) { //释放锁的钩子方法的实现
//队列头节点
Node h = head;
if (h != null && h.waitStatus != 0)
//唤醒后继线程
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
//获取节点状态,释放所得的节点,即头节点
//CANCELLED(1),SIGNAL(-1),CONDITION(-2),PROPAGATE(-3)
int ws = node.waitStatus;
// 若头节点状态小于0,则将其置为0,表示初始状态
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
//后继节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
//如果新节点已经被取消CANCELLED(1)
s = null;
//从队列尾部开始,往前去找最前面的一个waitStatus小于0的节点
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
// 唤醒后继节点的线程
LockSupport.unpark(s.thread);
}
unparkSuccessor()唤醒后继节点的线程后,后继节点的线程重新执行方法acquireQueued()中的自旋抢占逻辑。
acquireQueued()自旋在阻塞自己的线程之前会进行挂起预判。shouldParkAfterFailedAcquire()方法的主要功能是:将当前节点的有效前驱节点(是指有效节点不是CANCELLED类型的节点)找到,并且将有效前驱节点的状态设置为SIGNAL,之后返回true代表当前线程可以马上被阻塞了。具体可以分为三种情况:
(1)如果前驱节点的状态为-1(SIGNAL),说明前驱的等待标志已设好,返回true表示设置完毕。
(2)如果前驱节点的状态为1(CANCELLED),说明前驱节点本身不再等待了,需要跨越这些节点,然后找到一个有效节点,再把当前节点和这个有效节点的唤醒关系建立好:调整前驱节点的next指针为自己。
(3)如果是其他情况:-3(PROPAGATE,共享锁等待)、?2(CONDITION,条件等待)、0(初始状态),那么通过CAS尝试设置前驱节点为SIGNAL,表示只要前驱节点释放锁,当前节点就可以抢占锁了。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取前驱节点的状态,如果前驱节点状态WieSIGNAL(-1)就直接返回
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
// 前驱节点以及取消CANCELLED(1)
do {
//将pred记录前驱的前驱,调整当前节点的prev指针,保持为前驱的前驱
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
//调整前驱节点的next指针
pred.next = node;
} else {
// 如果前驱状态不是CANCELLED,也不是SIGNAL,就设置为SIGNAL,
//设置前驱状态之后,此方法返回值还是false,表示线程不可用,被阻塞
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
在独占锁的场景中,shouldParkAfterFailedAcquire()方法是在acquireQueued()方法的死循环中被调用的,由于此方法返回false时acquireQueued()不会阻塞当前线程,只有此方法返回true时当前线程才阻塞,因此在一般情况下,此方法至少需要执行两次,当前线程才会被阻塞。
acquireQueued()方法中调用parkAndCheckInterrupt()方法暂停当前线程,源码如下:
private final boolean parkAndCheckInterrupt() {
//调用park()使线程进入waiting状态
LockSupport.park(this);
// 如果被唤醒,查看自己是否已经被中断
return Thread.interrupted();
}
AbstractQueuedSynchronizer会把所有的等待线程构成一个阻塞等待队列,当一个线程执行完lock.unlock()时,会激活其后继节点,通过调用LockSupport.unpark(postThread)完成后继线程的唤醒。