AQS
AQS
是 AbstractQueuedSynchronizer
的缩写,即 抽象队列同步器
,是 Java.util.concurrent
中的一个基础工具类,用于实现同步器(Synchronizer)的开发。
AQS 提供了一种实现锁和同步器的框架,使用 AQS 能简单且高效地构造出应用广泛的同步器,使得开发者能够更方便地编写线程安全的代码。
AQS 的使用涉及到复杂的同步机制和多线程协调,需要仔细考虑并发控制的细节。通常情况下,直接使用基于 AQS 实现的高级同步器(如 ReentrantLock
,Semaphore
,ReentrantReadWriteLock
,SynchronousQueue
,FutureTask
、CountDownLatch
等)更为常见,而不需要直接使用 AQS。
AQS 的数据结构
AQS 内部使用了一个先进先出(FIFO)的 双端队列
,被称为,被称为 CLH 队列
(Craig, Landin, and Hagersten queues)。它并不直接储存线程,而是储存拥有线程的 Node 节点。
使用了两个虚拟的引用 head
和 tail
用于标识队列的头部和尾部,管理等待线程。
它的实现方式和普通的双端队列略有不同,主要涉及到以下两个方面:
-
节点的处理方式:在 CLH 队列中,每个 Node 都代表了一个等待线程,同时它还包含一个
state
属性。当一个线程尝试获取同步状态时,它会创建一个节点插入到 CLH 队列的尾部,然后自旋等待获取同步状态。当同步状态可用时,AQS 会使用 CAS 操作将当前头结点改为自己,并返回原头结点,从而唤醒自己。
-
队列的管理方式:CLH 队列采用的是轻量级锁的思路,当一个线程到达尾节点时它只需自旋等待即可。此时,节点的
state
属性会发挥重要作用,它能够指示当前节点是处于等待还是已经被唤醒出队并不再使用,从而避免了队列中节点的浪费。
AQS 内部使用了一个 [[JMM Java内存模型#volatile|volatile]] 的变量 state
来作为资源的标识。同时定义了几个获取和改变 state
的 [[Protected]] 方法 getState()
、setState()
、compareAndSetState()
,均为原子操作,子类可以覆盖这些方法来实现自己的逻辑。
compareAndSetState
的实现依赖于 [[实现线程安全#Unsafe 类|Unsafe]] 的 compareAndSwapInt()
方法
线程的 Node 节点
waitStatus
用来标记当前节点的状态:
-
CANCELLED
:表示当前节点(对应的线程)已被取消。当等待超时或被中断,会触发进入为此状态,进入该状态后节点状态不再变化; -
SIGNAL
:后面节点等待当前节点唤醒; -
CONDITION
:Condition
类 中使用,当前线程阻塞在Condition
,如果其他线程调用了 Condition 的signal
方法,这个节点将从等待队列转移到同步队列队尾,等待获取同步锁; -
PROPAGATE
:共享模式,前置节点唤醒后面节点后,唤醒操作无条件传播下去; -
0
:中间状态,当前节点后面的节点已经唤醒,但是当前节点线程还没有执行完成。
实现队列
通过 Node 可以实现两种队列:
-
通过 prev 和 next 实现 CLH 队列。
在 CLH 锁中,每个等待的线程都会有一个关联的 Node,每个 Node 有一个 prev 和 next 指针。当一个线程尝试获取锁并失败时,它会将自己添加到队列的尾部并自旋,等待前一个节点的线程释放锁。
- 通过
nextWaiter
实现Condition
上的等待线程队列(单向队列),这个 Condition 主要用在ReentrantLock
类中
常用方法
AQS 主要提供了以下几个重要方法(均为 protected):
-
isHeldExclusively()
:该线程是否正在独占资源。只有用到Condition
类才需要去实现它。 -
acquire(int arg)
:尝试获取资源,如果获取失败则等待,直到成功获取同步状态或被中断。 -
release(int arg)
:释放资源,唤醒等待队列中的其他线程。 -
tryAcquire(int arg)
:独占方式,尝试获取资源,成功返回 true,失败返回 false。 -
tryRelease(int arg)
:独占方式,尝试释放资源,成功返回 true,失败返回 false。 -
tryAcquireShared(int arg)
:共享方式。尝试获取资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。 -
tryReleaseShared(int arg)
:共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回 true,否则返回 false。
arg
为要 获取/释放 的资源个数,在独占模式下始终为 1
这些方法虽然都是 protected
的,但是它们并没有在 AQS 具体实现,而是直接抛出异常:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
这里不使用抽象方法的目的是:避免强迫子类中把所有的抽象方法都实现一遍,减少无用功,这样子类只需要实现自己关心的抽象方法即可,如 信号 Semaphoreopen
只需要实现 tryAcquire
方法而不用实现其余不需要用到的模版方法
设计思想
AQS 的设计思想是基于模板方法模式,它将同步器的核心操作定义为抽象方法,由子类去实现具体细节。AQS 内部维护了一个等待队列,用于管理等待获取同步状态的线程。
AQS 通过 CAS(Compare and Swap)操作、队列等待、线程阻塞等机制来实现线程的协作和同步。
获取资源
获取资源的入口是 acquire(int arg)
方法
首先调用 tryAcquire
尝试去获取资源,如果获取资源失败,就通过 addWaiter(Node.EXCLUSIVE)
方法把这个线程插入到等待队列(在队列的尾部插入新的 Node 节点)中。其中传入的参数代表要插入的 Node 是独占式的。通过 CAS 自旋的方式保证操作的线程安全性。
之后,处于等待队列的 Node 从头结点一个一个去获取资源。
此外,还可以通过如下方法获取资源:
-
acquireInterruptibly
:申请可中断(在线程中断时可能会抛出InterruptedException
)的资源(独占模式) -
acquireShared
:申请共享模式的资源 -
acquireSharedInterruptibly
:申请可中断的资源(共享模式)
释放资源
释放资源相比于获取资源要简单得多
在 java.util.concurrent.locks.ReentrantLock
的实现中,tryRelease(arg)
会减少持有锁的数量,如果持有锁的数量变为0,释放锁并返回 true。
如果 tryRelease(arg)
成功释放了锁,那么接下来会检查队列的头结点。如果头结点存在并且 waitStatus
不为0(这意味着有线程在等待),那么会调用 unparkSuccessor(Node h)
方法来唤醒等待的线程。
AQS实现自定义锁
下面是一个简单的示例,展示如何使用 AQS
来实现一个自定义的可重入锁。
这个简单的锁实现没有考虑重入性和公平性等问题,实际应用中可能还需要更复杂的逻辑来满足更高级的需求。
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class CustomReentrantLock implements Lock {
private final Sync sync = new Sync();
private static final class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
return compareAndSetState(0, 1);
}
@Override
protected boolean tryRelease(int arg) {
setState(0);
return true;
}
Condition newCondition() {
return new ConditionObject();
}
}
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
代码说明:
-
CustomReentrantLock 类实现了
Lock
接口,提供了锁的基本操作。 -
Sync 类继承自
AbstractQueuedSynchronizer
,并重写了tryAcquire
和tryRelease
方法来控制锁的状态。tryAcquire
: 尝试获取锁,通过compareAndSetState
方法原子地更新状态值。如果当前状态为0(表示锁未被持有),则设置状态为1并返回true
,否则返回false
。tryRelease
: 释放锁,简单地将状态设置为0。
-
lock/unlock 方法: 使用
sync
实例来调用AbstractQueuedSynchronizer
中定义的acquire
和release
方法。 -
Condition: 通过
newCondition
方法创建条件变量。
AQLS
AQS 里面的“资源”是用一个 int
类型的数据来表示的,有时候业务需求的资源数超出了 int
的范围,所以在 JDK 1.6 中,多了一个 AQLS
(AbstractQueuedLongSynchronizer)。它的代码跟 AQS 几乎一样,只是把资源的类型变成了 long
类型。
AOS
AQS
和 AQLS
都继承了一个类 AOS
(AbstractOwnableSynchronizer)。这个类于 JDK 1.6 引入。用于表示锁与持有者之间的关系(独占模式)
-
字段:
private transient Thread exclusiveOwnerThread;
:这个字段用于存储当前独占访问权的线程。由于它是transient
的,因此在序列化时不会被保存。
-
方法:
-
protected final void setExclusiveOwnerThread(Thread thread);
:此方法用于设置当前拥有独占访问权的线程。如果传入null
,则表示没有线程拥有访问权。 -
protected final Thread getExclusiveOwnerThread();
:此方法返回最后通过setExclusiveOwnerThread
方法设置的线程,如果没有设置过,则返回null
。
-
AbstractOwnableSynchronizer
提供了一种机制来允许线程独占某个资源或同步器,通常被用作创建锁和其他同步器的基类。例如,在Java的 ReentrantLock
类中,就使用了 AbstractOwnableSynchronizer
来跟踪哪个线程当前拥有锁。这使得 ReentrantLock
能够支持重入性,即同一个线程可以多次获得同一个锁。