这里写目录标题
- 回顾
- 前缀知识
- 一、Condition的概念
- 二、Condition底层结构
- 三、Condition源码解析
- 3.1 newCondition()
- 3.2 await()
- 总结
- 主要方法:
回顾
如果你还没熟悉 AQS 中的独占锁,可以先看这篇文章的前导篇。上一篇文章是以
ReentrantLock
里面的加锁、解锁源码进行分析的。
文章链接:(一)从底层源码剖析AQS的来龙去脉!
回顾上文,其实公平锁和非公平锁的实现区别无非两点。
首先,非公平锁在调用lock
后,直接使用CAS
进行抢锁操作,如果锁未被占用,则直接获取锁并返回。
然后,非公平锁在CAS
失败后,会进入tryAcquire
方法,如果发现锁被释放了(state == 0)
,非公平锁会再次尝试通过CAS抢锁;而公平锁则会检查等待队列是否有线程处于等待状态,如果有等待线程,公平锁不会抢锁,而是将自己加入队列尾部,遵循先来先得的原则。
其实大体上就是这两点区别,而且如果这两次的 CAS 都不成功,那么后面非公平锁和公平锁是一样的,都需要进入到阻塞队列等待唤醒。
tips:如果想要更好的理解此篇文章,希望大家可以好好看一下这篇文章,否则这篇文章看着应该会很痛苦的。
文章链接:(一)从底层源码剖析AQS的来龙去脉!
前缀知识
AQS 内部存在的两种类型的队列:
1. 同步队列:这是线程在等待锁时所处的队列,即当线程获取锁资源发现已经被其他线程占有而加入的队列;
2. 等待队列(可能有多个等待队列):这是由 ConditionObject
维护的队列,用于存放调用 await()
方法而释放锁并等待信号的线程。当线程被 signal()或signalAll()
方法唤醒时,它将从等待队列中移除,并重新加入同步队列以竞争锁。
两种队列的概念:
1. 同步队列是线程等待锁的队列,其中的线程处于阻塞状态,等待锁的释放以便获取锁。
2. 等待队列是一个特定于每个 ConditionObject
的队列,用于存放调用 await() 方法被阻塞的线程。
一、Condition的概念
Condition
是一个接口类,具体实现者为AQS内部的ConditionObject
类。
AQS使用内部类ConditionObject
构建等待队列,当Condition
调用await()
方法后,等待获取锁资源的线程将会加入等待队列中,而当Condition
调用signal()
方法后,线程将从等待队列转移到同步队列中进行锁资源的竞争。
Condition 经常可以用在生产者-消费者
的场景中,请先看下面这个例子。
场景描述:
模拟短信发送和接收的过程。在这个场景中,生产者
是短信发送者,消费者
是短信接收者。现在通过一个队列来存储待发送和接收的短信。
这其中包含一个队列来保存消息,一把锁 ReentrantLock 用于线程同步,以及两个条件变量,分别用于短信发送者和短信接收者。
代码实现
public class MessageQueue {
private LinkedList<String> queue = new LinkedList<>();
private final int capacity = 10;
private final ReentrantLock lock = new ReentrantLock();
// condition 依赖于 lock 来产生
private final Condition notFull = lock.newCondition(); // 生产者条件
private final Condition notEmpty = lock.newCondition(); // 消费者条件
public void put(String message) throws InterruptedException {
lock.lock();
try {
// 如果队列已满,则生产者等待
while (queue.size() == capacity) {
notFull.await();
}
queue.addLast(message);
System.out.println("放入消息: " + message);
notEmpty.signal(); // 唤醒消费者
} finally {
lock.unlock();
}
}
public String take() throws InterruptedException {
lock.lock();
try {
// 如果队列为空,则消费者等待
while (queue.isEmpty()) {
notEmpty.await();
}
String message = queue.removeFirst();
System.out.println("取出消息: " + message);
notFull.signal(); // 唤醒生产者
return message;
} finally {
lock.unlock();
}
}
}
通过上面的例子其实可以得到结论:使用 Condition 时,必须先持有相应的锁。
作用
每个Object对象都会在被创建的时候与一个监视器对象产生“羁绊”,而每个对象也会有一组监视器的方法,即wait()
/notify() 或 notifyAll()
方法, 通过这些可以实现线程之间的通信机制,也就是等待/唤醒机制。
但这些有个前提,那就是需要持有对象的监视器锁才可以调用wait()
/notify() 或 notifyAll()
方法,但是相对于notify()
的随机唤醒等待锁的一个线程,和 notifyAll()
直接唤醒所有等待锁的线程,Condition的await()
/signal()
无疑更加准确,他们可以精确唤醒某个等待锁的线程。
并且他们等待、唤醒机制不同,在monitor监视器中,一个对象有一个同步队列和一个等待队列,但是在AQS中一个锁对象则是可以拥有一个同步队列和多个等待队列!
注意:
wait()
,notify()
这些方案是基于对象监视器锁的,而Condition
是基于ReentrantLock
实现的。所以不管是调用await
进入等待还是signal
唤醒,必须获取到锁才能进行操作!
二、Condition底层结构
2.1 AQS底层基本变量
我们先看一下AQS底层基本变量:
private transient volatile Node head; // 头结点, 可以理解为当前当前持有锁的线程
private transient volatile Node tail; // 尾节点,可以理解为被阻塞的线程节点
private volatile int state; // 表示共享资源(可以理解为是否获取到锁的标志)
private transient Thread exclusiveOwnerThread; // 表示当前占据锁的线程
2.2 Node节点结构
abstract static class Node {
volatile Node prev; // 表示前驱指针
volatile Node next; // 表示后继指针
Node nextWaiter; // 等待队列里下一个等待条件的结点
volatile Thread thread; // 线程本程
/* 共享还是独占模式的标识 */
// 共享模式时的节点标识
static final Node SHARED = new Node();
// 独占模式时的节点标识
static final Node EXCLUSIVE = null;
/* 下列四个变量表示Node在队列中的状态 waitStatus专用 */
volatile int waitStatus; // 节点的等待状态
static final int CANCELLED = 1; // 表示线程获取锁请求已经取消(线程等待超时或被中断~)
static final int SIGNAL = -1; // 表示线程需要被唤醒,等待资源释放
static final int CONDITION = -2; // 表示节点在条件队列中,等待某个条件的满足
static final int PROPAGATE = -3; // 在共享模式下,表示后续节点需要被唤醒并继续执行
}
2.3 FIFO同步队列
注意:阻塞队列不包含 head,同步队列则是包含head!!!
PS:上面结构如果不是太清楚是作用是什么,可以去我的上一篇文章理清楚。
文章链接:(一)从底层源码剖析AQS的来龙去脉!
2.4 ConditionObject
Condition只是一个接口,具体实现是AQS内部类的ConditionObject
// 条件队列头节点
private transient Node firstWaiter;
// 条件队列尾节点
private transient Node lastWaiter;
2.5 条件队列
2.6 队列之间的关系
图片解释:
1、可以看到不论是同步队列还是条件队列,他们的节点都是Node节点,这个其实是因为最终条件队列的节点还是要去转移到同步队列中的。
2、一个 ReentrantLock
实例可以有一个同步队列+N个条件队列,至于等待队列的N具体是多少,那么则是看我们ReentrantLock
实例调用多少次调用 newCondition()
了,其中N >= 0。
3、每个 condition
有对应的条件队列,如线程 A 调用 conditionA.await()
方法可以将当前线程 A 封装成 Node
,然后添加到条件队列,继而阻塞在这,等待着唤醒才回去重新进入同步队列,然后获取锁。
4、当调用conditionA.signal()
触发唤醒时,唤醒的是队头,会将conditionA
对应的条件队列的 firstWaiter
移到同步队列的队尾,然后等待获取锁。只有当获取锁后 await()
方法才能返回,继续执行await()
方法之后的代码。
为什么只有当获取到锁,才从
await()
方法返回,并继续往下执行?
因为调用await()的目的就是等待某个条件满足,而这个条件通常是可能会被持有相同锁的其他线程中被改变的。所以为了线程在条件满足时能安全地继续执行,它必须重新获得之前释放的锁。这是为了确保对共享资源的操作是线程安全的,防止竞态条件的发生。
为什么要继续执行
await()
方法之后的代码?
因为我们既然之前调用了 await() ,那么就证明肯定是由于某些条件的确实,不得不把它阻塞住,从而它后面的代码也执行。所以当我们调用 signal() 的时候,那么证明我们此时的条件满足了,所以也可以执行后面的方法了,所以线程需要返回,然后继续执行 await() 方法之后的代码。
在这个过程中,条件队列的firstWaiter不会立即变成条件队列的下一个节点。只有当被移到同步队列的线程成功获取锁并且await()方法返回之后,条件队列的下一个节点才会成为新的firstWaiter。
所以才能够满足了互斥、顺序以及正确这三个特性:
互斥:重新获取锁可以确保线程在访问共享资源时的互斥性,避免多个线程同时访问共享资源而导致的不一致性。
顺序:重新获取锁后,线程才能确定条件确实已经满足,可以安全地继续执行。
正确:线程在等待条件和重新检查条件之间是互斥的,防止在检查条件时其他线程改变了条件状态。
5、条件队列是一个单向链表,而同步队列则是双向链表,因为它需要通过前后指针来进行唤醒和得知waitStatus
状态
注意:队列中的Node节点的waitStatus为
CONDITION
状态。
上面的 2 -> 3 -> 4 只是最简单的流程,没有其他的因素。比如中断、signalAll()
以及await(long time, TimeUnit unit)
这种带超时方法
具体为什么可以看上篇文章:(一)从底层源码剖析AQS的来龙去脉!。
三、Condition源码解析
3.1 newCondition()
因为Condition
是基于 ReentrantLock
实现的。所以不管是调用 await
进入等待还是 signal
唤醒,必须获取到锁才能进行操作,所以我们先看看它们之间是如何实现的。
之前说过,Condition
只是一个接口,它的实现类为 ConditionObject
,而每个 ReentrantLock
实例可以通过调用多次 newCondition
产生多个 ConditionObject
的实例。
所以我们首先来看下我们关注的 Condition 的实现类 AbstractQueuedSynchronizer
类中的 ConditionObject
是如何创建的的?
// 先获取锁的类型【这点无关紧要,因为不会根据锁的类型而创建不同的obj】,然后去调用重载方法
public Condition newCondition() {
return sync.newCondition();
}
// 获取ConditionObject实例
final ConditionObject newCondition() {
return new ConditionObject();
}
很简单啊很简单,就只是创建一个ReentrantLock
,然后通过它调用newCondition()
就可以了。
ReentrantLock lock = new ReentrantLock();
Condition conditionA = lock.newCondition();
Condition conditionB = lock.newCondition();
3.2 await()
接下来,我们分析await()
方法,源码如下:
// 调用await方法会进行阻塞,直到调用signal方法,而且因为它也可以响应中断,所以被中断的时候,它直接抛出异常。
public final void await() throws InterruptedException {
// 既然可以响应中断,所以直接判断线程中断状态,如果响应中断直接抛异常中断
if (Thread.interrupted())
throw new InterruptedException();
// 把线程封装进节点。然后添加到condition条件队列中
Node node = addConditionWaiter();
// 释放当前线程持有锁资源,保存 释放锁之前 的state值
int savedState = fullyRelease(node);
// 用来存储线程中断检查的结果。
int interruptMode = 0;
// 判断节点是否在同步队列(SyncQueue)中,即是否被唤醒
// 情况1:isOnSyncQueue(node)为true,也就是说当前node节点已经转移到同步队列了,已被唤醒。
// 情况2:(interruptMode = checkInterruptWhileWaiting(node)) != 0,表示线程中断
while (!isOnSyncQueue(node)) {
// 如果当前线程不在同步队列中,park阻塞当前线程,直到被unpark方法唤醒,或线程被中断。
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 线程被唤醒后尝试获取锁要。
// 是获取锁成功后并且没有抛中断异常,那么就 interruptMode = REINTERRUPT; 表示需要重新抛出中断。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 检查条件队列中是否还有其他节点,要是有的话,那么就把他们移除。
// 也就是说:把条件队列中不是CONDITION状态的节点clear
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
总结
主要方法:
public interface Condition {
/**
* 调用当前方法会使当前线程处于等待状态直到被通知(signal)或中断
* 当其他线程调用singal()或singalAll()方法时,当前线程将被唤醒
* 当其他线程调用interrupt()方法中断当前线程等待状态
* await()相当于synchronized等待唤醒机制中的wait()方法
*/
void await() throws InterruptedException;
/**
* 作用与await()相同,但是该方法不响应线程中断操作
*/
void awaitUninterruptibly();
/**
* 作用与await()相同,但是该方法支持超时中断(单位:纳秒)
* 当线程等待时间超出nanosTimeout时则中断等待状态
*/
long awaitNanos(long nanosTimeout) throws InterruptedException;
/**
* 作用与awaitNanos(long nanosTimeout)相同,但是该方法可以声明时间单位
*/
boolean await(long time, TimeUnit unit) throws InterruptedException;
/**
* 作用与await()相同,在deadline时间内被唤醒返回true,其他情况则返回false
*/
boolean awaitUntil(Date deadline) throws InterruptedException;
/**
* 当有线程调用该方法时,唤醒等待队列中的一个线程节点
* 并将该线程从等待队列移动同步队列阻塞等待锁资源获取
* signal()相当于synchronized等待唤醒机制中的notify()方法
*/
void signal();
/**
* 作用与signal()相同,不过该方法的作用是唤醒该等待队列中的所有线程节点
* signalAll()相当于synchronized等待唤醒机制中的notifyAll()方法
*/
void signalAll();
}