AQS源码解析—共享模式_Semaphore信号量
简介
synchronized 可以起到锁的作用,但某个时间段内,只能有一个线程允许执行。
Semaphore(信号量)用来限制能同时访问共享资源的线程上限,非重入锁。
- Semaphore 是什么,可以理解为是一个通行证的管理池。
- Semaphore 信号量常用来做
限流
使用,在高并发场景下用于控制同一时刻对共享资源的访问次数。可以用于商品的秒杀等场景。 - Semaphore 信号量,它保存了一系列的许可(permits),每次调用
acquire()
都将消耗一个许可,每次调用release()
都将归还一个许可。- acquire() 和 release() 方法之间的代码为同步代码。
- Semaphore 的内部是基于 AQS 的共享锁来实现的。
- 使用了类似 ReentrantLock 的内部结构,
使用了内部类Sync(继承AQS)
,两个实现类NonfairSync(非公平逻辑)和 FairSync(公平的逻辑)- 如果是 公平逻辑,则先看一下队列中是否有线程在等待,如果没有则直接去拿通行证,如果有则需要排队等待(跟获取锁差不多)。
- 内部的资源使用的还是 AQS 的 state
(volatile int state)
属性 - 当 acquire() 时,state + 1,当 release() 时,state - 1(常用的就是 1,也可以指定)
- 使用了类似 ReentrantLock 的内部结构,
举例场景:操作磁盘文件非常耗时,这个时候我要限制同时不能有太多的线程去做文件读取或者写入的事情,这个时候就可以使用 Semaphore。每个磁盘做业务之前,需要先拿到 Semaphore 管理池中的一个通行证,做完业务之后,需要归还 Semaphore 的通行证。
如果当前线程没有拿到通行证,则会挂起并进入 AQS 阻塞队列中,等待被唤醒。
Semaphore 信号量 获取通行证流程图
Semaphore 与 CountDownLatch 对比区别
- CountDownLatch 中挂起的线程是在等待 CountDownLatch 中的状态值归 0,而后被唤醒。
- Semaphore 中挂起的线程是在等待状态大于 0,才去获取锁。
入门案例分析
案例1
- 服务并发控制
public class SemaphoreDemo {
private Semaphore permits = new Semaphore(10, true);
/**
* 服务内部的业务需要控制最高并发数为10
*/
private void service() {
try {
// 获取通行证
permits.acquire();
// do something...
// 业务...
} catch (InterruptedException e) {
} finally {
// 归还通行证
permits.release();
}
}
}
案例2
- Oracle 官方案例,连接池 Pool 源码分析
public class Pool {
/** 可同时访问资源的最大线程数*/
private static final int MAX_AVAILABLE = 100;
/** 信号量 表示:可获取的对象通行证*/
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
/** 共享资源,可以想象成 items 数组内存储的都是Connection对象 模拟是链接池*/
protected Object[] items = new Object[MAX_AVAILABLE];
/** 共享资源占用情况,与items数组一一对应,比如:items[0]对象被外部线程占用,那么 used[0] == true,否则used[0] == false*/
protected boolean[] used = new boolean[MAX_AVAILABLE];
/**
* 获取一个空闲对象
* 如果当前池中无空闲对象,则等待..直到有空闲对象为止
*/
public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}
/**
* 归还对象到池中
*/
public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}
/**
* 获取池内一个空闲对象,获取成功则返回Object,失败返回Null
* 成功后将对应的 used[i] = true
*/
private synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null;
}
/**
* 归还对象到池中,归还成功返回true
* 归还失败:
* 1.池中不存在该对象引用,返回false
* 2.池中存在该对象引用,但该对象目前状态为空闲状态,也返回false
*/
private synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else
return false;
}
}
return false;
}
}
SemaphoreDemo
public class SemaphoreTest02 {
public static void main(String[] args) throws InterruptedException {
// 声明信号量,初始的许可(permits)为2
// 公平模式:fair为true
final Semaphore semaphore = new Semaphore(2, true);
Thread tA = new Thread(() -> {
try {
// 每次调用acquire()都将消耗一个许可(permits)
semaphore.acquire();
System.out.println("线程A获取通行证成功");
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
} finally {
// 每次调用release()都将归还一个许可(permits)
semaphore.release();
}
});
tA.start();
// 确保线程A已经执行
TimeUnit.MILLISECONDS.sleep(200);
Thread tB = new Thread(() -> {
try {
// 调用acquire(2)都将消耗2个许可(permits)
// 带参数的acquire(permits)基本上不用
semaphore.acquire(2);
System.out.println("线程B获取通行证成功");
} catch (InterruptedException e) {
} finally {
// 调用release(2)都将归还2个许可(permits)
semaphore.release(2);
}
});
tB.start();
// 确保线程B已经执行
TimeUnit.MILLISECONDS.sleep(200);
Thread tC = new Thread(() -> {
try {
// 每次调用acquire()都将消耗一个许可(permits)
semaphore.acquire();
System.out.println("线程C获取通行证成功");
} catch (InterruptedException e) {
} finally {
// 每次调用release()都将归还一个许可(permits)
semaphore.release();
}
});
tC.start();
}
}
执行结果:
线程A获取通行证成功
线程B获取通行证成功
线程C获取通行证成功
源码解析
属性及构造方法
- 创建 Semaphore 时需要传入许可次数。Semaphore 默认也是非公平模式,可以调用第二个构造方法声明其为公平模式。
// 只有这一个属性,继承自AQS。内部有2个实现类,公平和非公平
private final Sync sync;
// 构造方法,创建时要传入许可次数,默认使用非公平模式
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// 构造方法,需要传入许可次数,以及是否公平模式
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
内部类 Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
// 构造方法 传入int值,赋值给内部的state
Sync(int permits) {
setState(permits);
}
// 获取state
final int getPermits() {
return getState();
}
// 非公平模式下获取信号量 上来直接抢占锁,不会判断队列中是否有节点排队
final int nonfairTryAcquireShared(int acquires) {
// 自旋 + CAS
for (;;) {
// 获取当前的state
int available = getState();
// state - acquires 赋值给remaining
int remaining = available - acquires;
// 判断减完之后的state < 0 那么直接返回一个负数,代表获取state失败
// 条件2:state > 0 使用CAS方式尝试获取state,获取成功返回state的值,失败继续自旋获取。
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// CAS + 自旋 尝试将state的值加上releases
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 拿到当前的state
int current = getState();
// 释放通行证之后的数量
int next = current + releases;
// 如果next小于当前值,则说明next值溢出int最大值了,抛出异常
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// CAS设置state为next
if (compareAndSetState(current, next))
return true; // CAS成功后返回true
}
}
// CAS + 自旋 将state的值减去reductions
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
// CAS + 自旋 重置state为 0
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
Sync.NonfairSync
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
// 构造方法,调用父类的构造方法
NonfairSync(int permits) {
super(permits);
}
/*
* 尝试获取许可,调用父类的nonfairTryAcquireShared()方法
* @return ( > 0 表示获取信号量(锁)成功,< 0表示获取信号量失败)
*/
protected int tryAcquireShared(int acquires) {
// Sync.nonfairTryAcquireShared():非公平模式下获取信号量。此方法在上方的内部类Sync中分析了
return nonfairTryAcquireShared(acquires);
}
}
Sync.FairSync
- 本文以公平模式进行分析。
- 公平模式下,先检测前面是否有排队的,如果有排队的则获取许可失败,进入队列排队,否则尝试原子更新 state 的值。
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
// 构造方法,调用父类的构造方法
FairSync(int permits) {
super(permits);
}
/*
* CAS + 自旋尝试获取通行证,获取成功返回 >= 0 的值,获取失败返回 < 0 的值。
*/
protected int tryAcquireShared(int acquires) {
// 自旋
for (;;) {
/*
* 判断当前AQS阻塞队列内是否有等待者线程,如果有直接返回-1,表示当前acquire的操作需要进入到等待队列
* (因为这里我们解析的是公平锁源码,所以这里会判断,非公平锁直接上来就抢)
*/
if (hasQueuedPredecessors())
return -1;
/*
* 执行到这里,有哪几种情况?
* 1.调用acquire时,AQS同步队列内没有其他等待者节点
* 2.当前节点 在同步队列中是head.next节点
*/
// 获取state的值,这里表示通行证的数量
int available = getState();
// remaining 表示当前线程 获取通行证完成之后,semaphore还剩余数量
int remaining = available - acquires;
/*
* 条件1:remining < 0 表示线程获取通行证失败
* 条件2:前置条件:remaning >= 0,CAS更新state成功,说明线程获取通行证成功,CAS失败,则自旋。
*/
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining; // 返回剩余通行证数量
}
}
}
acquire() 大致流程图
Semaphore.acquire()
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
||
||
\/
AQS.acquireSharedInterruptibly()
// AQS的共享方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 当前线程标志位处于中断状态,直接抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// FairSync.tryAcquireShared()方法:自旋 + CAS获取信号量,获取成功返回直接走业务逻辑,失败则构造节点入队阻塞。
// 该方法在上方的非公平模式Sync.FairSync中分析了
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
AQS.doAcquireSharedInterruptibly()
/*
* 将当前线程构造为一个Node进入同步队列,并挂起当前线程,等待被唤醒。
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 构造节点(共享模式)并入同步队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// 自旋
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
// 当前节点时head.next节点,尝试获取信号量
if (p == head) {
// 返回值表示的是state - arg 后的state值
int r = tryAcquireShared(arg);
// 站在Semaphore角度:r表示还剩余的通行证数量,大于等于0,表示获取成功
if (r >= 0) {
// 设置当前节点为head节点,并且向后传播(依次唤醒)
setHeadAndPropagate(node, r);
p.next = null;
failed = false;
return;
}
}
/*
* 为当前Node在同步队列中找到一个状态合法的前驱Node,然后挂起当前线程。
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
AQS.setHeadAndPropagate()
/*
* 设置当前节点为head节点,并且向后传播(依次唤醒)
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
// 将当前节点设置为新的node节点
setHead(node);
// 在CountDownLatch中 propagate值一定为1
// 但是在Semaphore中,这个值可能为0,所以后续的条件就有意义了
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 获取后继节点
Node s = node.next;
/*
* 条件1:
* s == null 当前node已经是tail了,条件一定成立,doReleaseShared()会处理
* 条件2:
* 前置条件 s != null,并且s是共享节点,调用await()时构造节点时就是共享模式了
* 基本上所有的情况都会执行到 doReleaseShared()
*/
if (s == null || s.isShared())
doReleaseShared();
}
}
release() 大致流程图
Semaphore.release()
public void release() {
sync.releaseShared(1);
}
AQS.releaseShared()
public final boolean releaseShared(int arg) {
/*
* Sync.tryReleaseShared():CAS + 自旋方式释放资源,大概率都会成功。此方法在上面的内部类Sync中分析了
*/
if (tryReleaseShared(arg)) {
// 唤醒获取资源失败的线程
doReleaseShared();
return true;
}
return false;
}
AQS.doReleaseShared()
/*
* 唤醒获取资源失败的线程
*
* CountDownLatch 版本
* 都有哪几种路径会调用到doReleaseShared方法呢?
* 1.latch.countDown() -> AQS.state == 0 -> doReleaseShared() 唤醒当前阻塞队列内的 head.next 对应的线程。
* 2.被唤醒的线程 -> doAcquireSharedInterruptibly()中的parkAndCheckInterrupt() 唤醒 -> setHeadAndPropagate() -> doReleaseShared()
*
* Semaphore 版本
*/
// AQS.doReleaseShared
private void doReleaseShared() {
for (;;) {
// 获取当前AQS内的头结点
Node h = head;
// 条件1:h != null 成立,说明阻塞队列不为空..
// 不成立:h == null 什么时候会是这样呢?
// latch创建出来后,没有任何线程调用过 await() 方法之前,有线程调用latch.countDown()操作 且触发了 唤醒阻塞节点的逻辑..
// 条件2:h != tail 成立,说明当前阻塞队列内,除了head节点以外 还有其他节点。
// h == tail -> head 和 tail 指向的是同一个node对象。什么时候会有这种情况呢?
// 1.正常唤醒情况下,依次获取到 共享锁,当前线程执行到这里时(这个线程就是 tail 节点)
// 2.第一个调用await()方法的线程 与 调用countDown()且触发唤醒阻塞节点的线程 出现并发了..
// 因为await()线程是第一个调用latch.await()的线程,此时队列内什么也没有,它需要补充创建一个head节点,然后再次自旋时入队
// 在await()线程入队完成之前,假设当前队列内 只有 刚刚补充创建的空元素 head。
// 同期,外部有一个调用countDown()的线程,将state值从1,修改为0了,那么这个线程需要做 唤醒 阻塞队列内元素的逻辑..
// 注意:调用await()的线程 因为完全入队完成之后,再次回到上层方法 doAcquireSharedInterruptibly 会进入到自旋中,
// 获取当前元素的前驱,判断自己是head.next,所以接下来该线程又会将自己设置为 head,然后该线程就从await()方法返回了..
if (h != null && h != tail) {
// 执行到if里面,说明当前head 一定有 后继节点!
int ws = h.waitStatus;
// 当前head状态 为 signal 说明 后继节点并没有被唤醒过
if (ws == Node.SIGNAL) {
// 唤醒后继节点前 将head节点的状态改为 0
// 这里为什么,使用CAS呢?
// 当doReleaseShared方法 存在多个线程 唤醒 head.next 逻辑时,
// CAS 可能会失败..
// 案例:
// t3 线程在 if (h == head) 返回false时,t3 会继续自旋. 参与到 唤醒下一个head.next的逻辑..
// t3 此时执行到 CAS WaitStatus(h, Node.SIGNAL, 0) 成功.. t4在t3修改成功之前,也进入到 if (ws == Node.SIGNAL) 里面了
// 但是t4 修改 CAS WaitStatus(h, Node.SIGNAL, 0) 会失败,因为 t3 改过了...
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒后继节点
unparkSuccessor(h);
}
// 如果当前状态为0,则CAS设置状态为传播状态。对于这个状态可参考博客[https://blog.csdn.net/cq_pf/article/details/113387256]
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 条件成立:
// 1.说明刚刚唤醒的 后继节点,还没执行到 setHeadAndPropagate方法里面的 设置当前唤醒节点为head的逻辑。
// 这个时候,当前线程 直接跳出去..结束了..
// 此时用不用担心,唤醒逻辑 在这里断掉呢?
// 不需要担心,因为被唤醒的线程 早晚会执行到doReleaseShared方法。
// 2.h == null: latch创建出来后,没有任何线程调用过await()方法之前 有线程调用latch.countDown()操作 且触发了唤醒阻塞节点的逻辑..
// 3.h == tail -> head 和 tail 指向的是同一个node对象
// 条件不成立:
// 被唤醒的节点 非常积极,直接将自己设置为了新的head,此时 唤醒它的节点(前驱),执行 h == head 条件会不成立..
// 此时 head节点的前驱,不会跳出 doReleaseShared 方法,会继续唤醒 新head 节点的后继..
if (h == head) // loop if head changed
break;
}
}
总结
- 其实本质上 CountDownLatch 和 Semaphore 使用的 AQS 方法逻辑是一样的。
- 只是在 挂起节点 和 通知节点 稍微有不同:
- CountDownLatch 挂起节点,是当前 state 不为 0
- Semaphore 挂起节点,是当前 state 小于 0
- CountDownLatch 唤醒节点,是当 state 为 0
- Semaphore 唤醒节点,是当前 state > 0
- Semaphore 初始化的时候需要指定许可的次数,许可的次数是存储在 state 中
- 获取一个许可时,state 值减 1
- 释放一个许可时,state 值加 1
参考
- 视频参考
- b站_小刘讲源码付费课
- 文章参考
- shstart7_AQS共享模式之Semaphore源码解析
- 兴趣使然的草帽路飞_AQS源码探究_09 Semaphore源码分析
- 肆华_Semaphore阅读理解