一、Semaphore
Semaphore
通过设置一个固定数值的信号量,并发时线程通过 acquire()
获取一个信号量,如果能成功获得则可以继续执行,否则将阻塞等待,当某个线程使用 release()
释放一个信号量时,被等待的线程如果可以成功抢到信号量则继续执行。根据该特征可以有效控制线程的并发数。
那 Semaphore
是如何控制并发的呢,本篇文章带领大家一起解读下 Semaphore
的源码。
在进行源码分析前,先回顾下 Semaphore
是如何使用的,例如下面一个案例:
public class Test {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println("线程:" + Thread.currentThread().getName() + " 执行, 当前时间:" + LocalDateTime.now().toString());
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}, String.valueOf(i)).start();
}
}
}
运行之后,可以看到下面日志:
可以看到每次都是 3
个并发。
下面我们通过源码看下 Semaphore
是如何实现的等待唤醒。
二、Semaphore 源码解读
2.1. 对象的创建过程
如果需要一个Semaphore
对象,可以直接通过 new
创建对象,并传入一个信号量的值,那这个值到底给到哪去了呢?
我们可以点到 Semaphore
的构造函数中,构造函数中可以通过 fair
参数控制 Sync
的类型,其实就是控制后面所使用的锁是公平锁还是非公平锁,如果使用的是只有一个 permits
参数的构造函数,则 Sync
的类型是 NonfairSync
也就是非公平锁:
这里以 NonfairSync
为例,可以点到 NonfairSync
的构造函数中,可以看到信号量值给了父类 Sync
的构造函数:
Sync
则继承了 AQS
,因此 Semaphore
是依赖于 AQS
阻塞队列的,这里将信号量的大小给到了 AQS
中的 state
变量。
2.2. acquire() 获取信号量过程
点到 acquire()
方法中,可以看到又调用了 sync.acquireSharedInterruptibly(1)
也就是 AQS
下的 acquireSharedInterruptibly
方法中:
在该方法中首先使用了 tryAcquireShared
方法,可以理解为获取锁的操作,不过不同的是,这里获取的是信号量的剩余,如果剩余小于0
了,则表示获取锁失败,需要进行阻塞,下面来看下是如果获取锁的。
这里的 tryAcquireShared
会调用到子类的 tryAcquireShared
,这里以 NonfairSync
为例:
下面继续进到 nonfairTryAcquireShared
方法中,这里的逻辑很明了,使用乐观锁自旋的方式,对 state
也就是记录信号量值的变量,进行减去 acquires
, acquires
在前面就已经看到,写死的 1
,因此这里将 state
进行了 -1
并赋值,返回的值也就是state - 1
之后的值:
看到这之后再回到前面的 AQS
下的 acquireSharedInterruptibly
方法中,如果 tryAcquireShared
获取锁成功,也就是信号量余额大于等于 0
,那这种情况下也无需进行阻塞,但如果小于 0
了此时则表示信号量已经被使用完了,该线程就需要阻塞等待了,下面继续进到 doAcquireSharedInterruptibly
方法中,看是如何实现阻塞等待的:
进到 doAcquireSharedInterruptibly
方法中,主要使用了 AQS
中的几个常用方法,在该方法中首先使用 addWaiter
将当前线程加入到了 AQS
的阻塞队列中:
然后使用自旋的方式,找到当前节点的前驱节点,如果前置节点已经是 head
了,那就可以尝试获取锁了,也就是信号量的剩余,如果获取到了锁,也就是信号量大于等于0
,则可以唤醒node
节点,释放p
:
如果前面没有成功获取到锁,下面则需要进行阻塞等待,当被唤醒时,又会接着进行自旋直到获取到锁后结束:
2.3. release() 释放信号量过程
首先进入到 release()
方法中,可以看到调用了 AQS
下的 releaseShared
方法,并传入了固定值 1
:
这里 tryReleaseShared
,其实也是一种获取锁操作,下面可以进到该方法中看下逻辑:
在该方法中同样和之前 nonfairTryAcquireShared
方法类似,都是使用乐观锁自旋的方式,不过这里做的是 + releases
也就是 +1
,并将 +1
后的值进行替换,并返回 true
:
下面再回到 AQS
下的 releaseShared
方法,下面会进入到 doReleaseShared
方法中,该方法则是对已经阻塞的线程进行唤醒:
三、总结
阅读了 Semaphore
的源码,可以发现内部基于AQS
共享方式实现,因此在并发量大的场景下可以起到较好的性能。