目录
JUC并发编程之Semaphore-应用与深度源码剖析
1. Semaphore 是什么?
2.怎么使用Semaphore?
2.1构造方法
2.2 重要方法
2.3 基本使用
需求场景
基础版代码实现
tryAcquire()引入代码实现
acquireUninterruptibly(),acquire()对比代码实现
3.源码剖析【重点】
底层结构图:
思路总结:
semaphore.acquire():
semaphore.release():
JUC并发编程之Semaphore-应用与深度源码剖析
1. Semaphore 是什么?
Semaphore字面意思是信号量的意思,它的作用就是控制访问特定资源的线程数目,底层依赖AQS的状态State,是在生产当中比较常用的一个工具类。
2.怎么使用Semaphore?
2.1构造方法
public Semaphore(int permits)
public Semaphore(int permits, boolean fair)
permits表示许可线程的数量
fair表示公平性,如果这个设为true的话,下一次执行的线程就会是等待最久的线程
2.2 重要方法
public void acquire() throws InterruptedException
public void release()
tryAcquire(int args,long timeout, TimeUnit unit)
- acquire() 表示阻塞并获取许可
- release() 表示释放许可
2.3 基本使用
需求场景
资源访问,服务限流Hystrix里面限流底层就是基于信号量的方式,如图所示:
基础版代码实现
/**
* @Description: TODO
* @Author: etcEriksen
* @Date: 2023/3/7
**/
@Slf4j
@SuppressWarnings({"all"})
public class SemaphoreRunner {
public static void main(String[] args) {
//构造参数为:2,表示的含义为:该Semaphore所带有的总公共资源为2
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 10; i++) {
new Thread(new Task(semaphore,"leomessi:"+i)).start();
}
}
static class Task extends Thread {
Semaphore semaphore ;
public Task(Semaphore semaphore,String tname) {
super(tname) ;
this.semaphore = semaphore ;
}
@Override
public void run() {
try {
//semaphore.acquireUninterruptibly();
semaphore.acquire(2);//获取2个公共资源才可以通过一个线程 【带有中断抛出异常的机制】
log.info(Thread.currentThread().getName()+":aquire at time:" + System.currentTimeMillis()) ;
Thread.sleep(5000) ;
semaphore.release(2) ;//归还公共资源,并且归还的公共资源数量要和一个线程通过时获取的公共资源数量要持平
} catch (Exception e) {
e.printStackTrace();
}
}
public void fallback() {
log.info("降级");
}
}
}
代码分析:
运行结果:
tryAcquire()引入代码实现
/**
* @Description: TODO
* @Author: etcEriksen
* @Date: 2023/3/7
**/
@Slf4j
@SuppressWarnings({"all"})
public class SemaphoreRunner {
public static void main(String[] args) {
//构造参数为:2,表示的含义为:该Semaphore所带有的总公共资源为2
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 10; i++) {
new Thread(new Task(semaphore,"leomessi:"+i)).start();
}
}
static class Task extends Thread {
Semaphore semaphore ;
public Task(Semaphore semaphore,String tname) {
super(tname) ;
this.semaphore = semaphore ;
}
@Override
public void run() {
try {
// //semaphore.acquireUninterruptibly();
// semaphore.acquire();//获取2个公共资源才可以通过一个线程 【带有中断抛出异常的机制】
// Thread.sleep(5000) ;
// semaphore.release(2) ;//归还公共资源,并且归还的公共资源数量要和一个线程通过时获取的公共资源数量要持平
if (semaphore.tryAcquire(500, TimeUnit.MILLISECONDS)) {
log.info(Thread.currentThread().getName()+":aquire at time:" + System.currentTimeMillis()) ;
Thread.sleep(5000);
semaphore.release();//释放公共资源
} else {
//如果500毫秒线程还没有获取到相对应的2个公共资源,那么降级处理
fallback();
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void fallback() {
log.info("降级");
}
}
}
分析代码:这里结合了降级处理
运行结果:
acquireUninterruptibly(),acquire()对比代码实现
acquire():当线程被中断后,会抛出InterruptException异常。
acquireUninterruptibly():当线程被中断后,不会抛出异常。
acquireUninterruptibly(): 结合代码分析
运行结果
acquire():结合代码分析
运行结果:
3.源码剖析【重点】
底层结构图:
ProcessOn Flowchart
思路总结:
初始化Semaphore对象时指定总资源数量,多个线程进来时会去竞争该公共资源,但是在公平锁的情况下,会维护一个CLH阻塞队列,该队列为公平队列,从前往后进行唤醒获取公共资源。当公共资源不够当前线程使用时或CLH阻塞队列存在节点时,新进来的线程对象都会被封装为Node节点加入到CLH阻塞队列的尾部,公平等待时机。非公平锁时,与之正好相反。
semaphore.acquire():
1.
2.
3.
acquireSharedInterruptibly()调用的tryAcquireShared解析:
acquireSharedInterruptibly()调用的doAcquireSharedInterruptibly解析:
很多相同的源码在之前的源码分析中都详细介绍了,所以这里只记录新出现的源码思路:
注:doAcquireSharedInterruptibly方法调用的tryAcquireShared方法
4.应用层调用Interrupt()方法
应用层的interrupt()中断方法调用后,底层park阻塞被中断,那么继续向下执行代码:
interrupt()方法给当前线程打上中断标识啦,所以调用interrupted()方法时返回true并且消除中断标识。
semaphore.release():
该方法后之前源码分析的lock.unlock()的思路基本一致。简略记录:
1.
2.
3.