AQS源码解析 3.lock() & unlock() 加锁解锁过程
Lock() 过程
- 这里使用 ReentrantLock 的公平锁去看 AQS 的加锁过程。
- 在 ReentrantLock 的实现中,其默认构造的锁是非公平锁。
详细流程图
-
尝试获取锁 + 构造节点入队过程
-
在队列中被挂起 + 被唤醒重新抢锁的过程
AQS.acquire()
// ReentrantLock.lock() 公平锁入口
public void lock() {
sync.acquire(1); // AQS提供的acquire()
}
||
||
\/
// AQS.acquire()
public final void acquire(int arg) {
/*
* 条件1:!tryAcquire()尝试获取锁,取反后 获取成功返回false,获取失败返回true 进入条件2的逻辑
* 条件2:
* 2.1 addWaiter() 获取锁失败 将当前线程封装成一个Node入队
* 2.2 acquireQueued() 在队列中尝试获取锁的过程,如果获取失败则挂起当前线程,还有唤醒后相关的逻辑
* 返回值表示挂起过程中线程 是否是被中断唤醒过,true表示被中断唤醒过,false表示未被中断过..
*/
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 打断线程 再次设置中断标志位为true
selfInterrupt();
}
ReentrantLock.tryAcquire()
// ReentrantLock.tryAcquire()方法在内部类 FairSync公平锁 中
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
/*
* 公平锁提供的 tryAcquire 方法 尝试获取锁
* 抢占成功:返回true 包含重入..
* 抢占失败:返回false
*/
@ReservedStackAccess // 这个注解的作用:它会保护被注解的方法,通过添加一些额外的空间,防止在多线程运行的时候出现栈溢出
protected final boolean tryAcquire(int acquires) {
// 当前线程
final Thread current = Thread.currentThread();
// 获取state的值(即当前锁的状态)
int c = getState();
// c == 0表示当前处于无锁状态
if (c == 0) {
/*
* 条件1:!hasQueuedPredecessors()
* 因为这里是fairSync公平锁,所以任何时候必须先查看队列中是否有节点(等待者),有节点就得去排队,不允许竞争锁
* hasQueuedPredecessors()查看队列中是否有节点:true表示队列中有节点,false表示队列中没有节点,
* 当队列中没有节点时,取反后 false->true 才会走到条件2的逻辑。
* 条件2:compareAndSetState(0, acquires)
* 成功:说明当前线程抢占锁成功
* 失败:说明存在竞争,且当前线程竞争失败
* 只有当队列中没有节点时才能使用CAS操作去获取锁。
*/
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 抢锁成功 将当前线程设置为获取锁的线程
setExclusiveOwnerThread(current);
// 直接return true 尝试获取锁成功
return true;
}
}
/*
* 走到这里就说明当前锁已经被占用了,因为ReentrantLock是可重入锁,所以会判断持有锁的线程是否就是当前线程,即这里是重入的逻辑。
*/
else if (current == getExclusiveOwnerThread()) {
// 重入的逻辑
// nextc更新值
int nextc = c + acquires;
// 越界判断:当重入的深度很深时,会导致nextc < 0,int达到最大值之后 再+1 会爆int 值会变为负数..
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 设置新的state,setState是一个无锁方法,因为只有持锁线程才能进到这里
setState(nextc);
// 重入锁再次加锁成功 也返回true
return true;
}
/*
* 什么时候返回false?
* 1.state = 0,无锁状态,但是CAS抢锁失败(有并发)
* 2.state > 0,且持锁线程非当前线程
*/
return false;
}
}
AQS.addWaiter()
/*
* 当前线程抢占锁失败,将当前线程封装为Node,并入队
* @param mode:模式,这里传入的是独占模式(Node.EXCLUSIVE)
* @return 最终会返回当前线程包装后的node节点
*/
private Node addWaiter(Node mode) {
// 将当前线程封装成一个Node节点,mode是独占模式(ReentrantLock => Node.EXCLUSIVE)
Node node = new Node(Thread.currentThread(), mode);
/*
* 快速入队
*/
// 获取队尾节点 作为前置节点pred(入队操作就是将当前节点放入当前队尾节点的后面)
Node pred = tail;
// 条件成立:队列中已经有node了
if (pred != null) {
// 将当前节点的前驱指向pred
node.prev = pred;
// CAS设置tail指向当前node(这里可能有并发,所以要使用CAS操作)
if (compareAndSetTail(pred, node)) {
// CAS成功,将尾结点的后继指向node 完成双向绑定
pred.next = node;
// 说明入队成功,返回node即可
return node;
}
}
/*
* 什么时候会执行到这里?
* 1.当前队列是空队列 tail == null
* 2.CAS竞争入队失败.. 会来到这里
* enq()方法,完整入队,不断自旋入队。
*/
enq(node);
return node;
}
AQS.enq()
private Node enq(final Node node) {
// 自旋,保证一定可以入队,只有当前node入队才会跳出循环
for (;;) {
Node t = tail;
/*
* 当前队列是空队列,tail == null
* 说明当前锁被占用,但是队列没有节点,说明当前线程可能是第一个获取锁失败的线程(为什么是可能?=> 存在并发)
* 那么作为当前持锁线程的第一个后继线程,需要做什么?
* 因为第一个当前持锁的线程,获取锁时直接tryAcquire成功了,没有向阻塞队列中添加任何的node,
* 所以作为后继需要为它补一个Node,随后将自己入队。
*/
if (t == null) { // Must initialize
// CAS为当前抢占锁成功的线程追加一个Node 存入队列的头结点中(因为头结点一定是抢占到锁的线程),然后继续自旋
// CAS成功 说明当前线程成为 head.next 节点
if (compareAndSetHead(new Node()))
// 此时当前队列中只有1个节点,就是当前抢占锁成功线程的节点,且为头结点,将tail指向head
tail = head;
// 注意:这里没有return 会继续自旋(因为自己还没入队)
} else {
/*
* 普通入队
*/
// 将自己入队,只不过在for自旋中,会保证一定入队成功!
node.prev = t;
// CAS将自己设置为尾结点
if (compareAndSetTail(t, node)) {
t.next = node;
// 返回前置节点
return t;
}
}
}
}
AQS.acquireQueued()
/*
* acquireQueued()方法的作用?
* 1.当前节点入队后有没有被挂起呢?没有 => 挂起的操作
* 2.如果被挂起了?那么唤醒之后的逻辑在哪?=> 唤醒之后的逻辑
* 都在此方法中。
* @param node 表示当前线程封装的node 且当前时刻 已经入队成功了...
* @param arg 表示加锁的arg参数,设置state会用到
*/
final boolean acquireQueued(final Node node, int arg) {
/*
* true 表示当前线程抢占锁成功(一般情况下【lock】,最终肯定会获取到锁)
* false 表示失败,需要执行出队的逻辑(后面讲响应中断的lock方法时再讲)
*/
boolean failed = true;
try {
// 表示当前线程是否被中断
boolean interrupted = false;
// 自旋 获取锁
for (;;) {
/*
* 什么情况会执行这里?
* 1.进入for循环时,在线程尚未park前
* 2.线程park后,被唤醒后,也会执行这里(自旋)
*/
// 拿到节点的前驱节点
final Node p = node.predecessor();
/*
* 这里判断当前节点是不是head.next节点,head.next节点在任何时候 都有权利去争夺锁
* 条件1:p == head 成立的话才有机会调用tryAcquire()尝试获取锁
* 条件2:tryAcquire(arg)
* 成立:说明head对应的线程 已经释放锁了,head.next节点对应的线程,正好获取到锁了
* 不成立:说明head对应的线程 还没释放锁呢,head.next仍然需要被park..
*/
if (p == head && tryAcquire(arg)) {
// 拿到锁之后,设置头结点为当前节点
setHead(node);
// 将上个线程对应的node的next引用置为null 协助老的head出队
p.next = null; // help GC
// 当前线程获取锁的过程中 没有发生异常
failed = false;
// 返回当前线程的中断标记
return interrupted;
}
/*
* shouldParkAfterFaildAcquire():当前线程获取锁失败后,是否需要挂起?
* true -> 需要挂起 | false -> 不需要挂起
* -------
* parkAndCheckInterrupt():将当前线程挂起,被唤醒后返回中断标志位(会清除中断标志位),返回当前线程是否是被中断唤醒的。
* (唤醒:1.正常唤醒 其他线程unpark 2.其他线程给当前挂起的线程一个中断信号)
* 当被唤醒时,继续进入自旋尝试获取锁。
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 如果当前线程是被中断唤醒的,就将interrupted置为ture
// 因为在parkAndCheckInterrupt()方法中 interrupted()方法会清除打断标记 也就是将interrupted置为false 所以要重新置为true
interrupted = true;
}
} finally {
if (failed)
// 取消竞争
cancelAcquire(node);
}
}
//----------------------------------------------------------------------------
// AQS.parkAndCheckInterrupt()
// park当前线程 将当前线程挂起,唤醒后返回当前线程 是否为 中断信号 唤醒
private final boolean parkAndCheckInterrupt() {
// 使用LockSupport挂起当前线程
LockSupport.park(this);
// 返回当前线程是否被中断,该方法会清除线程的中断标志
// 如果被打断过则返回true,并清除打断标记 置为false
return Thread.interrupted();
}
AQS.shouldParkAfterFailedAcquire()
总结
- 当前节点的前驱节点是
CANCELLED(1)
取消状态,第一次来到方法时,会将前面所有处于CANCELLED的节点全部删除
,最终找到第一个状态是 0 的节点,然后将其状态设置为**-1(SIGNAL)
**,然后继续自旋后返回 true。 - 当前节点的前驱节点状态是 0,当前线程会设置前驱节点的状态为 -1,然后再次自旋时会返回 true。
主要做的事就是删除当前节点前面连续的所有处于 CANCELLED
的节点,找到第一个状态为 0 的节点,将其状态设置为**-1(SIGNAL)
**,然后退出。
/*
* 总结:
* 1.当前节点的前置节点是CANCELLED(1)取消状态,第一次来到这个方法时 会越过 取消状态的节点,第二次 会返回true 然后park当前线程。
* 2.当前节点的前置节点状态是0,当前线程会设置前置节点的状态为-1(为了唤醒后继节点),第二次自旋来到这个方法时 会返回true 然后park当前线程。
* @param pred 当前线程node的前置节点
* @param node 当前线程对应的node
* @return true -> 当前线程需要挂起 | false -> 当前线程不需要挂起
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
/*
* 获取当前节点前驱节点的状态(waitStatus)
* waitStatus = 0 初始默认状态
* > 0 表示节点时CANCELLED(1)取消状态
* = -1 SIGNAL表示当前节点释放锁后会唤醒它的第一个后继节点
*/
int ws = pred.waitStatus;
// 表示当前节点的前驱节点状态就是SIGNAL(-1),是个可以唤醒当前节点的节点,所以返回true => parkAndCheckInterrupt():park当前线程了..
// 普通情况下,第一次来到shouldParkAfterFailedAcquire时,ws不会是-1
if (ws == Node.SIGNAL)
return true;
/*
* ws > 0 表示当前节点node的前驱节点的waitStatus > 0是CANCELLED(1)取消状态,
* 取消状态的节点无法唤醒后继节点,所以需要一直向前找到第一个waitStatus <= 0的节点
* 在这个过程中,会将waitStatus > 0的节点全部删除
*/
if (ws > 0) {
// 找爸爸(前置节点)的过程,条件是什么呢?前置节点的 waitStatus <= 0 的情况
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 找到的第一个ws <= 0的节点的next指针指向当前node(相当于将之间所有状态为CANCELLED(1)取消状态的节点全部删除(出队))
pred.next = node;
} else {
// 到这里 说明 ws = 0,
// 使用CAS将当前线程node的前置节点强制设置为SIGNALE,表示前置节点释放锁之后需要唤醒我
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
unlock() 过程
- 以 ReentrantLock 中的公平锁的 unlock() 为例。
大致流程图
AQS.release()
unlock 底层调用的是 release()
// ReentrantLock.unlock() 释放锁入口
public void unlock() {
sync.release(1); // AQS提供的release()
}
||
||
\/
// AQS.release()
public final boolean release(int arg) {
/*
* tryRelease()尝试释放锁
* true -> 表示当前线程已经完全释放锁
* false -> 表示当前线程尚未完全释放锁
*/
if (tryRelease(arg)) {
// 队列头结点
/*
* head什么情况下会被创建出来?
* 当持锁线程未释放线程时,且持锁期间 有其它线程想要获取锁时,其它线程发现获取不了锁,而且队列是空队列,
* 此时后续线程会为当前持锁中的线程 构建出来一个head节点,然后后续线程 会追加到 head 节点后面。
*/
Node h = head;
/*
* 条件1:h != null 成立 说明队列不为空 也就是说队列中的head节点已经初始化过了,ReentrantLock 在使用期间发生过多线程竞争了..
* 条件2:h.waitStatus != 0 (大概率是-1(Signal)) 条件成立,说明当前head后一定插入过node节点 可以进行唤醒节点的操作
*/
if (h != null && h.waitStatus != 0)
/*
* 唤醒后继节点(注意,unparkSuccessor里也有寻找后继的逻辑,即唤醒的不一定就是h.next节点)
*/
unparkSuccessor(h);
return true;
}
return false;
}
ReentrantLock.tryRelease()
// ReentrantLock.tryRelease()方法在内部类 Sync 中 继承于AQS
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
final boolean nonfairTryAcquire(int acquires) {...}
@ReservedStackAccess // 这个注解的作用:它会保护被注解的方法,通过添加一些额外的空间,防止在多线程运行的时候出现栈溢出
protected final boolean tryRelease(int releases) {
// c = 当前锁的状态 - 释放的锁的状态,减去释放的值 拿到最新值
int c = getState() - releases;
// 这里判断 当前调用释放锁的线程是否是获取锁的线程,不是的话直接抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// free -> 是否已经完全释放锁
boolean free = false;
// c == 0 表示满足完全释放锁的条件
if (c == 0) {
// free 置为true
free = true;
// 设置当前独占锁的线程为null
setExclusiveOwnerThread(null);
}
// 更新state
setState(c);
// 返回free的值(完全释放锁返回true 反之返回false)
return free;
}
}
AQS.unparkSuccessor()
// 唤醒当前节点的后继节点
private void unparkSuccessor(Node node) {
// 获取当前节点的waitStatus
int ws = node.waitStatus;
// 小于0 就是-1(Signal) 使用CAS的方式将状态变为0
// 改成0的原因:因为当前节点即将完成唤醒后继节点的任务了..
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取当前当前节点的后继节点
Node s = node.next;
/*
* 条件1:
* s 什么时候等于null?
* 1.当前节点就是tail节点时 s == null
* 2.当新节点入队未完成时(1.设置新节点的prev指向pred 2.cas设置新节点为tail 3.(未完成)pred.next -> 新节点)需要找到可以被唤醒的节点..
* 条件2:s.waitStatus > 0 前提:s != null
* 成立:说明 当前node节点的后继节点是 取消状态 需要找一个合适的可以被唤醒的节点
*
* 这里就是判断当前节点的后继节点的状态是否是0(初始状态)或者-1(Signal),
* 不是的话,就去队列中找到一个距离当前节点最近的可以被唤醒的节点赋值给s
*/
if (s == null || s.waitStatus > 0) {
s = null;
// 在队列中找到一个距离当前节点最近的并且可以被唤醒的节点 node可能找不到(也就是s可能为null)
for (Node t = tail; t != null && t != node; t = t.prev)
/*
* 状态合法,赋值给s,注意这里找到时并没有break,即最终找到的节点就是离node最近的节点(从后往前 找到最前面的node)
*/
if (t.waitStatus <= 0)
s = t;
}
// 找到了一个可以被唤醒的节点 没找到则什么也不做
if (s != null)
// 调用LockSupport的unpark将其唤醒
LockSupport.unpark(s.thread);
}
扩展:AQS 响应中断加锁逻辑
- 本篇文章的前面的 加锁、解锁 部分,都是在介绍 ReentrantLock 的
lock()
普通加锁方式,这种加锁方式是不可以响应中断的,下面我们分析可以被响应中断的加锁方式lockInterruptibly()
:
ReentrantLock.lockInterruptibly() 可以被响应中断的加锁方法
// ReentrantLock.lockInterruptibly():可以被响应中断的加锁方法
public void lockInterruptibly() throws InterruptedException {
// 可以被响应中断的方式去竞争资源
sync.acquireInterruptibly(1);
}
// AQS.acquireInterruptibly():竞争资源的方法(可以被响应中断)
public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 如果当前线程已经是有中断标记interrupted为true了,则直接抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取锁
if (!tryAcquire(arg))
// 获取锁失败,进入该方法逻辑
doAcquireInterruptibly(arg);
}
// 也是将当前节点封装为Node,并执行入队操作
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 这里直接抛出打断异常 直接响应
throw new InterruptedException();
}
} finally {
if (failed)
// 我们主要来分析下cancelAcquire这个方法: 取消指定node参与竞争
cancelAcquire(node);
}
}
AQS.cancelAcquire() 响应中断出队逻辑
/*
* 取消指定node参与竞争
*/
private void cancelAcquire(Node node) {
// 判空
if (node == null)
return;
// 取消node排队,直接将内部关联的线程置为null
node.thread = null;
// 获取当前取消排队node的前驱
Node pred = node.prev;
// 有可能它的前驱也处于取消状态,继续往前找 找到一个正常的Node
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
/*
* 拿到前驱节点的next节点 这里有两种情况:
* 1.就是当前node
* 2.可能也是ws > 0的节点
*/
Node predNext = pred.next;
// 设置当前节点的状态为 CANCELLED(1)取消状态
node.waitStatus = Node.CANCELLED;
/*
* 当前取消排队的node所在的队列的位置不同,执行的出队的逻辑是不一样的,一共分为三种情况:
* CASE1:当前node是队尾,tail -> node
* CASE2:当前node非head.next节点,也不是tail
* CASE3:当前node是head.next节点
*/
/*
* CASE1:
* 条件1:node == tail 成立:当前node是队尾,tail -> node,执行条件2
* 条件2:compareAndSetTail(node, pred) 使用CAS方式将tail指向node的前驱节点,成功的话,说明修改tail完成
*/
if (node == tail && compareAndSetTail(node, pred)) {
// CAS修改pred.next -> null. 完成node出队
compareAndSetNext(pred, predNext, null);
/*
* CASE2:当前node非head.next节点,也不是tail
* CASE3:当前node是head.next节点
*/
} else {
// 保存节点的状态(waitStatus)
int ws;
/*
* 这一堆判断 判断的就是 CASE2:当前node非head.next也不是tail节点的情况
* pred != head 成立,说明当前node不是head.next节点,也不是tail(CASE1)
* 条件2:((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)))
* 2.1 成立:表示当前ndoe节点的前驱状态是signal。不成立:前驱状态可能为0,极端情况下:前驱节点也取消排队了
* 2.2 成立:ws <= 0 则需要设置前驱节点状态为SIGNAL(-1)状态,表示要唤醒后继节点
* if里面做的事,就是让pred.next -> node.next,所以需要保证pred节点状态为SIGNAL
*/
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
/*
* 当前node不是head.next节点,也不是tail节点
* 出队:pred.next -> node.next节点后,当node.next节点被唤醒后,
* 调用shouldParkAfterFailedAcquire方法会让node.next越过取消状态的节点,完成真正的出队
*/
Node next = node.next;
if (next != null && next.waitStatus <= 0)
// CAS设置pred.next -> node.next
compareAndSetNext(pred, predNext, next);
// CASE3:当前node是head.next节点
} else {
// node是head.next节点,唤醒node的后继节点,然后调用
// 类似CASE2,后继节点唤醒后会调用shouldParkAfterFailedAcquire方法让node.next越过取消状态的节点,与head建立双重指向的关系
// 假设当前head.next的后继节点就是第三个节点 则:head.next -> 第三个node 中间就是被出队的head.next 第三个node.prev -> head
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
小总结
-
AQS 是 Java 中几乎所有锁和同步器的一个基础框架,这里说的是“几乎”,因为有极个别确实没有通过 AQS 来实现;
-
AQS 是结合 ReentrantLock 一起看的,因为里面的公平锁和非公平锁,是使用 AQS 的实现;
-
AQS 中维护了一个队列,这个队列使用双链表实现,用于保存等待锁排队的线程;
-
AQS 中维护了一个状态变量 state,在多线程并发场景下,通过控制状态变量 state 就可以实现加锁解锁操作了;
- 判断当前持有锁的线程是否已经释放了锁,使得没有拿到锁的线程进入队列等待。
-
然后当前线程执行完成之后,会 unpark 队列的下一个线程,使其进行工作,这样就协调了多线程场景下锁竞争的问题;
-
本文章是基于 ReentrantLock 的公平锁观察加锁解锁流程,而 ReentrantLock 的默认实现是非公平锁。
- 非公平锁的逻辑相对简单,调用 lock() 时直接 CAS 抢锁,抢锁失败再去执行其它逻辑,不需要判断阻塞队列中有没有其它线程在等待,上来就直接 CAS 抢占锁。
- 非公平锁的吞吐量相对与公平锁要好一些。
// ReentrantLock非公平锁 NonfairSync static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } // acquire方法 调用 tryAcquire方法 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } // ReentrantLock内部类 Sync abstract static class Sync extends AbstractQueuedSynchronizer { ... /** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 不需要关心阻塞队列中是否有线程在等待,直接抢锁 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } ... }
AQS 独占模式情景分析
参考
- 视频参考
- b站_小刘讲源码付费课
- 文章参考
- shstart7_AQS源码解析2.内部核心结构与lock过程
- shstart7_AQS源码解析3.release()方法
- 兴趣使然的草帽路飞_AQS源码探究_03 成员方法解析(加锁、资源竞争逻辑)
- 兴趣使然的草帽路飞_AQS源码探究_04 成员方法解析(释放锁、响应中断出队逻辑)