同步工具类:Semaphore
- 介绍
- 源码分析
- 构造函数
- acquire 获取信号量
- release 释放信号量
- 业务场景
- 代码
- 测试结果
- 总结
介绍
官方说明:
Semaphore用于限制可以访问某些资源(物理或逻辑的)的线程数目,他维护了一个许可证集合,有多少资源需要限制就维护多少许可证集合,假如这里有N个资源,那就对应于N个许可证,同一时刻也只能有N个线程访问。一个线程获取许可证就调用acquire方法,用完了释放资源就调用release方法。
通俗说明:
Semaphore 中文俗称信号量,主要用于控制流量,比如:数据库连接池给你分配10个链接,那么让你来一个连一个,连到10个还没有人释放,那你就等等。
源码分析
构造函数
permits 可以理解为许可证,默认情况下只需要传入permits即可。也就是一次运行放行几个线程。如果你需要使用Semaphore 共享锁中的公平锁,那么可以传入第二个构造函数fair= false/true.
public Semaphore(int permits) {
//permits 为许可数量,默认构造非公平版本
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
//permits 为许可数量,fair 来决定构造公平版本还是非公平版本
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
acquire 获取信号量
如下所示,其实获取信号量的这四个方法,主要就是,一次获取几个和是否响应中断的组合。
是否响应中断 可以理解为 其他线程调用interrupt方法时,该线程是否做出响应(一般是抛出异常)。
下面举例了 acquire()方法。调用了AQS的模板方法 acquireSharedInterruptibly();其中Semaphore 实现了公平锁和非公平锁的tryAcquireShared()方法。公平锁的是首先判断队列里面有没有排队的。如果有的话,就需要等待阻塞。非公平锁 是一上来就死循环 去拿资源。
方法 | 描述 |
---|---|
semaphore.acquire() | 一次获取一个信号量,响应中断 |
semaphore.acquire(2) | 一次获取n个信号量,响应中断(一次占2个坑) |
semaphore.acquireUninterruptibly() | 一次获取一个信号量,不响应中断 |
semaphore.acquireUninterruptibly(2) | 一次获取n个信号量,不响应中断 |
public void acquire() throws InterruptedException {
//调用AQS 的模板方法
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//此处响应中断,抛出InterruptedException()
if (Thread.interrupted())
throw new InterruptedException();
//Semaphore 的公平锁和非公平锁都实现了tryAcquireShared方法
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
公平锁:
protected int tryAcquireShared(int acquires) {
for (;;) {
//调用AQS的 hasQueuedPredecessors方法,查看是否
//有其他线程排队,有的话就返回-1,不获取资源,直接进入阻塞
if (hasQueuedPredecessors())
return -1;
int available = getState();
//获取的资源需要小于剩余的资源
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
非公平锁:
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
//判断资源是否够用
int remaining = available - acquires;
//够用的话直接 cas 修改返回
//不够用的话也是cas 修改
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
release 释放信号量
有获取就得有释放,获取了几个信号量就要释放几个信号量
方法 | 描述 |
---|---|
semaphore.release() | 一次释放一个信号量 |
semaphore.release(2) | 一次获取n个信号量 |
public void release() {
//调用AQS的模板方法 releaseShared()
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
//Semaphore Sync 实现了tryAcquireShared方法
//公平锁和非公平锁都是一样的逻辑
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
//对参数进行校验,首先不能是负数
//其次不能两数相加超过了Integer.MaxValue 从而溢出了
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//在循环里面将资源释放回去即可
if (compareAndSetState(current, next))
return true;
}
}
业务场景
此处模拟8个人同时去上厕所,但是测试只有两个。所以需要排队同步,此时可以利用Semaphore。
代码
/**
* @author :echo_黄诗
* @description:Semaphore 的业务模拟
* @date :2023/3/1 17:11
*/
public class Demo {
public static void main(String[] args) {
//定义两个资源(此时是厕所)
Semaphore semaphore=new Semaphore(2);
//定义核心线程池个数为8
ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(8,20,20,
TimeUnit.SECONDS,new ArrayBlockingQueue(15));
//定义8个人来上厕所
for (int i=0;i<8;i++){
threadPoolExecutor.execute(()->{
try {
semaphore.acquire();
System.out.println(new SimpleDateFormat("YYYY-MM-DD HH:mm:ss").format(new Date()));
//模拟上厕所耗时
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//上完厕所后,需要离开(释放资源)
semaphore.release();
}
System.out.println(Thread.currentThread().getName()+": 来上厕所了,");
});
}
}
}
测试结果
总结
一:Semaphore 类的核心是Sync,它继承了AQS类,并重写了几个关键方法来实现自己的特殊功能。
二:Semaphore 类中实现了公平锁和非公平锁,默认是非公平。和 ReentrantLock 类似。
三:Semaphore 类中的资源是使用了AQS中的state属性。
四: Semaphore 和CountDownLatch 的实现方式比较类似。大家可以比较学习。
CountDownLatch 学习可以参考:
https://blog.csdn.net/echohuangshihuxue/article/details/129280219
大家有什么补充的,随时欢迎。