AQS
概述
AbstractQuenedSynchronizer
抽象的队列式同步器, 在java.util.concurrent.locks
包下。
我们常用的ReentrantLock
中有一个抽象静态内部类Sync
,就继承自AbstractQuenedSynchronizer
。
abstract static class Sync extends AbstractQueuedSynchronizer {
}
而new 一个ReentrantLock
对象的话,构造函数时这样的
public ReentrantLock() {
// 默认创建的是非公平
sync = new NonfairSync();
}
而这个NonfairSync
则继承自Sync
,Sync
,就继承自AbstractQuenedSynchronizer
。
AQS是JUC的基石,有很多个类( (ReentrantLock 、CountDownLatch 、ReentrantReadWriteLock、Semaphore) )的实现基于这个类,因为这些类都需要包含的一些功能由AQS提供。
其通过内置的CLH(FIFO)队列(CLH是三个科学家的名字,为一个单向链表实现的队列,AQS中为其变种,即实际为双向链表)来完成资源获取线程的排队工作,将每条将要去抢占资源的线程封装成一个Node节点来实现锁的分配,有一个int变量 state 表示持有锁的状态,通过CAS对state 进行修改(0表示没有,1表示阻塞)
加锁会导致阻塞、有阻塞就需要排队,实现排队必然需要队列 。 如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS的抽象表现。它将请求共享资源的线程封装成队列的结点(Node) ,通过CAS、自旋以及
LockSupport.park()
的方式,维护state变量的状态,使并发达到同步的效果锁,面向锁的使用者(定义了程序员和锁交互的使用层API,隐藏了实现细节,你调用即可)
AQS同步器,面向锁的实现者。 由Java并发大神Douglee,提出统一规范并简化了锁的实现,屏蔽了同步状态管理、阻塞线程排队和通知、唤醒机制等
AQS有一个内部类Node,Node有pre和next的属性。Node有head、tail属性。从源码可以看出。其内部有一个双向链表,而这个双向链表就是CLH队列变种的队列。
AQS有一个int属性state ,用来表示阻塞状态(锁的状态)。0表示可以获取,1则表示需要阻塞
Node的部分字段含义:
原理
内部就是一个由双向链表实现的队列,队列中的节点node就代表了一个线程。
队列中有一个状态用来表示阻塞状态,0表示可以获取锁,1表示要阻塞线程。
在获取锁的时候,会去CAS修改state,如修改成功则获取到锁,
如果出现竞争,公平锁会把线程节点添加到队尾,并park阻塞,等待其他线程执行完毕执行unpark唤醒
如果出现竞争,则说明当前线程 与 队列头的线程有争抢,谁抢到算谁的。
可重入锁则会将state修改到大于1
从lock()方法阅读AQS源码
final void lock() {
if (compareAndSetState(0, 1))
// 第一个进来的线程
// 这里调用父类AQS的方法,CAS修改state ,修改成功就把当前线程设置上,方便之后重入
setExclusiveOwnerThread(Thread.currentThread());
else
// 失败的话在这个方法里继续尝试获得
acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire需要子类去实现,有公平锁、非公平锁等的实现,构造函数指定的话,创建的是非公平锁
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
// 可能上一个线程刚好执行完了,这里获取锁成功
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 这里说明目前持有锁的是当前线程,这里可重入,state需要增加到大于1
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
tryAcquire失败了的话,则先addWaiter然后把新的node传进去执行acquireQueued
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
// 如果不是第一次进来,则直接尝试把node加入队尾
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 第一次进来的时候,队列为空,也就是说尾节点为空,会到这里
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 第一次进来会先创建一个哨兵节点作为头节点,同时尾节点也指向了它
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 然后自旋,到这里
// 将当前线程的节点加入队尾,如下图所示
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued 如果获取不到锁的话,会执行 shouldParkAfterFailedAcquire和 parkAndCheckInterrupt
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// ws初始为0
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 线程准备好了,直接返回true
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 设置为-1,-1代表线程准备好了等待资源释放
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
// 这里将线程阻塞,直到unpark。
// unpark是在unlock中执行的
LockSupport.park(this);
return Thread.interrupted();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 如果当前节点的前一个节点是头节点(傀儡节点)
// 则说明当前节点是第一个节点,且获取锁成功,就会进入这里
// 获取锁成功,则代表了上一个线程释放了锁也执行了unpark,所以下面那个方法被唤醒,继续自旋,执行到这里
// 会重新设置头节点,进入setHead中,也就是将当前节点设置为傀儡节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}