Semaphore的基本使用场景是限制一定数量的线程能够去执行.
举个简单的例子: 一个单向隧道能同时容纳10个小汽车或5个卡车通过(1个卡车等效与2个小汽车), 而隧道入口记录着当前已经在隧道内的汽车等效比重. 比如1个小汽车和1个卡车, 则隧道入口显示3. 若隧道入口显示10表示已经满了. 当汽车驶出隧道之后, 隧道入口显示的数字则会相应的减小. 于这个示例相符合场景非常适合用信号量.
Semaphore在构造的时候, 可以传入一个int. 表示有多少许可(permit). 线程获取锁的时候, 要告诉信号量使用多少许可(类比与小汽车和卡车), 当线程要使用的许可不足时, 则调用的线程则会被阻塞. 可以和上面简单的举例进行初步理解.
Semaphore - 信号量
下面是简单代码示范
-
public static void main(String[] args) {
-
// 表示有2个许可.
-
Semaphore sem = new Semaphore(2);
-
for (int i = 0; i < 3; i++) {
-
new Thread(() -> {
-
try {
-
// 默认使用一个许可.
-
sem.acquire();
-
System.out.println(Thread.currentThread() + " I get it.");
-
TimeUnit.SECONDS.sleep(3);
-
System.out.println(Thread.currentThread() + " I release it.");
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
} finally {
-
sem.release();
-
}
-
}).start();
-
}
-
}
代码输出入下:
-
Thread[Thread-0,5,main] I get it.
-
Thread[Thread-1,5,main] I get it.
-
Thread[Thread-1,5,main] I release it.
-
Thread[Thread-0,5,main] I release it.
-
Thread[Thread-2,5,main] I get it.
-
Thread[Thread-2,5,main] I release it.
上述大致可以分为以下三步:
- 第一步: 首先线程0和1, 获取锁. 线程3被被阻塞.
- 第二步: 3秒过后, 线程0和线程1分别释放锁,
- 第三步: 线程2可以获得到锁.
Semaphore获取锁流程
Semaphore可以有4个方式获得锁.
- acquire() 线程占用一个许可.
- acquire(int) 线程占用int个许可
- acquireUninterruptibly() 线程占用一个许可, 调用不可以打断
- acquireUninterruptibliy(int) 线程占用int个许可,调用并且不可打断
4个方法只有细微的不同, 这里用 acquire()
用来分析, 其他的可以自行分析.
acquire 方法
调用Semaphore#acquire()
方法, 它本质上是调用的AQS#acquireSharedInterruptibly(int)
, 参数为1.
-
// arg 等于 1
-
public final void acquireSharedInterruptibly(int arg)
-
throws InterruptedException {
-
if (Thread.interrupted())
-
throw new InterruptedException();
-
// 关于tryAcquireShared,Semaphore有两种实现
-
// 一种是公平锁,另一种是非公平锁. 这分析非公平锁.
-
if (tryAcquireShared(arg) < 0)
-
// 调用 AQS#doAcquireSharedInterruptibly(1) 方法
-
doAcquireSharedInterruptibly(arg);
-
}
上面代码中, 因为AQS规定tryAcquireShared
方法要由实现方覆写. 所以在Semaphore中存在两个覆写, 一个是公平锁的覆写, 另一个是非公平锁的覆写. 这里选择以非公平锁来阅读. 因为日常使用较多(可能是无意识的,构造方法只需要传入一个int).
-
// NonfairSync#tryAcquireShared 方法.
-
// 注意: NonfairSync extends Sync !!!
-
protected int tryAcquireShared(int acquires) {
-
return nonfairTryAcquireShared(acquires);
-
}
-
// Sync#nonfairTryAcquireShared 方法
-
int nonfairTryAcquireShared(int acquires) {
-
// 当多线程竞争比较激烈, 该for循环会进行多次.
-
for (;;) {
-
// 获取当前状态
-
int available = getState();
-
// 判断剩余允许线程
-
int remaining = available - acquires;
-
// 通过CAS保证多线程操作.
-
// 最后返回剩余. 假设当前剩余2个. 要使用1个.
-
// if执行(没有其他线程竞争)完成, 则最后返回1个.
-
if (remaining < 0 ||
-
compareAndSetState(available, remaining))
-
return remaining;
-
}
-
}
doAcquireSharedInterruptibly 方法
假设上面方法 getState()
方法返回0, 期望使用1个, 则计算得到remaining = -1
, 则最后返回-1. 因此会进入到下面的方法doAcquireSharedInterruptibly(int)
-
// 假设传入的参数为1.
-
private void doAcquireSharedInterruptibly(int arg)
-
throws InterruptedException {
-
// 将调用线程封装了共享型Node, 加入到双向链表的队尾
-
final Node node = addWaiter(Node.SHARED);
-
boolean failed = true;
-
try {
-
for (;;) {
-
// 记录node的前任
-
final Node p = node.predecessor();
-
// 前任是头节点, 则尝试去获锁
-
if (p == head) {
-
int r = tryAcquireShared(arg);
-
// 获锁成功,设置头节点,并且进行传播
-
if (r >= 0) {
-
setHeadAndPropagate(node, r);
-
p.next = null; // help GC
-
failed = false;
-
return;
-
}
-
}
-
// 获锁失败, 判断是否进行睡眠, 若不睡眠就进行下次循环.
-
if (shouldParkAfterFailedAcquire(p, node) &&
-
parkAndCheckInterrupt())
-
throw new InterruptedException();
-
}
-
} finally {
-
if (failed)
-
cancelAcquire(node);
-
}
-
}
获取锁流程总结
Semaphore获取锁的过程总结为如下:
- 判断是否满足获取锁条件, 关键方法
nonfairTryAcquireShared
. - 若获取锁成功,则也会修改
state
. - 若获取锁失败,关键方法
doAcquireSharedInterruptibly
阻塞的获取锁.- 添加到双向链表
- 若是头节点后继, 则尝试获取锁, 否者则判断进入睡眠等待唤醒, 唤醒后继续执行3.2
- 若不进入睡眠,则直接运行到3.2步
Semaphore释放锁流程
Semaphore释放锁两个方法.
- release() 释放一个许可
- release(int) 释放int个许可
该两个方法都会调用AQS#releaseShared(int)
方法, 使用release()
方法,则参数为1, 使用release(int)
方法, 则参数为int.
releaseShared 方法
-
// 释放共享锁
-
public final boolean releaseShared(int arg) {
-
// 调用Semaphore#tryReleaseShared方法.
-
if (tryReleaseShared(arg)) {
-
// tryReleaseShared释放成功, 则释放双向链表中head的后继
-
doReleaseShared();
-
return true;
-
}
-
return false;
-
}
tryReleaseShared 方法
-
// Semaphore#tryReleaseShared
-
protected final boolean tryReleaseShared(int releases) {
-
for (;;) {
-
// 获取当前的许可, 有并发问题
-
int current = getState();
-
// 计算释放之后的许可数量
-
int next = current + releases;
-
if (next < current) // overflow
-
throw new Error("Maximum permit count exceeded");
-
// 自旋(通过CAS)设置状态. 设置成功则返回true.
-
if (compareAndSetState(current, next))
-
return true;
-
}
-
}
doReleaseShared 方法
-
private void doReleaseShared() {
-
for (;;) {
-
// 记录当前head
-
Node h = head;
-
// 队列中含有等待的节点
-
if (h != null && h != tail) {
-
// 记录头节点等待状态
-
int ws = h.waitStatus;
-
// 有下一个节点需要唤醒
-
if (ws == Node.SIGNAL) {
-
// CAS 设置状态, 若没有成功, 则是并发导致失败.
-
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
-
continue;
-
// 唤醒后继.
-
unparkSuccessor(h);
-
}
-
// 并发情况下,可能会出现wa为0,需要状态为PROPAGATE,保证唤醒
-
else if (ws == 0 &&
-
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
-
continue;
-
}
-
if (h == head)
-
break;
-
}
-
}
释放锁总结
Semaphore释放锁的过程总结为如下:
- 释放N个许可, 因为存在并发释放, 需要CAS确保设置更新后的值.
- 唤醒双向链表中有效的等待节点. (可能存在并发问题,引入PROPAGATE状态)
- 被唤醒的节点调用获取锁的流程.
图解Semaphore
-
public static void main(String[] args) throws InterruptedException {
-
Semaphore sem = new Semaphore(2);
-
for (int i = 0; i < 5; i++) {
-
Thread thread = new Thread(() -> {
-
try {
-
sem.acquire();
-
TimeUnit.SECONDS.sleep(5);
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
} finally {
-
sem.release();
-
}
-
});
-
thread.start();
-
}
-
}
上面的程序, 通过添加不暂停断点输出日志信息来查看全部流程. 将输出拆分, 方便查看.
日志的第一部分:
节点中的数字代表线程标识, 为x表示没有记录线程. 最右边为抢到锁的节点, 里面记录的是线程号.
日志第二部分:
- 由于 线程0 和 线程1 开始释放锁, 并且都更新state的状态
- 线程0 和 线程1 同时去唤醒队列中下一个有效节点, 存在并发问题
- 线程0成功唤醒, 设置线程2的节点的waitStatus为0, 线程1去唤醒, 发现有人已经设置过, 所以设置线程2的节点的waitStatus为PROPAGATE(传播).
- 线程0唤醒完毕,退出释放锁方法. 线程2抢锁成功, 并且线程1也随之退出释放锁的方法.
线程2抢锁成功之后的图:
日志第三部分:
- 线程2唤醒线程3的节点, 线程2唤醒任务结束.
- 线程3成功获取锁, 线程3去唤醒线程4. 线程3唤醒任务结束.
日志第四部分:
- 线程4尝试获得锁, 最后失败.
- 判断能否进入睡眠, 发现前任的waitStatus没有设置成SIGNAL, 因此不能睡眠, 再次尝试.
- 尝试失败, 进入睡眠.
日志第五部分:
- 线程2和线程3依次释放锁, 并且唤醒队列中下一个线程.
- 线程2 唤醒 线程4, 线程4去抢锁, 线程2唤醒任务结束,退出释放锁方法.
- 线程4尝试抢锁, 发现抢锁成功(后续还需要设置在队列中的状态等,所以并不是最终完成).
- 线程3由于线程2修改了头节点, 因此线程3设置头节点状态为PROPAGATE.
- 线程4和3唤醒任务结束.
第五部分执行结束后:
- 线程4获取锁.
- 头节点因此线程3和线程2并发唤醒队列中的线程,导致线程3第一次失败, 而第二次修改的时候,线程4已经将头节头改变, 但是碰巧列表中已经没有等待的节点,所以头节点的waitStatus为0, 因此线程3将头节点的waitStatus设置为PROPAGATE.
- 线程4获取锁后, 会将封装线程4的节点中的线程置为null, 方便为GC回收.
日志第六层部分:
- 线程4释放锁, 进入
doReleaseShared
方法, 发现队列中已经没有节点.
因此AQS中最后的队列就下图所示:
结束语
- 了解Semaphore锁的释放和获取流程
- 了解Semaphore的底层逻辑
- 了解AQS底层的共享锁模式