在线程间通信方式2一节中,我们了解了Lock,Condition和ReentrantLock,学习了简单使用Condition和RentrantLock完成线程间通信,从文章中我们了解到ReentrantLock是Lock接口的一个最常用的实现类,ReentrantLock是独占锁,独占锁的场景下又支持公平锁和非公平锁,那么在源码实现中,ReentrantLock继承关系,实现结构又是怎样的呢?
ReentrantLock继承关系及关联类
在多线程与锁中,我们了解到ReentrantLock是支持公平锁和非公平锁的,对应的构造函数源码如下:
// ReentrantLock.java
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
可以看到如果是公平锁则创建FairSync对象,如果是非公平锁则创建NonfairSync,再结合Lock接口核心的lock,tryLock,unlock函数源码(Lock接口在线程间通信方式2中有介绍)可以看出,在ReentrantLock中,使用FairSync或NonfairSync代理了锁状态管理,lock,tryLock和unlock实现源码如下:
// ReentrantLock.java
public void lock() {
sync.lock();
}
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
public void unlock() {
sync.release(1);
}
由此,进一步梳理ReentrantLock实现,可以得到下图:
上图中一些生僻类及其作用见下表:
类名 | 说明 | 备注 |
---|---|---|
FairSync | ReentrantLock中公平锁的实现类 | / |
NonfairSync | ReentrantLock中非公平锁的实现类 | / |
Sync | 公平锁和非公平锁实现类的共同父类,使用AbstractQueuedSynchronizer状态记录锁的持有数据 | / |
AbstractQueuedSynchronizer | AbstractQueuedSynchronizer简称AQS,一个用于实现阻塞锁和同步器的工具类,其内部维护一个先进先出的等待队列(真实数据结构是双向链表),依赖一个int型的数据管理同步状态 | / |
AbstractOwnableSynchronizer | AQS的父类,定义了一个线程独占的同步器,用于实现创建锁和锁占有标记的基础类,其内部使用exclusiveOwnerThread记录当前占用锁的线程 | / |
Serializable | 序列化接口 | / |
ReentrantLock.lock流程分析
跟踪ReentrantLock.lock调用流程,可以得到下面的时序图(以非公平锁为例分析):
上图中描述了ReentrantLock中lock在资源空和资源被占用的情况下的执行流程,接下来我们来看下其内部的细节实现。
ReentrantLock.lock获取锁成功
NonfairSynck类中的lock代码如下所示:
// NonfairSync.java
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
可以看出,我们在调用了ReentrantLock.lock后,NonfairSync会首先尝试通过CAS将资源占用状态置为1(compareAndSetState,默认值0,期望值1),如果执行成功,说明当前获取共享资源成功,将当前线程设置为独占锁持有者,当前线程继续执行。
compareAndSetState
compareAndSetState操作的是AQS中声明的一个int型的值,如果其值为0,表示当前锁空闲,如果有线程到来可以占用锁,如果值大于1,表示当前锁被占用,为保证多线程对该值的操作实时可见,使用volatile修饰该变量,相关代码如下:
// AbstractQueuedSynchronizer.java
private volatile int state;
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
setExclusiveOwnerThread
setExclusiveOwnerThread最终是将当前线程设置到AbstractOwnableSynchronizer中定义的exclusiveOwnerThread中,代码如下:
// AbstractOwnableSynchronizer.java
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
transient关键字:
transient关键字的主要作用是让某些被transient关键字修饰的变量不被序列化,如果对transient修饰的变量执行了序列化,则该变量会重新执行默认初始化,反序列化得到的对象是null
ReentrantLock.lock获取锁失败
前文中可以看到如果compareAndSetState执行返回false的话,就说明当前共享资源被占用,随后走else逻辑,执行acquire(1),其内容如下:
// NonfairSync.java
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
// AbstractQueuedSynchronizer.java
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
其内部主要由两块组成,tryAcquire和acquireQueued,其中tryAcquire再次尝试获取锁,如果获取成功,则流程结束,当前线程正常继续执行,如果获取失败,则执行acquireQueued方法(由于使用且操作符连接tryAcquire和acquireQueued,所以只有tryAcquire返回false的时候,才会执行acquireQueued方法),该方法接受addWaiter返回的Node参数,下面来详细看下两个函数的具体实现。
tryAcquire
tryAcquire在NonfairSync中的实现如下,其最终调用到的是Sync类的nonfairTryAcquire方法:
// NonfairSync.java
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
// Sync.java
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 获取当前同步资源状态
int c = getState();
// 同步资源空闲
if (c == 0) {
// 占用同步资源更新资源状态
if (compareAndSetState(0, acquires)) {
// 设置当前线程为资源对应的独占锁持有者
setExclusiveOwnerThread(current);
return true;
}
}
// 如果当前线程就是持有锁线程,更新锁状态,直接持有锁
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
可以看到在tryAcquire中实际上也是尝试获取锁的过程,首先检查当前当前同步资源状态,如果不可获取,则检查当前线程时否是持锁线程,是的话则直接获取锁(可重入锁的实现),更新同步资源状态,如果均失败,则返回false。
acquireQueued
acquireQueued在AQS中关联的核心代码如下所示,可以看出主要包含addWaiter和acquireQueued两部分:
addWaiter
// AbstractQueuedSynchronizer.java
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// AbstractQueuedSynchronizer.java
private Node addWaiter(Node mode) {
// 创建新的Node对象
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
可以看到不管是addWaiter还是enq,其最终目标都是基于当前线程创建新的Node对象,将新的Node对象添加在队尾,前文提到AQS中维护了一个先进先出的队列,其数据结构本质是双向链表,这里的head,tail就是链表的具体实现。Node类中包含了指向前一个元素和后一个元素的引用,Node的声明如下:
// Node实体类
static final class Node {
...
// 前一个元素
volatile Node prev;
// 下一个元素
volatile Node next;
// 线程对象
volatile Thread thread;
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
...
}
// AbstractQueuedSynchronizer.java中声明Node对象
// 链表头
private transient volatile Node head;
// 链表尾部
private transient volatile Node tail;
acquireQueued
acquireQueued源码如下:
// AbstractQueuedSynchronizer.java
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
可以看到在addWaiter中成功将新的Node添加到队尾后,acquireQueued中当前线程会自旋,尝试获取锁,如果获取失败,则执行shouldParkAfterFailedAcquire和parkAndCheckInterrupt,这两个函数都返回true则执行selfInterrupt方法(代码见acquire函数部分)。
shouldParkAfterFailedAcquire用于判断是否应该对当前线程阻塞,如果是的话则返回true,parkAndCheckInterrupt用于执行线程阻塞并且判断当前线程是否处于interrupt状态,如果是则会返回true。通过源码可以看到其是通过LockSupport.park进行线程状态切换的,代码如下:
// AbstractQueuedSynchronizer.java
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
LockSupport.park作用
LockSupport.park用于在许可证不可用时阻塞禁用当前线程,如果许可证可用,则该许可证被消耗,并且当前调用立即返回,否则,出于线程调度的目的,当前线程将被阻塞禁用,并进入休眠状态,直到发生以下三种情况之一:
- 其他线程以当前线程为参数调用unpark
- 其他线程中断当前线程
- 调用发生异常
结合上文,我们可以得出ReentrantLock.lock执行的一般流程如下所示:
ReentrantLock.unlock流程分析
跟踪ReentrantLock.unlock调用流程,可以得到下面的时序图(以非公平锁为例分析):
可以看到对于ReentrantLock.unlock流程而言,其核心实现函数是tryRelease和unparkSuccessor,release函数如下所示:
// ReentrantLock.java
public void unlock() {
sync.release(1);
}
// AQS.java
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
从代码中可以看到当tryRelease执行完成返回true后,我们会获取链表首位元素,当首位元素不为空且等待状态(waitStatus)不等于0时,执行unparkSuccessor,在这里我们又遇到了Node类的另一个核心元素waitStatus,其声明如下:
static final class Node {
...
// waitStatus的四种可能取值,表示当前节点及其后续节点对应线程的运行状态
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 声明waitStatus
volatile int waitStatus;
...
}
四种取值含义如下所示:
- SIGNAL:取值为-1,该节点的后续节点处于被阻塞或即将阻塞的状态,因此当前节点的线程在释放锁或取消时必须解除后继节点的阻塞状态,使后续节点的线程得以正常运行
- CANCELLED:取值为1,当前节点对应的线程由于超时或中断而被取消,Node的waitStatus取该值,进入取消状态后节点状态不再变化
- CONDITION:取值为-2,该节点当前在等待队列中,节点对应的线程等待Condition,当其他线程对Condition调用了signal方法后,该节点从等待队列中转入链表中,进行同步状态的获取
- PROPAGATE:取值为-3,在共享锁实现中使用,当前节点线程处于可运行状态
为了简化使用,对于waitStatus取值并没有按照数字递增或递减排列进行取值,如果该节点取值为非负值,则代表不需要将操作同步到其他节点,对于普通Node节点而言,waitStatus字段初始化为0,对于条件节点,该字段初始化为1,在代码中使用CAS对该字段进行修改。
下面我们来分别看下这两个核心函数的实现
tryRelease
tryRelease主要用于对锁状态标记进行清理,函数实现如下所示:
protected final boolean tryRelease(int releases) {
// 获取AQS的锁状态标记,计算剩余锁状态标记
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 锁状态标记为0,当前没有其他线程持有锁,锁处于空闲状态,设置锁持有者为null
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 更新锁状态
setState(c);
return free;
}
在ReentrantLock独占锁实现的场景下,state锁状态标记取值只有两个,0和1(前文lock过程中也有分析通过CAS改变state状态时,一直是从0到1的修改),进而当当前线程执行tryRelease后,state锁状态标记取值更新为0,表示当前锁处于空闲状态,随后自然要唤醒Node链表中的其他节点去获取锁啦。
unparkSuccessor
unparkSuccessor函数主要用于更新Node节点的waitStatus状态并按需将Node链表中阻塞的线程唤醒执行,实现如下所示:
private void unparkSuccessor(Node node) {
// 获取头节点的waitStatus状态,将头节点的waitStatus状态设置为0,head waitStatus恢复初始状态
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取头节点的下一个节点,如果下一个节点为null或者状态为取消状态(waitStatus大于0只有取消状态),则从尾节点开始遍历,查找未被取消的后续节点对象
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 对上一步获取到的节点对应的线程执行unpark唤醒,去抢占锁
if (s != null)
LockSupport.unpark(s.thread);
}
从前面可以看出这里传入的node是当前的head(链表头),判断当前头节点的waitStatus,如果不是初始值则重置为初始值,随着查找下一个要被唤醒的节点,查找到后,唤醒节点对应的线程,让线程去尝试抢占锁。
结合上文,我们可以得出ReentrantLock.unlock执行的一般流程如下所示: