开篇之前,烦请诸位允许我附庸风雅一次。近期因诸事繁杂,心情颇低落,遂于喜马拉雅APP中收听《老子》一文。其中的第八十一章《结天道》一文于我感悟颇深:和大怨,必有余怨,报怨以德,焉可以为善?是以执右契(指所有权凭证或借据)而不以责于人。故有德司契,无德司彻。夫天道无亲,恒与善人。
看着本篇文章要梳理的AbstractQueuedSynchronizer,我又想起了当年那场不分青红皂白死磕其源码的战争。为了能够彻底弄懂,我不分昼夜地仔细学习,可终是悟性不足,无法习得其要领。或许大家已经觉察道路,我不止一次说过这样的话。其实我有认真反思过这个问题,个人觉得最主要原因是自己的理论知识不够扎实,还有就是思考力有所欠缺。不过常言有云:一艺之成,当尽毕生之力,因此这些并不会促使我放弃目前所从事专业。
1 概述
回过头来看AbstractQueuedSynchronizer类,我们发现它并非一个现成的类,而是java并发包(java.util.conncurrent.locks)中的一个抽象基类,主要用于构建锁或其他同步器。它提供了一个框架来实现依赖于共享资源管理的同步器。AbstractQueuedSynchronizer通常又被简称为AQS,其核心概念有:
- 同步器:AQS允许你创建一个同步器,它可以维护一个状态,并且提供方法来获取、释放这个状态。状态通常用来表示某种资源是否被占用。
- 队列:AQS维护了一个等待线程的FIFO队列(CLH锁队列)。当线程试图获取同步状态失败时,会被放入队列中等待。
- 独占与共享模式:AQS支持两种类型的同步器——独占模式和共享模式。独占模式意味着一次只能有一个线程获得同步状态;而共享模式则允许多个线程同时获得同步状态。
AbstractQueuedSynchronizer类,即AQS中的关键方法有以下几个:1) getState():获取当前同步状态;2) setState(int):设置同步状态;3) compareAndSetState(int expect, int update) :CAS操作更新状态。了解到这里,还是先看一下这个类的继承结构,具体如下所示:
从图中可以看出用到AbstractQueuedSynchronizer抽象类的几乎都是我们耳熟能详的工具,譬如LimitLatch、Semaphore、CountDownLatch、ReentrantLock、ReentrantReadWriteLock。当然还有Worker,如果对这个工具类不太熟悉,可以看一下《ThreadPoolExecutor详解》这篇文章。另外ReentrantLock和ReentrantReadWriteLock是使用AbstractQueuedSynchronizer实现的独占锁的例子;Semaphore和CountDownLatch是使用AbstractQueuedSynchronizer实现的共享锁的例子。当然除了这些官方工具类外,我们还可以通过继承AQS并重写其中的关键方法(这些关键方法中存在着控制同步状态变化的逻辑)来实现自己的同步器,譬如下面这个例子(这是一个简单的独占锁的案例):
class MyLock extends AbstractQueuedSynchronizer {
protected boolean tryAcquire(int arg) {
// 如果状态为0,则尝试获取锁
if (compareAndSetState(0, 1)) {
return true;
}
return false;
}
protected boolean tryRelease(int arg) {
// 释放锁,将状态设置为0
setState(0);
return true;
}
public void lock() {
acquire(1);
}
public boolean tryLock() {
return tryAcquire(1);
}
public boolean release() {
return tryRelease(1);
}
}
这个案例实现了一个简单的独占锁,也就是说当某个线程获取到这个锁时,其他线程并无资格获取,只能排队等待。
2 关于AQS及其相关类的理解
前一小节介绍过AQS的主要作用是构建锁或其他同步器。关于锁这一点还好理解,上一小节也梳理了使用AQS定义的各种锁,譬如:LimitLatch中的Sync,ReentrantLock中的Sync,ReentrantReadWriteLock中的Sync以及Worker(一个位于ThreadPoolExecutor中的内部类)等等。不过对于这点我还是有些不理解:定义完锁之后要干什么呢?或许只有弄懂AQS的基本原理之后我才会对这个问题有一些自己的想法。相比于这点,同步器这个概念着实让我有点头蒙。还是先来看看AQS的基本原理!(注意:以下内容参考并摘自《【并发基础】AQS(Abstract Queued Synchronizer)框架的使用和实现原理详解》这篇博客) 。上一小节我们提到了AQS的一些核心概念,譬如:同步器、队列等。这一小节我们将围绕这些核心概念展开,以便自己能够更充分的理解AQS,也希望通过这次梳理将前面描述的两个问题给解决掉。
2.1 同步状态
在AQS中维护了一个名为state,且由volatile修饰的整型成员变量。它的主要作用就是表示同步状态。这个变量的值可以用来表示锁是否被持有,或者用于计数等场景(通常情况下当其值大于等于0时表示被占用,小于0表示未被占用)。关于其定义可以参见下述代码:
private volatile int state;
// 下述两个方法是 state 属性的 getter 和 setter 方法
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
// compareAndSetState() 方法主要用于给 state 属性设置值(这里是通过 cas 的方式实现的)
// 注意这里的 STATE 是一个 VarHandle 类型的变量(关于这个类的用法将在后续文章中介绍)
protected final boolean compareAndSetState(int expect, int update) {
return STATE.compareAndSet(this, expect, update);
}
这里再啰嗦一下,AQS支持两种同步方式:独占式或共享式。另外AQS使用模板模式提供了一些可供子类重写的模板方法(注意:最后一个方法并非模板方法),譬如:
- tryAcquire(int):该方法尝试用独占式方式获取同步状态,获取成功返回true,获取失败会返回false
- tryRelease(int) :该方法尝试用独占式方式释放同步状态
- tryAcquireShared(int) :该方法尝试以共享方式获取同步状态。当返回值大于等于0时表示获取成功,否则表示获取失败
- tryReleaseShared(int) :该方法尝试以共享方式释放同步状态
- isHeldExclusively():这个方法并非模板方法,不过也是一个非常重要的方法。这个方法会判断当前的状态是否是在独占模式下被线程占用
2.2 等待队列
在2.1小节中,我们一起梳理了AQS的同步状态,通过梳理我对AQS中的同步状态有了更加清晰的认知。本小节我想梳理一下AQS中用到的等待队列,即CLH(Craig-Landin-Hagersten)。
AQS使用CLH(Craig-Landin-Hagersten)锁队列来管理等待获取同步状态的线程。这是一个FIFO(先进先出)的双向链表,其中每个节点代表一个等待的线程。注意:这段描述中有这样一些关键信息——获取同步状态的是线程、CLH队列是一个FIFO(先进先出)的双向链表、这个链表中的每个节点代表一个等待的线程。
首先让我们一起看一下组成FIFO(先进先出)双向队列的节点元素Node。它位于AQS类中,是其中的一个由final修饰的静态内部类,其源码如下所示:
static final class Node {
/** 一个标记,表示当前节点在共享模式下等待 */
static final Node SHARED = new Node();
/** 一个标记,表示当前节点在独占模式下等待 */
static final Node EXCLUSIVE = null;
/** 实际上下面这些属性主要用于表示当前节点的状态 */
/** waitStatus值,表示当前线程已经被取消 */
static final int CANCELLED = 1;
/** waitStatus值,表示后继线程需要被取消 */
static final int SIGNAL = -1;
/** waitStatus值,表示线程正在等待条件 */
static final int CONDITION = -2;
/** waitStatus值,表示下一个acquireShared应无条件传播。 */
static final int PROPAGATE = -3;
/**
* 状态字段,仅接受以下数据:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;
/** 当前节点的前驱节点 */
volatile Node prev;
/** 当前节点的后继节点 */
volatile Node next;
/** 与当前节点关联的线程 */
volatile Thread thread;
/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
/** Establishes initial head or SHARED marker. */
Node() {}
/** Constructor used by addWaiter. */
Node(Node nextWaiter) {
this.nextWaiter = nextWaiter;
THREAD.set(this, Thread.currentThread());
}
/** Constructor used by addConditionWaiter. */
Node(int waitStatus) {
WAITSTATUS.set(this, waitStatus);
THREAD.set(this, Thread.currentThread());
}
/** CASes waitStatus field. (采用 CAS 方式设置节点状态) */
final boolean compareAndSetWaitStatus(int expect, int update) {
return WAITSTATUS.compareAndSet(this, expect, update);
}
/** CASes next field.(采用 CAS 方式为节点设置下一节点) */
final boolean compareAndSetNext(Node expect, Node update) {
return NEXT.compareAndSet(this, expect, update);
}
final void setPrevRelaxed(Node p) {
PREV.set(this, p);
}
// VarHandle mechanics(下述属性是通过反射方式创建的,后面会有文章对此进行讲解)
private static final VarHandle NEXT;
private static final VarHandle PREV;
private static final VarHandle THREAD;
private static final VarHandle WAITSTATUS;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
NEXT = l.findVarHandle(Node.class, "next", Node.class);
PREV = l.findVarHandle(Node.class, "prev", Node.class);
THREAD = l.findVarHandle(Node.class, "thread", Thread.class);
WAITSTATUS = l.findVarHandle(Node.class, "waitStatus", int.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}
}
通过源码可以看出Node节点内部存在一些重要的属性:waitStatus(节点状态)、prev(当前节点的前驱节点)、next(当前节点的后继节点)、thread(与当前节点绑定的处于等待状态的线程)以及nextWaiter(下一节点的等待模式——独占还是共享)。另外Node内部也提供了对这些属性进行操作的函数,比如:compareAndSetWaitStatus(int expect, int update)——用于设置节点状态、compareAndSetNext(Node expect, Node update)——用于设置当前节点的下一节点。
节点元素有了,接下来就是该节点使用方的定义了。在AQS中有这样两个属性,它们的类型均为Node,它们均被volatile和transient关键字修饰,其中一个属性的名字为head,另一个属性的名字为tail。个人理解这就是AQS中CLH队列的头部和尾部。关于CLH队列的结构,可以参见下面这幅图:
从图中可以看出AQS类中的head属性指向一个Node节点,表示双向链表的开始节点。tail属性指向一个Node节点,表示双向链表的结束节点。
通过上面的描述,我们知道使用通过AQS构建的工具,在出现多个线程争抢同一资源时,无法获得资源的线程会被添加到这个双向链表中进行等待。那这个队列的过程究竟是怎样的呢?下面就让我们以ReentrantLock(这个类位于org.apache.tomcat.util.threads包中)为例来研究一下。看下面这个案例:
public class AQSTest {
ReentrantLock reentrantLock = new ReentrantLock(true);
private int i = 0;
private void add() {
reentrantLock.lock();
i += 1;
reentrantLock.unlock();
}
public static void main(String[] args) throws Throwable {
AAAA a = new AAAA();
Thread t = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 1000; i++)
a.add();
}
});
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 1000; i++)
a.add();
}
});
t.start();
t1.start();
t.join();
t1.join();
System.out.println(a.i);
}
}
通过代码不难发现,如果去掉add()方法中的reentrantLock.lock()和reentrantLock.unlock(),那么main(String[])方法中的执行结果很大概率不是2000(这个在本地有试验过,结果与预期一致,且概率很高)。如果不去掉add()方法中的这两行代码,那么main(String[])方法中的执行结果是2000(无论执行多少次结果都是这个)。出现这种情况的原因是i+=1不是一个原子性的操作,也就是说这个算法应该分为i+1运算,和重新赋值给i这两个步骤(姑且这么理解)。由于是两个线程同时操作i这个数据,这个时候出现线程t拿到i的值进行运算,还未将计算结果重新赋值给i,而线程t1又读取到i的值(注意此时线程t和t1拿到的i的值是一样的)并进行运算的概率很高。这个时候虽然进行了两次运算,但运算的结果实际上都是一样的,也就是说实际上线程t1和t做了一样的工作。如何解决这个问题呢?引起这个问题的根源是同步资源未得到有效的管理。对于资源i的操作应在线程安全的情况下进行,而使用ReentrantLock锁可以完美的解决这个问题,这是因为ReentrantLock可以将对资源i的所有操作序列化,其结果就是同一时刻只能有一个操作这操作共享资源i。那ReentrantLock是如何完成这些操作的?
2.2.1 加锁
本小节我们将来介绍ReentrantLock类的加锁流程,具体可以先看一下ReentrantLock类的lock()方法执行过程中的源码,如下所示:
// ReentrantLock 的 lock() 方法
public void lock() {
sync.acquire(1);
}
// 注意下面这个方法位于 AQS 类中
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 下面这个方法位于 ReentrantLock 类的静态内部类 FairSync 类中
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 下面这几个方法位于 AQS 类中
private Node addWaiter(Node mode) {
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
initializeSyncQueue();
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
从代码可以看出,当调用ReentrantLock上的lock()方法后,会将调用转交给ReentrantLock持有的Sync类型的sync对象上的acquire(1)方法。由于Sync类继承了AQS类,所以调用acquire(1)方法其实就是在调AQS中的acquire(int)方法。这个方法的源码在上面源码部分也进行了展示。AQS的acquire(int)方法接着调用模板方法tryAcquire(int),由于这个方法的实现位于实现类中,也就是ReentrantLock类FairSync内部类中的tryAcquire(int)。FairSync中的tryAcquire(int)方法主要做了这样几件事:
- 获取当前线程对象,即Thread对象
- 调用父类Sync中的getState()方法,实际上调用的就是AQS类中的getState()方法,拿到当前的状态值
- 接着判断当前的状态值是否为0。如果是,则调用hasQueuePredecessors()方法,这个方法的主要作用就是判断当前线程是否在线程等待队列中,如果在则返回true,否则返回false。当hasQueuePredecessors()方法返回false,则接下来调用compareAndSetState(int expect, int update)方法将要更新的值(1)设置到state属性中,这里用到了CAS语法。如果设置成功,则调用setExclusiveOwnerThread()方法,将当前线程设置到属性exclusiveOwnerThread(这个属性定义在AbstractOwnableSynchronizer类中)上,然后返回true给调用者,表示加锁成功。如果当前c的值不为0,则判断当前线程是否等于持有共享状态的线程,如果是,则将共享状态加1(个人理解这里就是实现锁重入的核心逻辑),然后调用setState(int)方法,将变更的状态设置到state属性上,然后返回true给调用者,表示加锁成功。如果c既不等于0,且持有状态的线程不能与当前线程则返回false,表示加锁失败。
调用完tryAcquire(int)方法后,会得到一个布尔值,如果这个值为false(即加锁失败),则接下来会调用AQS中的addWaiter(Node.EXCLUSIVE)方法。这个方法首先创建一个Node节点,然后通过CAS的方式将Node节点设置到线程等待队列中。这个方法的逻辑是这样的:
- 创建Node节点(这个Node节点会包含一个表示当前节点模式的属性:独占还是共享)
- 编写一个死循环,直至处理成功才返回。这个死循环中的处理逻辑是这样的:拿到AQS中的尾节点。如果尾节点为空,则调用initializeSyncQueue()(这个方法的主要作用就是初始化AQS中的头尾节点,此时头尾节点同时指向同一个节点,即new Node(),追头节点的设置时通过CAS方式完成的)。如果尾节点不为空,则将当前方法接收的Node节点通过CAS的方式设置为尾节点,同时将原有的尾节点设置为Node节点的前驱节点,当通过CAS方式将Node节点设置为尾节点后,会将原来尾节点的下一节点设置为当前节点Node。注意:这里所有的操作都是线程安全的(通过compareAndSetTail()方法可以看出这里用到了CAS方式来保证操作的线程安全)
调用完addWaitr(Node.EXCLUSIVE)之后,会接着调用acquireQueued()方法,注意这个方法接收addWaiter(Node.EXCLUSIVE)返回的Node对象。这个方法主要做了以下一些操作:
- 编写无限循环体,在循环体中做以下逻辑
- 取出当前节点的前驱节点
- 判断当前节点的前驱节点是否等于head节点(比如跟踪是head节点初始化时头尾指针都指向的节点,而此时当前节点的前驱节点恰好等于这个节点),如果等于接下来就尝试获取锁,如果获取成功,就将head指针指向当前节点,同时将当前节点的thread属性和prev属性重置为null,同时将当前节点的next属性重置为null,接下来返回false给调用者,表示唤醒成功。如果当前节点的前驱节点不等于head指针指向的节点,那么调用shouldParkAfterFailedAcquire(p, node),这里会将当前节点的前驱节点的waitStatus值设置为-1,注意原来是0。接下来来会再次执行循环体,此时依旧跳过第一个if,然后执行第二个if分支,不过此时由于当前节点的前驱节点的waitStatus值已经被设置成了-1,因此这次会走到第二个if分支,然后执行parkAndCheckInterrupt()方法,执行LockSupport.park(this);代码【之后线程就在这里阻塞了】