1.AQS介绍
相信每个Java Coder 都使用过或者至少听说过AQS, 它是抽象队列同步器AbstractQueuedSynchronizer 的简称,在juc包下。它提供了一套可用于实现锁同步机制的标准框架,其维护了一个volatile
修饰的共享变量state
和 一个FIFO(先进先出)
线程等待队列,多线程争用资源被阻塞的时候就会进入这个队列。state是共享变量,其访问方式有如下三种:getState()
, setState()
, compareAndSetState()
,通过尝试获取共享变量 state 的结果来对线程的状态作出处理。
基于JDK8分析
我们在简单看下AbstractQueuedSynchronizer类结构:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 7373984972572414691L;
/**
* Creates a new {@code AbstractQueuedSynchronizer} instance
* with initial synchronization state of zero.
*/
protected AbstractQueuedSynchronizer() { }
/**
* Wait queue node class.
*
*
* <p>To enqueue into a CLH lock, you atomically splice it in as new
* tail. To dequeue, you just set the head field.
* <pre>
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
* </pre>
*
*/
static final class Node {
/**
* 共享节点
*/
static final Node SHARED = new Node();
/**
* 独占节点
*/
static final Node EXCLUSIVE = null;
/**
* 取消排队
*/
static final int CANCELLED = 1;
/**
* 唤醒后继节点
*/
static final int SIGNAL = -1;
/**
* 这个和Condition相关
*/
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/**
* CLH队列节点Node的等待状态
* 默认值是0
* 取值就是上面的 1,-1, -2, -3
*/
volatile int waitStatus;
/**
* CLH队列节点Node的上一个节点
*/
volatile Node prev;
/**
* CLH队列节点Node的下一个节点
*/
volatile Node next;
/**
* CLH队列节点Node维护的Thread
*/
volatile Thread thread;
//........
}
/**
* AQS内部维护的CLH队列中的头节点
*/
private transient volatile Node head;
/**
* AQS内部维护的CLH队列中的尾节点
*/
private transient volatile Node tail;
/**
* 同步状态
*/
private volatile int state;
//......省略其他code........
}
复制代码
我们用一张图来概括下:
2.AQS原理
AQS是将暂时无法请求共享资源的线程封装成一个CLH队列(虚拟的双向队列)的一个结点来实现锁的分配。根据volatile
修饰的state
共享变量,线程通过CAS (Compare and swap)
去改变状态。如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并将共享资源设置为锁定状态,如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现,即将暂时获取不到锁的线程加入到队列中 等待被唤醒。
3.AQS的实现
AQS(AbstractQueuedSynchronizer
)它是整个同步机制的基类,它是基于模板方法模式进行设计的,如果需要实现同步器,一般可以这样:
- 使用者继承
AbstractQueuedSynchronizer
并重写指定的方法 - 将AQS组合在同步组件的实现中,并调用其模板方法(这些模板方法会调用使用者重写的方法)
同步器在实现的时候只需要实现共享资源state的获取和释放方式即可,具体线程等待队列的维护, AQS已经实现。同步器实现的时候主要关注下面几个方法:
方法 | 说明 |
---|---|
tryAcquire(int) | 独占方式。尝试获取资源,成功则返回true,失败则返回false |
tryRelease(int) | 独占方式。尝试释放资源,成功则返回true, 失败则返回false |
tryAcquireShared(int) | 共享方式。尝试获取资源。负数表示失败;大于等于0表示成功,其中0表示没有剩余可用资源 |
tryReleaseShared(int) | tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false |
一般来说实现同步器要么是独占,要么是共享方式,只需实现tryAcquire——tryRelease
或者 tryAcquireShared——tryReleaseShared
中的一种即可。
虽然AOS也支持同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock, 但是使用情况较少
其中在acquire()
, acquireShared()
两种方式下,线程在等待队列中忽略中断,acquirelnterruptibly()
, acquireSharedlnterruptibly()
支持响应中断,一旦中断将抛出中断异常。
3.1 独占or共享?
实现了AQS的常见锁有:ReentrantLock
、 Semaphore
、 CountDownLatch
、 CyclicBarrier
、ReentrantReadWritelock
都是AQS的衍生物。
资源共享方式 | 典型实现类 |
---|---|
独占 | 只有一个线程能操作共享资源,如 ReentrantLock |
共享 | 多个线程可以同时操作共享资源,如 Semaphore,CountDownLatch,CyclicBarrier |
独占和共享 | ReentrantReadWritelock,读锁是共享的,写锁是独占的 |
好了,关于AQS到这里已经有了基本的认识,接下来我们就从ReentrantLock
着手,一步一步分析AQS的源码。
4.AQS源码分析
4.1 认识 ReentrantLock
想必 ReentrantLock
这个类或多或少都使用过,这里我们就先简单看下ReentrantLock
类的类结构
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
/**
* 同步器
*/
private final Sync sync;
/**
* 同步器父类:继承了AQS
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* 加锁,留给子类实现
*/
abstract void lock();
/**
* 非公平尝试获取锁
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
//Integer.MAX_VALUE + 1 < 0
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
/**
* 尝试释放锁
*/
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
//......省略其他......
}
/**
* 非公平同步器,继承了Sync
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* 加锁
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
/**
* 公平同步器,继承了Sync
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
/**
* 加锁
*/
final void lock() {
acquire(1);
}
/**
* 尝试获取锁
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
/**
* 默认是非公平锁
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* 指定公平或者非公平
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
/**
* ReentrantLock 对外提供的加锁方法
*/
public void lock() {
sync.lock();
}
/**
* 支持中断的加锁,一旦线程中断将抛出异常
*/
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
/**
* ReentrantLock 对外提供的尝试获取锁,获取到返回true,否则返回false
*/
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
/**
* 可以指定时间,在指定时间内获取锁返回true,否则返回false
*/
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
/**
* ReentrantLock 对外提供的释放锁
*/
public void unlock() {
sync.release(1);
}
//.......省略其他......
}
复制代码
从类结构中,可以清晰的看到,它内部是基于AQS来实现的。
我们简单看下类继承关系图:
4.1.1 怎么理解公平和非公平?
我们看下公平的方式获取锁的代码:
其实公平和非公平非常好理解,假设张三
在银行窗口办理业务,李四
和王五
在候客区等待,在张三
办理完业务的一瞬间,刘华强
正好也推门来到银行里办理业务:
- 如果是公平锁,刘华强需要乖乖排队到李四和王五的后面,等他们都办理完了,刘华强再去办理业务
- 如果是非公平锁,刘华强进来后不需要去排队,直接去窗口办理业务,就是这么不礼貌。
如果刘华强顺利的做到了柜台窗口,那么就表示他获得了锁,可以继续办理业务了,如果在刘华强刚要坐下办理时,来了一个更横的白宝山抢先一步做到了柜台椅子上,此时白宝山办起来业务,刘华强则需要去排队了。
知道了公平
和非公平
,那么我们接下来就以非公平
展开讨论,ReentrantLock默认的就是非公平。
4.2 通过 ReentrantLock 走进AQS
模拟3个人去银行办理业务
public class AqsDemo {
static Lock lock = new ReentrantLock();
public static void main(String[] args) {
Thread t1 = new Thread(AqsDemo::bankProcess, "张三");
t1.start();
Thread t2 = new Thread(AqsDemo::bankProcess, "李四");
Thread t3 = new Thread(AqsDemo::bankProcess, "王五");
t2.start();
t3.start();
}
//模拟银行窗口处理业务
private static void bankProcess() {
lock.lock();
try {
try {
System.out.println("银行柜台开始处理 [" + Thread.currentThread().getName() + "] 的业务");
//模拟办理业务非常久
Thread.sleep(300000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
}
}
复制代码
4.2.1 加锁
“张三”先办理业务,调用lock.lock()
方法获得锁,继续往里走:
final void lock() {
/**
* CAS修改同步状态state的值
* 如果成功将state的值从0修改为1,则表示获取锁成功
*/
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//否则将尝试加入等待队列(这中间也可能会继续抢锁,后面再说)
acquire(1);
}
复制代码
由于“张三”是第一个来办理业务的,所以它可以成功的将state
的值从0修改为1,表示张三获得锁,开始办理业务。
此时李四也过来办理业务,此时发现state = 1,说明窗口有人在办理,此时李四就尝试去排队了
final void lock() {
//张三正在办理业务,state = 1
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//李四将来到这里
acquire(1);
}
复制代码
继续跟进去看下:
//arg = 1
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
复制代码
它内部主要包含了3个方法,我们接下来分别看下这3个方法都做了什么?
4.2.1.1 tryAcquire(arg)
方法剖析
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
复制代码
ReentrantLock.NonfairSync: 非公平锁的具体实现
- 获取同步状态位state(共享资源)的值
- 如果state = 0, 说明共享资源是空闲的,此时通过CAS尝试将其从0修改为1,并标记当前持有锁的线程
- 如果state != 0, 看下当前线程是否为已经持有锁的线程,如果是,则将state+1,表示重入。
整个过程也比较好理解,首先判断state是否为0,比如李四在去候客区之前又过来看下窗口是否是空闲的,如果是空闲的,他就不用去候客区了,直接办理业务就好了(此时就表示李四获得了锁)
如果没有获得锁,没有获得锁具体的体现就是无法将state从0修改为1,所以李四在这里将返回false.
4.2.1.2addWaiter(Node.EXCLUSIVE)
方法剖析
//mode: Node.EXCLUSIVE
private Node addWaiter(Node mode) {
//1.创建一个Node节点,里面封装了当前线程
Node node = new Node(Thread.currentThread(), mode);
/**
* 判断尾节点会否为空,首次进来肯定是空的
* 所以李四过来的时候这里是空,将先调用enq方法
*
* 第一个排队的节点不会来到这里,它要先调用enq方法去初始化头节点
* 等王五过来的时候我们在分析
*/
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
复制代码
继续看下enq(node)
方法:
//node 是封装了当前线程的节点
private Node enq(final Node node) {
//死循环
for (;;) {
Node t = tail;
/**
* 判断尾节点是否为空,如果为空,必须初始化
* 所以第一次进来(李四过来)的时候讲初始化,初始化的逻辑是
* 设置一个头节点,这个节点没有任何数据,通常称为“虚拟节点”/“傀儡节点”
*/
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
//将当前节点设置到尾节点
if (compareAndSetTail(t, node)) {
//将当前节点和虚拟节点建立联系
t.next = node;
return t;
}
}
}
}
复制代码
这个方法也比较好理解,第一次循环过来将设置虚拟头节点,然后继续循环,这次t == null就不成立了,于是将来到else 的逻辑,在这里将建立当前节点和虚拟节点的联系
用一张图来看下:
到这里,排队节点就建立好双向连接的关系,接下来我们看最后一个方法
4.2.1.3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
方法剖析
//node 是封装了李四线程的节点
//arg = 1
final boolean acquireQueued(final Node node, int arg) {
//入队失败的标识
boolean failed = true;
try {
//中断的标识
boolean interrupted = false;
//死循环
for (;;) {
//获取当前节点的前驱节点
final Node p = node.predecessor();
/**
* 如果前一个节点是head, 那么当前节点再真正去候客区前再次尝试获取锁
* tryAcquire(arg) 方法就是我们前面分析的第一个方法,就是尝试获取锁
*
*
* 根据我们前面画的那张图可以知道,李四的前驱节点就是head, 所以它会
* 再次尝试获取锁,
* 如果张三的业务办理时间特别短,李四获取锁成功
* -- 则将李四节点置为新的“虚拟节点”
*
* 如果张三的业务办理时间特别长,李四获取锁失败,则将调用下面的if方法
*/
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
/**
* shouldParkAfterFailedAcquire()方法单独分析
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
我们先看第一个if的逻辑:
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
复制代码
它的逻辑就是:获取当前节点的前驱节点,如果前驱节点是head,那说明当前节点是排在head之后的第一个节点,那么此时当前就去再次尝试获取锁,如果获取锁成功,则将当前节点置为"傀儡节点"。
如果当前节点的前驱节点不是head节点或者尝试获取锁失败,则将第二个if的逻辑:
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
interrupted = true;
}
复制代码
我们先看第一个方法:shouldParkAfterFailedAcquire(p, node)
/**
* pred: 是当前节点的前驱节点
* node: 当前节点
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
/**
* 前驱节点的状态
*
* waitStatus 的值默认是0,有 1, -1, -2, -3 几种取值
*/
int ws = pred.waitStatus;
/**
* 如果前驱节点的状态是-1,则直接返回true,表示pred节点具有唤醒下一个节点的责任
*
*/
if (ws == Node.SIGNAL)
return true;
//如果ws > 0 ,表示前驱节点已经取消了,还是以银行为例,可能某个在候客区排队的人
//突然临时有事,就不办理了,此时他就从候客区出去了
if (ws > 0) {
/*
* 如果(当前节点的)前驱节点是取消状态,那么我就继续看前驱节点的前驱节点是不是也是取消的
* 直到找到有效节点
*
* 举个例子:
* A <==> B <==> C <==> D <==> E <==> F
* 假如当前节点node是E, 前驱节点是D, 如果D的状态是1(CANCELLED),那么就看
* C的状态,如果C也是1,那么就看B,如果B的状态不是1,则结构就变成了
*
* A <==> B <==> E <==> F
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 状态是小于等于0 并且不是-1
* 使用CAS将前一个节点的状态修改为 -1
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
复制代码
总的来说,这个方法就是将当前节点的前驱节点的
waitStatus
状态修改为Node.SIGNAL
,而这个状态表示前驱结点具有唤醒下一个节点的能力。
由于现在CLH队列中只有两个节点:虚拟头节点和李四节点,那么这个方法执行完就变成了这样:
接下来我们再看第二个方法:parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
//1.挂起当前线程
LockSupport.park(this);
/**
* 2.返回当前线程是否中断了
*
* 注意:只有当前线程被unpark()或者当前线程被中断了,才会走到这里,否则将一直阻塞在1处
*/
return Thread.interrupted();
}
复制代码
这个方法比较简单,首先通过LockSupport的park()方法挂起当前线程,此时到这里,李四线程就是真正的在候客区排队等待了。之后只有被唤醒了才有资格去柜台办理业务。
我们在回过来后,整体看下这个方法:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
- 为什么要搭配死循环?
- acquireQueued()方法的出口在哪里?
我们知道,同步队列是一个先进先出的队列,head <==> A < ==> B <==> C <==> D <==> tail
, 那么A肯定是第一个要被唤醒的线程(唤醒后面我们再看)。我们还是以李四为例,李四的前驱结点是head
,但是张三的业务办理时间特别长,所以李四调用tryAcquire(arg)
方法依然没有获得锁,那么李四最后将会调用LockSupport.park()方法将自己挂起,阻塞到这里,无法向下执行,所以也不会退出for循环,更不会退出acquireQueued()
方法。假设李四被LockSupport.unpark()唤醒了,唤醒之后,将继续执行for循环,再次判断 if (p == head && tryAcquire(arg))
是否成立,其实主要就是判断tryAcquire(arg)
方法是否可以获得锁,如果获得锁,则返回true,否则将返回false。
- 如果李四获得锁(
(p == head && tryAcquire(arg))=true
),那么李四就成为新的head节点(傀儡节点),李四将退出for循环,也就意味着退出acquireQueued()
方法,这就表明李四获得了锁,意味着加锁逻辑lock.lock()
可以继续往下走了。 - 如果李四没有获得锁(
(p == head && tryAcquire(arg))=false
),什么情况下李四被唤醒之后没有获得锁呢?这就是前面我们说过的非公平锁,李四被唤醒后去抢锁,可能在抢的一瞬间,有田七过来率先获得了锁,所以李四就没有获得锁,这样李四就需要继续挂起,等待被唤醒。(这就是需要放到死循环中的原因)
我们继续看王五抢锁(张三办理业务时间特别长,一直没有释放锁,李四已经在候客区排队等待了)
整个过程和李四基本上是一样的,我们只看下不同的地方:addWaiter(Node mode)
//mode = Node.EXCLUSIVE
private Node addWaiter(Node mode) {
//将当前线程(王五)封装到Node节点中
Node node = new Node(Thread.currentThread(), mode);
/**
* 如果pred 不是null, 说明队列已经初始化过了,所以就不需要进入enq()方法去初始化队列
*
* 这里只需要使用CAS将当前节点设置到尾部即可,然后返回当前节点
*/
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
复制代码
继续往下看:acquireQueued(final Node node, int arg)
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
/**
* 现在王五的前驱结点是李四,李四不是head结点,所以不会进来这里
*
*/
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
/**
* shouldParkAfterFailedAcquire(p, node): 修改前驱结点李四的waitStatus 为 -1,表示具有唤醒后继节点的能力
*
* parkAndCheckInterrupt(): 当前线程挂起
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
//当前线程因为某种原因失败了,则取消排队,基本上不会进来这里
cancelAcquire(node);
}
}
复制代码
到这里,入队的逻辑就基本上都完成了,接下来我们看下唤醒。
4.2.2 解锁(唤醒)
张三业务办理结束,需要释放锁,然后去唤醒等待队列中的线程。
private static void bankProcess() {
lock.lock();
try {
try {
System.out.println("银行柜台开始处理 [" + Thread.currentThread().getName() + "] 的业务");
Thread.sleep(300000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
//业务办理完毕,释放锁 ==================
lock.unlock();
}
}
复制代码
我们继续看下内部逻辑,主要做两件事:
- 当前线程释放锁:
将同步状态位state从1置为0
- 唤醒等待队列中的线程去抢锁:
用LockSupport.unpark()唤醒LockSupport.park()阻塞的线程
public final boolean release(int arg) {
//1.tryRelease(arg) 释放锁,释放成功,去唤醒等待中的线程
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//2.唤醒等待中的线程
unparkSuccessor(h);
return true;
}
return false;
}
复制代码
首先看下释放锁的逻辑:就是将同步状态位state从1修改为0.
//releases = 1
protected final boolean tryRelease(int releases) {
//获取同步状态位的值,然后减一
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//如果为0,说明成功释放锁
if (c == 0) {
free = true;
//将独占线程置为null
setExclusiveOwnerThread(null);
}
//将同步状态位的值从1修改为0
setState(c);
return free;
}
复制代码
然后再看下唤醒等待队列中的线程的逻辑
/**
* node 是等待队列中的头结点
* 同步队列是一个FIFO的队列,所以肯定要先从头部唤醒
*
* unparkSuccessor 方法名直译过来就是唤醒后继结点,head是傀儡节点,它的next才是有效节点
*/
private void unparkSuccessor(Node node) {
/*
* 经过前面的分析,我们知道node(head)的waitStatus是-1,所以这里if条件成立
* 然后通过CAS将其修改为0,不明白这里为什么要置为0?
*/
int ws = node.waitStatus;
if (ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
}
/*
* 获得head节点的下一个节点,这个节点就是李四
*/
Node s = node.next;
/**
* 如果s是null或者waitStatus > 0, 说明下一个节点可能不存在了或者取消排队了
* 此时就从队列的尾部一直往前找,直到找到最接近 head的一个有效节点
*
*/
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null) {
//找到之后,就唤醒等待队列中的线程
LockSupport.unpark(s.thread);
}
}
复制代码
一旦调用了LockSupport.unpark(s.thread)
方法,那么同步队列中调用LockSupport.park()
方法阻塞的线程将会被唤醒,从阻塞之处开始往下执行。
我们看下阻塞的代码:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
/**
* parkAndCheckInterrupt() 调用`LockSupport.park()`方法阻塞线程
*/
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
interrupted = true;
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
一旦执行LockSupport.park()
方法,现成将阻塞到这里,不会往下执行,一直卡在if条件判断里,一旦有人释放锁,然后调用 LockSupport.unpark(s.thread)
方法唤醒线程,则这里就可以继续往下执行,由于这里是一个for(;;)
的死循环,所以它继续循环执行,然后首先执行第一个if判断,看它是否是接挨着head节点(head节点是虚拟节点,不保存线程)的节点(先进先出的体现),然后尝试获取锁(将同步状态位state从0修改为1)一旦获取锁,则存储当前线程的Node节点将成为新的“虚拟头节点
”,然后退出acquireQueued(final Node node, int arg)
方法,一直向上退出,最终来到我们自己编写的加锁代码lock.lock()
处,然后就可以继续往下执行业务逻辑了。
我们把其中的代码用图标注下:
到这里,整个AQS的核心流程就结束了,其实只要一步一步仔细的梳理代码,发现也并不难理解O(∩_∩)O哈哈~
5.总结
我们简单梳理下AQS的加锁,排队,唤醒的流程:
- 通过CAS将同步状态位state从0修改为1,如果修改成功,表示获得锁;如果修改失败,则表示锁被其他线程占有,此时需要排队
- 排队的逻辑就是将当前线程封装到Node节点中,然后尾插法插入到CLH同步队列的尾部,然后调用LockSupport.park()阻塞当前线程
- 获得锁的线程释放锁后,将会调用LockSupport.unpark()方法从头部开始唤醒阻塞的线程(先进先出的体现)
好了,关于AQS的介绍就到这里吧,后面有问题再补充。