必要掌握技术
阻塞,唤醒与中断
阻塞与唤醒
LockSupport的park使用的是Parker->park()
synchronized的wait,synchronized的monitorenter使用的是ParkEvent->park(),
而LockSupport的unpark,Parker->unpark()
synchronized的signal,synchronized的monitorexit都是使用的ParkEvent->unpark()。
Parker和ParkEvent是两个极为相似的类,都使用pthread_cond_wait这个方法完成阻塞,pthread_cond_signal这个方法来唤醒。
其中monitorexit并没有看到过源码解析,但是monitorenter在阻塞时用的是park,那么monitorexit基本也会是unpark。
中断
中断和唤醒的两大关系
如果中断,那么就无法阻塞(或者会被从阻塞中唤醒),这和各自的实现有关,如Parker->park()会直接返回而避免进入阻塞。
此外,如果使用的是Parker->park()(也就是LockSupport.park(),须知AQS是使用此方法完成阻塞,所以很多并发组件都是这类),如果曾经中断过,就算清除了中断位,也会造成下一次park的豁免。
这是因为interrupt调用了Parker->unpark(),将许可证赋值为1了。
park先判断许可证是否为1,如果有许可证直接许可证归0然后返回,再判断是否中断,如果中断直接返回,如果都不满足,才会继续执行阻塞逻辑,被唤醒后,会将许可证归0,结束方法。
unpark会直接将许可证赋值为1,然后尝试唤醒阻塞线程。
更多源码阅读可以观看参考资料
对于中断报错的思考
为什么synchronized监视器不存在异常报错,而synchronized的wait()存在,这是因为JVM在阻塞唤醒后添加了对中断位的判断,如果判断通过,会爆出一次,拥有这个异常的方法如wait/sleep/join等。
对于中断无法对synchronized监视器锁起作用的思考
interrupt虽然可以唤醒阻塞的线程,但是完成park阻塞的块被死循环了,就算唤醒,也会重新回到park,而中断并没有被设置为break的条件。
参考资料
Parker和ParkEvent
https://blog.csdn.net/qq_31865983/article/details/105184585
monitorenter源码解析
https://blog.csdn.net/u013643074/article/details/125596328
park源码解析
https://blog.csdn.net/fengyuyeguirenenen/article/details/122997847
https://blog.csdn.net/weixin_43406582/article/details/119540260(内含interrupt源码)
wait源码解析
https://blog.csdn.net/qq886666/article/details/124101527
关于中断异常的解析
https://www.cnblogs.com/niuyourou/p/12392942.html
阻塞队列和等待队列
阻塞队列和同步队列是JUC锁的两个基本机制,对于synchronized而言,获取不到监视器锁的线程会进入阻塞队列,对于AQS来说,tryAcquire()竞争失败的节点也会进入阻塞队列。synchronized的wait()会使获取监视器锁的节点放弃锁,而进入唯一的等待队列,AQS中condition的await()会使lock锁的节点放弃锁,进入对应condition的等待队列。当他们被唤醒时,就会执行wait()/await()中的后续逻辑,重新获取锁(依然需要和其他线程竞争),获取到之后便会从wait()/await()返回,执行原先调用wait()/await()点的后续逻辑。
AQS中的Condition
await()
首先会通过addConditionWaiter()向等待队列添加节点,然后尝试释放锁,持续陷入阻塞,直到被中断或者判断在阻塞队列了。
public final void await() throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
//如果不在阻塞队列
while (!isOnSyncQueue(node)) {
//阻塞
LockSupport.park(this);
//中断的判断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//进入阻塞方法(acquireQueued),这个方法在获取独占锁时有详细说明,这个方法的返回值是中断位,如果被中断了并且没有发生异常,那么interruptMode = REINTERRUPT,后期就按中断处理了
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // 如果后面添加了其它节点,进行清除,在丢出取消节点有详细描述
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
等待队列添加节点
如果当前尾节点不是Condition状态,会使用unlinkCancelledWaiters()从头到尾清除一遍。然后插入一个状态为CONDITION的尾节点。
为什么阻塞队列的遍历往往是从尾到头,而等待队列可以从头到尾,这是因为等待队列的操作已经默认是在锁中进行了,没有阻塞队列激烈的头节点判断机制。
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
释放锁
再次提醒,释放锁是await()的一部分,用于从阻塞队列取出这个节点。
释放锁的逻辑相对简单,实际上就是调用了release(),对于release()的解析可以看释放独占锁章节。
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
//如果失败,会直接返回取消状态
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
值得注意的是,释放锁的过程中,重写的tryRelease()往往会尝试判断是否拥有独占锁,如果没有,那么就会报错。也就是说,执行await()必定是线程安全的语义是必须自己重写完成的。以下是ReentrantLock的重写示范。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
/**
如果当前线程没有拥有锁,就会报错提示
**/
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
如果没有拥有锁而报错,线程会进入fullyRelease(Node node)的finally块,将节点置为取消状态,然后在signal执行transferForSignal(Node node)时由于状态不是等待而被直接剔除(见signal()的正式唤醒逻辑)。
是否在阻塞队列的判断
这里有一点容易混淆,prev和next并不是等待队列的指针,而是阻塞队列的,等待队列是一个以nextWaiter为指针的单向队列。
如果node状态为CONDITION(发生了await()),或者前继节点为空(阻塞队列节点必有前指针),会判断不在阻塞队列。如果有后继节点,那么必定在阻塞队列。如果都不符合,可能暂时位于尾部,那会从尾到头尝试从阻塞队列找到该节点,如果找不到,说明还在等待队列,返回false。
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
将取消节点丢出
private void unlinkCancelledWaiters() {
//取出头节点
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
//如果当前节点不是等待状态,就将其断开连接,然后让trail连接上下一个。可以将trail理解为上一个状态为CONDITION的节点。
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
signal()
如果没有拥有独占锁,报错。如果等待队列有节点进入唤醒。
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
正式唤醒逻辑
将首节点向后移一个。
如果没有下一个了,就把尾指针也清空。
然后清空不需要的节点。
直到唤醒一个正在等待的节点或遍历完为止。
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
尝试将状态从CONDITION换成正在运行,如果成功了,就会将其入队(关于enq()逻辑可以看追加尾节点章节),p为node在阻塞队列中的前继节点。如果前继节点是取消状态,或者单次尝试把前继节点状态换成阻塞失败,那么会唤醒当前节点的线程,这时节点的线程会继续执行wait()中的逻辑(见await()章节),它会进行前继节点signal状态更新然后完成自我阻塞(调用的acquiredQueue()逻辑见自我阻塞章节)。
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* yingyufanyi
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
AQS的意义
AQS的作用是减少锁争抢带来的消耗,它阻塞了除获取到锁的头节点外的其它节点的线程,并将阻塞线程以队列的形式保存起来。当头节点的线程释放锁后,会唤醒自己的后继节点,唤醒的节点会将自己设为新的头节点。在AQS抽象类中实现了从队列增删,中断,时间判断的功能。而获取锁和释放锁的逻辑待实现,因为研发者希望让它尽可能地作为更多并发工具的组件。
重写方法
AQS中有五个待重写方法
以下两个用来尝试获取和释放独占锁
tryAcquire(int arg)
tryRelease(int arg)
以下两个用来尝试获取和释放共享锁
tryAcquireShared(int arg)
tryReleaseShared(int arg)
最后一个用来判断当前线程是否占有着独占锁
isHeldExclusively(),在AQS的父类中,有着setExclusiveOwnerThread()和getExclusiveOwnerThread()来判断线程的独占关系
3种获取独占锁的方式
acquire(int arg)
简要介绍
如果获取锁失败,那么会加到尾节点,然后自我阻塞,通过if会加一个中断位。这是因为在acquiredQueue中,会清除中断,避免失败阻塞。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
尝试获取锁
这个方法由继承子类重写,体现了一种多态性,需要明白的是获取锁不仅可以在里面加入一些公平或非公平的争抢,还可以加入多次CAS尝试。所以一次尝试tryAcquire并非一次尝试获取锁,我们可以根据自己业务需求,额外增加尝试次数而减少阻塞带来的上下文切换消耗。
追加尾节点
addWaiter是用于追加到尾节点的,mode只有Node.EXCLUSIVE和Node.SHARED这两种静态成员常量。if判断中,是尝试获取一次尾节点,如果可以成功的话,说明没有竞争,直接返回。
CAS前优先完成prev指针指向,这样,在CAS成功的那一刻,队列就已经连通了,这也和队列遍历顺序有关(队列是从尾到头遍历的),也和AQS经常使用前继节点作为判断条件有关。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
事实上,在AQS构造函数是空的,并没有完成对于AQS队列的初始化,只有在第一次被获取锁时才会开始判断,如果尾节点是null,那么会尝试初始化一个节点作为首尾节点。
之后会一直进入else,陷入尾节点的尝试加入。这和addWaiter中if逻辑一致,这样写的好处是,在低并发的时候可以无视enq方法,给人一种简化的感觉,如注释所言,一条快捷的路径。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
自我阻塞
for循环就是阻塞的逻辑。如果当前节点前继节点为头节点,就会获得一次尝试获取的机会,如果成功获取,那么会将自己设为头节点,然后返回中断位。
如果parkAndCheckInterrupt()为true,也就意味着之前是被中断的,那么会把中断位记录下来,等到返回acquire()层会调用selfInterrupt()实现自我中断。
如果中间出现了错误,那么会直接进入failed,将节点置为取消状态。cancelAcquire源码解析放在删除节点中。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
如果前一个节点处于等待唤醒状态,那么返回true。
如果前一个节点被取消了(只有这个情况waitStatus>0),那么把它给删除,直到遇到第一个不被取消的节点为止,由于头节点的存在,所以并不会出现空指针。最后把有效的前继节点指向自己,中间的就会因为不可达而被回收。否则把前继改为等待唤醒状态。
node.prev = pred = pred.prev;这句有点绕口,虽然逻辑没错,但一般应该是pred = pred.prev; node.prev =pred ;这是因为取消时候这两种情况都返回了false,这是因为出现这两种情况可能会让节点拥有获得锁的机会,因为它上一个节点可能已经是头节点了。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
阻塞当前线程并返回是否清空中断位
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
acquireInterruptibly(int arg)
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
很简单,首先加入队列尾,然后判断是否轮到自己,在第二个if中将自己前面节点设为signal,唯一不同的点在于通过第二个if后并不是保存中断,而是直接抛出异常。
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
tryAcquireNanos(int arg, long nanosTimeout)
这部分重复代码依旧很多,额外的逻辑如下:
deadline是预计终止时间,在for循环中,每次nanosTimeout都会更新剩余时间,如果小于1秒,那么不会进入阻塞,这是因为线程阻塞和唤醒也需要时间,1秒钟内时让它循环判断,可以提高响应速度,给人更准时的感觉。另外,此方法也响应中断,中断发生后会进入finally逻辑删除当前节点。
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
释放独占锁
tryRelease(arg)需要自己实现,需要注意的是由于重入问题,即使成功执行一次,也只是减少一次重入,而继续返回false。
head为null说明队列没有完成初始化,head状态为0说明队列只完成了初始化,或者曾经被唤醒过。
如果成功释放了锁,那么会唤醒下一个节点。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
唤醒后继节点
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
//首先完成节点状态归0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
//如果下一个被取消了,那么从尾到头寻找第一个待唤醒节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//如果下一个没有被取消,那么直接唤醒下一个就行了
if (s != null)
LockSupport.unpark(s.thread);
}
获取共享锁
尝试获取一次共享锁,如果失败,加入队列,自我阻塞。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
尝试获取共享锁
如果没人拥有写锁或者自己拥有写锁,就可以在其中持续尝试获取读锁。可以分为公平和非公平,公平情况下,可以使用hasQueuedPredecessors()阻止后继节点提前获取读锁(即使不用也不会造成共享语义的错误)。
是否是读锁可以通过state或者waitStatus来判断。
在ReentrantReadWriteLock中,就以state的前16位记录目前持有读锁的线程的数量,后16位记录目前持有写锁的线程的数量。
doAcquire执行
尝试队尾加入节点(在acquire(int arg)的追加尾节点有相应逻辑),然后自我阻塞,每次被唤醒后会尝试获取共享锁,获取成功,将自己设为头节点。
在源码中Node.SHARED对应的是共享节点,
private void doAcquireShared(int arg) {
//addWaiter方法在acquire(int arg)的追加尾节点有相应逻辑
final Node node = addWaiter(Node.SHARED);
//老套路了,成功执行改为false,不成功finally用来将节点设为取消状态
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这里将当前点设置成新的头节点,但是并没有唤醒后继节点,而是放在doReleaseShared中唤醒。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
如果本身就是活跃的,那么说明它是其中共享的一个读节点,目前作为队列最前的头节点,需要设置状态为共享,这样其它进入的线程才能正确判断读状态。如果都执行完,就退出,(h==head 不满足说明其他线程修改了头节点,那需要从头进行head的判断)。
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
//h!=null和h!=tail保证了阻塞队列至少有一个Node
if (h != null && h != tail) {
int ws = h.waitStatus;
//如果这个节点被设置成SIGNAL,说明需要释放后续节点
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
//如果不为signal,说明后面还有节点保持读状态,将节点改为PROPAGETE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
删除节点
首先判断节点是否已经被删除了,因为删除本身并没有加锁。
取消的逻辑在自我阻塞时也有出现过,那多线程取消是否会出现并发数据安全问题,并不会。因为删除过程并没有动任何的其他点的prev指针,它们都可以共同向前遍历。
如果自己是尾节点,直接去掉就好。
否则,会判断自己是否是头节点,或者头节点的竞争者,如果是其中某种情况,会唤醒后继者,否则把前继节点指向后继节点。
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}