AQS介绍
AQS是一个抽象类,主要用来构建锁和同步器。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
}
AQS为构建锁和同步器提供了一些通用功能的实现,因此,使用AQS能简单且高效的构造出应用广泛的大量的同步器,比如常用的ReentrantLock,Seamphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue等都是基于AQS的。
AQS原理
AQS核心思想
AQS核心思想是,如果被请求的共享资源空闲, 则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要阻塞线程并且在资源释放时唤醒并未等待的线程分配锁。这个机制AQS是基于CLH锁实现的。
CLH锁是对自旋锁的一种改进,是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在节点之间的关联关系),暂时获取不到锁的线程将被加入到该队列中,AQS将每个请求共享资源的线程封装成一个CLH队列锁的一个结点(Node)来实现锁的分配。在CLH队列锁中,一个节点表示一个线程,他保存着线程的引用、当前节点在队列中的状态、前驱节点、后继节点。
CLH队列结构如图所示
AQS核心原理图
AQS使用int成员变量state表示同步状态,通过内置的FIFO线程等待队列来完成获取资源线程的排队工作。
state变量由volatile修饰,用于展示当前临界资源的获锁情况。
// 共享变量,使用volatile修饰保证线程可见性
private volatile int state;
另外,状态信息state可以通过protected类型的getState()、setState()、和compareAndSetState()进行操作。并且,这几个方法都是final修饰的,在子类中无法被重写。
//返回同步状态的当前值
protected final int getState() {
return state;
}
// 设置同步状态的值
protected final void setState(int newState) {
state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
以可重入的互斥锁ReentrantLock为例,他的内部维护了一个state变量,用来表示锁的占用状态,state的初始值为0,表示锁处于未锁定状态。当线程A调用lock()方法时,会尝试通过tryAcquire()方法独占该锁,并让state的值加1.如果成功,那么线程A就获取到了锁,如果失败,那么线程A就会被加入到一个等待队列(CLH)中,直到其他线程释放该锁。假设线程A获取锁成功了,释放锁之前,A线程自己可以重复获取此锁(state的值会累加)。这就是可重入性的表现。
一个线程可以多次获取同一个锁而不会被阻塞。但是,这也意味着,一个线程必须释放与获取的次数相同的锁,才能让state的值回到0,也就是让锁恢复到未锁定状态。只有这样,其他等待的线程才能有机会获取该锁。
示意图如下:
再以CountDownLatch为例,任务分为N个子线程去执行,state初始化为N(N要与线程个数一致)。这N个子线程开始执行任务,每执行完一个子线程,就调用一次countDown()方法,该方法会尝试使用CAS操作,让state的值减1,当所有的子线程都执行完毕(即state的值为0),CountDownLatch会调用unpack()方法,唤醒主线程,这时,主线程就可以从await()方法返回,继续执行后续操作。
AQS资源共享方式
AQS定义两种资源共享方式:Exclusive(独占式,只有一个线程能执行,如ReentrantLock)和Share(共享式,多个线程同时执行 如Seamphore,CountDownLatch)。
一般来说,自定义同步器的共享方式要么是独占,要么是共享,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式ReentrantReadWriteLock。
自定义同步器
同步器的设计是基于模版方法模式的,如果需要自定义同步器的一般方式如下(模版方法模式的经典应用):
- 使用者继承AbstractQueuedSychronized并重写指定的方法。
- 将AQS组合在自定义同步组件的实现中,并调用其模版方法,而这些模版方法会调用使用者重写的方法。
AQS使用了模版方法模式,自定义同步器时需要重写下面几个AQS提供的钩子方法:
//独占方式。尝试获取资源,成功则返回true,失败则返回false。
protected boolean tryAcquire(int)
//独占方式。尝试释放资源,成功则返回true,失败则返回false。
protected boolean tryRelease(int)
//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected int tryAcquireShared(int)
//共享方式。尝试释放资源,成功则返回true,失败则返回false。
protected boolean tryReleaseShared(int)
//该线程是否正在独占资源。只有用到condition才需要去实现它。
protected boolean isHeldExclusively()
什么是钩子方法:钩子方法是一种被声明在抽象类中的方法,一般使用protected关键字修饰,它可以是空方法(由子类实现),也可以是默认实现方法,模版设计模式通过钩子方法控制固定步骤的实现。
常见的同步工具类(AQS子类)
Seamphore(信号量)
介绍
Sychrnoized和ReentrantLock都是一次只允许一个线程访问某个资源,而Seamphore(信号量)可以用来控制同时访问特定资源的线程数量
。
Seamphore的使用很简单,假如有N(N>5)个线程来获取Seamphore中的共享资源,下面代码表示同一时刻N个线程中只有5个线程能获取到共享资源,其他线程都会阻塞,只有获取到共享资源的线程才能执行,等到有线程释放了共享资源,其他阻塞的线程才能获取到。
// 初始共享资源数量
final Semaphore semaphore = new Semaphore(5);
// 获取1个许可
semaphore.acquire();
// 释放1个许可
semaphore.release();
当初始的资源个数为1的时候,Seamphore退化为排他锁
。
Seamphore有两种模式
- 公平模式:调用acquire()方法的顺序就是获取许可证的顺序,遵循FIFO。
- 非公平模式:抢占式的。
对应的两个构造方法
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
这两个构造方法,都必须提供许可(permits)的数量,第二个构造方法可以指定是公平模式还是非公平模式,默认非公平模式。
Seamphore通常用于那些资源有明确访问数量限制的场景比如限流(仅限于单机模式,实际项目中推荐使用Redis+Lua脚本实现)。
原理
Seamphore是共享锁的一种实现,默认构造AQS的state值为permits
,其中permits为许可证数量,拿到许可证的线程才能继续执行。
以无参acquire为例,调用seamphore.acquire(),线程尝试获取许可证,如果state>0.则表示可以获取成功,如果state<=0的话,则表示许可证数量不足,获取失败。
如果可以获取成功的话(state>0),会尝试使用CAS操作去修改state的值,如果获取失败则会创建一个Node节点加入等待队列,挂起当前线程。
// 获取1个许可证
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 获取一个或者多个许可证
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
acquireSharedInterruptibly方法是AbstractQueuedSychronizer的默认实现。
// 共享模式下获取许可证,获取成功则返回,失败则加入等待队列,挂起线程
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取许可证,arg为获取许可证个数,当获取失败时,则创建一个节点加入等待队列,挂起当前线程。
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// 共享模式下尝试获取资源(在Semaphore中的资源即许可证):
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
// 非公平的共享模式获取许可证
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 当前可用许可证数量
int available = getState();
/*
* 尝试获取许可证,当前可用许可证数量小于等于0时,返回负值,表示获取失败,
*
*/
/重要!!!:当前可用许可证大于0时才可能获取成功,CAS失败了会循环重新获取最新的值尝试获取
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
以无参release方法为例,调用seamphore.release(); 线程尝试释放许可证,并使用CAS操作去修改state的值state=state+1;释放许可证成功之后,同时会唤醒等待队列中的一个线程。被唤醒的线程会重新尝试去修改state的值state=state-1;如果state>0则令牌获取成功,否则重新进入等待队列,挂起线程。
// 释放一个许可证
public void release() {
sync.releaseShared(1);
}
// 释放一个或者多个许可证
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
releaseShared方法是AbstractQueuedSychronized中的默认实现
// 释放共享锁
// 如果 tryReleaseShared 返回 true,就唤醒等待队列中的一个或多个线程。
public final boolean releaseShared(int arg) {
//释放共享锁
if (tryReleaseShared(arg)) {
//释放当前节点的后置等待节点
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared方法是Seamphore内部类Sync重写的一个方法。
// 内部类 Sync 中重写的一个方法
// 尝试释放资源
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
// 可用许可证+1
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// CAS修改state的值
if (compareAndSetState(current, next))
return true;
}
}
代码示例:
public class SemaphoreExample {
// 请求的数量
private static final int threadCount = 550;
public static void main(String[] args) throws InterruptedException {
// 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢)
ExecutorService threadPool = Executors.newFixedThreadPool(300);
// 初始许可证数量
final Semaphore semaphore = new Semaphore(20);
for (int i = 0; i < threadCount; i++) {
final int threadnum = i;
threadPool.execute(() -> {// Lambda 表达式的运用
try {
semaphore.acquire();// 获取一个许可,所以可运行线程数量为20/1=20
test(threadnum);
semaphore.release();// 释放一个许可
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
}
threadPool.shutdown();
System.out.println("finish");
}
public static void test(int threadnum) throws InterruptedException {
Thread.sleep(1000);// 模拟请求的耗时操作
System.out.println("threadnum:" + threadnum);
Thread.sleep(1000);// 模拟请求的耗时操作
}
}
Seamphore获取凭证的两个核心方法acquire()、tryAcquire();其中acquire()方法是阻塞方法,获取不到凭证会加入CLH等待队列,tryAcquire()方法,如果获取不到凭证会立即返回false。
Seamphore与CountDownLatch一样,也是共享锁的一种实现。它默认构造AQS的state为permits,当执行任务的线程数量超出permits,那么多余的线程将会被放入等待队列park,并自旋判断state是否大于0,只有当state大于0的时候,阻塞的线程才能继续执行,此时先前执行任务的线程继续执行release()方法,release()方法使得state的变量加1,那么自旋的线程便会判断成功,如此,每次只有最多不超过permits数量的线程能自旋成功,便限制了执行任务线程的数量。
CountDownLatch(倒计数器)
介绍
CountDownLatch允许count个线程阻塞在一个地方,直至所有线程的任务都执行完毕。
CountDownLatch是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,他不能再次被使用。
原理
CountDownLatch是共享锁的一种实现,他默认构造AQS的state值为count,这个通过构造方法可以看出。
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}
//...
}
当线程调用countDown()时,其实使用了tryReleaseShared方法以CAS的操作来减少state,直至state为0.当state为0时,表示所有线程都调用了countDown方法,那么在CountDownLatch上等待的线程就会被唤醒并继续执行。
public void countDown() {
// Sync 是 CountDownLatch 的内部类 , 继承了 AbstractQueuedSynchronizer
sync.releaseShared(1);
}
releaseShared方法是AbstractQueuedSychronized中的默认实现
// 释放共享锁
// 如果 tryReleaseShared 返回 true,就唤醒等待队列中的一个或多个线程。
public final boolean releaseShared(int arg) {
//释放共享锁
if (tryReleaseShared(arg)) {
//释放当前节点的后置等待节点
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared方法是CountDownLatch内部类Sync重写的一个方法,AbstractQueuedSychronizer中的默认实现仅仅抛出UnsupportedOperationException异常。
// 对 state 进行递减,直到 state 变成 0;
// 只有 count 递减到 0 时,countDown 才会返回 true
protected boolean tryReleaseShared(int releases) {
// 自选检查 state 是否为 0
for (;;) {
int c = getState();
// 如果 state 已经是 0 了,直接返回 false
if (c == 0)
return false;
// 对 state 进行递减
int nextc = c-1;
// CAS 操作更新 state 的值
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
当主线程调用await()的时候【实际上就是尝试获取state】,如果state不为0,那就证明任务还没有执行完毕,await()就会一直阻塞,也就是说await()之后的语句不会被执行(主线程被加入到等待队列即CLH中了)。然后CountDownLatch会自旋CAS判断state==0,如果state==0的话,就会释放所有等待的线程,await()方法之后的语句就可以得到执行。
// 等待(也可以叫做加锁)
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 带有超时时间的等待
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
CountDownLatch典型用法
- 某个线程在开始运行之前等待n个线程执行完毕:将CountDownLatch的计数器初始化为n【new CountDownLatch(n)】;每当一个线程执行完毕,将计数器减1【countDown()】; 当计数器的值变为0时,在CountDownLatch上await()的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
- 多个线程开始执行任务的最大并行性:注意是并行性、不是并发,强调的是多个线程在某一个时刻同时开始执行,类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的CountDownLatch对象,将其计数器初始化为1,多个线程在开始执行任务之前首先countdownLatch.await(),当主线程调用countDown()时,计数器变为0,多个线程同时被唤醒。
!!! 注意
:不管任务有没有抛出异常,都需要调用countDown()将计数器最终归0,否则很容易造成死锁。当然也可以等待一定的时间如await(30,TimeUnit.Seconds).
CyclicBarrier(循环栅栏)
介绍
先通俗理解:N个运动一起赛跑。但出发时间随机,在终点设置一个裁判负责计时,只有当最后一个运动员也到了终点,裁判才会公布所有人的成绩。
CyclicBarrier和CountDownLatch非常类似,他也可以实现线程间的技术等待,但是他的功能比CountDownLatch更加复杂和强 大,主要应用场景和CountDownLatch类似。
CountDownLatch是基于AQS实现的,而CyclicBarrier是基于ReentrantLock(ReentrantLock也属于AQS同步器)和Condition的。
CyclicBarrier字面意思是可循环使用(Cyclic)和屏障(Barrier)。他要做的事情就是:让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。
原理
CyclicBarrier内部通过一个count变量作为计数器,count的初始值为parties属性的初始化值,每当一个线程到了栅栏这里,那么就将计数器减1.如果count值为0了,表示这是这一代最后一个线程到达栅栏,就尝试执行我们构造方法中输入的任务。
//每次拦截的线程数
private final int parties;
//计数器
private int count;
结合源码来看
- CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await()方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
//这个Runnble类型参数,表示所有的任务都到了屏障以后,要做的一个操作。
this.barrierCommand = barrierAction;
}
- 当调用CyclicBarrier对象的await方法时,实际上调用的是dowait(false,0L)方法。await()方法就像树立起一个栅栏的行为一样,将线程挡住了,当拦住的线程数量达到parties的值时,栅栏才会打开,线程得以通行。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
// 当线程数量或者请求数量达到 count 时 await 之后的方法才会被执行。上面的示例中 count 的值就为 5。
private int count;
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 锁住
lock.lock();
try {
//。。。。。。。缩略
// count 减1
int index = --count;
// 当 count 数量减为 0 之后说明最后一个线程已经到达栅栏了,也就是达到了可以执行await 方法之后的条件
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
// 所有线程都到了,栅栏打开,执行构造函数中的方法
command.run();
ranAction = true;
// 将 count 重置为 parties 属性的初始化值
// 唤醒之前等待的线程
// 下一波执行开始
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
for (;;) {
try {
//线程还未全部到达,就开始循环,除非发生线程阻塞、超时等待时间到、打断等行为,否则始终循环等待index==0
}
} finally {
lock.unlock();
}
}
示例
public class CyclicBarrierExample1 {
// 请求的数量
private static final int threadCount = 550;
// 需要同步的线程数量
private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
public static void main(String[] args) throws InterruptedException {
// 创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(10);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
Thread.sleep(1000);
threadPool.execute(() -> {
try {
test(threadNum);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
}
threadPool.shutdown();
}
public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
System.out.println("threadnum:" + threadnum + "is ready");
try {
/**等待60秒,保证子线程完全执行结束*/
cyclicBarrier.await(60, TimeUnit.SECONDS);
} catch (Exception e) {
System.out.println("-----CyclicBarrierException------");
}
System.out.println("threadnum:" + threadnum + "is finish");
}
}
运行结果:
threadnum:0is ready
threadnum:1is ready
threadnum:2is ready
threadnum:3is ready
threadnum:4is ready
//cyclicBarrier为5,计数器到
threadnum:4is finish
threadnum:0is finish
threadnum:1is finish
threadnum:2is finish
threadnum:3is finish
//开始新的一轮
threadnum:5is ready
threadnum:6is ready
threadnum:7is ready
threadnum:8is ready
threadnum:9is ready
//cyclicBarrier为5,计数器到
threadnum:9is finish
threadnum:5is finish
threadnum:8is finish
threadnum:7is finish
threadnum:6is finish
......