文章目录
- 一、Semaphore基础概念
- 二、接口限流场景模拟
- 三、Semaphore源码解析
- 3.1、Semaphore结构解析
- 3.1.1、Sync源码
- 3.1.2、NonfairSync源码
- 3.1.3、FairSync源码
- 3.2、Semaphore重点方法源码解析
本章节将对Semaphore类中所有方法进行源码解析并提供部分代码案例。可以让读者全面了解该类提供的核心功能和该类使用的场景。在阅读本章之前希望您对AQS(AbstractQueuedSynchronizer)有一定了解,如果您曾阅读过AbstractQueuedSynchronizer源码或者是ReentrantLock源码,那本章节内容阅读起来就很轻车熟路。
一、Semaphore基础概念
Semaphore翻译过来的意思是信号、信号量。该类的主要作用是用于限制线程访问共享资源的数量。通俗一点来说,就是起到限流的作用。比如我们最常见的接口限流:当我们对外暴露一个接口时,应该对该接口的最大并行数进行控制。而Semaphore正好可以控制线程访问共享资源的数量,因此在单机部署的情况下,可以使用Semaphore来处理接口限流。synchronized关键字想必大家并不会感到陌生,synchronized可以控制共享资源同时只能被一个线程访问,而Semaphore更像是synchronized的升级版,Semaphore允许用户自定义共享资源同时被多少个线程访问。这个值可以是1也可是任意整数,但主要这个值不能大于int类型的最大值,也就是最大值不能超过2147483647。
二、接口限流场景模拟
案例中初始化一个Semaphore对象,设置了2个凭证,这就代表该类控制最多两个线程可以同时访问共享资源。构建了10个模拟请求线程,同时对共享资源访问,通过允许结果可以看到,10个线程同时访问时,只有2个线程获取到了凭证从而能访问共享资源,而剩余的8个线程都被接口拒绝,而无法访问共享资源。
package uct;
import java.util.concurrent.Semaphore;
/**
* @Author: DI.YIN
* @Date: 2024/6/14 17:22
* @Version:
* @Description:
**/
public class SemaphoreDemo {
private static Semaphore semaphore = new Semaphore(2); //限制最多2个线程可以访问共享资源
public static void main(String[] args) {
for (int i = 0; i < 10; i++) { //构建10个线程,模拟外部线程调用接口
ThreadRequest threadRequest = new ThreadRequest(semaphore, "线程[" + i + i + "]");
threadRequest.start();
}
}
/**
* 外部线程具体实现
*/
public static class ThreadRequest extends Thread {
private Semaphore semaphore;
ThreadRequest(Semaphore semaphore, String threadName) {
this.semaphore = semaphore;
super.setName(threadName);
}
@Override
public void run() {
if (semaphore.tryAcquire()) { //尝试获取凭证,如果获取失败,则返回false
System.out.println("获取凭证成功,可以进行访问");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("访问完成,释放凭证");
semaphore.release(); //释放凭证,可供后续线程继续获取
} else {
System.out.println("接口限流,本次请求被拒绝");
}
}
}
}
三、Semaphore源码解析
从第二小节可以看出,Semaphore通过凭证数来判断是否允许线程访问共享资源,当线程来临时,通过调用acquire方法或者是tryAcquire来尝试或者凭证。两个方法最大的差异在于,acquire方法是会一直阻塞线程,直至能够从Semaphore中获取到凭证,而tryAcquire只是尝试获取,如果获取失败,则会立马返回结果,并不会阻塞线程。当线程获取到凭证时,Semaphore中的凭证数就会随之减少,如果凭证数小于0时,Semaphore就会阻塞线程(调用acquire方法
)或者是立马返回false结果(调用tryAcquire方法
)。当线程访问完共享资源后,应该调用release方法释放所拥有的凭证,否则其他等待线程将一直被阻塞。
3.1、Semaphore结构解析
Semaphore类主要由三个内部类构成,其结构与ReentrantLock类十分相似。Semaphore类中自定义了Sync、NonfairSync、FairSync类。Sync类继承于AbstractQueuedSynchronizer,AbstractQueuedSynchronizer为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁定和相关同步器(信号量、事件,等等)提供一个基础的框架,AQS在JAVA的锁实现中有大量的使用。NonfairSync又叫做非公平锁,而FairSync称之为公平锁。NonfairSync和FairSync又继承了Sync,因此这两个类具有AQS的特性。这里不再对AbstractQueuedSynchronizer做过多阐述,因为AbstractQueuedSynchronizer自身设计较为复杂并且源码理解有一定难度,本章节无法一次性说完。所以需要您先对AbstractQueuedSynchronizer的设计有一定的了解。
3.1.1、Sync源码
Sync类继承于AQS,Sync作为NonfairSync和FairSync的父类,其自身内部也有一定的实现。构造方法Sync(int permits) 用于设置最大同步访问共享资源数。在AbstractQueuedSynchronizer 中有一个state的int变量,用于维护这里传入的凭证数量。Sync类的nonfairTryAcquireShared方法用于线程尝试获取acquires个凭证,该方法默认作为非公平锁NonfairSync获取凭证数的实现。for循环加上ACS从而规避多线程数据不一致问题。
/**
* Synchronization implementation for semaphore. Uses AQS state
* to represent permits. Subclassed into fair and nonfair
* versions.
* 信号量的同步实现。使用AQS状态代表许可证。子类分为公平和非公平版本。
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits); //设置共享资源最大可以同步访问的线程数。
}
final int getPermits() {
return getState();//获取许可证数量
}
//尝试获取许可证,只有获取到许可证的才能访问共享资源
final int nonfairTryAcquireShared(int acquires) {
for (; ; ) {
int available = getState();//获取当前可用的许可证数
int remaining = available - acquires;//计算扣减后剩余的许可数
if (remaining < 0 || compareAndSetState(available, remaining)) //如果许可数小于0,则直接返回,或者是通过CAS重置许可证数成功,则返回
return remaining;
}
}
//尝试释放release个许可证
protected final boolean tryReleaseShared(int releases) { //state值加releases个,通过ACS通知多线程
for (; ; ) {
int current = getState(); //获取当前许可证数
int next = current + releases;//修改许可证数量
if (next < current) // overflow 超过int类型最大值,则值为负数
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next)) //通过CAS更新许可证数量
return true;
}
}
final void reducePermits(int reductions) {
for (; ; ) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
//将许可证数量设置为0,表示不允许任何线程访问共享资源
final int drainPermits() {
for (; ; ) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
3.1.2、NonfairSync源码
NonfairSync基本上没有对Sync 进行扩展,其实现也是基于Sync
/**
* NonFair version 非公平锁
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
//尝试获取acquires个凭证,其底层调用Sync的nonfairTryAcquireShared方法
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
3.1.3、FairSync源码
FairSync继承于Sync,其tryAcquireShared方法并不像NonfairSync一样基于Sync来实现,因为公平锁和非公平锁最大的区别在于,公平锁讲究先进先出,也就是最先进入等待队列的线程,应该优先被唤醒尝试去获取凭证。而非公平锁自身随机的。
/**
* 公平锁
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
//获取acquires个凭证,hasQueuedPredecessors用于判断链表中是否还存在等待的线程
protected int tryAcquireShared(int acquires) {
for (; ; ) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
3.2、Semaphore重点方法源码解析
Semaphore其自身实现方法并不多,更多的实现是基于AbstractQueuedSynchronizer,现在我们从Semaphore源码从上往下进行解析。
构造函数Semaphore(int permits) 和 Semaphore(int permits, boolean fair) 都需要传入一个int类型的permits,这个值将保存到AbstractQueuedSynchronizer的state中,用于控制多线程最大并行数量。而布尔值fair用于指明底层采用公平锁FairSync还是非公平锁NonfairSync。默认采用非公平锁。
/**
* Creates a {@code Semaphore} with the given number of
* permits and nonfair fairness setting.
*
* @param permits the initial number of permits available.
* This value may be negative, in which case releases
* must occur before any acquires will be granted.
*/
public Semaphore(int permits) {
sync = new NonfairSync(permits); //设置许可证数量,默认采用不公平锁
}
/**
* Creates a {@code Semaphore} with the given number of
* permits and the given fairness setting.
*
* @param permits the initial number of permits available.
* This value may be negative, in which case releases
* must occur before any acquires will be granted.
* @param fair {@code true} if this semaphore will guarantee
* first-in first-out granting of permits under contention,
* else {@code false}
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits); //根据布尔值判断底层采用哪种模式的锁
}
acquire方法用于尝试获取一个凭证,如果获取失败(当前不存在可用凭证
)则当前线程将会被阻塞,直至有线程调用release方法释放凭证,唤醒阻塞线程。
/**
* Acquires a permit from this semaphore, blocking until one is
* available, or the thread is {@linkplain Thread#interrupt interrupted}.
* 从该信号量获取许可,阻塞直到可用,或者线程为被中断{@linkplain thread#interrupt interrupt}
*
* <p>Acquires a permit, if one is available and returns immediately,
* reducing the number of available permits by one.获得许可证(如果有)并立即返回,将可用许可证的数量减少一个
*
* <p>If no permit is available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of two things happens:
* 如果没有可用的许可,则当前线程将出于线程调度目的而被禁用,并处于休眠状态,直到发生以下两种情况之一:
* 1. 某些线程调用release方法释放了许可,或者是当前线程被中断
* <ul>
* <li>Some other thread invokes the {@link #release} method for this
* semaphore and the current thread is next to be assigned a permit; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread.
* </ul>
*
* <p>If the current thread:
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting
* for a permit,
* </ul>
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
*
* @throws InterruptedException if the current thread is interrupted
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
acquireSharedInterruptibly方法来自于AbstractQueuedSynchronizer。tryAcquireShared方法的实现在NonfairSync或者FairSync中,尝试获取n个凭证,如果返回负数,表示当前没有可用的凭证,则进入doAcquireSharedInterruptibly方法,doAcquireSharedInterruptibly方法将当前线程构建成一个Node对象放入链表中。调用LockSupport.park阻塞线程。当然如果当前存在可用凭证,则线程无需阻塞。Semaphore通过判断线程是否获取凭证来判断是否应该让线程阻塞。从而实现线程并发控制。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) //如果获取许可证失败,那么进入doAcquireSharedInterruptibly方法
doAcquireSharedInterruptibly(arg); //将线程放入同步队列,阻塞
}
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED); //将线程构建成Node节点,放入队列
boolean failed = true;
try {
for (; ; ) {
final Node p = node.predecessor(); //当前节点的前一个节点
if (p == head) { //如果他的前一个节点是头节点
int r = tryAcquireShared(arg); //再次尝试获取凭证
if (r >= 0) {
setHeadAndPropagate(node, r); //将当前节点设置为头节点
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //阻塞线程
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node); //如果获取到凭证,那么将线程移出队列
}
}
tryAcquire方法当比于acquire方法少了阻塞线程那一步,当调用tryAcquire时,线程将尝试获取一个凭证,如果获取失败,则直接返回flase结果,而不像acquire那样放入链表中阻塞线程。nonfairTryAcquireShared方法的具体实现在Sync中,如果该方法返回的值大于0,则表示线程获取到凭证,否则获取凭证失败。
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
final int nonfairTryAcquireShared(int acquires) {
for (; ; ) {
int available = getState();//获取当前可用的许可证数
int remaining = available - acquires;//计算扣减后剩余的许可数
if (remaining < 0 || compareAndSetState(available, remaining)) //如果许可数小于0,则直接返回,或者是通过CAS重置许可证数成功,则返回
return remaining;
}
}
tryAcquire(long timeout, TimeUnit unit) 方法 与 acquire 方法的实现十分相近,只是设置一个等待时间,如果超过该时间如果线程还未获取当凭证,则返回false。acquire 方法是一直等待,而tryAcquire(long timeout, TimeUnit unit) 是给定等待时间,超时则返回结果。
//在指定的时间尝试获取凭证
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);//如果tryAcquireShared返回负数,则进入doAcquireSharedNanos方法进行阻塞休眠,调用LockSupport.parkNanos进行睡眠
}
release方法用于释放一个凭证,释放的凭证将加到state上,以便于其他线程进行获取。tryReleaseShared方法的具体实现在Sync类。通过CAS来重置AQS上state的数量。
public void release() {
sync.releaseShared(1);
}
//尝试释放release个许可证
protected final boolean tryReleaseShared(int releases) { //state值加releases个,通过ACS通知多线程
for (; ; ) {
int current = getState(); //获取当前许可证数
int next = current + releases;//修改许可证数量
if (next < current) // overflow 超过int类型最大值,则值为负数
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next)) //通过CAS更新许可证数量
return true;
}
}
availablePermits 发放用于获取当前可用凭证数,其底层也就是调用AbstractQueuedSynchronizer的getstate方法获取state属性的当前值
public int availablePermits() {
return sync.getPermits();
}
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState(); //调用AbstractQueuedSynchronizer的方法
}
}
drainPermits方法用于获取当前可用许可证数,并将许可证数设置为0,这样就会导致接下来的所有线程无法访问共享资源,通过for加CAS来规避多线程的影响。
//将许可证数量设置为0,表示不允许任何线程访问共享资源
final int drainPermits() {
for (; ; ) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
hasQueuedThreads则是判断当前链表中是否还存在阻塞等待的线程,判断条件则是判断当前表头和表尾是否一致,如果是一致的,则表示当前链表为空链表,不存在阻塞的线程。
public final boolean hasQueuedThreads() {
return head != tail;
}
getQueuedThreads则是获取当前链表中所有阻塞的线程,其核心操作就是遍历链表,然后将链表的所有线程放入集合中,一次性返回
public final Collection<Thread> getQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) { //遍历链表
Thread t = p.thread;
if (t != null)
list.add(t);
}
return list;
}