1、Semaphore介绍
Semaphore(信号量)也是juc包下一个线程同步工具,主要用于 在一个时刻允许多个线程
对共享资源进行并行操作的场景。通常情况下,使用 Semaphore 的过程实际上是多个线程
访问共享资源获取许可证的过程。
Semaphore(信号量)底层也是基于AQS实现的,采用AQS的全局变量属性state作为一个
计数器,state 的值就表示是Semaphore信号量资源个数。
Semaphore 的内部逻辑如下:
1)如果Semaphore 内部的计数器大于0,那么线程将可以获得小于该计数器数量的许可
证数量,同时还会导致 Semaphore 内部的计数器减少所发放的许可证数量。
2)如果此时 Semaphore 内部的计数器等于0,表示没有可用的许可证,那么当前线程
可能被阻塞(使用 tryAcquire 方法时不回阻塞)
3)当前线程不再使用许可证时,需要立即将其释放以供其他线程使用,所以建议将
Semaphore 的获取和释放写在 try...finally...语句块中
2、Semaphore 核心属性&构造函数
Semaphore 的核心属性只有一个Sync 类型的变量,Sync是 Semaphore 的一个内部类继承
自AQS,所以Sync也是一个AQS;Semaphore中的所有功能都是基于Sync实现的。
另外 Semaphore 还有2个Sync的子类,即 非公平模式实现的NonfairSync 和 基于公平模式
实现的FairSync;所以 Semaphore 也是区分 “公平” 和 “非公平“”的。
这里我们需要重点关注下 Semaphore 的构造函数,什么时候是公平的,什么时候是非公平的
Semaphore 构造函数如下:
//默认是非公平的
//创建 Semaphore 时需要指定许可证数量(信号量数量)
public Semaphore(int permits) {
//默认创建非公平锁
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
3、Semaphore 应用场景
3.1、流量控制
Semaphore 尝尝用于限制同时在线的用户数量
如xx系统,最多允许指定数量的用户同时在线,如果超过指定数量,将告诉用户无法登录,
请稍后再试,示例代码如下:
public class SemaphoreExample1 {
public static void main(String[] args) {
//定义允许登录数量
final int MAX_PERMIT_LOGIN_ACCOUNT = 10;
//在这里客户登录逻辑 LoginService 是公共资源
final LoginService loginService = new LoginService(MAX_PERMIT_LOGIN_ACCOUNT);
IntStream.range(0,20).forEach(i -> {
new Thread(() -> {
try{
//客户登录,实际上是获取一次信号量的操作
//boolean login = loginService.login();
//非阻塞的acquire() 方法
boolean login = loginService.loginByAcquire();
if(!login){
System.out.println(currentThread().getName()+" login Error");
}
//模拟登陆后的业务耗时
randomSleep();
}finally {
//释放信号量
loginService.logout();
}
},"User-"+i).start();
});
}
//随机休眠
private static void randomSleep(){
try {
TimeUnit.SECONDS.sleep(current().nextInt(10));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//定义用户登录类
private static class LoginService{
//定义信号量
private final Semaphore semaphore;
public LoginService(int maxPermitLoginAccount){
//初始化信号量,信号量计数器的数量值不能小于0
this.semaphore = new Semaphore(maxPermitLoginAccount);
}
//登录方法,返回登录结果
public boolean login(){
//获取信号量,true=获取成功,false=获取失败
//非阻塞方法,获取信号量失败不阻塞,直接返回false
boolean login = semaphore.tryAcquire();
if(login){
System.out.println(currentThread().getName()+" login success.");
}
return login;
}
public boolean loginByAcquire(){
try {
//若获取信号量失败,则会阻塞,直到有资源可用
semaphore.acquire();
System.out.println(currentThread().getName()+" login success.");
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
}
public void logout(){
//释放许可证
semaphore.release();
System.out.println(currentThread().getName()+" logout success.");
}
}
}
3.2、使用 Semaphore 模拟一个lock锁
根据 Semaphore 的特性,只要没有可用的信号量(许可证),则当前线程需要挂起等待,
等待其他线程释放信号量。只要new 创建 Semaphore 对象时,传入信号量的数量为1,那么
Semaphore 就是一个lock锁,示例代码如下:
private static class TryLock{
//Semaphore 中的计数器定义为1,同一时刻只能有一个线程获取到信号量
private final Semaphore semaphore = new Semaphore(1);
//能获取到信号量表示成功获取锁
public boolean lock(){
boolean lock = semaphore.tryAcquire();
return lock;
}
//释放锁,即释放信号量
public void unlock(){
semaphore.release();
System.out.println(currentThread().getName()+" release lock");
}
}
4、Semaphore 常用方法解析
4.1、acquire() 、acquire(int permits) 方法
acquire() 和 acquire(int permits) 方法是获取信号量,若获取失败则一直挂起等待,直到获取
信号量成功;若线程被中断则异常退出;不同的地方是 acquire() 获取一个信号量资源,而
acquire(int permits) 获取指定数量permits的信号量资源,如下所示:
4.2、acquireSharedInterruptibly(int arg) 方法
acquireSharedInterruptibly 方法功能是获取可中断的共享锁;在获取锁之前先判断当前线程
是否已经被其他线程中断,若已经被中断,则抛出中断异常并退出。
若当前线程没有被中断,则执行 tryAcquireShared() ,若 tryAcquireShared() 方法执行失败
,即 tryAcquireShared() 返回值小于0,则将当前线程放入AQS双向链表中阻塞。
acquireSharedInterruptibly 方法代码如下:
在这里我们需要关注 tryAcquireShared() 方法在 AQS子类 Semaphore.NonfairSync
和 Semaphore.FairSync 中的实现
4.2.1、tryAcquireShared() 在 Semaphore.NonfairSync 中的实现
4.2.2、tryAcquireShared() 在 Semaphore.FairSync 中的实现
/**
* 以公平的模式 获取指定数量acquires的信号量
* @param acquires
* @return
*/
protected int tryAcquireShared(int acquires) {
//自旋
for (;;) {
//公平实现先判断队列中排队的情况
//如果有排队节点,且当前节点不是头结点的next节点
//则表示当前线程在AQS中的队列中排队,则返回-1,表示当前线程由AQS完成阻塞等待
if (hasQueuedPredecessors())
return -1;
//执行到这里表示当前线程不用排队,可以尝试竞争信号量资源
//state变量用于表示permit值,获取当前资源数量
int available = getState();
int remaining = available - acquires;
//remaining 小于0,表示没有足够的信号量资源,则直接返回remaining
//remaining大于等于0,表示此时有多余的信号量,则进行CAS操作,若CAS操作失败,则进入下一次循环
//若CAS操作成功,表示当前线程获取到了信号量资源,则返回当前的资源数,当前的资源数可能为0或者大于0
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
public final boolean hasQueuedPredecessors() {
/**
* 判断 是否存在比当前线程等待时间长的线程或节点(节点关联线程)
*/
Node t = tail; //按反向初始化顺序读取节点
Node h = head;
Node s;
//若队列不为空,且队列中除了头节点外第一个节点中关联的线程不是当前线程,
//则返回true
//todo 在并发条件下,在判断 h != t 成立后,后面 s = h.next 可能为空,这种情况下也认为存在有比当前线程等待时间长的线程
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());//todo 注意:在队列不为空的情况下,s一定不为空,
}
4.3、nonfairTryAcquireShared(int acquires) 方法
nonfairTryAcquireShared 方法是 Semaphore.Sync 类中的方法,用于非公平模式下获取
指定数量的信号量,
/**
* 非公平锁获取共享锁(即读锁),直接抢锁
* 这里是获取信号量
*
* @param acquires 要获取信号量的数量
* @return
*/
final int nonfairTryAcquireShared(int acquires) {
//非公平锁直接CAS抢锁即可,直到可用资源小于0 或 CAS 抢到了锁
for (;;) {
int available = getState();
int remaining = available - acquires;
//可用资源数remaining小于0 或 CAS操作成功,即抢到了锁才会结束
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
4.4、acquireUninterruptibly(int permits) 方法
获取指定数量的信号量,若获取失败,则一直阻塞
若线程被中断,则继续等待,即不允许中断
注意:acquireShared(int arg) 方法的解析请参考前边“AQS(二)共享锁的获取”
4.5、tryAcquire()、tryAcquire(int permits) 方法
获取一指定数量的信号量资源,若有资源则返回true,若获取失败则返回false,
注意:这个方法线程不会阻塞
nonfairTryAcquireShared 方法请参考4.3
4.4、tryAcquire(long timeout, TimeUnit unit)、 tryAcquire(int permits, long timeout, TimeUnit unit)
带有超时时间的获取指定数量的信号量资源,若获取成功,则直接返回true,都则线程挂起
等待 ,若等待超过了超时时间 timeout ,则返回false
4.5、release() 、release(int permits)方法
释放指定信号量,release() 默认是释放一个信号量
注意:这里释放信号量的逻辑是在 Semaphore.Sync 中实现的
4.6、tryReleaseShared(int releases) 方法
该方法功能是释放指定数量的信号量,是在 Semaphore.Sync 中实现的
/**
* 释放锁(信号量)
* @param releases
* @return
*/
protected final boolean tryReleaseShared(int releases) {
//直接通过CAS操作对AQS的全局变量 state加上参数releases(releases一般都是1)即可
for (;;) {
//获取当前state,即信号量资源个数
int current = getState();
//机上要归还的信号量个数
int next = current + releases;
if (next < current) // 信号量小于加前的,即信号量溢出(超过int 最大值),则抛出溢出
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))//CAS操作成功,表示释放成功,则返回true,直接结束,否则进入下一次循环
return true;
}
}
4.7、availablePermits() 方法
availablePermits 功能是获取当前可用的信号量个数
public int availablePermits() {
return sync.getPermits();
}
//Sync方法
final int getPermits() {
return getState();
}
//AQS方法
protected final int getState() {
//返回同步状态的当前值。该操作具有volatile读的内存语义。
return state;
}