AQS(AbstractQueuedSynchronizer)
文章目录
- AQS(AbstractQueuedSynchronizer)
- 1. 概述
- 2. AQS源码分析前置知识
- 2.1 AQS的int变量
- 2.2 AQS的CLH队列
- 2.3 内部类Node(Node类在AQS类内部)
- 2.3 小总结
- 3. 以ReentrantLock为突破口进行AQS源码分析
- 3.1 架构原理
- 3.2 lock()方法
- 3.3 AQS中的acquire()方法
- 3.3.1 tryAcquire(int arg)
- 3.3.2 addWaiter(Node mode)
- 3.3.3 acquireQueued()
- 3.3 unlock()方法
- 4. 总结
- 4.1 ReentrantLock加锁
- 4.2 ReentrantLock解锁
1. 概述
AbstractQueuedSynchronizer,抽象的队列同步器,位于rt.jar包中的java.util.concurrent.locks目录下,总的来说可以说AQS属于锁的分配机制。
- 是用来实现锁或者其它同步器组件的公共基础部分的抽象实现,是重量级锁基础框架及整个JUC体系的基石,主要用于解决锁分配给"谁"的问题
- 整体就是一个抽象的FIFO队列来完成资源获取线程的排队工作,并通过一个int类变量表示持有锁的状态
CLH:Craig、Landin and Hagersten 队列,是一个单向链表,AQS中的队列CLH是变体的虚拟双向队列FIFO
如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS同步队列的抽象表现。它将要请求共享资源的线程及自身的等待状态封装成队列的结点对象(Node),通过CAS、自旋以及LockSupport.park()的方式,维护state变量的状态,使并发达到同步的效果。
简单看看源码:
AQS使用一个volatile的int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作将每条要去抢占资源的线程封装成一个Node节点来实现锁的分配,通过CAS完成对State值的修改
2. AQS源码分析前置知识
2.1 AQS的int变量
/**
* The synchronization state.
*/
private volatile int state;
- state == 0:表示当前无线程占用锁,线程可以抢占CPU资源
- state >= 1:表示已有线程占用锁,等待区的线程必须继续等待
2.2 AQS的CLH队列
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
双向队列,从尾部入队,从头部出队
2.3 内部类Node(Node类在AQS类内部)
Node的等待状态waitStatus成员变量,源码:
/** 共享模式 */
static final Node SHARED = new Node();
/** 独占模式 */
static final Node EXCLUSIVE = null;
/** 线程被取消了 */
static final int CANCELLED = 1;
/** 后继线程需要唤醒 */
static final int SIGNAL = -1;
/** 等待condition唤醒 */
static final int CONDITION = -2;
/**
* 共享式同步状态获取将会无条件地传播下去
*/
static final int PROPAGATE = -3;
/** 初始为0,状态是上面的几种 */
volatile int waitStatus;
/** 前置节点 */
volatile Node prev;
/** 后继节点 */
volatile Node next;
/** 表示处于该节点的线程 */
volatile Thread thread;
2.3 小总结
- 有阻塞就需要排队,实现排队必然需要队列
- AQS的底层实现即:state变量+CLH双向队列
- Node=waitStatus+Thread
3. 以ReentrantLock为突破口进行AQS源码分析
AQS的使用,基本都是通过被【聚合】一个【队列同步器】的子类完成线程访问控制的;ReentrantLock,Semaphore,CountDownLatch等均是如此
3.1 架构原理
3.2 lock()方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
非公平锁通过CAS抢到锁时会立即更新并占用,另外FairSync和NonfairSync获取锁时的区别:
对比公平锁和非公平锁的 tryAcquire()方法的实现代码,其实差别就在于非公平锁获取锁时比公平锁中少了一个判断 hasQueuedPredecessors()
hasQueuedPredecessors() 中判断了是否需要排队,导致公平锁和非公平锁的差异如下:
- 公平锁:公平锁讲究先来先到,线程在获取锁时,如果这个锁的等待队列中己经有线程在等待,那么当前线程就会进入等待队列中
- 非公平锁:不管是否有等待队列,如果可以获取锁,则立刻占有锁对象。也就是说队列的第一个排队线程苏醒后,不一定就是排头的这个线程获得锁,它还是需要参加竞争锁(存在线程竞争的情况下),后来的线程可能不讲武德插队夺锁了
3.3 AQS中的acquire()方法
lock()方法中均调用了AQS的acquire()方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- tryAcquire(),尝试抢锁。如果抢锁失败,则调用2流程
- 该方法由子类实现,NonfairSync中调用了nonfairTryAcquire()方法,即非公平锁尝试获取锁的方法
- addWaiter(Node mode),将当前线程以mode模式加入等待队列中(Node.EXCLUSIVE为独占模式),并返回当前线程的node
- acquireQueued(final Node node, int arg),将当前节点的前一位节点的waitStatus状态改为-1,并调用LockSupport.park(this)阻塞
3.3.1 tryAcquire(int arg)
以非公平锁为例,图中分析将以A,B,C三个顾客线程为例占用访问窗口state
- 判断当前AQS的state,如果为0,表示资源空闲,则尝试抢锁
- 判断当前线程与持有锁的是否是同一个线程,是则表示重入锁,state+1
- false,则继续执行下一条件方法;true抢锁成功
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {// 资源空闲,则继续尝试抢锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {// 重入state+1
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 未抢到锁,返回false
return false;
}
3.3.2 addWaiter(Node mode)
- enq(nod):如果等待队列为空,则会创建一个虚拟节点,再加入当前线程节点到队尾巴
- 双向链表中,第一个节点为虚节点(也叫哨兵节点),其实并不存储任何信息,只是占位。真正的第一个有数据的节点,是从第二个节点开始的
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
3.3.3 acquireQueued()
- 线程B进入方法,首先获取线程B的前驱节点p即虚拟节点,为头节点则再次尝试抢锁,抢锁失败则继续调用后续代码
- 后续调用shouldParkAfterFailedAcquire(),方法中判断前驱waitStatus,为0,则通过CAS将前驱节点的waitStatus改为-1;为-1则返回true
- 上诉返回true会继续调用parkAndCheckInterrupt(),方法中会调用LockSupport.park(this)阻塞当前线程
- cancelAcquire()方法为异常时需要对当前node进行出队操作
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 准备继续调用parkAndCheckInterrupt方法
if (ws == Node.SIGNAL)
return true;
// ws大于0说明是CANCELLED状态
if (ws > 0) {
// 循环判断前驱节点是否也为CANCELLED状态,忽略对应节点,重新连接队列
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 将当前节点的前驱节点设置为SIGNAL状态,用于后续唤醒操作
// 程序第一次执行到这返回false,还会进行外层第二次循环,最终调用该方法返回true
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
// 线程挂起,程序不会继续往下执行
LockSupport.park(this);
// 根据park()方法API描述,程序存在下列三种情况会继续往下执行
// 1. 被unpark
// 2. 被中断(interrupt)
// 3. 其他不合理逻辑的返回才会继续往下执行
// 因上诉三种情况程序执行至此,返回当前线程的中断状态,并清空中断状态
// 如果由于被中断,该方法会返回true
return Thread.interrupted();
}
3.3 unlock()方法
线程A执行完程序,调用unlock()解锁,将state设置为1,调用LockSupport.unpark解锁,节点B继续执行,抢夺锁,将state改为1;head指针指向节点B,并清空B
public void unlock() {
sync.release(1);
}
// sync
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// sync
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)// 解锁后,线程b不再阻塞
LockSupport.unpark(s.thread);
}
4. 总结
4.1 ReentrantLock加锁
// 1. ReentrantLock.lock()
public void lock() {
sync.lock();// 默认非公平锁
}
// 2. NonfairSync.lock()
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
// 3. AbstractQueuedSynchronizer.acquire()
// 3.1 selfInterrupt(),如果执行了该方法说明原来的线程的中断标识位即true,故会再次将中断标识位置为true
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 4. NonfairSync.tryAcquire()
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
// 5. Sync.nonfairTryAcquire()
// 5.1 c==0表示资源未被占用,尝试CAS抢锁,成功则返回true
// 5.2 current == getExclusiveOwnerThread()表示重入锁,state += 1,并返回true
// 5.3 返回true,表示当前线程持有锁,可以继续执行;返回false,表示当前线程抢锁失败,在3中继续执行后续条件方法
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 6. AbstractQueuedSynchronizer.addWaiter(),继3中未抢到锁后执行的方法
// 6.1 pred != null表示队列已经初始化,则将当前Node入队尾部
// 6.2 enq(node):方法中,会将当前node插入队尾,若未初始化队列,则将先初始化队列,再插入队尾
// 6.3 返回当前node节点,回到3中将当前node当参数传给acquireQueued()方法继续调用,后续流程看8
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
// 7. AbstractQueuedSynchronizer.enq()
// 7.1 t == null表示队列未初始化,则初始化虚拟节点为队头,并让队尾一起指向队头
// 7.2 自旋获取最新队尾,直到CAS成功将当前节点插入队尾,然后返回当前节点node
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// 8. AbstractQueuedSynchronizer.acquireQueued(),由3中流程调用,返回后流程可继续看3
// 8.1 Node p = node.predecessor(),获取到的p为当前节点node的前驱节点
// 8.2 p == head && tryAcquire(arg),前驱节点为头节点则继续尝试获取锁;得到锁则将当前节点置为头节点,并将数据清空,丢弃原头节点,返回interrupted
// 8.3 shouldParkAfterFailedAcquire(p, node) true:表示当前线程应阻塞;false表示当前线程暂时不需要阻塞。false将循环再尝试获取锁,还未获取到才阻塞
// 8.4 parkAndCheckInterrupt() 使用LockSupport.park阻塞当前线程,获得许可后继续执行,返回当前线程的中断标志位并清除当前线程的中断标识位
// 8.5 failed,cancelAcquire() 发生异常,导致执行该块代码,将会将node从等待队列里移除
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 9.AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire()
// 9.1 ws == Node.SIGNAL,表示pred的后继线程需要阻塞
// 9.2 ws > 0,表示该前驱节点ws已经被取消,循环则是继续往前找到未被取消的前驱节点重新连接队列
// 9.3 else 中则是将前驱节点的waitState改为Node.SIGNAL,待8中循环再次进入该方法时将符合9.1的条件
// 9.4 返回true:表示当前线程应该阻塞;返回false表示当前线程暂时不需要阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// 10. AbstractQueuedSynchronizer.parkAndCheckInterrupt(),调用来源于8
// 10.1 LockSupport.park(),申请许可,无许可则阻塞当前线程,获得许可则继续执行10.2
// 10.2 返回当前线程的中断标志位,并清除当前线程的中断标志位,回到8
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
4.2 ReentrantLock解锁
// 1. ReentrantLock.unlock()
public void unlock() {
sync.release(1);
}
// 2. AbstractQueuedSynchronizer.release()
// 2.1 h != null && h.waitStatus != 0,队列中头节点不为空,且waitStatus不为0才说明队列中有线程处于阻塞状态,故才需要尝试调用unpark方法
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// 3. Sync.tryRelease()
// 3.1 因4.1加锁过程中5.2的重入锁情况,故state有可能不为1,故需要一个lock对应一个unlock才能将锁逐个解开,当c==0时,才会使得2流程中准备释放锁
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// 4. AbstractQueuedSynchronizer.unparkSuccessor(),调用来源于2
// 4.1 ws < 0,表示后继节点已经准备好了,就等资源释放了
// 4.2 s == null || s.waitStatus > 0,表示当前节点已被删除或取消;将从队尾开始往前寻找第一个waitStatus<=0的节点
// 4.3 如果s不为空,则给s的线程发放许可证,使得s节点的线程可以继续调用加锁过程中10.2后的流程
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}