AQS实现原理
前期准备
AQS(全称AbstractQueuedSynchronizer)即队列同步器。它是构建锁或者其他同步组件的基础框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等)。
整体概览类图
AbstractQueuedSynchronizer类图
可以看到AbstractQueuedSynchronizer是一个队列,队列里的数据是Node类型。
Node类图
- thread: Node中的thread变量用来存放进入AQS队列里面的线程
- SHARED:用来标记该线程是获取共享资源时被阻塞挂起后放入AQS队列的
- EXCLUSIVE:用来标记线程是获取独占资源时被挂起后放入AQS队列的
- waitStatus 记录当前线程等待状态,可以为①CANCELLED (线程被取消了)、②SIGNAL(线程需要被唤醒)、③CONDITION(线程在CONDITION条件队列里面等待)、④PROPAGATE(释放共享资源时需要通知其他节点);
深入原理
AQS是一个同步队列,内部使用一个FIFO的双向链表,管理线程同步时的所有被阻塞线程。双向链表这种数据结构,它的每个数据节点中都有两个指针,分别指向直接后继节点和直接前驱节点。所以,从双向链表中的任意一个节点开始,都可以很方便地访问它的前驱节点和后继节点。
以独享资源为例
1 如果当前资源是第一次抢占,会初始化队列,比如我们来了一个线程ThreadA
这个时候头节点和尾节点都指向这个节点。并且由于是第一个节点,所以这个线程开始执行,所以Thread指向了空,其实也可以继续指向ThreadA,但是其实我们用不到了,因为线程不管是异常还是顺利执行完都会释放资源,就算继续存放ThreadA也没有用武之地了。
2 这个时候如果再来一个线程ThreadB,就会放到队列中去进行排队。后面如果继续来线程抢占资源是一样的进行排队。
3 如果我们需要唤醒下一个线程,也就是TreadA执行完毕了,唤醒ThreadB,此时队列的状态如图所示。
源码阅读
加锁过程
1 获取锁的时候调用了 sync.lock()
我们进入这个方法,发现是一个抽象方法,调用的是子类的方法,分别是公平和非公平锁的实现,我们先看公平锁。
2 调用FairSync的acquire()方法
接着调用了tryAcquire方法尝试获取锁,如果获取不成功则会进入到队列中去。
3 tryAcquire里面做了哪些事情呢?
// 尝试获取锁
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 如果返回 0 表示没有锁定 大于0说明被其它线程占用了
int c = getState();
if (c == 0) {
// 如果没有线程等待时间更长 获取锁并设置值为1
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
//设置当前线程为独占资源持有者
setExclusiveOwnerThread(current);
return true;
}
}
// 若当前线程已经是独有锁的持有者 设置重入次数 state + 1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
如果获取成功就可以继续执行,否则就会进入队列。
4 addWaiter(Node mode) 放到等待队列中去
线程抢占锁失败后,执行addWaiter(Node.EXCLUSIVE)将线程封装成Node节点追加到AQS队列。
addWaiter(Node mode)的mode表示节点的类型,Node.EXCLUSIVE表示是独占排他锁,也就是说重入锁是独占锁,用到了AQS的独占模式。
Node定义了两种节点类型:
- 共享模式:Node.SHARED。共享锁,可以被多个线程同时持有,如读写锁的读锁。
- 独占模式:Node.EXCLUSIVE。独占很好理解,是自己独占资源,独占排他锁同时只能由一个线程持有。
private Node addWaiter(Node mode) {
// 创建节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 这是如果队尾为空则直接入队
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 追加到尾部
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 也是自旋操作
enq(node);
return node;
}
// 自旋入队操作
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 初始化队列
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
5 acquireQueued(newNode,1)
这个方法的主要作用就是将线程阻塞。
- 若同步队列中,若当前节点为队列第一个线程,则有资格竞争锁,再次尝试获得锁。
- 尝试获得锁成功,移除链表head节点,并将当前线程节点设置为head节点。
- 尝试获得锁失败,判断是否需要阻塞当前线程。
- 若发生异常,取消当前线程获得锁的资格。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋
for (;;) {
// 获取前一个节点
final Node p = node.predecessor();
//若此节点的前个节点为头节点,说明当前线程可以获取锁,阻塞前尝试获取锁,若获取锁成功,将当前线程从同步队列中删除。
if (p == head && tryAcquire(arg)) {
// 这个时候如果获取到了锁
// 将当前线程从同步队列中删除 并将pre, next 指向设置为空
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 没有成功获取锁 则判断是否能够阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
6 shouldParkAfterFailedAcquire
这个方法的主要作用是:线程竞争锁失败以后,通过Node的前驱节点的waitStatus状态来判断, 线程是否需要被阻塞。
- 如果前驱节点状态为 SIGNAL,当前线程可以被放心的阻塞,返回true。
- 若前驱节点状态为CANCELLED,向前扫描链表把 CANCELLED 状态的节点从同步队列中移除,返回false。
- 若前驱节点状态为默认状态或PROPAGATE,修改前驱节点的状态为 SIGNAL,返回 false。
- 若返回false,会退回到acquireQueued方法,重新执行自旋操作。自旋会重复执行
acquireQueued和shouldParkAfterFailedAcquire,会有两个结果:
(1)线程尝试获得锁成功或者线程异常,退出acquireQueued,直接返回。
(2)执行shouldParkAfterFailedAcquire成功,当前线程可以被阻塞。
Node 有 5 种状态,分别是:
- 0:默认状态。
- 1:CANCELLED,取消/结束状态。表明线程已取消争抢锁。线程等待超时或者被中断,节点的waitStatus为CANCELLED,线程取消获取锁请求。需要从同步队列中删除该节点
- -1:SIGNAL,通知。状态为SIGNAL节点中的线程释放锁时,就会通知后续节点的线程。
- -2:CONDITION,条件等待。表明节点当前线程在condition队列中。
- -3:PROPAGATE,传播。在一个节点成为头节点之前,是不会跃迁为PROPAGATE状态的。用于将唤醒后继线程传递下去,这个状态的引入是为了完善和增强共享锁的唤醒机制。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
* 前一个节点处于通知状态,表示执行完成以后会唤醒后面的节点可放心的阻塞自己
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
* 说明前一个节点取消了锁的获取 可以再次尝试进行锁的获取:
* (1)当前线程会再次返回方法acquireQueued,再次循环,尝试获取锁;
* (2)再次执行shouldParkAfterFailedAcquire判断是否需要阻塞。
* 但是在这之前向前遍历,更新当前节点的前驱节点为第一个非取消的节点 下面这个循环就是干这个的
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
7 阻塞线程,上面我们如果进入阻塞状态就会阻塞
private final boolean parkAndCheckInterrupt() {
// 阻塞当前线程
LockSupport.park(this);
//检测当前线程是否已被中断(若被中断,并清除中断标志),中断返回 true,否则返回false。 如果成功被中断则返回true
// 上一步acquireQueued返回了true 虽然我们阻塞了线程等待唤醒,但是这里我们如果判断到线程已经中断了 则后面线程可以直接停止没有必要再执行
return Thread.interrupted();
}
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
如果获取锁失败,并且线程已经中断了,则这个线程可以直接停止。
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
经过上面的过程我们梳理可以得到这样一张时序图:
释放锁的过程
1 释放锁的时候调用了下面的方法
2 接着尝试释放锁
protected final boolean tryRelease(int releases) {
// 我们加几次锁就要释放几次,如果这个值大于1表示是重入锁
int c = getState() - releases;
// 如果当前这个锁不是这个线程加的 则锁状态不对直接抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 如果锁完全释放 变为无锁状态
free = true;
setExclusiveOwnerThread(null);
}
// 修改重入次数
setState(c);
return free;
}
3 释放锁后唤醒后面的节点
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
* 头节点waitStatus状态 SIGNAL或PROPAGATE
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
* 查找需要唤醒的节点:正常情况下,它应该是下一个节点。
* 但是如果下一个节点为null或者它的waitStatus为取消时,则需要从同步队列tail节点向前遍历,查找到队列中首个不是取消的 节点。
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 真正的执行唤醒
LockSupport.unpark(s.thread);
}
释放锁的流程图:
公平与非公平锁
熟悉了上面的原理公平和非公平在实现原理上是一样的,就是非公平可以插队。
Lock lock = new ReentrantLock(false);
final void lock() {
// 尝试插队
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 在这个方法里面tryAcquire 最终调用的是nonfairTryAcquire
acquire(1);
}
在nonfairTryAcquire的时候
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 区别在这里 直接插队 也就是插队不成功 才会放到队列
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
也就是少了这个方法下面这个方hasQueuedPredecessors在公平锁中法判断这个线程之前是否还有排队的线程,当state为0的时候,代表资源可用了,如果是公平锁会拿在这个线程前面的线程,也就是队列中的下一个线程获得锁。
但是非公平锁直接不判断直接进行插队,
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
读写锁
读写锁维护着一对锁,一个读锁和一个写锁。通过分离读锁和写锁,使得并发性比一般的互斥锁有了较大的提升:在同一时间可以允许多个读线程同时访问,但是在写线程访问时,所有读线程和写线程都会被阻塞。
读写锁的主要特性:
- 公平性:支持公平性和非公平性。
- 重入性:支持重入。读写锁最多支持65535个递归写入锁和65535个递归读取锁。
- 锁降级:写锁能够降级成为读锁,但读锁不能升级为写锁。遵循获取写锁、获取读锁在释放写锁的次序。
写锁和ReentrantLock 中的写锁区别不大,主要是来看一下共享锁:
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
// 首先判断这个资源有没有被加独享锁 如果加了独享锁 不是当前线程加的 则获取读锁失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
// 首先判断写锁的队列中是不是有线程比当前线程先去获得共享锁 如果没有 并且可重入次数小于最大值 则去获取共享锁
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 相当于一个补丁补充处理 tryAcquireShared 中的缺失
return fullTryAcquireShared(current);
}
如果上面的流程获取读锁失败了:
private void doAcquireShared(int arg) {
// 这里是会入队的
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 去获取独享锁
int r = tryAcquireShared(arg);
if (r >= 0) {
// 当获取锁成以后 会通知其它在共享队列中等待的线程来获取锁
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 设置对头并进行广播
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
// 唤醒所有被阻塞的线程 因为是共享的 所以不用阻塞 之前的阻塞是因为 获取共享锁发生了阻塞
doReleaseShared();
}
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 如果说设置状态不成功继续自旋
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒后续节点 和 ReentrantLock 一样
unparkSuccessor(h);
}
// 状态设置为PROPAGATE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}