本文深入解读了高频面试点——ReentrantLock的条件队列使用方法及其原理。源码有详细注释,建议收藏阅读。
点击上方“后端开发技术”,选择“设为星标” ,优质资源及时送达
Jdk中独占锁的实现除了使用关键字synchronized
外,还可以使用ReentrantLock
。虽然在性能上两者没有什么区别,但ReentrantLock
相比synchronized
而言功能更加丰富,使用起来更为灵活,也更适合复杂的并发场景,其原理之前已经介绍过,请自行阅读。
重点,一文掌握ReentrantLock加解锁原理!|原创
使用synchronized
结合Object
上的wait
和notify
方法可以实现线程间的等待通知机制。ReentrantLock
的Condition
同样可以实现这个功能,而且相比前者使用起来更清晰也更简单。前者是java底层级别的,后者是语言级别的,后者可控制性和扩展性更好。
Condition与Object的wait/notify区别
Condition能够支持不响应中断,而通过使用 Object 方式不支持
Condition能够支持多个等待队列(new 多个Condition对象),而 Object 方式只能支持一个
Condition能够支持超时时间的设置,而 Object 不支持
使用示例
为了方便理解源码,我们先用一个Demo展示一下ReentrantLock的线程停止和通知是如何使用的。这里使用的是一个生产者和消费者的模型,一个线程负责加,另一个线程负责减。
static volatile int i = 0;
static final ReentrantLock LOCK = new ReentrantLock();
static final Condition condition = LOCK.newCondition();
public static void add() throws InterruptedException {
LOCK.lock();
try {
while (i == 0) {
Thread.sleep(1000);
System.out.print("add\t");
System.out.println(++i);
condition.signal();
condition.await();
}
} finally {
LOCK.unlock();
}
}
public static void sub() throws InterruptedException {
LOCK.lock();
try {
while (i == 1) {
Thread.sleep(1000);
System.out.print("sub\t");
System.out.println(--i);
condition.signal();
condition.await();
}
} finally {
LOCK.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
while (true) {
try {
add();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(() -> {
while (true) {
try {
sub();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
可以看到,想要获得一个Condition对象,需要首先通过一个ReentrantLock锁来创建,而最终调用其实为AQS中的内部类ConditionObject。
condition是要和lock配合使用的,而lock的实现原理又依赖于AQS,所以AQS内部实现了ConditionObject。我们知道在锁机制的实现上,AQS内部维护了一个双向的同步队列,如果是独占式锁的话,所有获取锁失败的线程的尾插入到同步队列。condition内部也是使用相似的方式,内部维护了一个单向的等待队列,所有调用condition.await
方法的线程会加入到等待队列中,并且线程状态转换为等待状态。

ConditionObject中有两个成员变量:头节点firstWaiter 和 尾节点lastWaiter ,同步队列的成员Node 复用了实现同步队列的内部类Node。用nextWaiter保存了下一个等待节点,源码如下。
Condition condition = LOCK.newCondition();
//ReentrantLock内部类Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
final ConditionObject newCondition() {
return new ConditionObject();
}
}
// AQS内部类 ConditionObject
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
//真正的创建Condition对象
public ConditionObject() { }
}
static final class Node {
Node nextWaiter;
}
用Object的方式Object对象监视器上只能拥有一个同步队列和一个等待队列,而使用Lock可以有有一个同步队列和多个等待队列。可以多次调用lock.newCondition()创建多个Condition,所以一个Lock可以持有多个等待队列。

下面开始解读await()
和signal()
方法。
Await方法原理
阻塞前:
1.在条件队列尾部添加新节点(状态CONDITION=-2),如果头节点为空则把当前节点设为头节点。
2.获取当前线程占有的state,无论state是几,都清空为0,代表完全释放锁。并且在释放当前线程所占用的锁之后,会唤醒同步队列中的下一个节点。
3.进入自旋判断逻辑:如果当前节点状态是 CONDITION(-2)或者 prev 节点(表示在同步队列中有前驱节点)为空,返回false,进入while逻辑,阻塞当前线程;如果有继承者,表示肯定在同步队列中,直接跳出循环;如果从同步队列队尾开始寻找,找到当前节点,同样表示在队列中,跳出循环。

注意!! 是先添加到条件队列,再释放锁。所以有可能出现以下的情况,A插入条件队列调用await唤醒B,但是在A唤醒后准备park时,B已经执行完需要的逻辑,并且再次Park。此时的A线程可能已经状态不再是CONDITION,说明已经进入同步队列,那就可以跳过Park再次直接争夺锁,所以这里需要自旋锁去不断尝试判断。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 1. 添加新节点,将当前线程保存其中,并且添加到等待队列队尾
Node node = addConditionWaiter();
// 2. 释放当前线程所占用的lock,并且唤醒同步队列中的下一个节点
int savedState = fullyRelease(node);
int interruptMode = 0;
// 当不在同步队列中(处于condition状态或者前一个节点为null)
while (!isOnSyncQueue(node)) {
// 3. 当前线程进入到等待状态
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 4. 自旋等待获取到同步状态(即获取到lock)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
//删除无效的等待节点
unlinkCancelledWaiters();
// 5. 处理被中断的情况
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
阻塞后:
恢复执行后,检查是否中断。然后自旋再次判断是否已经进入同步队列,返回true,跳出循环继续执行。
调用acquireQueued,尝试去争夺锁,这里逻辑和
lock
一样,已经是同步队列去竞争锁的逻辑。并且会将之前清空的state值按照原来的大小设置。最后都是一些中断标记的处理,主流程已经结束。
注意:退出await
方法一定表明当前线程已经获得了与condition
关联的锁资源。

具体请看代码:
// AQS
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 1. 添加新节点,将当前线程保存其中,并且添加到等待队列队尾
Node node = addConditionWaiter();
// 2. 释放当前线程所占用的lock,并且唤醒同步队列中的下一个节点
int savedState = fullyRelease(node);
int interruptMode = 0;
//是先添加到等待队列,再释放锁。所以有可能出现以下的情况,A插入条件队列调用await唤醒B,但是在A唤醒后准备park时,B已经执行完需要的逻辑,并且再次Park,此时的A就可以跳过Park再次直接争夺锁。
while (!isOnSyncQueue(node)) {
// 3. 关键节点!!!:当前线程进入到等待状态
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 4. 自旋等待获取到同步状态(即获取到lock)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 如果节点线程被取消才会进入这里的逻辑。正常不会
if (node.nextWaiter != null) // clean up if cancelled
//删除无效的等待节点
unlinkCancelledWaiters();
// 5. 处理被中断的情况
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
// 添加新的条件队列节点
private Node addConditionWaiter() {
Node t = lastWaiter;
// 清除被取消的尾节点
if (t != null && t.waitStatus != Node.CONDITION) {
//解除关联
unlinkCancelledWaiters();
t = lastWaiter;
}
//将当前线程保存在Node中
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
//队尾插入
t.nextWaiter = node;
//更新lastWaiter (如果是第一次插入节点,头尾节点都是同一个)
lastWaiter = node;
return node;
}
//完全释放锁状态
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
// 这里会释放锁,并且唤醒后继节点
if (release(savedState)) {
//成功释放同步状态
failed = false;
return savedState;
} else {
//不成功释放同步状态抛出异常
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
Signal
检查本线程是否持有锁,正常是持有锁,如果不符合就抛出异常。
从等待队列中拿到第一个节点。如果头节点为空代表条件队列为空,谁也不通知直接结束。
将头节点从条件队列中移除,并且把nextWaiter置为null。然后把节点状态设为0,转移进入同步队列。如果队列为空则初始化同步队列。
如果前驱节点不是 signal 状态或者前一个节点已经被取消,直接对头节点线程解除阻塞。返回true跳出循环。
至此本线程方法执行结束。依旧持有锁,但是转移了条件队列的头节点到同步队列中,就做了这一件事。
//AQS
public final void signal() {
//1. 先检测当前线程是否已经获取lock
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//2. 获取等待队列中第一个节点,之后的操作都是针对这个节点
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
//ReentrantLock
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
//AQS
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
//1. 将头结点从等待队列中移除
first.nextWaiter = null;
//2. while中transferForSignal方法对头结点做真正的处理
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
//1. 更新状态为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//2.将该节点移入到同步队列中去
// 这里的处理和同步队列的生成用的同一个方法
// node p 为前驱节点(原尾节点)
Node p = enq(node);
int ws = p.waitStatus;
// 如果前驱节点不是signal状态或者前一个节点已经被取消,直接对头节点解除阻塞。返回true跳出循环
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
具体原理图如下:

SignalAll
signalAll与signal方法的区别体现在doSignalAll
方法上,前面我们已经知道doSignal
方法只会对等待队列的头节点进行操作,而doSignalAll
将条件队列中的所有Node都转移到了同步队列中,即“通知”当前调用condition.await()方法的每一个线程,代码如下。
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
最后,欢迎大家提问和交流。
如果对你有帮助,欢迎点赞、评论或分享,感谢阅读!
update在MySQL中是怎样执行的,一张图牢记|原创
2022-11-19

讲真,这篇最全HashMap你不能错过!|原创
2022-11-17

MySQL主从数据不一致,怎么办?
2022-11-15
