ReentrantLock源码解析(补充4——条件变量Condition)
上一章 ReentrantLock源码解析 仅介绍了 ReentrantLock 的常用方法以及公平锁、非公平锁的实现。这里对上一章做一些补充。主要是:
- AQS 中阻塞的线程被唤醒后的执行流程
- 可打断的锁
lock.lockInterruptibly()
- 锁超时
lock.tryLock(long,TimeUnit)
- 条件变量 Condition(本篇讲述)
1. Condition的使用
假设场景:当线程获取到锁,进入临界区,但由于某个条件不满足,而不能很快执行完临界区代码。比较好的处理方式是:让出锁资源。
在 synchronized 的保护性暂停设计模式中,给出了代码模板。类似的,在 ReentrantLock 中,也可以完成该设计。
ReentrantLock 的 Condition 是 synchronized 的 waitSet 的优化实现。
- Synchronized 重量级锁的 waitSet 只有一个,也就是所有获取锁后,又不满足某条件的线程,通过 o.wait() 方法让出锁资源,只能进到同一个等待集合 waitSet 中。
- ReentrantLock 可以有多个等待集合,每个等待集合被封装在不同的 Condition 中。
synchronized 重量级锁的结构为:
ReentrantLock中 Condition 的结构(ConditionObject 是 Condition 接口的实现):【重要】
需要全体唤醒时,Synchronized 唤醒的是所有 waitSet 中的线程,ReentrantLock 则是唤醒某个 Condition 中的线程。
在保护性暂停设计模式中,所有不满足条件的线程后续还会重新进入等待集合中。线程状态转换是需要消耗 CPU 资源的,ReentrantLock 通过使用多个 Condition 将该代价降低。
关于条件变量 Condition 的使用,我们模拟以下场景:
线程 t1 和 t2 获取到锁资源后,都要满足 conditionOk 条件,才可以继续执行临界区代码,并消耗 conditionOk 条件,主线程负责给予满足的条件。
public class ReentrantLockDemo2_Condition {
//锁资源
static ReentrantLock lock = new ReentrantLock();
//当前条件情况,如int类型的条件到达了某个值,将被判断为满足条件,或者boolean类型的条件变成了 true,将被判断为满足条件
static boolean conditionOk = false;
//条件等待集合
static Condition condition = lock.newCondition();
public static void main(String[] args) {
//线程 t1
new Thread(() -> {
//condition.await()之前需要先获取锁
lock.lock();
try {
//线程刚启动,或被唤醒后,进行条件判断
while (!conditionOk) {
System.out.println("t1 条件不满足,等待条件");
try {
//挂起线程,进入Condition的等待集合、
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//满足条件后
System.out.println("t1: 条件满足,开始工作!");
//消耗condition
conditionOk = false;
} finally {
lock.unlock();
}
}, "t1").start();
//线程 t2,具体实现与 t1相同
new Thread(() -> {
lock.lock();
try {
while (!conditionOk) {
System.out.println("t2 条件不满足,等待条件");
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("t2: 条件满足,开始工作!");
conditionOk = false;
} finally {
lock.unlock();
}
}, "t2").start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//主线程给予满足的条件
lock.lock();
try {
//条件准备完成
conditionOk = true;
//唤醒condition等待
condition.signalAll();
} finally {
lock.unlock();
}
}
}
运行结果:
- 两个线程一开始都不满足条件,都进入了等待集合,并挂起进入 等待状态(WAITING);
- 主线程将条件准备完成后,唤醒两线程,两线程重新争抢锁;
- 线程 t1 先争抢到锁,消费掉条件后,线程 t2 后争抢到锁,但条件不满足,又进入了等待集合。
t1 条件不满足,等待条件
t2 条件不满足,等待条件
t1: 条件满足,开始工作!
t2 条件不满足,等待条件
2. condition.await() 源码分析
condition.await() 由 AQS 中的 ConditionObject类 实现,它主要做了几件事:
-
将当前线程的 Node 加到 ConditionObject 等待集合的队尾
-
释放锁资源。
- 【重点 1 】如果没有获得锁资源,到这里,就会抛出异常,并且刚加入队尾的 node 的状态将会改为 Node.CANCELLED ,该 node 后续将被 ConditionObject 剔除
-
【重点 2 】只要不在 AQS 双向队列中,就进入等待状态,等待唤醒。(通过 signal() 中的LockSupport.unpark() 或者 interrupt() 都可以唤醒线程,并将 node 放入到 AQS 的队列尾部等待锁资源)
-
如果不是被 signal() 中的 unpark() 唤醒的,不作处理,再次进入循环,重新被 park() 挂起进入到 等待状态(WAITING)
因为仅仅通过 unpark(),而没有通过 signal() 的 enq() 操作,将不被视为被合法唤醒。
合法唤醒:需要将 node 的 waitStatus 变为 0 ,同时加入到 AQS 的双向队列中等待锁资源。
-
-
重新 acquireQueued() 重新争锁。
-
acquireQueued()获取锁 return 之后,才准备退出 await() 方法。
-
在结束 await() 方法之前,需要做两件事:
- 清理 ConditionObject 的链表
- 【重点 3】根据中断情况,抛出异常,或者再次中断线程,或者什么都不做。
- 如果线程是通过 interrupt() 唤醒的,抛出异常
- 如果线程在 signal()唤醒后,还被 interrupt() 过,再次interrupt() 以确保此时中断标记为 true;
- 如果线程 signal() 之后没有被 interrupt() 过,什么也不做。
public final void await() throws InterruptedException {
//处理中断
if (Thread.interrupted())
throw new InterruptedException();
//将当前线程添加到ConditionObject中去————见2.1
Node node = addConditionWaiter();
//释放当前线程持有的锁资源,并保存当前重入次数————见2.3
int savedState = fullyRelease(node);
int interruptMode = 0;
//如果不在 AQS 双向链表中,进入循环
while (!isOnSyncQueue(node)) {
// 将当前线程挂起
LockSupport.park(this);
//直到被interrupt()中断,或者被唤醒————见2.4
//如果仅仅是被 interrupt() 中断,interruptMode = THROW_IE,从而进入if方法体,break退出循环;
//如果signal()之后紧接着调用了 interrupt(),interruptMode = REINTERRUPT,从而进入if方法体,break退出循环;
//如果仅仅是 signal() 则会因为不满足while条件而退出循环,interruptMode仍为 0
//不论是 interrupt() 还是 signal() ,node都会进入到 AQS 队列中等待锁资源。
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
//如果线程不是被 signal()的unpark() 的其他 单独的 unpark()方法 唤醒的,不作处理!再次进入循环将再次被 park() 进入等待状态(WAITING)
}
//退出循环
//线程被唤醒,并进入到 AQS 队列中。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
//直到获取到锁资源,acquireQueued()才会返回,返回的是中断标记。
//进入到这里,说明调用了 signal() 后。在 acquireQueued()中还被 interrupt() 过
interruptMode = REINTERRUPT;
//如果不处在 ConditionObject 的队尾,尝试清空 ConditionObject 中不为 Node.CONDITION 节点。
if (node.nextWaiter != null) // clean up if cancelled
//清理 ConditionObject 链表 ———— 见 2.2
unlinkCancelledWaiters();
if (interruptMode != 0)
//如果过程中被中断过
//视情况:抛出异常,或者再次中断线程,或者什么都不做。----见2.5
reportInterruptAfterWait(interruptMode);
}
2.1 addConditionWaiter()
说明:将当前线程添加到 ConditionObject 的末尾。在添加之前,先 unlinkCancelledWaiters() 清理链表。
ConditionObject 中 firstWaiter 指向单链表头结点,lastWaiter 指向单链表尾结点。
源码
private Node addConditionWaiter() {
//获取尾结点
Node t = lastWaiter;
//如果尾结点不为空,并且尾结点的状态不是 Node.CONDITION,说明要清理链表————见2.2
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//将当前线程封装到 Node 中
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
//如果 ConditionObject 的链表为空,firstWaiter 指向它
firstWaiter = node;
else
//如果链表有元素,则加在末尾元素的最后
t.nextWaiter = node;
//更新链表末尾元素指针
lastWaiter = node;
return node;
}
2.2 unlinkCancelledWaiters()
说明:从前往后清理 ConditionObject 链表中 waitStatus 不为 Node.CONDITION 的节点。
源码:
private void unlinkCancelledWaiters() {
//t指向当前判断的节点,trail指向上一个可用节点(是 CONDITION 的节点)
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
//一旦发现需要清除的节点
//1. 断开引用
//2. 如果第一个就不可用,更新 firstWaiter
//3. 假设下一个可用,更新 trail(上一个可用节点) 的nextWaiter 指向
//4. 如果到了末尾,更新 lastWaiter 指向
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
//遇到了可用的,更新 trail 指向
trail = t;
t = next;
}
}
2.3 fullyRelease(node)
说明:如果当前线程确实是在获取锁的情况下,调用的 await() 方法,将会释放锁资源,并将之前的重入次数,即 AQS 的 state 变量,记录下来。以备后续唤醒时,直接设置重入次数。
如果当前线程并没有持有锁,却调用了 await() 方法,来到了 fullyRelease(),将会抛出 IllegalMonitorStateException()
异常。同时将刚加入到 ConditionObject 链表的节点设置状态为 CALCELLED ,即 取消状态。
这个异常也在 Object#wait()方法中出现。如果一个对象调用 wait() 方法,却不在 Synchronized 修饰的代码块中,也会抛出 IllegalMonitorStateException 异常
源码
final int fullyRelease(Node node) {
boolean failed = true;
try {
//获取 AQS 中的state,即当前ReentrantLock的重入次数
long savedState = getState();
//release做了两件事:
//1. 判断线程是否持有锁,没有就抛出异常
//2. 将ReentrantLock的AQS中的owner设为空
//3. 如果AQS双向队列中有 node 在等待,唤醒一个
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
//抛出异常后,无论外界是否捕获异常,这里都会将刚加入到 ConditionObject 链表的 node 状态设置为 CANCELLED
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
2.4 checkInterruptWhileWaiting(node)
说明:
当线程被唤醒,判断是否为合法唤醒。
-
如果当前线程被 interrupt() 唤醒过,且还未signal(),返回 THROW_IE:-1;(interrupt() 抢先将 node 加入到 AQS 等待队列中)
-
如果在被唤醒 signal() 后被 interrupt() 中断,返回 REINTERRUPT:1;(signal() 抢先将 node 加入到 AQS 等待队列中)
这个情况也被视为正常地 signal() ,不过需要把中断标记改为 true
-
如果不是合法唤醒,返回 0。
合法唤醒:需要将 node 的 waitStatus 变为 0 ,同时加入到 AQS 的双向队列中等待锁资源。(只能通过 signal()的unpark() 或者 interrupt()后进入到 transferAfterCancelledWait() 才可以合法唤醒 )
源码
private int checkInterruptWhileWaiting(Node node) {
//如果当前线程是因为中断而退出的LockSupport.park()方法,将会进入到 transferAfterCancelledWait() 方法
//如果当前线程正常被唤醒,返回 0
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
如果当前线程是因为中断导致的唤醒,将会进入到 trnasferAfterCalcceldWait(node) :
- CAS 更新node的waitStatus状态
- 如果成功,将该 node 通过 enq() 放回到 AQS 队尾等待锁资源
- 如果失败,说明 signal() 方法抢先进行 CAS 成功,signal() 也会将 node 通过 enq() 放到 AQS 双向队列的尾部
- 如果是 interrupt() 抢先将 node 放入AQS等待队列,返回 true
- 如果是 signal() 抢先将 node 放入AQS等待队列,返回 false
final boolean transferAfterCancelledWait(Node node) {
//被中断唤醒后的线程,将状态变为 0 ,尝试入队到AQS中
//注意此时是在释放锁之后,当前线程没有持有锁资源
// signal() 方法也会有对node 的 CAS 操作。
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
//不论是由于interrupt()后进入该方法,并 enq()成功,还是由于signal()使得node enq()成功,最后都会进入到 AQS 中!
//而且cas失败的情况很少见,即使失败了,由signal()做enq()速度也是很快的,所以这里只需要自旋等待
while (!isOnSyncQueue(node))
//为避免自旋空转,浪费CPU资源,这里可以通过 yield() 交出 CPU 控制权,放弃时间片,等待下一次调度。
Thread.yield();
return false;
}
2.5 reportInterruptAfterWait(interruptMode);
说明:判断 signal() 和 interrupt() 的执行情况,返回中断处理
- 如果线程是被 interrupt() 唤醒的,并通过 transferAfterCancelledWait() 进入到 AQS 队尾,则在 acquireQueued() 获取到锁资源后,且在退出 await() 之前,抛出
InterruptException
异常 - 如果线程是被 signal() 唤醒的
- 如果 signal() 之后,该线程还被 interrupt() 过,则在该方法中再次调用 interrupt() 确保在 await() 退出时,中断标记为 true。
- 如果 signal() 之后,线程没有被中断过,则什么也不做
源码:
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
//如果线程是通过 interrupt() 唤醒的,抛出异常
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
//如果线程在 signal()唤醒后,还被 interrupt() 过,再次interrupt() 确保此时中断标记为 true
selfInterrupt();
//如果线程 signal() 之后没有被 interrupt() 过,什么也不做。
}
3. condition.signal()
唤醒在该条件变量中等待的第一个是Node.CONDITION 状态的节点,并让其进入AQS等待队列继续争锁。
如果没有持有 ReentrantLock 锁,这个唤醒是不合法的,会抛出 IllegalMonitorStateException()
异常。
public final void signal() {
//判断当前线程是否持有锁
if (!isHeldExclusively())
//如果没有持有锁就调用 signal(),抛出异常
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
//如果条件变量中,有等待的线程,那么唤醒第一个。
doSignal(first);
}
doSignal(first) 将 ConditionObject 的等待链表中第一个还是 Node.CONDITION 状态的节点唤醒。
private void doSignal(Node first) {
do {
//将 first 从链表中剔除出来
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
//如果first之前被 interrupt() 唤醒,已经进入了 AQS 等待队列,那么就要再次循环,拿到 ConditionObject 的下一个 first进行唤醒。
//如果 transferForSignal() 正常唤醒,则退出循环,完成 doSignal()
} while (!transferForSignal(first) &&
//如果 ConditionObject 的链表已经没有节点了,就结束。
(first = firstWaiter) != null);
}
transferForSignal(node)
- 先尝试 CAS 将 node 的 waitStatus 改为 0
- 如果修改成功,将该 node 加入到 AQS 的等待队列中去。
- 如果修改失败,
- 有可能线程已经被 interrupt() 后的 transferAfterCancelledWait() 唤醒进入 AQS 等待队列了,return false,让 doSignal() 寻找 ConditionObject 中链表的下一个节点进行唤醒。
- 也有可能该线程没拿到锁资源,就进行了 condition.await(),从而挂在 ConditionObject 链表最后,但其状态为 Node.CANCELLED
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
//要么线程已经被 interrupt() 后的 transferAfterCancelledWait() 节点 waitStatus 设为 0
//要么就是节点 waitStatus 是 Node.CANCELLED
return false;
//如果signal成功,将 node 加入到 AQS 队列中去。
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
4. condition.signalAll()
唤醒在该条件变量中等待的所有是Node.CONDITION 状态的节点,并让它们进入AQS等待队列继续争锁。**。
如果没有持有 ReentrantLock 锁,这个唤醒是不合法的,会抛出 IllegalMonitorStateException()
异常。
public final void signalAll() {
//判断当前线程是否持有锁
if (!isHeldExclusively())
//如果没有持有锁就调用 signal(),抛出异常
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
doSignalAll() 将 ConditionObject 链表中 所有 Node.CONDITION 的节点都唤醒
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}