缘起
每门编程语言基本都离不开并发问题,Java亦如此。谈到Java的并发就离不开Doug lea老爷子贡献的juc包,而AQS又是juc里面的佼佼者
因此今天就一起来聊聊AQS
概念
AQS是什么,这里借用官方的话
Provides a framework for implementing blocking locks and related synchronizers that rely on first-in-first-out wait queues
AQS的全程是AbstractQueuedSynchronizer,在这里咱们进行咬文嚼字一下。
Abstract:这是AQS采用模板设计模式的基础,AQS中将定义了大部分同步的流程,仅将加解锁的操作留给子类根据需求进行自定义(这也就是为什么使用AQS可以快速开发锁或者同步器的主要原因)
Queued:这里指的是CLH队列;当共享资源被占用时,就需要一套线程阻塞等待以及被唤醒时锁分配机制,AQS通过CLH队列来实现
CLH是一个虚拟的双向链表实现的队列,获取不到锁资源时会被AQS封装成Node节点并加入队列。每个线程执行结束都会唤醒其下一个节点
Synchronizer:同步控制,AQS的同步控制实现有两种。一种是volatile+CAS的乐观锁设计,另一种是LockSupport+CAS的悲观锁设计
切入点
- 模板方法设计
- 可重入设计
- CLH队列设计
- 公平/非公平锁设计
模板方法设计
我们来看看AQS的设计流程
从图中可看到,AQS设计并且实现了一个同步器/锁的完整流程;
但是将tryAcquire/tryAcquireShared和tryRelease/tryReleaseShared这些经常改动的操作设置为抽象方法,留给子类自行拓展
acquire方法剖析
这个方法采用门面设计模式,将CAS获取锁,封装线程以及添加CLH队列的操作封装成一个方法并对外提供
我们常用的ReentrantLock.lock方法实际上就是调用此方法
public final void acquire(int arg) {
// 尝试通过CAS获取锁,若获取成功则执行同步代码块
// 获取失败则通过addWaiter将当前线程封装成AQS的Node节点并加入CLH队列,同时中断当前线程
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
addWaiter方法剖析
Node是AQS的内部类,Node是组成CLH队列的节点
申请公平/非公平锁失败都会被加入CLH队列
private Node addWaiter(Node mode) {
// 1. 将当前线程跟Node关联起来,方便AQS根据队列顺序唤醒获取锁
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 2. 追加新建节点到双向链表尾
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 3. 初始化链表
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued方法剖析
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 1. 获得当前Node的前驱节点,也就是当前任务的前一个任务
final Node p = node.predecessor();
// 2. 如果前一个任务是head则尝试通过CAS来获取锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//3. 通过UNSAGE.park来阻塞当前线程来等待许可
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
小结
通过这小节我们可以看到AQS是如何通过模板方法设计模式大大简化了同步器/锁开发的
可重入设计
ReentrantLock是可重入锁的典型设计,在这里就基于它进行分析
ReentrantLock的锁是组合设计,通过内部类Sync、NonfairSync和FairSync来实现
这里看看非公平锁NonfairSync的实现
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//1. 如果当前锁空闲,则获取锁并设置锁的Owner为当前线程
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 2. 如果当前锁已经被获取,则判断锁的Owner是否是当前线程
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
// 1. 进行锁释放时对state进行减法,由于加锁和释放锁的操作都是配对出现的。那么重入2次时状态为3,那么释放锁的时候也会释放3次直到状态state为0时才释放锁
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
// 2. 释放锁的时候先将锁的Owner置空
setExclusiveOwnerThread(null);
}
// 3. 释放锁
setState(c);
return free;
}
锁Owner实现
AQS的锁Owner是通过继承父类AbstractOwnableSynchronizer来实现的
AbstractOwnableSynchronizer只有一个成员属性exclusiveOwnerThread,用来标识独占锁场景下是哪个线程持有锁,方便可重入判断
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
小结
通过分析我们可以看到,AQS的可重入设计是通过父类AbstractOwnableSynchronizer+volatile修饰的state作为计数器来实现的
CLH队列设计
当共享资源被占用时,就需要一套线程阻塞等待以及被唤醒时锁分配机制,AQS通过CLH队列来实现
CLH队列保证了锁可以高效进行分配,避免了无意义的唤醒阻塞未获得锁线程
CLH的入口在于AQS的acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法,它是一个虚拟的双向链表,addWaiter会将请求锁的线程封装成Node节点,Node节点中通过存储前后序节点来维护双向队列
源码剖析
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 1. 获取前序节点
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
// 将当前节点所有直接前驱节点从CLH队列中移除
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
//请求锁的线程加入CLH队列中,通过调用LockSupport、UNSAFE来进行线程的阻塞,直至CLH链表中上一个节点释放锁后才会主动唤醒
LockSupport.park(this);
return Thread.interrupted();
}
LockSupport源码实现
public static void park(Object blocker) {
Thread t = Thread.currentThread();
// 这是做什么用的?
setBlocker(t, blocker);
// 最终调用UNSAFE来执行
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
// 在UNSAFE中显示这是个本地方法,是由C++来实现的
public native void park(boolean var1, long var2);
// C++通过给当前线程添加一个互斥量0,每次该线程获得CPU时间片都会判断该互斥量,为0则将当前线程切换为阻塞状态
// 直到其他线程通过unpark来将此互斥量修改为1,该方法才会继续往下执行
小结
CLH队列给AQS维护了一个高效率的锁分配/释放的基础
公平/非公平锁设计
JDK中公平锁和非公平锁的具体实现分别是FairSync和NonfairSync
FairSync的获取锁流程
NonfairSync的获取锁流程
小结
通过分析可看到公平锁和非公平锁的获取锁/释放锁逻辑几乎一致,这都要归功于AQS的模板方法设计模式
两者不同的地方在于获取公平锁的请求会直接加入到CLH队列中等待锁,获取非公平锁的请求都会尝试直接获取锁,获取失败再加入CLH队列进行等待
非公平锁的性能相对而言更好,一般也是首选,但是会存在锁饥饿的现象;公平锁可以有效解决锁饥饿的问题,但性能相对而言会差一些
总结
- 在AQS中可以看到不少优秀的设计,这都要归功于Doug Lea老爷子;除了AQS,在juc里还有很多优秀的设计,如并发性能最好的字典ConcurrentHashMap、无锁高性能队列ConcurrentLinkedQueue等等
在品完源码后,你会发现其设计思想丝毫不逊色于各个大数据组件 - “曾经想征服全世界,到最后回头才发现,这世界点点滴滴全部都是你” ——致JDK