目录
- 简介
- 原理
- 概览
- 资源的共享方式
- 独占(Exclusive)
- 共享(Shared)
- 模板方法模式在AQS中的应用
- 经典应用
- ReentrantLock
- Semaphore
简介
AQS全称AbstractQueuedSynchronizer,位于java.util.concurrent.locks包下,它是一个用来构建锁和同步器的框架,使用AQS能简单且高效的构造出应用广泛的大量同步器,像ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch等等都是基于AQS的,我们自己也能利用AQS非常轻松的构造出符合自己需求的同步器。
原理
概览
AQS的核心思想是:如果被请求的资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且修改资源的状态;如果被请求的资源被占用,那么就需要一套线程阻塞等待和被唤醒时锁分配的机制,这个机制AQS使用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。
CLH(Craig,Landin,and Hagerste)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成CLH队列的一个结点(Node)来实现锁的分配。
AQS使用一个int类型成员变量来表示状态:
private volatile int state;
状态通过如下三个方法进行操作:
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
其中compareAndSetState以CAS的方式实现对状态的原子更新。
资源的共享方式
独占(Exclusive)
当一个线程在独占模式下获取到了资源,其他线程都无法获取该资源,像ReentrantLock就是独占模式。
共享(Shared)
在共享模式下,支持多线程获取资源,但并不一定每个线程都能成功获取,这取决于子类怎么实现(AbstractQueuedSynchronizer是一个抽象类,有些方法需要子类去实现)。像Semaphore、CountDownLatch都是共享模式,而ReentrantReadWriteLock既支持独占模式(写锁),也支持共享模式(读锁)。
不同的自定义同步器争用资源的方式不同,自定义同步器在实现时只需要实现资源的获取与释放方式即可(其实就是state值的维护,比如获取资源state减一,释放资源state加一,state小于等于0时无法获取资源等),至于线程等待队列的维护(如获取资源失败入队、唤醒出队等),AQS已经帮我们实现好了。
模板方法模式在AQS中的应用
AQS的设计是基于模板方法模式的,如果需要自定义同步器,一般的方式是这样:
- 继承AbstractQueuedSynchronizer并重写指定的方法(这些重写方法很简单,无非是对state的值进行操作)。
- 将AQS组合在自定义同步组件的实现中,并调用其方法,而这些方法会调用使用者重写的方法。
自定义同步器时需要重写如下几个AQS提供的模板方法:
// 尝试以独占的方式获取资源,这个方法需要判断state的值是否允许获取资源,如果允许,那就获取。
// 在调用acquire方法时会调用这个方法,如果返回false,线程可能会加入CLH队列并阻塞,直到其他线程调用release方法唤醒它
// 返回true代表成功,false代表失败
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 尝试以独占的方式释放资源
// 返回true代表成功,false代表失败
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// 尝试以共享的方式获取资源,这个方法需要判断state的值是否允许获取资源,如果允许,那就获取。
// 在调用acquireShared方法时会调用这个方法,如果返回值小于0,线程可能会加入CLH队列并阻塞,
// 直到其他线程调用releaseShared方法唤醒它
// 返回负数代表失败
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// 尝试以共享的方式释放资源
// 返回true代表成功,false代表失败
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
// 该线程是否正在独占资源。只有用到condition才需要去实现它。
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。AQS类中其他方法都被final修饰,所以无法重写,只有这几个方法可以自定义。
我们在使用时,直接调用的是这几个方法:
// 以独占的方式获取资源
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 以独占的方式释放资源
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// 以共享的方式获取资源
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// 以共享的方式释放资源
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
可以看到,这些方法里会调用我们自己实现的方法。
经典应用
ReentrantLock
ReentrantLock又叫可重入锁,它是基于AQS的,实现的是独占模式。它支持公平锁和非公平锁,所以它有两套同步器来支持这两种锁机制,分别是NonfairSync和FairSync,ReentrantLock把这两个同步器的公共代码抽象到内部类Sync中,我们来看下Sync的代码:
// 此时AQS的state代表持有锁的次数(注意,不是持有锁的线程数,同一个线程多次获取锁state也会增加)
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
// ReentrantLock的lock最终调用的就是这个方法
abstract void lock();
// 非公平版本的tryAcquire最终就是调用这个方法,AQS的tryAcquire方法放在Sync的子类中重写
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 如果锁没有被持有
// 使用CAS的方式设置state的值
// state预期为0,设置为acquires(这里是1)
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current); // 将当前线程设置为占用资源的线程
return true;
}
}
// 当前线程为占用资源的线程,因为是可重入锁,所以这里可以继续修改state
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 重写AQS的tryRelease方法
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 锁没有被持有
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
// Methods relayed from outer class
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
再看下NonfairSync的代码:
// 非公平锁同步器
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
// 实现Sync的lock方法
final void lock() {
// 直接用CAS的方式抢锁,无需排队,非公平性就体现在这里
// state预期为0(资源没有被占用),将state设置为1
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread()); // state修改成功,设置当前线程为占用资源的线程
else // 抢锁失败调用AQS的acquire方法,如果还是抢锁失败,会进入队列排队
acquire(1);
}
// 重写AQS的tryAcquire方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires); // 调用父类Sync的nonfairTryAcquire方法
}
}
我们发现,非公平锁的非公平性体现在调用lock方法时会直接使用CAS修改state的值而无需排队,只有在CAS失败之后才会调用AQS的acquire方法,acquire方法包含了入队等待和唤醒那一套机制。
接下来看下FairSync的代码:
// 公平锁同步器
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
// 实现Sync的lock方法
final void lock() {
acquire(1); // 直接调用AQS的acquire方法
}
// 重写AQS的tryAcquire方法
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 锁没有被持有
if (!hasQueuedPredecessors() && // 在队列里没有排在当前线程前面的线程,公平性就体现在这里
compareAndSetState(0, acquires)) { // 修改state值成功
setExclusiveOwnerThread(current); // 设置当前线程为占用资源的线程
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
从代码可以看出,公平锁同步器的lock方法直接调用AQS的acquire方法,而acquire方法包含了入队等待和唤醒那一套机制。
ReentrantLock在构造器中指定使用哪个同步器,默认使用非公平同步器,因为它的吞吐量更大:
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
ReentrantLock的lock方法实际上调用的是同步器的lock方法:
public void lock() {
sync.lock();
}
而unlock实际上调用的是同步器的release方法:
public void unlock() {
sync.release(1);
}
Semaphore
Semaphore中文一般叫信号量,它可以控制同时访问共享资源的线程数,所以它最常见的使用场景是用来控制并发(限流)。Semaphore也是基于AQS的,它实现的是共享模式。跟ReentrantLock一样,它也支持公平和非公平两种锁机制,分别对应FairSync和NonfairSync这两个同步器。
NonfairSync和FairSync的公共代码被提取到内部类Sync中,首先我们看下Sync的代码:
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
// permits是许可数,每当有一个线程访问共享资源,permits就减一
// 当permits小于等于0时,线程访问共享资源会被阻塞
Sync(int permits) {
setState(permits); // permits即是AQS中的state
}
final int getPermits() {
return getState();
}
// 非公平的方式获取共享资源
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
// 直接使用CAS修改state值,不用管前面有没有线程排队,非公平性就体现在这里
compareAndSetState(available, remaining))
return remaining; // 返回负数代表异常
}
}
// 重写AQS的tryReleaseShared方法
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
再看下NonfairSync的代码:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
// 重写AQS的tryAcquireShared
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires); // 调用父类Sync的nonfairTryAcquireShared方法
}
}
最后看下FairSync的代码:
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors()) // 在队列里有排在当前线程前面的线程
return -1; // 返回失败,因为公平锁需要排队
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
Semaphore可以在构造器中指定许可数和公平性:
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
它对外暴露的主要方法中,直接或间接调用的都是我们在同步器中定义的方法。