- AQS 基础概念
- 为什么 AQS 是 JUC 最重要的基石?
- AQS 能干什么
- AQS内部结构
- AQS内部类Node
- AQS 源码分析
- 以 lock方法为入口讲解
- nonfairTryAcquire 方法
- addWaiter方法
- 线程B
- 线程C
- acquireQueued 方法
- B节点
- C节点
- unlock
- cancelAcquire 方法
- 总结
AQS 基础概念
AQS 全称:AbstractQueuedSynchronizer
,字面意思:抽象队列同步器。
位于 java.util.concurrent.locks
包下:是一个抽象类
AQS 是什么?
AQS 是用来实现锁或者其它同步器组件的公共基础部分的抽象实现,是重量级基础框架及整个JUC体系的基石,
通过内置的FIFO队列来完成资源获取线程的排队工作,并通过一个int类变量表示持有锁的状态
队列的结构:
CLH:Craig、Landin and Hagersten 队列,是一个单向链表,AQS中的队列是CLH变体的虚拟双向队列FIFO
官方解释:
这么说有点抽象,举个例子:
比如银行办理业务,一个窗口只能有一个人办理业务,此时其他人就必须在大厅中等待,这个等候大厅就相当于 队列
, 人 就相当于 队列中的线程。
在通知下一个人办理业务时,我们都知道在银行中一般都有一个屏幕来显示 轮到xxx 号 办理业务
这个通知的屏幕就相当于 AQS 中的 state
,用来表示状态,比如 1表示有线程占用,0表示未占用。
为什么 AQS 是 JUC 最重要的基石?
和 AQS 有关的锁
在源码中的体现:
ReentrantLock:
CountDownLatch:
ReentrantReadWriteLock:
Semaphore:
从源码中也可以看出,几乎我们使用的锁都继承了这个AQS同步器,AQS 就像一个服务框架,定义通用的一些规则。
进一步理解锁和同步器的关系:
- 锁,面向锁的使用者
- 定义了程序员和锁交互的使用层API,隐藏了实现细节,你调用即可。
- 同步器,面向锁的实现者
- 比如Java并发大神DougLee,提出统一规范并简化了锁的实现,屏蔽了同步状态管理、阻塞线程排队和通知、唤醒机制等。
AQS 能干什么
我们知道加锁就会导致阻塞,有阻塞就需要排队,排队必然就会用到队列。
抢到资源的线程直接使用处理业务,抢不到资源的必然涉及一种排队等候机制。抢占资源失败的线程继续去等待(类似银行业务办理窗口都满了,暂时没有受理窗口的顾客只能去候客区排队等候),但等候线程仍然保留获取锁的可能且获取锁流程仍在继续(候客区的顾客也在等着叫号,轮到了再去受理窗口办理业务)。
既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?
如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS的抽象表现。它将请求共享资源的线程封装成队列的结点(Node),通过CAS、自旋以及LockSupport.park()的方式,维护state变量的状态,使并发达到同步的效果。
AQS内部结构
AQS使用一个volatile的int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作将每条要去抢占资源的线程封装成一个Node节点来实现锁的分配,通过CAS完成对State值的修改。
- AQS的同步状态State成员变量:类似于银行的业务窗口的状态,0 表示空闲状态,>= 1 表示有人占用。
- AQS的CLH队列 :是一个虚拟的双向队列,想象成 银行的等待大厅。
小总结:
有阻塞就需要排队,实现排队必然需要队列
AQS 就是 state变量+CLH双端队列
AQS内部类Node
内部结构
对应的属性说明:
Node的int变量 waitStatus:队列中其他线程的等待状态。一共分为四种:CANCELLED、SIGNAL、CONDITION、PROPAGATE
想象成银行等待大厅中等待的顾客的状态。
AQS 源码分析
AQS作为 JUC 的基石,几乎所有的类都继承了AQS,本次分析以 ReentrantLock 为例。
以 lock方法为入口讲解
ReentrantLock 的架构图:
Sync 为 ReentrantLock 中的内部类
首先从构造器方法入手,ReentrantLock 可以实现公平锁和非公平锁。
对于非公平锁和公平锁提供了俩个类: NonfairSync、FairSync,这俩个类都继承了 Sync,同时Sync又继承了AQS类。
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
而ReentrantLock 中的 lock 方法,实际上调用了 lock 接口中定义的 lock 方法。
而Lock 接口中的方法在 NonfairSync、FairSync 类中有了不同的实现方式:
在 FairSync 中 lock 方法的定义:
实际上调用了 AQS 中的 acquire 方法:
而 acquire 方法中,调用了 AQS 类中的 tryAcquire 方法,而 AQS 中对于 tryAcquire 方法并没有定义具体的实现,而是下放到子类 FairSync、NonfairSync 中。这里就是 模板方法设计模式。AQS 中的方法相当于一个钩子,供子类进行重写:
FairSync 实现了tryAcquire的具体方法逻辑:
经过这么多次的调用,实际上使用公平锁时,具体的实现方式在 FairSync 中的 tryAcquire方法中、
在 NonfairSync 中 lock 方法的定义:
NonfairSync 方法比 FairSync 多了一个 if 判断,compareAndSetState方法对 AQS 中的 同步状态 state 做判断。如果没有线程占用锁,也就是期望值为0,那么好,当前线程就占用,并且修改状态值。如果有线程占用,仍然执行 acquire 方法。
acquire 方法同样也调用了 tryAcquire 方法。
在 tryAcquire 中继续调用了 nonfairTryAcquire方法
使用 非公平锁时,实际上的实现逻辑,在 Sync 中的 nonfairTryAcquire 方法中:
总结
可以明显看出公平锁与非公平锁的lock()方法唯一的区别就在于公平锁在获取同步状态时多了一个限制条件:
hasQueuedPredecessors()
hasQueuedPredecessors
是公平锁加锁时判断是否需要排序以及等待队列中是否存在有效节点的方法
公平锁:公平锁讲究先来先到,线程在获取锁时,如果这个锁的等待队列中已经有线程在等待,那么当前线程就会进入等待队列中;
非公平锁:不管是否有等待队列,如果可以获取锁,则立刻占有锁对象。也就是说队列的第一个排队线程在unpark(),之后还是需要竞争锁(存在线程竞争的情况下)
源码重点分析
既然非公平锁和公平锁都会调用 acquire 方法, 那么重点就放在这个 acquire 方法中,acquire 方法中分成了三个流程走向:
nonfairTryAcquire 方法
以非公平锁的 nonfairTryAcquire 为例,公平锁的 tryAcquire 方法仅仅是多了一个hasQueuedPredecessors 方法判断。
当第一个线程A尝试占用锁时,其实在 lock 方法中的就已经占用成功了,修改了同步状态state的值,并设置占用锁的线程。也就是说第一个线程A并不会执行 acquire 方法,也就不会调用 nonfairTryAcquire
而在第二个 线程B想要占用锁时,由于state已经被第一个线程A所修改,因此第二个线程B会执行 acquire 方法,最终调用 nonfairTryAcquire
第二个线程B尝试获取锁失败,返回false ,取反为 true,下一步执行 addWaiter
则进行入队操作。
addWaiter方法
线程B
前面我们说过,每一个等待的线程都会被封装成一个Node节点,就是在 addWaiter中封装的。
enq 方法中进行入队操作:
第一次循环
private Node enq(final Node node) {
for (;;) {
// tail = null
Node t = tail;
// t==null 条件成立
if (t == null) { // Must initialize
// 设置头结点,此时的头结点并不是节点B,而是一个虚拟节点,不保存任何信息
if (compareAndSetHead(new Node()))
// 将尾结点指向头结点 参考图一
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
双向链表中,第一个节点为虚节点(也叫哨兵节点),其实并不存储任何信息,只是占位。
真正的第一个有数据的节点,是从第二个节点开始的。
第二次循环:
private Node enq(final Node node) {
for (;;) {
// 此时tail指向了虚拟节点,因此不为null
Node t = tail;
// t==null 条件不成立
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 将节点B的前驱节点,指向虚拟节点
node.prev = t;
// 设置尾节点指向 节点 b
if (compareAndSetTail(t, node)) {
// 尾节点的后驱节点指向节点B
t.next = node;
// 图示参考: 图二
return t;
}
}
}
}
此时线程B才算真正的入队成功
线程C
加入此时又来一个线程 C,对象锁仍然被线程A占用这。
acquire方法:
addWaiter方法:
由于节点B入队的时候,将队列已经初始化一次,不会再执行 enq 方法。图示参考图三
此时B节点 和 C 节点都已经入队成功,但是不能干等这啊,毕竟还要抢占对象锁,因此 在入队完之后就要执行 acquireQueued
方法
acquireQueued 方法
B节点
首先B节点在入队之后,执行acquireQueued
方法
final boolean acquireQueued(final Node node, int arg) {
// 失败的标志,比如线程B看线程A占用时间太长,不等了,直接走了。
boolean failed = true;
try {
// 阻塞的标志
boolean interrupted = false;
// 执行第一次循环:
for (;;) {
// 获取节点B的前驱节点- 头结点。predecessor方法看图四
final Node p = node.predecessor();
// p 就是 头结点。此时尝试获取对象锁。但是不好意思,线程A还在占用这,因此返回false, if 条件不成立
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 返回 false ,if 条件不成立。请看图五。紧接着第二次循环....
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 取消当前正在尝试的节点
cancelAcquire(node);
}
}
predecessor
方法就是获取当前节点的前驱节点
shouldParkAfterFailedAcquire
方法:检查当前线程是否应该中断。返回
waitStatus 表示当前节点在队列中的状态,在 AQS内部类Node
中讲过。
经过第一次循环,头节点中的 waitStatus 被修改为 -1 :
开始第二次循环:
final boolean acquireQueued(final Node node, int arg) {
// 失败的标志,比如线程B看线程A占用时间太长,不等了,直接走了。
boolean failed = true;
try {
// 阻塞的标志
boolean interrupted = false;
// 死循环,执行第二次循环:
for (;;) {
// 获取节点B的前驱节点- 头结点。predecessor方法看图四
final Node p = node.predecessor();
// p 就是 头结点。此时线程B尝试获取对象锁。但是不好意思,线程A还在占用这,因此返回false, if 条件不成立
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 当执行第二次循环时,shouldParkAfterFailedAcquire返回true,请看图六
if (shouldParkAfterFailedAcquire(p, node) &&
// parkAndCheckInterrupt会阻塞线程B,请看图七
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire
方法:
parkAndCheckInterrupt
方法: 此时B线程被阻塞在这个方法中
park/unpark 方法讲解在第五章
C节点
此时B节点阻塞在队列中,当线程C执行入完队列,执行 acquireQueued
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 第一次循环
for (;;) {
// 获取 C 节点的前驱节点 - B节点,p= NodeB
final Node p = node.predecessor();
// B 节点不是头结点,因此直接为false
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 返回 false,请看图八,仅接着执行第二次循环
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
第二次循环:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 第一次循环
for (;;) {
// 获取 C 节点的前驱节点 - B节点,p= NodeB
final Node p = node.predecessor();
// B 节点不是头结点,因此直接为false
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// shouldParkAfterFailedAcquire 返回 true,请看图九
if (shouldParkAfterFailedAcquire(p, node) &&
// // 此时线程C又被阻塞在 parkAndCheckInterrupt 方法中
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
节点B的 waitStatus被修改为 -1 :
总结:
AQS 底层阻塞线程使用的是LockSupport的 park 方法。
每次入队列的节点,都会使前一个节点的 waitStatus值设置为 -1.表示此线程准备好,等待释放对象锁。
unlock
当线程A执行完,是如何释放锁,线程B、C 都被阻塞在 parkAndCheckInterrupt 方法中,是如何被唤醒并且抢到锁的呢? 一步步看
线程A调用 unlock 方法,仍然是 Lock 接口中的方法,在 ReentrantLock 中实现。
unlock 中调用了 AQS 中的release 方法。
release
方法:
public final boolean release(int arg) {
// 尝试释放正在占用的对象锁,并返回true。进入 if 语句请看图十
if (tryRelease(arg)) {
Node h = head;
// 头结点不为空,此时正在指向虚拟节点。并且头结点的waitStatus=-1
// 进入 if 语句,执行 unparkSuccessor。请看图 11
if (h != null && h.waitStatus != 0)
//
unparkSuccessor(h);
return true;
}
return false;
}
这里和上面的 tryAcquire 一样,AQS 中的 tryRelease 作为钩子方法,在 ReentrantLock 重写
tryRelease
方法: 释放锁
此时修改完同步状态:
在执行 unpark方法之前,线程B 被阻塞在 parkAndCheckInterrupt
方法中,请看 图12 。
此时在 unparkSuccessor 方法中唤醒线程B,线程会顺着在被阻塞的地方接着执行,也就是在哪跌倒在哪爬起来。执行 parkAndCheckInterrupt 方法中的 return 返回 fasle。
线程 B 在被唤醒后,接着执行第三次循环,抢占对象锁。
第三次循环:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取 B节点的前驱节点,也就是头结点
final Node p = node.predecessor();
// p==head 条件成立
// 此时会再次执行 tryAcquire 抢占对象锁。具体抢占过程请看图13
// 为什么会再次抢占锁呢?不是已经该线程B执行了吗?
// 队列中的线程会按顺序抢占对象锁没有错,但是我们使用的是非公平锁,非公平锁在进入队列之前就会尝试获取对象锁
if (p == head && tryAcquire(arg)) {
// 线程B抢占成功后,设置头结点为 B节点,具体的设置过程请看图 14
setHead(node);
// 将头结点的后驱节点设置为null
p.next = null; // help GC
// 设置失败标志位 false
failed = false;
// 此时循环结束,线程B成功上位,抢到锁。
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
经过多次调用 ,具体在 nonfairTryAcquire
方法中尝试抢占。
线程B抢占对象锁之后,返回 true
设置头结点的过程: 将前驱节点、线程 设置为null。
此时等待队列的状态:
线程C如何被唤醒、抢占锁和B一样,我就不重复了。
cancelAcquire 方法
到此为止,如何抢占锁、阻塞 以及唤醒线程才算一个完整的流程,但是我们分析的都是在理想的状态下,也就是在抢占锁时没有出现意外情况,但事实真是如此吗?
答案肯定是不,在高并发下,任何情况都有可能发生。因此下面看看在出现异常后,执行取消尝试的流程。
比如:在等待过程中,节点B由于等待时间太长,不想等了,那么节点B就需要取消等待-获取锁的资格,并重写设置pre,next
取消尝试的流程 也就是 cancelAcquire 方法
下面分析不同的情况 cancelAcquire 的执行流程:
队列的初始情况,至于队列如何形成的,就不在演示了,和上面步骤一样。
第一种情况: 节点5 不想等了,那么它就不会执行 for 循环,而是执行 cancelAcquire
private void cancelAcquire(Node node) {
// 节点5 不为 null,条件失败
if (node == null)
return;
// 将节点5的线程置null
node.thread = null;
// 节点5 的前驱节点为节点4
Node pred = node.prev;
// 节点4 的 waitStatus = -1 ,条件不成立
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 节点4 的后驱节点为节点5
Node predNext = pred.next;
// 通过 waitStatus状态图看出 CANCELLED 的值为 1,表示取消的节点
// 此时节点5的 waitStatus = 1
node.waitStatus = Node.CANCELLED;
// 节点5是尾结点,因此 node == tail 条件成立
// CAS 操作,将尾结点指向节点4
if (node == tail && compareAndSetTail(node, pred)) {
// 将节点4的后驱节点置为 null
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
执行完cancelAcquire
,此时队列的情况:
第二种情况:如果中间节点不想等了【以节点4为例】
private void cancelAcquire(Node node) {
// 节点4不为null,条件不成立
if (node == null)
return;
// 将节点4 的线程置位null
node.thread = null;
// pred = 节点3
Node pred = node.prev;
// 节点3的waitStatus =-1,条件不成立
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext = 节点4
Node predNext = pred.next;
// 将节点4的waitStatus设置为 1
node.waitStatus = Node.CANCELLED;
// 节点4不是尾结点,条件不成立,执行 else 里面的语句
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 节点3不是头结点,条件成立
if (pred != head &&
// 节点3的 waitStatus = -1 条件成立,ws = -1,因此也就不会执行 || 后面的
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
// 节点3的线程不为null,条件成立
pred.thread != null) {
// next = 节点5
Node next = node.next;
// 节点5不为null,条件成立
// 节点5的waitStatus = 0,条件成立
if (next != null && next.waitStatus <= 0)
// 将节点3的后驱节点指向节点5
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
// 节点4 的后驱节点指向自己
node.next = node; // help GC
}
}
对应的队列图:
第三种情况:如果节点3、节点4,都不想等了
private void cancelAcquire(Node node) {
// 节点4不为null,条件不成立
if (node == null)
return;
// 将节点4 的线程置位null
node.thread = null;
// pred = 节点3
Node pred = node.prev;
// 此时节点3也不想等了,它的waitStatus的值=1 ,条件成立
while (pred.waitStatus > 0)
// 节点3 的前驱节点节点2 赋给 节点4的 前驱节点
// 也就是说,节点4的prev跳过节点3直接指向节点2
// 此时 pred = 节点2
node.prev = pred = pred.prev;
// predNext = 节点3
Node predNext = pred.next;
// 将节点4的waitStatus设置为 1
node.waitStatus = Node.CANCELLED;
// 节点4不是尾结点,条件不成立,执行 else 里面的语句
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 节点2不是头结点,条件成立
if (pred != head &&
// 节点2的 waitStatus = -1 条件成立,不会执行 || 后边的判断
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
// 节点3的线程不为null,条件成立
pred.thread != null) {
// next = 节点5
Node next = node.next;
// 节点5不为null,条件成立
// 节点5的waitStatus = 0,条件成立
if (next != null && next.waitStatus <= 0)
// 将节点2的后驱节点指向节点5
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
// 节点4 的后驱节点指向自己
node.next = node; // help GC
}
}
对应的队列图:
总结
此时AQS核心源码已经完毕,虽然一步一步都整理了下来,但是还是有点懵的,如果有错误的地方还请各位靓仔指出,共同进步…感谢各位观看…