AQS抽象同步器核心原理
在争用激烈的场景下使用基于CAS自旋实现的轻量级锁有两个大的问题:
- CAS恶性空自旋会浪费大量的CPU资源。
- 在SMP架构的CPU上会导致“总线风暴”。
解决CAS恶性空自旋的有效方式之一是以空间换时间,较为常见的方案有两种:分散操作热点、使用队列削峰。JUC并发包使用的是队列削峰的方案解决CAS的性能问题,并提供了一个基于双向队列的削峰基类——抽象基础类AbstractQueuedSynchronizer(抽象同步器类,简称为AQS)。
锁与队列的关系
无论是单体服务应用内部的锁,还是分布式环境下多体服务应用所使用的分布式锁,为了减少由于无效争夺导致的资源浪费和性能恶化,一般都基于队列进行排队与削峰。
- CLH 锁的内部队列
CLH自旋锁使用的CLH是一个单向队列,也是一个FIFO队列。在独占锁中,竞争资源在一个时间点只能被一个线程锁访问;队列的
队首节点(队列的头部)表示占有锁的节点,新加入的抢锁线程则需要等待,会插入到队列的尾部。CLH锁的内部结构如图所示:
-
分布式锁的内部队列
在分布式锁的实现中,比较常见的也是基于队列的方式进行不同节点中“等锁线程”的统一调度和管理。以基于ZooKeeper的分布式锁为例,其等待队列的结构大致如图所示:
-
AQS 的内部队列
AQS是JUC提供的一个用于构建锁和同步容器的基础类。JUC包内的许多类都是基于AQS构建,例如ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock、FutureTask等。AQS解决了在实现同步容器时设计的大量细节问题,主要原理和CLH队列差不多。AQS队列内部维护的是一个FIFO的双向链表,这种结构的特点是每个数据结构都有两个指针,分别指向直接的前驱节点和直接的后驱节点。所以双向链表可以从任意一个节点开始很方便地访问前驱节点和后驱节点。每个节点其实是由线程封装的,当线程争抢锁失败后会封装成Node加入到AQS队列中去;当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞的节点(线程)。AQS的内部结构如图所示:
AQS 的核心成员
AQS出于“分离变与不变”的原则,基于模板模式实现。AQS为锁获取、锁释放的排队和出队过程提供了一系列的模板方法。由于JUC的显式锁种类丰富,因此AQS将不同锁的具体操作抽取为钩子方法,供各种锁的子类(或者其内部类)去实现。
状态标志位
AQS中维持了一个单一的volatile修饰的状态信息state,AQS使用int类型的state标示锁的状态,可以理解为锁的同步状态。
//同步状态,使用volatile保证线程可见
private volatile int state;
state因为使用volatile保证了操作的可见性,所以任何线程通过getState()获得状态都是可以得到最新值。AQS提供了getState()、setState()来获取和设置同步状态。由于setState()无法保证原子性,因此AQS给我们提供了compareAndSetState()方法利用底层
UnSafe 的 CAS 机 制 来 实 现 原 子 性 。 compareAndSetState() 方 法 实 际 上 调 用 的 是 unsafe 成 员 的compareAndSwapInt()方法。以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程执行该锁的lock()操作时,会调用tryAcquire()独占该锁并将state加1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其他线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态。
AbstractQueuedSynchronizer继承了AbstractOwnableSynchronizer,这个基类只有一个变量叫exclusiveOwnerThread,表示当前占用该锁的线程,并且提供了相应的get()和set()方法。
队列节点类
AQS是一个虚拟队列,不存在队列实例,仅存在节点之间的前后关系。节点类型通过内部类Node定义,其核心的成员如下:
static final class Node {
/**节点等待状态值1:取消状态*/
static final int CANCELLED =1;
/**节点等待状态值-1:标识后继线程处于等待状态*/
static final int SIGNAL = -1;
/**节点等待状态值-2:标识当前线程正在进行条件等待*/
static final int CONDITION = -2;
/**节点等待状态值-3:标识下一次共享锁的acquireShared操作需要无条件传播*/
static final int PROPAGATE = -3;
//节点状态:值为SIGNAL、CANCELLED、CONDITION、PROPAGATE、0
//普通的同步节点的初始值为0,条件等待节点的初始值为CONDITION(-2)
volatile int waitStatus;
//节点所对应的线程,为抢锁线程或者条件等待线程
volatile Thread thread;
//前驱节点,当前节点会在前驱节点上自旋,循环检查前驱节点的waitStatus状态
volatile Node prev;
//后驱节点
volatile Node next;
//如果当前Node不是普通节点而是条件等待节点,则节点处于某个条件的等待队列上
//此属性指向下一个条件等待节点,即其条件队列上的后驱节点
Node nextWaiter;
...
}
waitStatus 属性
每个节点与等待线程关联,每个节点维护一个状态waitStatus,waitStatus的各种值以常量的形式进行定义。waitStatus的各常量值具体如下:
-
static final int CANCELLED = 1
waitStatus值为1时表示该线程节点已释放(超时、中断),已取消的节点不会再阻塞。表示线程因为中断或者等待超时,需要从等待队列中取消等待。由于该节点线程等待超时或者被中断,需要从同步队列中取消等待,因此该线程被置1。节点进入了取消状态,该类型节点不会参与竞争,且会一直保持取消状态。 -
static final int SIGNAL = ‒1
waitStatus为SIGNAL(‒1)时表示其后驱节点处于等待状态,当前节点对应的线程如果释放了同步状态或者被取消,将会通知后驱节点,使后驱节点的线程得以运行。 -
static final int CONDITION =‒2
waitStatus为‒2时,表示该线程在条件队列中阻塞(Condition有使用),表示节点在等待队列中(这里指的是等待在某个锁的CONDITION上),当持有锁的线程调用了CONDITION的signal()方法之后,节点会从该CONDITION的等待队列转移到该锁的同步队列上去竞争锁(注意:这里的同步队列就是我们说的AQS维护的FIFO队列,等待队列则是每个CONDITION关联的队列)。
节点处于等待队列中,节点线程等待在CONDITION上,当其他线程对CONDITION调用了signal()方法后,该节点从等待队列中转移到同步队列中,加入到对同步状态的获取中。 -
static final int PROPAGATE = ‒3
waitStatus为‒3时,表示下一个线程获取共享锁后,自己的共享状态会被无条件地传播下去,因为共享锁可能出现同时有N个锁可以用,这时直接让后面的N个节点都来工作。这种状态在CountDownLatch中使用到了。
为什么当一个节点的线程获取共享锁后,要唤醒后继共享节点?共享锁是可以多个线程共有的,当一个节点的线程获取共享锁后,必然要通知后继共享节点的线程也可以获取锁了,这样就不会让其他等待的线程等很久,这种向后通知(传播)的目的也是尽快通知其他等待的线程尽快获取锁。 -
waitStatus为0
waitStatus为0时,表示当前节点处于初始状态。Node节点的waitStatus状态为以上5种状态的一种。
thread 成员
Node的thread成员用来存放进入AQS队列中的线程引用;Node的nextWaiter成员用来指向自己的后继等待节点,此成员只有线程处于条件等待队列中的时候使用。
抢占类型常量标识
Node节点还定义了两个抢占类型常量标识:SHARED和EXCLUSIVE,具体的代码如下:
static final class Node {
//标识节点在抢占共享锁
static final Node SHARED = new Node();
//标识节点在抢占独占锁
static final Node EXCLUSIVE = null;
}
SHARED表示线程是因为获取共享资源时阻塞而被添加到队列中的;EXCLUSIVE表示线程因为获取独占资源时阻塞而被添加到队列中的。
FIFO 双向同步队列
AQS的内部队列是CLH队列的变种,每当线程通过AQS获取锁失败时,线程将被封装成一个Node节点,通过CAS原子操作插入队列尾部。当有线程释放锁时,AQS会尝试让队首的后驱节点占用锁。
AQS是一个通过内置的FIFO双向队列来完成线程的排队工作,内部通过节点head和tail记录队首和队尾元素,元素的节点类型为Node类型,具体的代码如下:
/*首节点的引用*/
private transient volatile Node head;
/*尾节点的引用*/
private transient volatile Node tail;
AQS的队首节点和队尾节点都是懒加载的。懒加载的意思是在需要的时候才真正创建。只有在线程竞争失败的情况下,有新线程加入同步队列时,AQS才创建一个head节点。head节点只能被setHead()方法修改,并且节点的waitStatus不能为CANCELLED。队尾节点只在有新线程阻塞时才被创建。一个包含5个节点的AQS同步队列的基本结构如图所示:
JUC 显式锁与 AQS 的关系
AQS是java.util.concurrent包的一个同步器,它实现了锁的基本抽象功能,支持独占锁与共享锁两种方式。该类使用模板模式来实现的,成为构建锁和同步器的框架,使用该类可以简单且高效地构造出应用广泛的同步器(或者等待队列)。
java.util.concurrent.locks包中的显式锁如ReentrantLock、ReentrantReadWriteLock,线程同步工具如Semaphore,异步回调工具如FutureTask等,内部都使用了AQS作为等待队列。通过开发工具进行AQS的子类导航会发现大量的AQS子类以内部类的形式使用。同样,我们也能继承AQS类去实现自己需求的同步器(或锁)。
ReentrantLock 与 AQS 的组合关系
ReentrantLock是一个可重入的互斥锁,又称为“可重入独占锁”。ReentrantLock锁在同一个时间点只能被一个线程锁持有,而可重入的意思是,ReentrantLock锁可以被单个线程多次获取。经过观察,ReentrantLock把所有Lock接口的操作都委派到一个Sync类上,该类继承了AbstractQueuedSynchronizer:
static abstract class Sync extends AbstractQueuedSynchronizer
ReentrantLock为了支持公平锁和非公平锁两种模式,为Sync又定义了两个子类,具体如下:
final static class NonfairSync extends Sync {...}
final static class FairSync extends Sync
NonfairSync为非公平(或者不公平)同步器,FairSync为公平同步器。ReentrantLock提供了两个构造器,具体如下:
public ReentrantLock() {
//默认的构造器
sync = new NonfairSync();
//内部使用非公平同步器
}
public ReentrantLock(boolean fair) {
//true 为公平锁,否则为非公平锁
sync = fair ? new FairSync() : new NonfairSync();
}
ReentrantLock的默认构造器(无参数构造器)被初始化为一个NonfairSync对象,即使用非公平同步器,所以,默认情况下ReentrantLock为非公平锁。带参数的构造器可以根据fair参数的值具体指定ReentrantLock的内部同步器使用FairSync还是NonfairSync。
由ReentrantLock的lock()和unlock()的源码可以看到,它们只是分别调用了sync对象的lock()和release()方法。
public void lock() {
//抢占显式锁
sync.lock();
}
public void unlock() {
//释放显式锁
sync.release(1);
}
通过以上的委托代码可以看出,ReentrantLock的显式锁操作是委托(或委派)给一个Sync内部类的实例完成的。而Sync内部类只是AQS的一个子类,所以本质上ReentrantLock的显式锁操作是委托(或委派)给AQS完成的。一个ReentrantLock对象的内部一定有一个AQS类型的组合实例,二者之间是组合关系。
组合和聚合比较类似,二者都表示整体和部分之间的关系。
聚合关系的特点是:整体由部分构成,但是整体和部分之间并不是强依赖的关系,而是弱依赖的关系,也就是说,即使整体不存在了,部分仍然存在。例如一个部门由多个员工组成,如果部门撤销了,人员不会消失,人员依然存在。
组合关系的特点是:整体由部分构成,但是整体和部分之间是强依赖的关系,如果整体不存在了,部分也随之消失。例如一个公司由多个部门组成,如果公司不存在了,部门也将不存在。可以说,组合关系是一种强依赖的、特殊的聚合关系。
由于显式锁与AQS之间是一种强依赖的聚合关系,如果显式锁的实例销毁,其聚合的AQS子类实例也被销毁,因此显式锁与AQS之间是组合关系。
AQS 中的模板模式
AQS同步器是基于模板模式设计的,并且是模板模式经典的一个运用,模板模式是很容易理解的设计模式之一。如果需要自定义同步器,一般的方法是继承AQS,并重写指定方法(钩子方法),按照自己定义的规则对state(锁的状态信息)进行获取与释放;将AQS组合在自定义同步组件的实现中,自定义同步器去调用AQS的模板方法,而这些模板方法会调用重写的钩子方法。
模板模式
模板模式是类的行为模式。准备一个抽象类,将部分逻辑以具体方法的形式实现,然后声明一些抽象方法来迫使子类实现剩余的逻辑。不同的子类提供不同的方式实现这些抽象方法,从而对剩余的逻辑有不同的实现。模板模式的关键在于:父类提供框架性的公共逻辑,子类提供个性化的定制逻辑。
模板模式的定义
在模板模式中,由抽象类定义模板方法和钩子方法,模板方法定义一套业务算法框架,算法框架中的某些步骤由钩子方法负责完成。具体的子类可以按需要重写钩子方法。模板方法的调用将通过抽象类的实例来完成。模板模式所包含的角色有抽象类和具体类,二者之间的关系如图所示:
模板方法和钩子方法
模板方法(Template Method)也常常被称为骨架方法,主要定义了整个方法需要实现的业务操作的算法框架。其中,调用不同方法的顺序因人而异,而且这个方法也可以做成一个抽象方法,要求子类自行定义逻辑流程。
钩子方法(Hook Method)是被模板方法的算法框架所调用的,而由子类提供具体的实现方法。在抽象父类中,钩子方法常被定为一个空方法或者抽象方法,需要由子类去实现。钩子方法的存在可以让子类提供算法框架中的某个细分操作,从而让子类实现算法中可选的、需要变动的部分。
模板模式的优点
分离变与不变是软件设计的一个基本原则。模板模式将不变的部分封装在基类的骨架方法中,而将变化的部分通过钩子方法进行封装,交给子类去提供具体的实现,在一定程度上优美地阐述了“分离变与不变”这一软件设计原则。
模板模式的优点如下:
- 通过算法骨架最大程度地进行了代码复用,减少重复代码。
- 模板模式提取了公共部分代码,便于统一维护。
- 钩子方法是由子类实现的,因此子类可以通过拓展增加复杂的功能,符合开放封闭原则。
开放封闭原则是面向对象设计的五大原则之一,其核心思想是:对扩展开放,对修改关闭。面向对象设计的五大原则:单一职责原则、依赖倒置原则、接口隔离原则、里氏替换原则和开放封闭原则。
AQS 的模板流程
AQS定义了两种资源共享方式:
- Exclusive(独享锁):只有一个线程能占有锁资源,如ReentrantLock。独享锁又可分为公平锁和非公平锁。
- Share(共享锁):多个线程可同时占有锁资源,如Semaphore、CountDownLatch、CyclicBarrier、ReadWriteLock的Read锁。
AQS为不同的资源共享方式提供了不同的模板流程,包括共享锁、独享锁模板流程。这些模板流程完成了具体线程进出等待队列的基础(如获取资源失败入队/唤醒出队等)、通用逻辑。基于基础、通用逻辑,AQS提供一种实现阻塞锁和依赖FIFO等待队列的同步器的框架,AQS模板为ReentrantLock、CountDownLatch、Semaphore提供了优秀的解决方案。
自定义的同步器只需要实现共享资源state的获取与释放方式即可,这些逻辑都编写在钩子方法中。无论是共享锁还是独享锁,AQS在执行模板流程时会回调自定义的钩子方法。
AQS 中的钩子方法
自定义同步器时,AQS中需要重写的钩子方法大致如下:
- tryAcquire(int):独占锁钩子,尝试获取资源。若成功则返回true,若失败则返回false。
- tryRelease(int):独占锁钩子,尝试释放资源。若成功则返回true,若失败则返回false。
- tryAcquireShared(int):共享锁钩子,尝试获取资源,负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- tryReleaseShared(int):共享锁钩子,尝试释放资源。若成功则返回true,若失败则返回false。
- isHeldExclusively():独占锁钩子,判断该线程是否正在独占资源。只有用到condition条件队列时才需要去实现它。
以上钩子方法的默认实现会抛出UnsupportedOperationException异常。除了这些钩子方法外,AQS类中的其他方法都是final类型的方法,所以无法被其他类继承,只有这几个方法可以被其他类继承。
对钩子方法的具体介绍如下:
-
tryAcquire 独占式获取锁
顾名思义,就是尝试获取锁,AQS在这里没有对tryAcquire()进行功能的实现,只有一个抛出异常的语句,我们需要自己对其进行实现,可以对其重写实现公平锁、不公平锁、可重入锁、不可重入锁。protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
-
tryRelease 独占式释放锁
tryRelease尝试释放独占锁,需要子类来实现。
protected boolean tryRelease(long arg) {
throw new UnsupportedOperationException();
}
- tryAcquireShared 共享式获取
tryAcquireShared尝试进行共享锁的获得,需要子类来实现。
protected long tryAcquireShared(long arg) {
throw new UnsupportedOperationException();
}
- tryReleaseShared 共享式释放
tryReleaseShared尝试进行共享锁的释放,需要子类来实现。
protected boolean tryReleaseShared(long arg) {
throw new UnsupportedOperationException();
}
- 查询是否处于独占模式
isHeldExclusively的功能是查询线程是否正在独占资源。在独占锁的条件队列中用到。
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
通过 AQS 实现一把简单的独占锁
由于ReentrantLock的实现比较复杂,为了降低学习难度,首先模拟ReentrantLock的源码并且基于AQS实现一把非常简单的独占锁:
基于AQS实现一把非常简单的独占锁的类为SimpleMockLock,它的UML类图如图所示:
其中Lock是锁的接口,定义了锁的操作的抽象方法,具体实现由子类实现,而其实现类大多都使用AQS作为其实现的等待队列(线程等待获取锁的队列),所以可以说AQS是对锁真正的实现。
SimpleMockLock是一个基于AQS的、简单的非公平独占锁实现,代码如下:
public class SimpleMockLock implements Lock {
//同步器实例
private final Sync sync = new Sync();
// 自定义的内部类:同步器
// 直接使用 AbstractQueuedSynchronizer.state 值表示锁的状态
// AbstractQueuedSynchronizer.state=1 表示锁没有被占用
// AbstractQueuedSynchronizer.state=0 表示锁已经被占用
private static class Sync extends AbstractQueuedSynchronizer {
//钩子方法
protected boolean tryAcquire(int arg) {
//CAS更新状态值为1
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//钩子方法
protected boolean tryRelease(int arg) {
//如果当前线程不是占用锁的线程
if (Thread.currentThread() != getExclusiveOwnerThread()) {
//抛出非法状态的异常
throw new IllegalMonitorStateException();
}
//如果锁的状态为没有占用
if (getState() == 0) {
//抛出非法状态的异常
throw new IllegalMonitorStateException();
}
//接下来不需要使用CAS操作,因为下面的操作不存在并发场景
setExclusiveOwnerThread(null);
//设置状态
setState(0);
return true;
}
//显式锁的抢占方法
@Override
public void lock() {
//委托给同步器的acquire()抢占方法
sync.acquire(1);
}
//显式锁的释放方法
@Override
public void unlock() {
//委托给同步器的release()释放方法
sync.release(1);
}
//省略其他未实现的方法
}
}
和ReentrantLock相比,SimpleMockLock的代码非常简单,这也是为了大家不被ReentrantLock的复杂代码锁困扰,能去更好地聚焦于AQS原理的学习。
SimpleMockLock仅仅实现了Lock接口的以下两种方法:
- lock()方法:完成显式锁的抢占。
- unlock()方法:完成显式锁的释放。
SimpleMockLock的锁抢占和锁释放是委托给Sync实例的acquire()方法和release()方法完成的。SimpleMockLock的内部类Sync继承了AQS类,实际上acquire()、release()是AQS的两个模板方法。在抢占锁时,AQS的模板方法acquire()会调用tryAcquire(int arg)钩子方法;在释放锁时,AQS的模板方法release()会调用tryRelease(int arg)钩子方法。
内部类Sync继承AQS类时提供了以下两个钩子方法的实现:
- protected boolean tryAcquire(int arg):抢占锁的钩子实现。此方法将锁的状态设置为1,表示互斥锁已经被占用,并保存当前线程。
- protected boolean tryRelease(int arg):释放锁的钩子实现。此方法将锁的状态设置为0,表示互斥锁已经被释放。
AQS 锁抢占的原理
下面基于SimpleMockLock公平独占锁的抢占过程详细说明AQS锁抢占的原理。
显式锁抢占的总体流程
这里先介绍一下SimpleMockLock锁抢占的总体流程,具体如图所示:
流程的第一步,显式锁的lock()方法会去调用同步器基类AQS的模板方法acquire(arg)。
AQS 模板方法:acquire(arg)
acquire是AQS封装好的获取资源的公共入口,它是AQS提供的利用独占方式获取资源的方法,源码实现如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();
}
通过源码可以发现,acquire(arg)至少执行一次tryAcquire(arg)钩子方法。tryAcquire(arg)方法默认是抛出一个异常,具体的获取独占资源state的逻辑需要钩子方法去实现。在模板方法acquire中,若调用tryAcquire(arg)尝试成功,则acquire()将直接返回,表示已经抢到锁;若不成功,则将线程加入等待队列。
钩子实现:tryAcquire(arg)
SimpleMockLock的tryAcquire()的流程是:CAS操作state字段,将其值从0改为1,若成功,则表示锁未被占用,可成功占用,并且返回true;若失败,则获取锁失败,返回false。
SimpleMockLock的实现非常简单,是不可以重入的,仅仅为了学习AQS而编写。如果是可以重入的锁,在重复抢锁时会累计state字段值,表示重入锁的次数,具体可参考ReentrantLock源码。
直接入队:addWaiter
在acquire模板方法中,如果钩子方法tryAcquire尝试获取同步状态失败,则构造同步节点(独占式节点模式为Node.EXCLUSIVE),通过addWaiter(Node node, int args)方法将该节点加入到同步队列的队尾。
private Node addWaiter(Node mode) {
//创建新节点
Node node = new Node(Thread.currentThread(), mode);
// 加入队列尾部,将目前的队列tail作为自己的前驱节点pred
Node pred = tail;
// 队列不为空的时候
if (pred != null) {
node.prev = pred;
// 先尝试通过CAS方式修改尾节点为最新的节点
// 如果修改成功,将节点加入到队列的尾部
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//第一次尝试添加尾部失败,意味着有并发抢锁发生,需要进行自旋
enq(node);
return node;
}
在addWaiter()方法中,首先需要构造一个Node对象,具体的代码如下:
Node node = new Node(Thread.currentThread(), mode);
构造Node对象所用到的两个参数如下:
-
当前线程
构造Node对象时,将通过Thread.currentThread()获取到当前线程作为第一个参数,该线程会被赋值给Node对象的thread成员属性,相当于将线程与Node节点进行绑定。在后续轮到此Node节点去占用锁时,就需要其thread属性获得需要唤醒的线程。 -
Node共享类型
mode是一个表示Node类型的参数,用于标识新节点是独占地还是共享地去抢占锁。mode虽然为Node类型,但是仅仅起到类型标识的作用。mode可能的值有两个,以常量的形式定义在Node类中,具体的代码如下:static final class Node { /** 常量标识:标识当前的队列节点类型为共享型抢占 */ static final Node SHARED = new Node(); /** 常量标识:标识当前的队列节点类型为独占型抢占 */ static final Node EXCLUSIVE = null; //省略其他代码 }
如果抢占独占锁,那么mode值为EXCLUSIVE;如果抢占共享锁,那么mode值为SHARED。
自旋入队:enq
addWaiter()第一次尝试在尾部添加节点失败,意味着有并发抢锁发生,需要进行自旋。enq()方法通过CAS自旋将节点的添加到队列尾部。
/**
* 这里进行了循环,如果此时存在tail,就执行添加队尾的操作
* 如果依然不存在,就把当前线程作为head节点
* 插入节点后,调用acquireQueued()进行阻塞
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
//队列为空,初始化队尾节点和队首节点为新节点
if (compareAndSetHead(new Node())) tail = head;
} else {
// 队列不为空,将新节点插入队列尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**CAS操作head指针,仅仅被enq()调用
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/**CAS操作tail指针,仅仅被enq()调用
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
自旋抢占:acquireQueued()
在节点入队之后(tryAcquire方法没有拿到锁时),启动自旋抢锁的流程。acquireQueued()方法的主要逻辑:当前Node节点线程在死循环中不断获取同步状态,并且不断在前驱节点上自旋,只有当前驱节点是队首节点才能尝试获取锁,原因是:
- 队首节点是成功获取同步状态(锁)的节点,而队首节点的线程释放了同步状态以后,将会唤醒其后驱节点,后驱节点的线程被唤醒后要检查自己的前驱节点是否为队首节点。
- 维护同步队列的FIFO原则,节点进入同步队列之后,就进入了一个自旋的过程,每个节点都在不断地执行for死循环。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋检查当前节点的前驱节点是否为队首节点,是才能获取锁
for (;;) {
// 获取节点的前驱节点
final Node p = node.predecessor();
// 节点中的线程循环的检查自己的前驱节点是否为head节点
// 只有前驱节点是head时,进一步调用子类的tryAcquire(…)实现
if (p == head && tryAcquire(arg)) {
// tryAcquire成功后,将当前节点设置为队首节点,移除之前的队首节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 检查前一个节点的状态,预判当前获取锁失败的线程是否要挂起
// 如果需要挂起,调用parkAndCheckInterrupt方法挂起当前线程,直到被唤醒
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; // 若两个操作都是true,则为true
}
} finally {
//如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了)那么取消节点在队列中的等待
if (failed)
//取消请求,将当前节点从队列中移除
cancelAcquire(node);
}
}
为了不浪费资源,acquireQueued()自旋过程中会阻塞线程,等待前驱节点唤醒后才启动循环。如果成功就返回,否则执行shouldParkAfterFailedAcquire()、parkAndCheckInterrupt()来达到阻塞效果。调用acquireQueued()方法的线程一定是node所绑定的线程(由它的thread属性所引用),该线程也是最开始调用lock()方法抢锁的那个线程,在acquireQueued()的死循环中,该线程可能重复进行阻塞和被唤醒。
AQS队列上每一个节点所绑定的线程在抢锁过程中都会自旋,即执行acquireQueued()方法的死循环,也就是说,AQS队列上每个节点的线程都不断自旋,具体如图所示:
如果队首节点获取了锁,那么该节点绑定的线程会终止acquireQueued()自旋,线程会去执行临界区代码。此时,其余的节点处于自旋状态,处于自旋状态的线程当然也不会执行无效的空循环而导致CPU资源浪费,而是被挂起(Park)进入阻塞状态。AQS队列的节点自旋不像CLH节点那样在空自旋而耗费资源。
挂起预判:shouldParkAfterFailedAcquire
acquireQueued()自旋在阻塞自己的线程之前会进行挂起预判。shouldParkAfterFailedAcquire()方法的主要功能是:找到当前节点的有效前驱节点(是指有效节点不是CANCELLED类型的节点),并且将有效前驱节点的状态设置为SIGNAL,之后返回true代表当前线程可以马上被阻塞了。具体可以分为三种情况:
- 如果前驱节点的状态为‒1(SIGNAL),说明前驱的等待标志已设好,返回true表示设置完毕。
- 如果前驱节点的状态为1(CANCELLED),说明前驱节点本身不再等待了,需要跨越这些节点,然后找到一个有效节点,再把当前节点和这个有效节点的唤醒关系建立好:调整前驱节点的next指针为自己。
- 如果是其他情况:‒3(PROPAGATE、共享锁等待)、‒2(CONDITION、条件等待)、0(初始状态),那么通过CAS尝试设置前驱节点为SIGNAL,表示只要前驱释放锁(就会通知其后继节点),当前节点就可以抢占锁了。
其源码如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 获得前驱节点的状态:如果前驱节点状态为SIGNAL(值为-1)就直接返回
if (ws == Node.SIGNAL) return true;
if (ws > 0) {
// 前驱节点以及取消CANCELLED(1)
do {
// 不断地循环,找到有效前驱节点,即非CANCELLED(值为1)类型节点
//将pred记录前驱的前驱
pred = pred.prev;
//调整当前节点的prev指针,保持为前驱的前驱
node.prev = pred;
} while (pred.waitStatus > 0);
//调整前驱节点的next指针
pred.next = node;
} else {
//如果前驱状态不是CANCELLED,也不是SIGNAL,就设置为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
//设置前驱状态之后,此方法返回值还是为false,表示线程不可用,第二次调用时被阻塞
return false;
}
在独占锁的场景中,此方法shouldParkAfterFailedAcquire()是在acquireQueued()方法的死循环中被调用的,由于此方法返回false时acquireQueued()不会阻塞当前线程,只有此方法返回true时当前线程才阻塞。因此在一般情况下,此方法至少需执行两次,当前线程才会被阻塞。
在第一次进入此方法时,首先会进入后一个if判断的else分支,通过CAS设置pred前驱的waitStatus为SIGNAL,然后返回false。此方法返回false之后,获取独占锁的acquireQueued()方法会继续进行for循环去抢锁:
- 假设node的前驱节点是队首节点,tryAcquire()抢锁成功,则获取到锁。
- 假设node的前驱节点仍然不是队首节点,或tryAcquire()抢锁失败,仍会再次调用此方法。
第二次进入此方法时,由于上一次进入时已经将pred.waitStatus设置为-1(SIGNAL)了,因此这次会进入第一个判断条件,直接返回true,表示应该调用parkAndCheckInterrupt阻塞当前线程了,等待前一个节点执行完成之后唤醒。
waitStatus 等于‒3
什么时候遇到前驱节点状态waitStatus等于‒3(PROPAGATE)的场景呢? PROPAGATE只能在使用共享锁的时候出现,并且只可能设置在head上。所以,对于非队尾节点,如果它的状态为0或PROPAGATE,那么它肯定是head。当等待队列中有多个节点时,如果head的状态为0或PROPAGATE,说明head处于一种中间状态,且此时有线程刚才释放锁了。而对于抢锁线程来说,如果检测到这种状态,说明再次执行acquire()是极有可能获得锁的。
waitStatus 大于 0
什么时候会遇到前驱节点的状态waitStatus大于0的场景呢?当pred前驱节点的抢锁请求被取消后期状态为CANCELLED(值为1)时,当前节点(如果被唤醒)就会循环移除所有被取消的前驱节点,直到找到未被取消的前驱。在移除所有被取消的前驱节点后,此方法将返回false,再一次去执行acquireQueued()的自旋抢占。
waitStatus 等于 0
什么时候遇到前驱节点状态waitStatus等于0(初始状态)的场景呢?分为两种情况:
- node节点刚成为新队尾,但还没有将旧队尾的状态设置为SIGNAL。
- node节点的前驱节点为head。
前驱节点为waitStatus等于0的情况是最常见的。比如现在AQS的等待队列中有很多节点正在等待,当前线程刚执行完毕addWaiter(节点刚成为新队尾),然后开始执行获取锁的死循环(独占锁对应的是acquireQueued()里的死循环,共享锁对应的是doAcquireShared()里的死循环),此时节点的前驱(也就是旧队尾的状态)肯定还是0(也就是默认初始化的值),然后死循环执行两次,第一次执行shouldParkAfterFailedAcquire()自然会检测到前驱状态为0,然后将0设置为SIGNAL;第二次执行shouldParkAfterFailedAcquire(),由于前驱节点为SIGNAL,当前线程直接返回true,去执行自我阻塞。
线程挂起:parkAndCheckInterrupt()
parkAndCheckInterrupt()主要任务是暂停当前线程,具体如下:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
// 调用park()使线程进入waiting状态
return Thread.interrupted();
// 如果被唤醒,查看自己是否已经被中断
}
AbstractQueuedSynchronizer会把所有的等待线程构成一个阻塞等待队列,当一个线程执行完lock.unlock()时,会激活其后驱节点,通过调用LockSupport.unpark(postThread)完成后继线程的唤醒。
AQS 两个关键点:节点的入队和出队
由于AQS的实现非常精妙,因此理解AQS的原理还是比较困难的。理解AQS的原理一个比较重要的关键点在于掌握节点的入队和出队。
节点的自旋入队
节点在第一次入队失败后,就会开始自旋入队,分为以下两种情况:
- 如果AQS的队列非空,新节点通过CAS插入队列尾部,并且是通过CAS方式插入,插入之后AQS的tail将指向新的尾节点。
- 如果AQS的队列为空,新节点入队时,AQS通过CAS方法将新节点设置为队首节点,并且将tail指针指向新节点。然后自旋,进入CAS插入操作,直到插入成功,自旋才结束。
节点的入队的代码在enq()方法中,因为enq()非常重要,所以将其代码重复如下:
private Node enq(final Node node) {
for (;;) { //自旋入队
Node t = tail;
if (t == null) {
//队列为空,初始化队尾节点和队首节点为新节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
//如果队列不为空,将新节点插入队列尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
队列初始化创建了一个空的队首节点,这个空的队首节点没有对应的线程,只占用一个位置,等到后面的节点抢到锁,这个节点就被移除。
节点的出队
节点出队的算法在acquireQueued()方法中,这是一个非常重要的模板方法。acquireQueued()方法通过不断在前驱节点上自旋(for死循环),如果前驱节点是队首节点并且当前线程使用钩子方法tryAcquire(arg)获得了锁,则移除队首节点,将当前节点设置为队首节点。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 在前驱节点上自旋
for (;;) {
// 获取节点的前驱节点
final Node p = node.predecessor();
// (1)前驱节点是队首节点
// (2)通过子类的tryAcquire()钩子实现抢占成功
if (p == head && tryAcquire(arg)) {
// 将当前节点设置为队首节点,之前的队首节点出队
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 省略park(无限期阻塞)线程的代码
}
} finally {
// 省略其他
}
}
节点加入到队列尾部后,如果其前驱节点就不是队首节点,通常情况下,该新节点所绑定的线程会被无限期阻塞,而不会去执行无效循环,从而导致CPU资源的浪费。
问题来了:被无限期阻塞的抢锁线程,是什么时候被唤醒的呢?
对于公平锁而言,队首节点就是占用锁的节点,在释放锁时,将会唤醒其后驱节点所绑定的线程。后驱节点的线程被唤醒后会重新执行以上acquireQueued()的自旋(for死循环)抢锁逻辑,检查自己的前驱节点是否为队首节点,如果是,在抢锁成功之后会移除旧的队首节点。
**AQS释放锁时是如何唤醒后继线程的呢?**AQS释放锁的核心代码如下:
public final boolean release(long arg) {
if (tryRelease(arg)) {
// 释放锁的钩子实现
Node h = head; //队列的队首节点
if (h != null && h.waitStatus != 0) unparkSuccessor(h);
//唤醒后驱线程
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
// 省略不相关代码
Node s = node.next; //后驱节点
// 省略不相关代码
if (s != null) LockSupport.unpark(s.thread);
//唤醒后驱的线程
}
通过以上分析可以看出:无效节点的出队操作是在唤醒后驱节点的线程之后,其后驱节点的线程在抢锁过程中完成的。
AQS 锁释放的原理
下面基于SimpleMockLock公平独占锁的释放过程详细说明AQS锁释放的原理。
SimpleMockLock 独占锁的释放流程
SimpleMockLock独占锁的释放流程如图所示:
AQS 模板方法:release()
SimpleMockLock的unlock()方法被调用时,会调用AQS的release(…)的模板方法。AQS的release(…)的模板方法代码如下:
public final boolean release(long arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
这段代码逻辑比较简单,如果同步状态的钩子方法执行成功(tryRelease返回true),就会执行if块中的代码,当 head 指向的队首节点不为null ,并且该节点的状态值不为0时才会执行unparkSuccessor()方法。
钩子方法tryRelease()方法尝试释放当前线程持有的资源,由子类提供具体的实现。
钩子实现:tryRelease()
tryRelease()方法是需要子类提供实现的一个钩子方法,需要子类根据具体业务去实现。SimpleMockLock的钩子实现如下:
//钩子方法
protected boolean tryRelease(int arg) {
//如果当前线程不是占用锁的线程
if (Thread.currentThread() != getExclusiveOwnerThread()) {
//抛出非法状态的异常
throw new IllegalMonitorStateException();
}
//如果锁的状态为没有占用
if (getState() == 0) {
//抛出非法状态的异常
throw new IllegalMonitorStateException();
}
//接下来不需要使用CAS操作,因为下面的操作不存在并发场景
setExclusiveOwnerThread(null);
//设置状态
setState(0);
return true;
}
核心逻辑是设置同步状态state的值为0,方便后驱节点执行抢占。
唤醒后驱:unparkSuccessor()
release()钩子执行了tryRelease()钩子成功之后,使用unparkSuccessor()唤醒后驱节点,具体的代码如下:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus; // 获得节点状态,释放锁的节点,也就是队首节点
//CANCELLED(1)、SIGNAL(-1)、CONDITION (-2)、PROPAGATE(-3)
//如果队首节点状态小于0,则将其置为0,表示初始状态
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next; // 找到后面的一个节点
if (s == null || s.waitStatus > 0) {
// 如果新节点已经被取消CANCELLED(1)
s = null;
//从队列尾部开始,往前去找最前面的一个waitStatus小于0的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0) s = t;
}
//唤醒后驱节点对应的线程
if (s != null)
LockSupport.unpark(s.thread);
}
unparkSuccessor()唤醒后驱节点的线程后,后驱节点的线程重新执行方法acquireQueued()中的自旋抢占逻辑。
当AQS队首节点释放锁之后,队首节点的状态变成初始状态,此节点理论上需要从队列中移除,但是此时该无效节点并没有立即被移除,unparkSuccessor()方法并没有立即从队列中删除该无效节点,仅仅唤醒了后驱节点的线程,重启了后驱节点的自旋抢锁。
ReentrantLock 的抢锁流程
下 面 结 合AbstractQueuedSynchronizer()的 模 板 方 法 详细 说 明 ReentrantLock 的 实 现 过 程 。ReentrantLock有两种模式:
- 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁。
- 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的。
ReentrantLock在同一个时间点只能被一个线程获取,ReentrantLock是通过一个FIFO的等待队列(AQS队列)来管理获取该锁所有线程的。ReentrantLock是继承自Lock接口实现的独占式可重入锁,并且ReentrantLock组合一个AQS内部实例完成同步操作。
ReentrantLock 非公平锁的抢占流程
ReentrantLock非公平锁的抢占的总体流程如图所示:
非公平锁的同步器子类
ReentrantLock为非公平锁实现了一个内部的同步器——NonfairSync,其显式锁获取方法lock()的源码如下:
static final class NonfairSync extends Sync {
//非公平锁抢占
final void lock() {
if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());
else acquire(1);
}
//省略其他
}
首先用一个CAS操作,判断state是否是0(表示当前锁未被占用),如果是0就把它置为1,并且设置当前线程为该锁的独占线程,表示获取锁成功。当多个线程同时尝试占用同一个锁时,CAS操作只能保证一个线程操作成功,剩下的只能乖乖去排队。
ReentrantLock“非公平”性即体现在这里:如果占用锁的线程刚释放锁,state置为0,而排队等待锁的线程还未唤醒,新来的线程就直接抢占了该锁,那么就“插队”了。举一个例子:当前有三个线程A、B、C去竞争锁,假设线程A、B在排队,但是后来的C直接进行CAS操作成功了,拿到了锁开开心心地返回了,那么线程A、B只能乖乖看着。
非公平抢占的钩子方法:tryAcquire(arg)
如果非公平抢占没有成功,非公平锁的lock会执行模板方法acquire(),首先会调用到钩子方法tryAcquire(arg)。非公平抢占的钩子方法实现如下:
static final class NonfairSync extends Sync {
//非公平锁抢占的钩子方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
//省略其他
}
abstract static class Sync extends AbstractQueuedSynchronizer {
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 先直接获得锁的状态
int c = getState();
if (c == 0) {
// 如果任务队列首节点的线程完了,它会将锁的state设置为0
// 当前抢锁线程的下一步就是直接进行抢占,不管不顾
// 发现state是空的,就直接拿来加锁使用,根本不考虑后面后驱者的存在
if (compareAndSetState(0, acquires)) {
// 1. 利用CAS自旋方式判断当前state确实为0,然后设置成acquire(1)
// 这是原子性的操作,可以保证线程安全
setExclusiveOwnerThread(current);
// 设置当前执行的线程,直接返回true
return true;
}
} else if (current == getExclusiveOwnerThread()) {
// 2. 当前的线程和执行中的线程是同一个,也就意味着可重入操作
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
// 表示当前锁被1个线程重复获取了nextc次
return true;
}
// 否则就是返回false,表示没有尝试成功获取当前锁,进入排队过程
return false;
}
//省略其他
}
非公平同步器ReentrantLock.NonfairSync的核心思想就是当前进程尝试获取锁的时候,如果发现锁的状态位是0,就直接尝试将锁拿过来,然后执行setExclusiveOwnerThread(),根本不管同步队列中的排队节点。
ReentrantLock 公平锁的抢占流程
ReentrantLock公平锁的抢占流程如图所示:
公平锁的同步器子类
ReentrantLock为公平锁实现了一个内部的同步器——FairSync,其显式锁获取方法lock的源码如下:
static final class FairSync extends Sync {
//公平锁抢占的钩子方法
final void lock() {
acquire(1);
}
//省略其他
}
公平同步器ReentrantLock.FairSync的核心思想是通过AQS模板方法去进行队列入队操作。
公平抢占的钩子方法:tryAcquire(arg)
公平锁的lock会执行模板方法acquire,该方法首先会调用钩子方法tryAcquire(arg)。公平抢占的钩子方法实现如下:
static final class FairSync extends Sync {
//公平抢占的钩子方法
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//锁状态
if (c == 0) {
//有前驱节点就返回,足够讲义气
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
公平抢占的钩子方法中,首先判断是否有后驱节点,如果有后驱节点,并且当前线程不是锁的占有线程,钩子方法就返回false,模板方法会进入排队的执行流程,可见公平锁是真正公平的。
是否有后驱节点的判断
FairSync进行是否有后驱节点的判断代码如下:
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}
hasQueuedPredecessors的执行场景大致如下:
- 当h!=t不成立的时候,说明h队首节点、t尾节点要么是同一个节点,要么都是null,此时hasQueuedPredecessors()返回false,表示没有后驱节点。
- 当h!=t成立的时候,进一步检查head.next是否为null,如果为null,就返回true。什么情况下h!=t同时h.next==null呢?有其他线程第一次正在入队时可能会出现。其他线程执行AQS的enq()方法,compareAndSetHead(node) 完 成 , 还 没 执 行 tail=head 语 句 时 , 此 时 t=null 、 head=new Node() 、head.next=null。
- 如果h!=t成立,head.next != null,判断head.next是不是当前线程,如果是就返回false,否则返回true。
head节点是获取到锁的节点,但是任意时刻head节点可能占用着锁,也可能释放了锁,如果释放了锁,那么此时state=0,未被阻塞的head.next节点对应的线程在任意时刻都是在自旋地尝试获取锁。
AQS 条件队列
Condition是JUC用来替代传统的Object的wait()/notify()线程间通信与协作机制的新组件,相比使用Object的wait()/notify(),使用Condition的await()/signal()这种方式实现线程间协作更加高效。
Condition 基本原理
Condition与Object的wait()/notify()作用是相似的,都是使得一个线程等待某个条件(Condition),只有当该条件具备signal()或者signalAll()方法被调用时等待线程才会被唤醒,从而重新争夺锁。不同的是,Object的wait()/notify()由JVM底层实现,而Condition接口与实现类完全使用Java代码实现。当需要进行线程间的通信时,建议结合使用ReentrantLock与Condition,通过Condition的await()和
signal()方法进行线程间的阻塞与唤醒。
ConditionObject类是实现条件队列的关键,每个ConditionObject对象都维护一个单独的条件等待对列。每个ConditionObject对应一个条件队列,它记录该队列的队首节点和尾节点。
public class ConditionObject implements Condition, java.io.Serializable {
//记录该队列的队首节点
private transient Node firstWaiter;
//记录该队列的尾节点
private transient Node lastWaiter;
}
一个Condition对象是一个单条件的等待队列,具体如图所示:
在一个显式锁上,我们可以创建多个等待任务队列,这点和内置锁不同,Java内置锁上只有唯一的一个等待队列。比如,我们可以使用newCondition()创建两个等待队列,具体如下:
private Lock lock = new ReentrantLock();
//创建第一个等待队列
private Condition firstCond = lock.newCondition();
//创建第二个等待队列
private Condition secondCond = lock.newCondition();
Condition条件队列与AQS同步队列的关系如图所示:
Condition条件队列是单向的,而AQS同步队列是双向的,AQS节点会有前驱指针。一个AQS实例可以有多个条件队列,是聚合关系;但是一个AQS实例只有一个同步队列,是逻辑上的组合关系。
await()等待方法原理
当线程调用await()方法时,说明当前线程的节点为当前AQS队列的队首节点,正好处于占有锁的状态,await()方法需要把该线程从AQS队列挪到Condition等待队列里,如图所示:
在await()方法中将当前线程挪动到Condition等待队列后,还会唤醒AQS同步队列中head节点的下一个节点。await()方法的整体流程如下:
- 执行await()时,会新创建一个节点并放入到Condition队列尾部。
- 然后释放锁,并唤醒AQS同步队列中的队首节点的后一个节点。
- 然后执行while循环,将该节点的线程阻塞,直到该节点离开等待队列,重新回到同步队列成为同步节点后,线程才退出while循环。
- 退出循环后,开始调用acquireQueued()不断尝试拿锁。
- 拿到锁后,会清空Condition队列中被取消的节点。
创建一个新节点并放入Condition队列尾部的工作由addConditionWaiter()方法完成,该方法具体如下:
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果尾节点取消,重新定位尾节点
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//创建一个新Node,作为等待节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//将新Node加入等待队列
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
signal()唤醒方法原理
线程在某个ConditionObject对象上调用signal()方法后,等待队列中的firstWaiter会被加入到同步队列中,等待节点被唤醒,流程如图所示:
signal()方法的整体流程如下:
- 通过enq()方法自旋(该方法已经介绍过),将条件队列中的队首节点放入到AQS同步队列尾部,并获取它在AQS队列中的前驱节点。
- 如果前驱节点的状态是取消状态,或者设置前驱节点为Signal状态失败,就唤醒当前节点的线程;否则节点在同步队列的尾部,参与排队。
- 同步队列中的线程被唤醒后,表示重新获取了显式锁,然后继续执行condition.await()语句后面的临界区代码。
节点入队的时机
在介绍完AQS之后总结一下,节点入队AQS的时机,这里暂时梳理了两个时机和三种细分场景。
-
时机一:在模板方法acquire()中,如果调用tryAcquire(arg)尝试成功,acquire()将直接返回,表示已经抢到锁;如果不成功,则开始将线程加入等待队列。这里分为三种场景:
-
模板方法acquire(arg)通过addWaiter(Node node, int args)方法,尝试将该节点加入到同步队
列的队尾,在存在竞争的场景时一般会成功。当然,如果加入失败,或者同步队列为空,就开始调
用enq(final Node node)自旋入队。 -
enq()方法通过CAS自旋将新节点插入队列尾部。具体来说,如果AQS的队列非空,新节点
入队的插入位置在队列的尾部,并且是通过CAS方式插入的,插入之后AQS的tail将指向新节点,
新节点作为尾节点。 -
enq()方法初始化AQS队列再执行CAS自旋。如果AQS的队列为空,新节点入队时首先进行
队列初始化,AQS通过CAS方法创建队首节点,并且将tail指针指向队首节点。然后自旋,进入CAS
自旋插入操作,直到插入成功,自旋才结束。
-
-
时机二:Condition等待队列上的节点被signal()唤醒,会通过enq(final Node node)自旋入队,插入AQS的尾部。
AQS 的实际应用
首先介绍一下JUC的总体架构,如图所示:
AQS建立在CAS原子操作和volatile可见性变量的基础之上,为上层的显式锁、同步工具类、阻塞队列、线程池、并发容器、Future异步工具提供线程之间同步的基础设施。所以,AQS在JUC框架中的使用是非常广泛的。
感谢耐心看到这里的同学,觉得文章对您有帮助的话希望同学们不要吝啬您手中的赞,动动您智慧的小手,您的认可就是我创作的动力!
之后还会勤更自己的学习笔记,感兴趣的朋友点点关注哦。