目录
- AQS详解
- 1、AQS简介
- `AbstractQueuedSynchronizer`的继承结构和类属性
- AQS的静态内部类`Node`
- 总结AQS的实现思想
- 总结AQS的实现原理
- AQS和锁的关系
- 2、AQS的核心方法
- AQS管理共享资源的方式
- 独占方式下,AQS获取资源的流程详解
- 独占方式下,AQS释放资源的流程详解
- 共享方式下,AQS获取资源的流程详解
- 共享方式下,AQS释放资源的流程详解
- 四种AQS管理共享资源的流程图示
- 3、AQS的模版方法设计模式
- 4、AQS对条件变量的支持
- Condition接口的定义
- AQS的ConditionObject内部类
- ConditionObject 中的`await()`方法
- ConditionObject 中的`signal()`方法
- 利用Condition实现线程通信举例
- 简单总结Condition 条件变量的作用和原理:
- 5、利用AQS实现自己的锁
- 总结:
AQS详解
1、AQS简介
为什么要先总结AQS而不是Lock?
原计划是先把Java中的锁总结下,然后详细的总结下Lock。结果Lock总结一半,发现还是先把AQS给总结下吧,要不然Lock的实现用了太多AQS的方法和思想。如果Lock里面混着AQS的总结,篇幅太大,容易乱。
AQS是AbstractQueuedSynchronizer(抽象队列同步器)的缩写,是Java并发包(java.util.concurrent)中用于构建锁和同步器的一个基础框架。
它提供了一个基于FIFO等待队列的机制,用于管理多线程之间的同步操作。
AQS是许多锁和同步器(如ReentrantLock、ReadWriteLock、CountDownLatch、Semaphore等)的核心实现基础。
AbstractQueuedSynchronizer
的继承结构和类属性
类继承结构:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable
AbstractQueuedSynchronizer
抽象类继承自AbstractOwnableSynchronizer
。
其中AbstractOwnableSynchronizer
主要用于提供管理独占模式下锁的所有者线程的功能,包含设置和获取锁所有者线程的方法。
可以看下AbstractOwnableSynchronizer
的源码:
/**
* AbstractOwnableSynchronizer 是一个抽象类,用于维护独占模式下的同步器的所有者线程。
* 这个类的主要作用是提供一个基础设施来记录哪个线程当前持有独占访问权。
*/
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
/**
* 空构造函数供子类使用。
* 子类可以调用这个构造函数来初始化基类的状态。
*/
protected AbstractOwnableSynchronizer() { }
/**
* 当前拥有独占模式同步的线程。
* 这个字段被声明为 transient,意味着它不会被序列化。
* 这个字段用于记录当前持有独占访问权的线程。
*/
private transient Thread exclusiveOwnerThread;
/**
* 设置当前拥有独占访问权的线程。
* 传入一个 null 参数表示没有线程拥有访问权。
*
* @param thread 拥有访问权的线程
*/
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
/**
* 返回最近一次通过 setExclusiveOwnerThread 方法设置的线程,
* 如果从未设置过,则返回 null。
* @return 拥有访问权的线程
*/
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
AbstractQueuedSynchronizer
的类属性:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
/**
* 队列的头节点。
* 该节点是一个哑节点,不持有实际线程,但其后继节点表示等待获取锁的线程。
* head 是一个 transient 类型的 volatile 字段,表示它不会被序列化并且对所有线程可见。
* 初始化时,head 为 null,表示等待队列为空。
*/
private transient volatile Node head;
/**
* 队列的尾节点。
* tail 指向队列的最后一个节点,即最新加入等待队列的线程。
* tail 是一个 transient 类型的 volatile 字段,表示它不会被序列化并且对所有线程可见。
* 初始化时,tail 为 null,表示等待队列为空。
*/
private transient volatile Node tail;
/**
* 同步状态。
* state 是一个 volatile 类型的整数,用于表示当前的同步状态。
* 具体含义由子类定义,比如表示锁的重入次数或剩余的许可数。
* 使用 volatile 确保对 state 的修改对所有线程立即可见。
*/
private volatile int state;
/**
* 自旋等待的阈值。
* 当线程在获取锁时,如果需要短暂等待,它们可能会进行自旋操作(忙等待)而不是立即挂起。
* 该常量定义了自旋等待的时间阈值(以纳秒为单位)。
* 如果预计等待时间超过这个阈值,线程将被挂起,以避免浪费CPU资源。
*/
static final long spinForTimeoutThreshold = 1000L;
// 其他属性...(UnSafe相关暂时不讲)
}
AQS的静态内部类Node
/**
* Node 是 AQS 的内部静态类,用于表示等待队列中的节点。
* 每个节点都包含了等待的线程以及一些状态和链接信息。
*/
static final class Node {
/** 共享模式的标记节点。 */
static final Node SHARED = new Node();
/** 独占模式的标记节点。 */
static final Node EXCLUSIVE = null;
/** 节点状态值:表示线程已取消。 */
static final int CANCELLED = 1;
/** 节点状态值:表示后继节点的线程需要被唤醒。 */
static final int SIGNAL = -1;
/** 节点状态值:表示线程正在等待条件。 */
static final int CONDITION = -2;
/**
* 节点状态值:表示下一个共享模式的获取操作应该无条件传播。
*/
static final int PROPAGATE = -3;
/**
* 节点的状态字段,只取以下值:
* SIGNAL: 后继节点的线程被阻塞(通过park方法),因此当前节点在释放或取消时必须唤醒其后继节点。
* 为避免竞争,获取操作必须首先表明它们需要一个信号,然后重试原子获取操作,然后在失败时阻塞。
* CANCELLED: 由于超时或中断,此节点被取消。节点一旦进入此状态,将不会离开。
* 特别是,具有取消节点的线程不会再次阻塞。
* CONDITION: 此节点当前在条件队列中。直到转移到同步队列时,其状态才会设置为0。
* (在这里使用此值与字段的其他用途无关,但简化了机制。)
* PROPAGATE: 共享模式的释放操作应该传播到其他节点。这只在doReleaseShared方法中为头节点设置,以确保传播继续,即使其他操作已经介入。
* 0: 无上述任何状态。
*
* 值的排列顺序简化了使用。非负值意味着节点不需要信号。因此,大多数代码不需要检查特定值,只需检查符号。
*
* 对于正常的同步节点,字段初始化为0;对于条件节点,初始化为CONDITION。
* 它使用CAS(或在可能的情况下,无条件的volatile写操作)进行修改。
*/
volatile int waitStatus;
/**
* 链接到前驱节点,当前节点/线程依赖前驱节点来检查waitStatus。
* 在入队时分配,并且只有在出队时才为垃圾收集的目的清空。
* 此外,当前驱节点取消时,我们会在寻找一个非取消的前驱节点时进行短路,这总是存在的,因为头节点从不取消:
* 节点只有在成功获取后才成为头节点。取消的线程永远不会成功获取,并且线程只会取消自己,而不是任何其他节点。
*/
volatile Node prev;
/**
* 链接到后继节点,当前节点/线程在释放时唤醒后继节点。
* 在入队时分配,当绕过取消的前驱节点时调整,并且在出队时为空(为垃圾收集的目的)。
* 入队操作不会在附件之后分配前驱节点的next字段,因此看到null next字段并不一定意味着节点在队列的末尾。
* 但是,如果next字段似乎为null,我们可以从尾节点扫描prev节点以进行双重检查。
* 取消的节点的next字段设置为指向节点自身,而不是null,以使isOnSyncQueue方法更容易处理。
*/
volatile Node next;
/**
* 入队此节点的线程。在构造时初始化并在使用后为空。
*/
volatile Thread thread;
/**
* 链接到下一个等待条件的节点,或特殊值SHARED。
* 因为条件队列只有在持有独占模式时才访问,我们只需要一个简单的链接队列来保存等待条件的节点。
* 然后它们被转移到队列以重新获取。而且因为条件只能是独占的,我们通过使用特殊值来表示共享模式来节省一个字段。
*/
Node nextWaiter;
/**
* 返回true如果节点正在以共享模式等待。
*
* @return 如果节点正在以共享模式等待则返回true
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 返回前驱节点,如果为null则抛出NullPointerException。
* 当前驱节点不能为空时使用。空检查可以被省略,但存在是为了帮助虚拟机。
*
* @return 前驱节点
* @throws NullPointerException 如果前驱节点为null
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
/**
* 构造一个节点,用于建立初始头节点或共享标记节点。
*/
Node() { // 用于建立初始头或共享标记节点
}
/**
* 构造一个节点,用于添加等待者。
*
* @param thread 关联的线程
* @param mode 节点的模式(独占或共享)
*/
Node(Thread thread, Node mode) { // 用于 addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
/**
* 构造一个节点,用于条件等待。
*
* @param thread 关联的线程
* @param waitStatus 节点的等待状态
*/
Node(Thread thread, int waitStatus) { // 用于 Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
总结AQS的实现思想
至此大概能够了解到AQS内部的一些实现了,Node类用于在AQS的同步队列和条件队列中维护线程的等待状态和链表结构。
通过waitStatus字段和链表结构,AQS能够有效管理线程的阻塞、唤醒和取消操作,保证并发环境下的线程同步。
之前在synchronized关键字详解这篇文章中有说过锁的本质。实际上无论是锁还是别的什么同步器,比如JVM对应synchronized底层锁的实现、亦或是AQS及其子类,它们实现的思想都差不多。
锁和同步器的核心思想可以总结为以下几点:
①、记录当前是否有线程获得锁:
需要有一个机制来判断锁当前是否被占用,以确保同一时间只有一个线程可以进入临界区。
这通常通过一个状态变量来实现,例如ReentrantLock中的state字段。
②、记录当前获得锁的是哪个线程:
需要知道哪个线程当前持有锁,以便支持重入锁(同一个线程可以多次获取同一个锁)和其他高级特性。
这通常通过一个线程变量来实现,例如AbstractOwnableSynchronizer
中的exclusiveOwnerThread
字段。
③、记录还有哪些线程在阻塞等待获取锁:
需要有一个队列或其他数据结构来管理那些尝试获取锁但未能成功的线程。
这些线程会被挂起(阻塞),等待锁被释放后再尝试获取锁。
这通常通过一个等待队列来实现,例如AbstractQueuedSynchronizer中的Node同步队列。只不过AQS这里利用了CLH锁的思想(下面会细说)来设计同步队列从而提高性能。
总结AQS的实现原理
我们就按照上面说的锁或者同步器的实现思想一步一步来分析:
①、状态
AQS利用 private volatile int state;
变量来表示当前的同步状态。由于AQS是一个抽象类,主要是为Java提供一个同步框架。
所以state
的具体含义由其子类定义,比如:
在独占锁(如 ReentrantLock)中,state 表示锁的持有数量(0表示未持有锁,1表示持有锁,大于1表示重入了锁)。
在共享锁(如 CountDownLatch 和 Semaphore)中,state 表示剩余的许可或计数。
②、持有锁的线程
AQS 提供了对持有锁线程的管理,通过 AbstractOwnableSynchronizer 类实现,这个类提供了对独占模式下持有锁的线程的记录和访问方法。
// 持有独占锁的线程
private transient Thread exclusiveOwnerThread;
// 设置持有独占锁的线程
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
// 获取持有独占锁的线程
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
③、队列
AbstractQueuedSynchronizer
的Node
节点中有pre
、next
,说明是个双向链表结构,对应 AbstractQueuedSynchronizer
类属性的private transient volatile Node head;
和private transient volatile Node tail;
。
AQS的具体方法还没有分析,这里先说结论。
AQS的入队操作enq(Node node)
方法确保新的节点总是被添加到队列的尾部。
AQS的出队操作unparkSuccessor(Node node)
方法和 acquire
系列方法确保总是从队列的头部获取锁。
所以AQS 的同步队列是一个双向链表结构的 FIFO(先进先出) 队列。
同时AQS还可以维护条件队列, 利用ConditionObject
内部类实现,ConditionObject
内部类最终也是利用Node
节点保存线程。并且利用Node
节点的nextWaiter
指针来维护一个单向链表。
所以AQS 的条件队列是一个单向链表结构的 FIFO(先进先出) 队列。
队列总结:
同步队列(Sync Queue): 管理那些尝试获取锁但未能成功的线程,通过 head 和 tail 指针以及 Node 节点实现。
当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。(后面会对获取同步状态的方法进行详细分析)
内部类Node
上面已经提到了,这里再总结下Node
节点中属性类型、名称和描述。
属性类型与名称 | 描述 |
---|---|
int waitStatus | 等待状态。包含如下状态: ① CANCELLED,值为 1,由于在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待,节点进入该状态后将不会变化 ② SIGNAL,值为 -1,表示当前节点的后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行 ③ CONDITION,值为 -2,节点在条件队列中,节点线程等待在 Condition 上,当其他线程对 Condition 调用了 signal() 方法后,该节点将会从条件队列中转移到同步队列中,加入到对同步状态的获取中 ④ PROPAGATE,值为 -3,表示下一次共享式同步状态获取将会无条件地被传播下去 ⑤ INITIAL,值为 0,初始状态 |
Node prev | 前驱节点,当节点加入同步队列时被设置(尾部添加) |
Node next | 后继节点 |
Node nextWaiter | 等待队列中的后继节点。如果当前节点是共享的,那么这个字段将是一个 SHARED 常量,也就是说节点类型(独占和共享)和等待队列中的后继节点共用同一个字段 |
Thread thread | 获取同步状态的线程 |
图示:
条件队列(Condition Queue): 管理那些等待特定条件的线程,通过 ConditionObject
内部类及其 firstWaiter
和 lastWaiter
指针、以及Node
节点实现。
条件队列中 Node节点的waitStatus 是CONDITION值为-2,表示节点线程等待在 Condition 上,当其他线程对 Condition 调用了 signal() 方法后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取中
注意:条件队列就是普通的单向链表结构的FIFO队列,不涉及CLH锁。
图示:
在 AbstractQueuedSynchronizer (AQS) 中,同步队列(Sync Queue)并没有明确定义为独立的队列实例,而是通过维护节点及其关系来实现类似队列的行为(可以叫隐式链表)。这种设计类似于LinkedList
的内部实现,通过节点的链接关系来管理队列中的元素,而不是在 AQS
内部声明一个类似 LinkedList
的队列实例。 AQS就是通过这种隐式链表再结合自旋锁加CAS操作来管理线程的等待和唤醒。
上面说的这种机制有一个特定的名称叫: CLH锁,其中CLH(Craig, Landin, and Hagersten)是指其发明者的姓氏,即 Craig, Landin, 和 Hagersten。他们三个人在 1993 年发表了一篇关于自旋锁的论文,介绍了这种基于链表的自旋锁机制。 Java的AQS就是利用了CLH锁的思想来实现高效的线程同步操作。只不过Doug Lee对CLH锁的Java实现进行了优化,增加了节点状态,显式地维护了节点的前驱和后继节点来丰富CLH锁的功能。
CLH 锁的基本原理:
CLH(Craig, Landin, and Hagersten)本质上是一个锁,但它的实现方式是基于队列的,因此常常被称为CLH队列锁或者CLH队列(如果你看到别的资料上有这些命名知道是怎么回事就行了)。在Java的AbstractQueuedSynchronizer (AQS) 中,CLH队列锁的思想被用来管理线程的同步和调度。
CLH锁是一种自旋锁,它的主要特点是每个线程在尝试获取锁时,会形成一个链表(队列),每个线程会在一个独立的节点上自旋,等待前驱节点释放锁。CLH锁的核心是通过队列来管理线程的等待和唤醒,从而实现锁的功能。
节点(Node): 每个线程在进入队列时,会创建一个节点(Node),记录线程的状态。节点的基本结构包含前驱和后继节点,以及线程的状态信息。
隐式链表: 所有节点通过指针连接形成一个隐式链表,维护了线程的等待队列。
自旋: 每个线程在自己的节点上进行自旋,等待其前驱节点释放锁并通知它。
FIFO 顺序: 保证线程按照先来先服务的顺序获取锁。
AQS的实现原理总结:
利用AQS实现的锁或者同步器来实现线程同步,当多个线程并发访问共享资源时,如果被访问的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就利用基于 CLH 锁 实现的同步机制来管理线程的等待和唤醒操作。
AQS和锁的关系
同步器(AQS,AbstractQueuedSynchronizer)是实现锁(以及其他同步组件)的核心机制。
同步器面向的是锁的实现者(现实中可以理解为面向高级开发或者架构师级别的程序员)
面向实现者:同步器为锁的实现者提供了一个框架,使得实现锁的过程更加简便和规范。
简化实现:同步器管理同步状态(如锁的状态)、线程的排队和等待、线程的唤醒等底层操作,使得锁的实现者可以专注于实现锁的具体语义。
屏蔽底层细节:同步器屏蔽了复杂的底层操作细节,比如如何高效地管理等待线程队列、如何处理线程的中断等。
锁面向的是锁的使用者(现实中可以理解为面向所有程序员)
面向使用者:锁提供了一个接口,使用者通过这个接口与锁进行交互,例如获取锁、释放锁。
接口定义:锁定义了使用者可以执行的操作,如 lock() 和 unlock() 方法。
隐藏实现细节:锁屏蔽了具体的实现细节,使得使用者不必关心底层是如何实现线程同步的,只需要使用锁提供的方法来确保线程安全。
最后总结下:
锁是面向使用者的接口,同步器是面向实现者的工具,锁的实现通常通过组合一个同步器实例来实现线程同步的具体行为(例如ReentrantLock
)。锁负责对外提供接口,同步器负责底层的状态管理和线程调度。
2、AQS的核心方法
AQS管理共享资源的方式
对于 AQS 来说,线程同步的关键是对状态值 state 进行操作。根据 state 是否属于单独的一个线程,操作 state 的方式分为独占方式和共享方式。
①、独占方式
定义:
独占方式获取的资源是与具体线程绑定的。如果一个线程获取了资源,就会标记该线程为资源持有者。其他线程在尝试操作 state 获取资源时,若发现资源已被其他线程持有,则会获取失败并被阻塞。
示例:独占锁 ReentrantLock
当一个线程获取了 ReentrantLock 的锁后,AQS 内部会首先使用 CAS 操作将 state 状态值从 0 变为 1,并将当前锁的持有者设置为当前线程。
如果同一线程再次获取锁,则发现它已经是锁的持有者,会将 state 状态值从 1 变为 2(设置可重入次数)。
其他线程在获取锁时,会发现自己不是锁的持有者,于是被放入 AQS 同步队列并挂起。
在独占方式下获取和释放资源使用的方法为 :
void acquire(int arg)
void acquireInterruptibly(int arg)
boolean release(int arg)
②、共享方式
定义:
共享方式的资源与具体线程不相关。多个线程可以通过 CAS 方式竞争获取资源。当一个线程获取资源后,其他线程也可以继续获取,前提是资源数量能满足需求。
示例:信号量 Semaphore
当一个线程通过 acquire() 方法获取信号量时,会先检查当前信号量个数是否满足需要。如果不满足,则将当前线程放入阻塞队列;如果满足,则通过自旋 CAS 获取信号量。
在共享方式下获取和释放资源的方法为:
void acquireShared(int arg)
void acquireSharedInterruptibly(int arg)
boolean releaseShared(int arg)
独占方式下,AQS获取资源的流程详解
当一个线程调用 acquire(int arg)
方法获取独占资源时,会首先使用 tryAcquire 方法尝试获取资源,具体是设置状态变量 state 的值,成功则直接返回,失败则将当前线程封装为类型为 Node.EXCLUSIVE 的 Node 节点后插入到 AQS 阻塞队列的尾部,并调用LockSupport.park(this)
方法挂起自己。
void acquire(int arg)
方法
public final void acquire(int arg) {
// 尝试获取资源,如果获取失败,则将线程加入等待队列并阻塞
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
selfInterrupt();
}
}
// 由子类实现,用于尝试获取资源。成功则返回true,失败则返回false
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 重新设置线程中断标志
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
boolean acquireQueued(final Node node, int arg)
方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; // 标记是否失败
try {
boolean interrupted = false; // 标记是否被中断
for (;;) {
final Node p = node.predecessor(); // 获取前驱节点
if (p == head && tryAcquire(arg)) { // 如果前驱节点是头节点且成功获取资源
setHead(node); // 设置当前节点为头节点
p.next = null; // 帮助GC
failed = false; // 标记成功
return interrupted; // 返回中断状态
}
// 如果前驱节点不允许当前线程获取资源,则阻塞当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; // 如果线程被中断,设置中断标志
}
} finally {
if (failed) // 如果失败,取消获取
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire
方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true; // 前驱节点状态为SIGNAL,表示可以安全阻塞
if (ws > 0) {
// 前驱节点已取消,跳过前驱节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 将前驱节点状态设置为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt
方法
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 阻塞线程
return Thread.interrupted(); // 返回并清除中断状态
}
cancelAcquire
方法
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null; // 清除线程引用
// 跳过所有已取消的前驱节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next; // 获取前驱节点的后继节点
node.waitStatus = Node.CANCELLED; // 将当前节点的状态设置为已取消
// 如果当前节点是尾节点,则移除当前节点
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null); // 将前驱节点的next指向null
} else {
// 如果前驱节点需要唤醒后继节点,尝试设置前驱节点的next指向后继节点
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 如果前驱节点不需要唤醒后继节点,则直接唤醒后继节点
unparkSuccessor(node);
}
node.next = node; // 帮助GC,避免内存泄漏
}
}
unparkSuccessor
方法
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); // 将节点的状态设置为0,表示不需要唤醒
Node s = node.next;
if (s == null || s.waitStatus > 0) {
// 如果后继节点为null或已取消,则从尾节点向前遍历找到未取消的节点
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); // 唤醒后继节点
}
Node addWaiter(Node mode)
方法
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); // 创建新的节点
Node pred = tail;
if (pred != null) {
node.prev = pred; // 设置新节点的前驱节点为当前尾节点
if (compareAndSetTail(pred, node)) { // 尝试将新节点设置为尾节点
pred.next = node; // 设置前驱节点的next为新节点
return node;
}
}
enq(node); // 如果快速路径失败,则进入完整的入队过程
return node;
}
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update); // CAS操作设置尾节点
}
Node enq(final Node node)
方法
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 队列为空,初始化头节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t; // 设置新节点的前驱节点为当前尾节点
if (compareAndSetTail(t, node)) { // 尝试将新节点设置为尾节点
t.next = node; // 设置前驱节点的next为新节点
return t;
}
}
}
}
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update); // CAS操作设置头节点
}
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update); // CAS操作设置尾节点
}
总结:
- ①、acquire(int arg):线程尝试获取资源,如果失败,将线程封装成节点并加入等待队列,然后阻塞。
- ②、tryAcquire(int arg):由子类实现,用于尝试获取资源。
- ③、addWaiter(Node mode):将当前线程封装成节点并快速加入等待队列。
- ④、enq(final Node node):如果快速路径失败,进入完整的入队过程。
- ⑤、acquireQueued(final Node node, int arg):在队列中等待,直到成功获取资源或被中断。
- ⑥、shouldParkAfterFailedAcquire(Node pred, Node node):判断是否应该阻塞当前线程。
- ⑦、parkAndCheckInterrupt():阻塞当前线程并返回中断状态。
- ⑧、cancelAcquire(Node node):取消节点的获取操作。
- ⑨、unparkSuccessor(Node node):唤醒后继节点。
独占方式下,AQS释放资源的流程详解
当一个线程调用 release(int arg)
方法时会尝试使用 tryRelease
操作释放资源,这里是设置状态变量 state 的值,然后调用 LockSupport.unpark(thread)
方法激活 AQS 队列里面被阻塞的一个线程 (thread)。被激活的线程则使用 tryAcquire
尝试,看当前状态变量 state
的值是否能满足自己的需要,满足则该线程被激活,然后继续向下运行,否则还是会被放入 AQS 队列并被挂起。
boolean release(int arg)
方法
public final boolean release(int arg) {
if (tryRelease(arg)) { // 尝试释放资源
Node h = head;
if (h != null && h.waitStatus != 0) // 如果头节点存在且其状态不为0
unparkSuccessor(h); // 唤醒头节点的后继节点
return true; // 释放成功
}
return false; // 释放失败
}
// 由子类实现,尝试释放资源,成功则返回true,失败则返回false
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
unparkSuccessor(Node node)
方法
private void unparkSuccessor(Node node) {
// 如果节点状态为负数,尝试清除状态
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); // 将节点状态设置为0
// 获取后继节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
// 如果后继节点为null或已取消,从尾节点向前遍历找到未取消的节点
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); // 唤醒后继节点的线程
}
// CAS操作设置节点的等待状态
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}
-
①、release方法:
尝试使用 tryRelease 释放资源。
如果成功,检查头节点的状态。
如果头节点存在且状态不为0,调用 unparkSuccessor 唤醒后继节点。
返回 true 表示释放成功,否则返回 false。 -
②、tryRelease方法:
由子类实现,尝试释放资源,成功则返回 true,失败则返回 false。 -
③、unparkSuccessor方法:
检查节点的等待状态,如果为负数则设置为0。
获取后继节点,如果后继节点为 null 或已取消,从尾节点向前遍历找到未取消的节点。
唤醒找到的后继节点的线程。
整个流程确保当资源被释放时,等待队列中的下一个线程能够被唤醒并尝试获取资源。如果成功获取资源,该线程会继续运行,否则它将继续在队列中等待。通过这种机制,AQS 保证了资源的独占访问和线程的有序等待。
注意点:
AQS 类的 tryAcquire 和 tryRelease 方法仅抛出一个异常并没有具体的实现,因为 AQS是锁阻塞和同步器的基础框架,tryAcquire 和 tryRelease 需要由具体的子类来实现。
子类在实现 tryAcquire 和 tryRelease 时要根据具体场景使用 CAS 算法尝试修改 state 状态值 ,成功则返回 true, 否则返回 false。子类还需要定义,在调用 acquire 和 release 方法时 state状态值的增减代表什么含义。
共享方式下,AQS获取资源的流程详解
当线程调用 acquireShared(int arg)
获取共享资源时,会首先使用 tryAcquireShared
尝试获取资源,具体是设置状态变量 state 的值,成功则直接返回,失败则将当前线程封装为类型为 Node.SHARED 的 Node 节点后插入到 AQS 阻塞队列的尾部,并使用LockSupport.park(this)
方法挂起自己。
acquireShared(int arg)
方法
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0) // 尝试获取共享资源,如果返回值小于0则表示获取失败
doAcquireShared(arg); // 进入等待队列
}
// 子类实现,尝试获取共享资源,成功返回非负值,失败返回负值
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
doAcquireShared(int arg)
方法
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED); // 将当前线程封装为共享模式节点并加入等待队列
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); // 获取前驱节点
if (p == head) { // 如果前驱节点是头节点,尝试获取资源
int r = tryAcquireShared(arg);
if (r >= 0) { // 获取资源成功
setHeadAndPropagate(node, r); // 设置头节点并传播
p.next = null; // 帮助GC
if (interrupted)
selfInterrupt(); // 如果在等待过程中被中断过,则重新中断自己
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 如果获取失败,检查是否应该挂起
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node); // 取消获取操作
}
}
setHeadAndPropagate
方法
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 记录旧的头节点
setHead(node); // 设置新的头节点
// 如果需要传播信号或者旧头节点状态为负数,尝试唤醒后继节点
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared(); // 唤醒后继节点
}
}
doReleaseShared
方法
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // 循环重新检查
unparkSuccessor(h); // 唤醒后继节点
} else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // 循环重新检查
}
if (h == head) // 如果头节点未改变,跳出循环
break;
}
}
其中有些方法和独占方式下,AQS获取资源的方法相同,可以参考上面的方法源码。 这里就不赘述了。
总结:
①、acquireShared方法:
尝试通过 tryAcquireShared 获取共享资源。
如果获取失败,调用 doAcquireShared 进入等待队列。
②、tryAcquireShared方法:
子类实现,尝试获取共享资源,成功返回非负值,失败返回负值。
③、doAcquireShared方法:
将当前线程封装为共享模式节点并加入等待队列。
在循环中检查前驱节点是否为头节点,尝试获取资源。
如果获取资源成功,设置新的头节点并传播信号。
如果获取资源失败,检查是否应该挂起线程并挂起。
④、setHeadAndPropagate方法:
设置新的头节点。
如果需要传播信号,唤醒后继节点。
⑤、doReleaseShared方法:
释放共享资源并传播信号。
确保信号传播,即使在其他获取/释放操作进行中。
共享方式下,AQS释放资源的流程详解
当一个线程调用 releaseShared(int arg)
时会尝试使用 tryReleaseShared
操作释放资源,这里是设置状态变量 state 的值,然后使用 LockSupport.unpark(thread)
激活 AQS 队列里面被阻塞的一个线程 (thread)。被激活的线程则使用 tryReleaseShared
查看当前状态变量 state 的值是否能满足自己的需要,满足则该线程被激活,然后继续向下运行,否则还是会被放入 AQS 队列并被挂起。
releaseShared(int arg)
方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // 尝试释放共享资源
doReleaseShared(); // 传播释放信号
return true; // 释放成功
}
return false; // 释放失败
}
// 子类实现,尝试释放共享资源,成功返回true,失败抛出异常
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
doReleaseShared
方法
private void doReleaseShared() {
/*
* 确保释放操作能够传播,即使有其他正在进行的获取/释放操作。
* 通常通过尝试唤醒头节点的后继节点来进行。
* 但如果不能,状态设置为 PROPAGATE 以确保释放时继续传播。
* 同时需要循环以防在执行过程中有新节点被加入。
* 与其他使用 unparkSuccessor 的情况不同,需要知道 CAS 重置状态是否失败,
* 如果失败则重新检查。
*/
for (;;) {
Node h = head; // 获取头节点
if (h != null && h != tail) { // 如果头节点存在且不是尾节点
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // 如果头节点需要信号
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 尝试将头节点状态重置为0
continue; // 如果失败则继续循环重新检查
unparkSuccessor(h); // 唤醒头节点的后继节点
} else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 如果头节点状态为0,设置为PROPAGATE
continue; // 如果失败则继续循环重新检查
}
if (h == head) // 如果头节点未改变,跳出循环
break;
}
}
// CAS 操作设置节点的等待状态
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}
总结:
①、releaseShared方法:
尝试通过 tryReleaseShared 释放共享资源。
如果成功释放,调用 doReleaseShared 传播释放信号。
返回 true 表示释放成功,否则返回 false。
②、tryReleaseShared方法:
子类实现,尝试释放共享资源,成功返回 true,失败则抛出异常。
③、doReleaseShared方法:
确保释放操作能够传播,即使有其他正在进行的获取/释放操作。
通常通过尝试唤醒头节点的后继节点来进行。
如果头节点需要信号,尝试将其状态重置为0并唤醒其后继节点。
如果头节点状态为0,设置为 PROPAGATE 以确保释放时继续传播。
循环检查直到头节点不再改变。
注意点:
和上面独占模式类似,AQS 类并没有提供可用的 tryAcquireShared
和 tryReleaseShared
方法,tryAcquireShared
和 tryReleaseShared
需要由具体的子类来实现。子类在实现 tryAcquireShared
和 tryReleaseShared
时要根据具体场景使用 CAS 算法尝试修改 state 状态值,成功则返回 true,否则返回 false。
四种AQS管理共享资源的流程图示
①、独占方式下,AQS获取资源的流程图
②、独占方式下,AQS释放资源的流程图
(还是2k屏截的图清楚~)
③、共享方式下,AQS获取资源的流程图
④、共享方式下,AQS释放资源的流程图
总结:
流程图总结的比较粗,建议结合流程图多看源码的细节。
3、AQS的模版方法设计模式
AQS设计的目的是为程序员实现锁或者同步器提供一个框架。
上面我们对AQS管理共享资源的方式进行了分析,其中有几个需要子类实现的方法如下:
方法名称 | 描述 |
---|---|
protected boolean tryAcquire(int arg) | 独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态 |
protected boolean tryRelease(int arg) | 独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态 |
protected int tryAcquireShared(int arg) | 共享式获取同步状态,返回值大于等于0的值,表示获取成功,反之,获取失败 |
protected boolean tryReleaseShared(int arg) | 共享式释放同步状态 |
protected boolean isHeldExclusively() | 当前同步器是否在独占模式下被线程占用,一般该方法表示是否该同步器被当前线程所独占 |
同时AQS提供了一些模板方法来实现独占式获取与释放同步状态、共享式获取与释放同步状态和查询同步队列中的等待线程情况。
当我们利用AQS实现自己的锁或者同步器的时候可以使用AQS提供的模板方法来实现自己的同步功能。
方法名称 | 描述 |
---|---|
void acquire(int arg) | 独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用重写的 tryAcquire(int arg) 方法 |
void acquireInterruptibly(int arg) | 与 acquire(int arg) 相同,但是该方法响应中断,当前线程未获取到同步状态而进入同步队列中,如果当前线程被中断,则抛出 InterruptedException 并返回 |
boolean tryAcquireNanos(int arg, long nanos) | 在 acquireInterruptibly(int arg) 基础上增加了超时限制,如果当前线程在规定时间内没有获取到同步状态,则返回 false,如果获取到了返回 true |
void acquireShared(int arg) | 共享式的获取同步状态,如果当该线程未获取到同步状态,将会进入同步队列等待,当该线程获取到的同步状态释放后,同时可以被多个线程获取同步状态 |
void acquireSharedInterruptibly(int arg) | 与 acquireShared(int arg) 相同,该方法可中断 |
boolean tryAcquireSharedNanos(int arg, long nanos) | 在 acquireSharedInterruptibly(int arg) 基础上增加了超时限制 |
boolean release(int arg) | 独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中的第一个节点唤醒 |
boolean releaseShared(int arg) | 共享式的释放同步状态 |
Collection getQueuedThreads() | 获取等待在同步队列上的线程集合 |
模版方法模式是一种行为设计模式,它在一个方法中定义一个算法的框架,而将一些步骤的实现延迟到子类中。模版方法使得子类可以在不改变算法结构的情况下,重新定义算法中的某些步骤。
模版方法设计模式在AQS中的体现
AQS定义了一些模板方法,如acquire(int arg)、release(int arg)、acquireShared(int arg)和releaseShared(int arg)等。这些方法定义了获取和释放同步状态的流程,而具体的同步状态获取和释放的实现则由子类通过实现抽象方法tryAcquire(int arg)
、tryRelease(int arg)
、tryAcquireShared(int arg)
和tryReleaseShared(int arg)
来提供。这种设计允许不同的同步器(如ReentrantLock、Semaphore等)在不改变AQS整体框架的情况下,定义自己的同步逻辑。
4、AQS对条件变量的支持
之前在synchronized详解中说过 synchronized 内置锁可以通过 notify
和 wait
方法来实现不同线程间的同步。
那么AQS也有一套逻辑用来实现不同线程间的同步,AQS是基于Condition接口的 signal
和 await
方法来实现的。并且Condition接口的 signal
和 await
方法也是需要配合锁来实现线程间的同步。
Condition接口的定义
下面列举了一些常用的方法:
public interface Condition {
/**
* 使当前线程等待,直到被通知或中断。
*
* @throws InterruptedException 如果当前线程在等待时被中断
*/
void await() throws InterruptedException;
/**
* 唤醒一个等待的线程。
*/
void signal();
/**
* 使当前线程等待,直到被通知。
* 在等待期间,线程不响应中断。
*/
void awaitUninterruptibly();
/**
* 使当前线程等待,直到被通知、中断或指定的等待时间到期。
*
* @param time 最长等待时间
* @param unit 时间单位
* @return 如果在指定的等待时间内收到通知,则返回true;如果等待时间到期仍未收到通知,则返回false
* @throws InterruptedException 如果当前线程在等待时被中断
*/
boolean await(long time, TimeUnit unit) throws InterruptedException;
/**
* 唤醒所有等待的线程。
*/
void signalAll();
}
AQS的ConditionObject内部类
AQS对于Condition接口的实现ConditionObject。
public class ConditionObject implements Condition, java.io.Serializable
ConditionObject
是 AQS 的内部类,可以访问 AQS 内部的变量(例如状态变量 state)和方法。每个 ConditionObject 实例维护了一个条件队列(condition queue),用于存放调用 await() 方法时被阻塞的线程。
具体的条件队列可以参考上面的 总结AQS的实现原理 部分的第③点 。
AQS 提供了 ConditionObject 的实现,并且提供了一个无参的公共构造方法用来实例化ConditionObject。AQS 的子类一般提供 newCondition
函数来实例化一个ConditionObject,例如 ReentrantLock就提供了newCondition()
方法。
ConditionObject 中的await()
方法
public final void await() throws InterruptedException {
// 如果当前线程被中断,则抛出InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程加入条件等待队列
Node node = addConditionWaiter();
// 释放当前线程持有的锁,返回释放前的锁状态
int savedState = fullyRelease(node);
int interruptMode = 0;
// 自旋等待,直到节点被转移到同步队列
while (!isOnSyncQueue(node)) {
// 挂起当前线程,等待被唤醒
LockSupport.park(this);
// 检查在等待过程中是否被中断,如果是则退出等待
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 重新获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 如果当前节点的下一个等待者不为空,则清理取消的等待者
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// 如果线程在等待过程中被中断,处理中断
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
await()
方法总结:
- 中断检查:首先检查当前线程是否被中断,如果是,则抛出 InterruptedException。
- 添加到条件队列:将当前线程包装成一个节点,并加入条件队列。
- 释放锁:释放当前线程持有的锁,并保存锁的状态,以便后续重新获取锁。
- 等待被唤醒:在自旋等待中,线程被挂起,直到节点被转移到同步队列。
- 重新获取锁:节点被唤醒后,重新获取锁。
- 清理条件队列:如果当前节点的下一个等待者不为空,则清理取消的等待者。
- 处理中断:如果线程在等待过程中被中断,进行相应的中断处理。
ConditionObject 中的signal()
方法
public final void signal() {
// 检查当前线程是否持有锁,如果没有则抛出IllegalMonitorStateException
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 获取条件队列的第一个节点
Node first = firstWaiter;
// 如果第一个节点不为空,执行唤醒操作
if (first != null)
doSignal(first);
}
signal()
方法总结:
- 锁持有检查:检查当前线程是否持有锁,如果没有持有锁,则抛出 IllegalMonitorStateException。
- 获取第一个等待节点:获取条件队列的第一个节点。
- 唤醒操作:如果第一个节点不为空,执行唤醒操作,将该节点从条件队列转移到同步队列。
利用Condition实现线程通信举例
实现两个线程交替打印牛、马
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class TestA {
private static final Lock lock = new ReentrantLock();
private static final Condition condition = lock.newCondition();
private static boolean flag牛 = true; // 控制打印牛的标志
public static void main(String[] args) {
Thread threadA = new Thread(() -> {
for (int i = 0; i < 10; i++) {
print牛();
}
});
Thread threadB = new Thread(() -> {
for (int i = 0; i < 10; i++) {
print马();
}
});
threadA.start();
threadB.start();
try {
threadA.join();
threadB.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void print牛() {
lock.lock();
try {
while (!flag牛) {
condition.await();
}
System.out.print("牛 ");
flag牛 = false;
condition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
private static void print马() {
lock.lock();
try {
while (flag牛) {
condition.await();
}
System.out.print("马 ");
flag牛 = true;
condition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
运行结果:
牛 马 牛 马 牛 马 牛 马 牛 马 牛 马 牛 马 牛 马 牛 马 牛 马
简单总结Condition 条件变量的作用和原理:
-
等待(await):
condition.await()
使当前线程等待并释放锁,直到其他线程调用condition.signal()
或condition.signalAll()
唤醒它。await()
方法会使当前线程进入当前条件变量维护的条件队列,挂起当前线程(挂起后线程状态变为WAITING
)并释放持有的锁。 -
通知(signal):
condition.signal()
唤醒一个等待在该条件上的线程。如果有多个线程等待,则随机选择一个唤醒(这里需要注意,虽然AQS的条件队列是FIFO的,但是并不保证condition.signal()
按照严格的FIFO顺序唤醒挂起的线程)。signalAll()
会唤醒所有等待在该条件上的线程。被唤醒的线程会尝试重新获取锁,然后从await()
方法返回,继续执行。 -
互斥和同步:Condition 与 Lock 配合使用,可以提供比
synchronized
和wait/notify
更加灵活的线程同步机制。Condition 可以有多个,允许在同一锁上有不同的等待集合,从而实现复杂的线程间协调。
5、利用AQS实现自己的锁
ReentrantLock
和synchronized
都是可重入的锁。
下面利用AQS这个框架来实现一个自定义的不可重入的锁。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class TestA {
private static final Lock lock = new MyNonReentrantLock();
private static int count;
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
// 上锁
lock.lock();
// 测试锁重入
boolean tryLock = lock.tryLock();
System.out.println(tryLock);
count++;
// 释放锁
lock.unlock();
}, "t1");
t1.start();
try {
t1.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(count);
// 主线程 未获取锁 直接释放锁 测试抛异常
lock.unlock();
}
}
/**
* 自定义 不可重入锁 (利用AQS的 独占模式)
*/
class MyNonReentrantLock extends AbstractQueuedSynchronizer implements Lock {
// 通过组合一个同步器实例 来实现线程同步的具体行为(搞一个内部类 用来做具体的工作)
class InnerSync extends AbstractQueuedSynchronizer {
// 独占模式重写的方法
// 判断锁是否已经被持有 true: 已占用 false: 未占用
@Override
protected boolean isHeldExclusively() {
// 利用AQS的状态变量表示 锁的占用情况 1:表示 已占用 0:表示 未占用
return getState() == 1;
}
// 独占模式重写的方法
// 尝试获取锁
@Override
protected boolean tryAcquire(int args) {
if (getState() == 1) {
System.out.println("锁已被占用!"); // 这里只是为了测试环境打印看一下,生产环境可千万别这么搞
return false; // 获取锁失败
}
// CAS设置 状态为 args
if (compareAndSetState(0, args)) {
// 获取锁成功
// 设置当前线程为持有锁线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
// CAS设置 状态失败
System.out.println("锁已被占用!"); // 这里只是为了测试环境打印看一下,生产环境可千万别这么搞
return false;
}
// 独占模式重写的方法
// 尝试释放锁
@Override
protected boolean tryRelease(int args) {
// 这里加个判断 防止没持有锁的线程来释放锁
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new RuntimeException("你没锁来释放个毛线!" + ": " + Thread.currentThread().getName());
if (getState() == 1) {
// 释放锁
// 释放锁的操作通常是在当前线程拥有锁的情况下执行的,因此可以确保当前线程是锁的独占线程
// 所以这里就没必要 用 compareAndSetState方法了
setState(args);
// 设置当前占有锁的线程为空
setExclusiveOwnerThread(null);
return true;
}
return false;
}
// 这里也提供一个获取条件变量的方法
Condition newCondition() {
return new ConditionObject();
}
}
// 下面 调用内部类InnerSync的方法 来实现Lock 接口的方法
private final InnerSync innerSync = new InnerSync();
@Override
public void lock() {
// 调用 AQS的模板方法 来获取锁
// 当执行到 tryAcquire 方法的时候 会调用 InnerSync子类的实现
// 设置状态为1 表示加锁
innerSync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
innerSync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
// 设置状态为1 表示加锁
return innerSync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return innerSync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
// 设置状态为0 表示释放锁
innerSync.release(0);
}
@Override
public Condition newCondition() {
return innerSync.newCondition();
}
}
运行结果:
锁已被占用!
false
1
Exception in thread "main" java.lang.RuntimeException: 你没锁来释放个毛线!: main
at MyNonReentrantLock$InnerSync.tryRelease(TestA.java:87)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1261)
at MyNonReentrantLock.unlock(TestA.java:139)
at TestA.main(TestA.java:37)
总结:
对于JUC下的并发相关知识点这才刚刚开始~