前言
前段时间在看ReentrantLocak及其依赖的基础模板类AbstractQueuedSynchronizer的源码,自己看源码同时在网上找文章,然而对源码中的一些细节问题还是不太理解,比如为什么阻塞的线程因中断被唤醒后需要重新中断,为什么被Condition阻塞的线程因中断被唤醒后需要抛出InterruptedException异常。网上的文章对这些细节的问题都没有讲清楚,所以我把这些搞清楚之后就想也写个博文记录一下。
ReentrantLocak简介
ReentrantLock和synchronized一样是一种可重入锁,但是其相比synchronized而言增加了一些高级功能,分别是以下3点:
1、等待可中断
如果当前持有锁的线程长期不释放,正在等待的线程可以放弃等待,改为处理其他事情。
2、公平锁
多个线程在等待同一个锁时,必须按照申请锁的时间顺序来依次获得锁;而非公平锁则不保证这一点,在锁被释放时,任何一个等待锁的线程都有机会获得锁。synchronized的锁是非公平的,ReentrantLocak在默认情况下是非公平的,但是可以带布尔值的构造函数创造公平锁。
3、锁绑定多个条件
一个ReentrantLocak对象可以帮到多个Condition对象。
ReentrantLock使用方式如下:
public class TestCondition {
public static void main(String[] args) throws InterruptedException {
ReentrantLock reentrantLock = new ReentrantLock();
Condition condition = reentrantLock.newCondition();
Thread t1 = new Thread(()->{
try {
//使用锁
reentrantLock.lock();
System.out.println("t1 get lock and await ... ");
Thread.sleep(2000);
//让出锁,造成当前线程在接到信号或被中断之前一直处于等待状态
condition.await();
System.out.println("t1 continue ... ");
Thread.sleep(2000);
System.out.println("t1 return ... ");
} catch (Exception e) {
e.printStackTrace();
}finally {
reentrantLock.unlock();
}
}) ;
Thread t2 = new Thread(()->{
try {
reentrantLock.lock();
System.out.println("t2 get lock and signal ... ");
Thread.sleep(2000);
//唤醒一个等待线程,该线程从等待方法返回前必须获得与Condition相关的锁
condition.signal();
System.out.println("t2 continue ... ");
Thread.sleep(2000);
System.out.println("t2 return ... ");
} catch (Exception e) {
e.printStackTrace();
}finally {
reentrantLock.unlock();
}
}) ;
t1.start();
Thread.sleep(1000);
t2.start();
t1.join();
t2.join();
}
}
运行结果如下:
ReentrantLock源码
同步器的模板AQS
什么是AQS
AbstractQueuedSynchronizer类简称AQS,是ReentrantLock依赖的基础类,提供了很多实现线程同步的模板方法,比如如何将线程加入等待队列,但是对于如果获取锁和如何释放锁等细节可以由同步器(如ReentrantLock)自定义实现。
ReentrantLock里面定义了Sync抽象类,Sync有两个实现类分别是NonFairSync非公平锁和FairSync公平锁。ReentrantLock内部封装Sync属性,表示ReentrantLock包含的同步锁。
AQS实现线程同步
一、同步锁实现的基本原理
ReentrantLock和synchronized是Java语言最常用的两个可重入的互斥同步锁,两者的实现原理有相似的地方,在此笔者想回顾以下synchronized的底层原理:
“synchronized关键字经过编译之后,会在同步代码块的前后分别形成monitorenter和monitorexit两个字节码指令,这两个字节码指令前后都需要一个引用类型的参数来指定要锁定或解锁的对象。可以在使用synchronized传入实参来明确要锁定的对象。如果没有传入实参,那么根据synchronized修饰的方法类型,如果修饰的是静态方法,则锁定类对应的Class对象,如果修饰的是实例方法,那么锁定的是代码块所在的对象。
在执行字节码指令monitorenter时,首先会尝试获取对象的锁,如果这个对象没有被锁定,或者当前线程已经持有了对象的锁,就把锁计数器的值增加1,而在执行monitorexit时会把计数器的值减去1。如果计数器的值为0,锁就被释放了。如果获取对象的锁失败,当前线程就会阻塞等待,直到对象的锁被锁定它的线程释放为止。在线程阻塞等待期间,无法强制其中断或者超时退出”。——摘自《深入理解Java虚拟机》
下面再看AQS的实现原理
1、ReentrantLock的组成模板AQS在实现可重入锁的原理上和synchronized类似,其内部定义了一个state变量作为锁计数器,当state的值为0时,表示当前没有线程持有锁;当state的值为1时,表示有一个线程持有了锁;当state的值大于1时,表示线程重入了该锁。
2、AQS会使用一个属性来记录当前是哪个线程持有了锁。
3、AQS内部维护了一个队列,常称之为同步队列,让所有需要排队等待锁的线程都加入队列中并且进入阻塞状态。然后当持有锁的线程释放锁以后,唤醒位于队列头部的线程。
4、为了支持线程可以在某个特定的条件下等待或者唤醒,即实现Condition接口的功能,AQS内部还需要有另外一个队列,常称之为条件队列。Condition实例让一个线程进入等待状态时,该线程会被放入到条件队列。直到调用signal或signalAll方法后,再将线程转移到外部类AQS的等待队列中,线程需要获取到AQS等待队列的锁,才可以继续恢复执行后续的用户代码。
二、AQS实现同步锁的源码
1、AQS内定义了一个Node内部类,用来封装同步队列或条件队列中的线程,一个Node实例,代表队列中的一个节点。这里需要注意的是无论是同步队列或条件队列中的节点,用的都是Node实例,但是两个队列用来连接节点的指针属性不同,其中prev和next用来连接同步队列的节点,nextWaiter用来连接条件队列的节点。因此同步队列是一个双向链表,条件队列是一个单向链表。
另外,Node里面定义了6个线程的等待状态,如其中CANCELLED表示线程已经被取消,SIGNAL表示当前线程的后继线程被阻塞或者即将被阻塞,当前线程释放锁或者取消后需要唤醒后继线程,CONDITION表示当前节点在条件队列中等待。并且用waitStatus属性表示线程的等待状态。
//用于封装线程的节点
static final class Node {
//共享模式下等待的标记
static final Node SHARED = new Node();
//独占模式下等待的标记
static final Node EXCLUSIVE = null;
// 线程的等待状态 表示线程已经被取消
static final int CANCELLED = 1;
//线程的等待状态 表示后继线程需要被唤醒
static final int SIGNAL = -1;
//线程的等待状态 表示线程在Condtion上
static final int CONDITION = -2;
//表示下一个acquireShared需要无条件的传播
static final int PROPAGATE = -3;
//等待状态,初始值为0
volatile int waitStatus;
//指向同步队列前一个节点的指针
volatile Node prev;
//指向同步队列后一个节点的指针
volatile Node next;
//当前节点代表的线程
volatile Thread thread;
//指向条件队列中的下一个节点的指针
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
//返回队列的前一个节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
//构造方法
Node() {
}
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
//传入线程等待状态的构造方法
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
2、AQS定义了用来表示同步队列中的头尾节点以及当前锁状态的属性。
//同步队列中的头节点
private transient volatile Node head;
//同步队列中的尾节点
private transient volatile Node tail;
//锁状态,类似于synchronized的锁计数器
private volatile int state;
//获取锁状态
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
3、AQS继承了AbstractOwnableSynchronizer抽象类,该抽象类封装了当前持有锁的线程的信息,用来获取或设置当前持有锁的线程。
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 3737899427754241961L;
protected AbstractOwnableSynchronizer() { }
//当前持有锁的线程
private transient Thread exclusiveOwnerThread;
//设置当前持有锁的线程
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
//获取当前持有锁的线程
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
3、AQS获取锁的机制
AQS获取锁的机制分为两步:
第一步是使用tryAcquire方法先尝试获取锁,获取锁的方式由同步器自定义实现。
第二步是如果获取失败使用acquireQueued方法,先创建Node节点,创建之后检查当前的Node节点是不是同步队列中的第二个节点,是的话再次尝试获取锁,获取失败则阻塞当前线程。如果阻塞期间线程被唤醒,则再次检查当前的Node节点是不是同步队列中的第二个节点,是的话再次尝试获取锁,获取失败则再次阻塞线程。如此一直反复循环下去,直到线程获取锁成功。
获取锁的方法:
public final void acquire(int arg) {
//先使用tryAcquire获取锁,tryAcquire方法由同步器自定义实现
if (!tryAcquire(arg) &&
/*获取失败则先创建Node节点并且将节点放入同步队列中,
* 然后进入acquireQueued方法,acquireQueued方法内部是一个循环,
* 在循环中会做下面几件事情:
* 1、检查当前节点是不是同步队列的第二个节点,如果是第二个节点则尝试获取锁,
* 如果获取锁成功则结束循环,并且将自身设为队列的头节点。
* 2、如果当前节点不是队列的第二个节点或者获取锁失败,则将前驱节点的状态设为singal,然后挂起线程。
* 3、如果线程被唤醒,则检查线程是否中断,然后回到步骤1,线程获取到锁为止。
* */
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//走到这里,说明线程已经被唤醒并且获取到了锁。但是线程被唤醒有两个可能的原因,
//一是前面的节点释放锁时唤醒后继节点的线程,二是线程被中断导致唤醒然后恰好也抢到了锁。
//如果线程是因为中断被唤醒并抢到了锁,则需要重新中断。
//因为AQS内部用于检查线程是否中断的方法interrupted()不仅会判断
//当前线程是否存在中断标志,存在的话也会清除中断标志
selfInterrupt();
}
创建Node节点并加入同步队列的方法:
//节点插入到同步队列尾部
private Node addWaiter(Node mode) {
//创建Node节点
Node node = new Node(Thread.currentThread(), mode);
// 节点插入到队尾
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果插入不成功,则使用enq(Node node)无限循环插入
enq(node);
return node;
}
//将节点插入到链表尾部,适用于在多线程竞争环境下,
//如果一次插入失败,则无线循环插入
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
创建节点后执行的方法,在此方法中需要在每次线程被唤醒的时候检查线程所在的Node节点是不是同步队列的第二个节点,是的话尝试获取锁,如果获取失败或者节点不是队列中的第二个节点,则使用LockSupport.park()方法阻塞线程。
同时,因为线程被使用LockSupport.park()方法阻塞之后,如果被其他线程调用了自己的interrupt()方法,也会被唤醒。所以线程被唤醒有两个原因,一是前一个线程释放锁然后唤醒了下一个线程,二是被调用了线程的interrupt()中断方法。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
/*定义一个循环,如果只有线程获取到同步锁才结束循环,否则每次在线程被唤醒时,
* 都先检查其前驱节点是不是同步队列的头节点,如果前驱节点是头节点的话尝试获取锁,
* 获取失败则将再次被阻塞。当然再阻塞之前会将前驱节点的状态设为singal,以便前驱节点
* 将来在释放锁的时候能唤醒自己。
* */
for (;;) {
//获取当前节点的前一个节点
final Node p = node.predecessor();
//判断当前节点是不是同步队列的第二个节点,是的话尝试获取锁
if (p == head && tryAcquire(arg)) {
//获取成功的话将当前节点设为同步队列的头节点
setHead(node);
//将原来的头节点与队列断开,方便垃圾回收
p.next = null;
failed = false;
return interrupted;
}
//shouldParkAfterFailedAcquire方法的作用是检查前驱节点的状态设置是否为signal,
//不是的话将其设为singal,并且清理队列中已经取消的节点。
if (shouldParkAfterFailedAcquire(p, node) &&
//如果其前驱节点的状态为singal,则进入parkAndCheckInterrupt方法。
//parkAndCheckInterrupt的作用是挂起线程,并且在线程唤醒时检查线程是否中断
//因为线程被唤醒有两种可能,一种是前驱节点释放锁时唤醒后记节点的线程,另一种
//是因为中断导致的唤醒,这时就需要在线程获取到锁以后重新中断。
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
将前一个节点的等待状态设为SIGNAL的方法:
//将前驱节点的等待状态设为SIGNAL
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//如果前一个节点的等待状态是signal,返回true
if (ws == Node.SIGNAL)
return true;
//如果前一个节点的线程已经中断或者取消,将其清理出同步队列
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//通过CAS将前一个节点的等待状态设为signal
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
挂起线程,当线程被唤醒的时候返回现在中断状态的方法:
//挂起线程,当线程被唤醒的时候返回现在的中断状态
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
//检查线程是否是因为中断被唤醒,以便后续决定是否重新中断线程
return Thread.interrupted();
}
让节点取消获取锁的方法:
//让节点取消获取锁
//当一个节点需要取消获取锁的时候,如果取消的节点是头节点或者取消节点的前驱节点不是signal状态,
//则会导致队列僵死,因为其后继节点的线程已经没有其他线程能将其唤醒。因此需要在特定的情况下对
//后继节点进行一次唤醒,以防止队列僵死。
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
//遍历并更新节点前驱,把node的prev指向前部第一个非取消节点。
Node pred = node.prev;
//如果前驱节点的状态大于0,即为cancel,说明是取消节点,则继续向前遍历
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
//记录pred节点的后继为predNext,后续CAS会用到
Node predNext = pred.next;
//直接把当前节点的等待状态置为取消,后继节点即便也在cancel可以跨越node节点。
node.waitStatus = Node.CANCELLED;
//如果取消的节点是尾节点,则只需要重置队列的尾节点,
//不影响队列原来的节点排序,不需要做其他动作
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
//如果取消的节点不是尾节点,则可能需要唤醒一次后继节点,避免队列僵死
//只有满足以下条件才不用唤醒:
//1、取消的节点不是头节点;2、取消的节点的前置节点的线程不为null;3、取消的节点的前置节点
//的等待状态是signal或者可以被设置为signal。如果满足这3个要求,只要将取消节点的前后节点
//连接起来就可以了。因为其前驱结点释放锁的时候,会唤醒后继结点,就不会让队列僵死。
int ws;
//如果node的后继节点next非取消状态的话,将node的前一个节点与后一个节点连起来
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
//将取消节点的前置节点和后继节点连接起来
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//不满足条件,则需要唤醒一次后继结点的线程,避免队列僵死
unparkSuccessor(node);
}
//将node与其原来的下一个节点断开
node.next = node;
}
}
以下图片摘自网络:
4、AQS释放锁的机制
在AQS获取锁的机制中,当节点成功获取锁之后,会将获取到锁的节点设为同步队列的头节点,所以释放锁的也一定是同步队列的头节点。头节点释放锁之后,需要唤醒队列的下一个节点,让其竞争锁,否则队列会僵死。
//用同步器的release(int arg)方法可以释放同步状态,该方法在释放了同步状态之后,
//会唤醒其后继节点(进而使后继节点重新尝试获取同步状态)。
//因为持有锁的节点必定是同步队列的头节点,所以如果要释放锁需要唤醒头节点的下一个节点
public final boolean release(int arg) {
//tryRelease(arg)方法由同步器自定义实现
if (tryRelease(arg)) {
//如果同步队列的头节点不为空,并且等待状态不为0,
//则需要唤醒头节点的下一个节点
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
//如果节点状态不小于0,则说明节点状态为1,表示节点已经取消。
//如果节点未取消,将节点状态设为0,表示节点已经释放。
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
/*
* 获取node的后继节点s,根据条件s = null 或者 s.ws > 0,从同步队列的尾部开始遍历,
* 直到找到距node最近的满足ws <= 0的节点t,即非取消状态的一个节点。
* 为什么是ws <= 0而不是ws < 0,因为如果是队列中的尾节点,其等待状态是初始值0,
* 还没有节点加入到其后面将其等待状态改为signal。
*/
if (s == null || s.waitStatus > 0) {
s = null;
//从尾开始遍历整个队列
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//这里只需要唤醒后继线程,不需要再释放锁后将节点从同步队列中移除,
//因为在后继节点的线程被唤醒后会竞争锁,竞争到锁之后就会把自己
//设为队列的头节点,原释放锁的节点就自然从队列中移除了。
if (s != null)
LockSupport.unpark(s.thread);
}
5、独占可中断式获取同步状态
可中断式获取锁和不可中断式获取锁的方法大致相同,都是创建一个节点加入到同步队列,然后开启一个循环,循环内容为:
检查当前节点的前置节点是不是同步队列的头节点,是的话尝试获取锁,否的话将当前节点的前驱节点的等待状态设为SIGNAL,然后挂起线程。如果线程被唤醒,则再次检查当前节点的前置节点是不是同步队列的头节点……
所不同的是:
不可中断式获取锁的时候,如果遇到线程中断,则只会将中断状态记录下来,其他什么都不做,继续按照原来的方式去抢锁,直到线程成功获取到锁以后,再调用selfInterrupt()方法进行重新中断。因此采用的是延迟响应中断的方式。而对于可中断式获取锁的时候,如果遇到线程中断,则直接抛出InterruptedException异常。
下面是不可中断获取锁的方法代码:
下面是可中断获取锁的方法源码:
public final void acquireInterruptibly(int arg)
//如果获取锁的过程中遇到线程中断,则抛出InterruptedException异常
//所以acquireInterruptibly方法增加了异常抛出
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//tryAcquire方法同样是由同步器自定义的
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
//检查当前节点的前置节点是不是同步队列的头节点,
//是的话则尝试获取锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
//如果parkAndCheckInterrupt()检查到线程被中断,
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//则直接抛出InterruptedException异常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
6、独占式超时获取同步状态
这种获取方式和独占式获取同步状态的方法也基本相同,所不同的是:
非超时获取同步状态挂起线程使用的是没有时间限制的LockSupport.park方法,只要线程没被其他线程唤醒,就一直处于挂起状态。而超时获取同步状态使用的是LockSupport.parkNanos方法,会传入时间参数,当挂起线程超过了时间线程就会自动唤醒,并且线程被唤醒后如果还没有抢到锁,会直接返回失败。
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
//增加超时时间获取锁
doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
//计算超时到达的时间点
final long deadline = System.nanoTime() + nanosTimeout;
//创建节点,加入同步队列
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
//循环检测当前节点的前驱节点是不是队列的头节点,
//是的话尝试获取锁
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
//计算还有多少时间到达超时时间点
nanosTimeout = deadline - System.nanoTime();
//如果已经到达超时时间点,则获取锁失败
if (nanosTimeout <= 0L)
return false;
//判断离超时时间点还有的时间差,如果时间差超过1秒,则挂起线程,
//并且指定线程休眠时间为时间差
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
//调用LockSupport.parkNanos方法对线程挂起的时间进行控制
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
7、共享锁获取同步状态
因为共享的意思是允许多个线程同时对线程进行操作,所以每一个线程去尝试获得共享锁之后,其实都是将当前剩余的资源数量和自己需要的资源数量作差,如果最终结果小于0了,那么说明当前已经不能够允许我这个线程获取资源了。
AQS的共享锁获取与独占锁获取最主要的区别就是独占锁每次获取时返回的是代表成功或者失败的布尔值,共享锁每次获取返回的是剩余的资源数量。
当 r > 0 的时候,表示获取了arg个资源之后,还有资源剩余,剩余资源大于0个,说明资源充足,获取锁成功
当 r == 0的时候,表示获取了arg个资源之后,剩余资源为0个,表示成功获取arg资源之后没有剩余的了,刚刚够你需要的arg个资源,获取锁成功
当 r < 0的时候,表示资源根本不够,如果你要获取arg个资源之后,剩下就是小于0了,不够,说明给你获取资源失败了,获取锁失败
private void doAcquireShared(int arg) {
//创建节点封装线程,模式为共享模式
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//如果节点的前置节点是同步队列的头节点,尝试获取锁
if (p == head) {
int r = tryAcquireShared(arg);
//获取锁后剩余的资源数量大于或等于0,表示获取成功
if (r >= 0) {
//获取成功后,将当前节点设为同步队列的头节点
//如果剩余资源数量大于0,唤醒后继节点来竞争锁
setHeadAndPropagate(node, r);
//前置节点从队列中删除
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//走到这里,说明获取锁失败,则将节点的等待状态设为singal,并且挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate是将当前节点设定为头节点并且继续向下传播的方法。
(1)首先第一步获取head头节点,然后将自己node设置成头节点,因为自己已经获取资源成功了。而头节点表示获取了资源成功的节点
(2)然后判断propagate是否大于0,大于0说明还有资源剩余,应该继续传播
(3)判断head == null,说明head节点已经没了,说明等待队列队列头节点已经释放资源了,可能这时候资源又有空余了
(4)判断head.waitStatus < 0 说明后面还有人等待唤醒,你需要唤醒后面的节点让他们啦竞争资源了
private void setHeadAndPropagate(Node node, int propagate) {
//保存之前的头节点
Node h = head;
//将当前节点设为头节点
setHead(node);
//head = node;
//node.thread = null;
//node.prev = null;
//以下三种情况需要将将唤醒行为进行传播
//剩余资源数量大于0
//h == null,当前线程设置自己成为head之前,前任head已经为null,即说明前任节点已经释放了资源,就需要执行唤醒
//h.waitSatatus < 0 说明head节点后面有线程在等待前任节点唤醒
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
此处的方法理解起来较为困难,为什么h.waitStatus < 0就需要传播唤醒后继线程,主要是因为会存在下面的场景:
1、A线程获取的共享锁,将自己设为head,然后因为propagate > 0,需要执行doReleaseShared()进行传播
2、A线程执行doReleaseShared()方法将自己的waitStatus设为0,并且唤醒下一个节点的线程B
3、线程B成功获取到共享锁,执行setHeadAndPropagate方法,但是还未执行到setHead方法,此时同步队列的head节点仍然
是A线程,并且A线程的waitStatus是0。
4、此时有以前已经获得共享锁的线程C释放了共享锁,执行doReleaseShared()方法,会出现head节点A的waitStatus是0,
然后就会compareAndSetWaitStatus(h, 0, Node.PROPAGATE)),将A的waitStatus再设为-3(PROPAGATE)
5、B线程执行了setHead方法将head设为了自己,但是原head节点A的waitStatus此时是-3,所以通过判断原head节点A的waitStatus<0就可以知道有节点释放了共享锁,就需要进行传播。
共享锁传播的方法
private void doReleaseShared() {
for (;;) {
Node h = head;
//如果同步队列中有两个以上节点
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
//将节点的状态设为0,然后唤醒后继节点
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
//唤醒下一个节点
unparkSuccessor(h);
}
//head.waitStatus=0的情况有两种
//1、就是head节点没有及时更新,线程被唤醒之后获取到了锁,在更新head之前,又经过一轮循环执行到这。
// 其实是当前线程通过第一次循环将状态设置为了0,第二次循环进入的时候头节点还没有被改变
//2、head节点及时更新了,但是到了最后一个节点,它的head.waitStatus=0
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//如果head发生了变化,说明此期间有其他线程获取到共享锁并且重新设置了head
if (h == head) // loop if head changed
break;
}
}
AQS实现Condition接口
一、Condition的作用
Condition是一个可以与Lock配合实现等待/通知模式的接口,AQS则提供了Condition接口的实现类。Condition的核心API如下:
await():当前线程等待,同时释放当前锁,可以用signal()时或者signalAll()方法或者中断跳出等待,线程会重新获得锁并继续执行。
awaitUninterruptibly():与await()方法基本相同,但这个方法不会在等待过程中响应中断
singal():用于唤醒一个在等待中的线程,和notify类型
二、AQS实现Condition接口的原理
1、AQS内部同样使用一个队列存放进入等待状态的线程。
2、当线程调用await()方法进入等待状态时,此时需要释放锁。
3、线程被唤醒时,需要加入到同步队列等待获取锁。
4、如线程因为中断被唤醒或者在调用await()方法之后但是进入阻塞之前被中断,则需要抛出InterruptedException异常。至于为什么要这样做,笔者刚开始也想不明白,但是笔者看了JDK源码中对InterruptedException类的注释,笔者认为这是Java语言的规定。
下面英文注释的翻译是“当线程正在等待、睡眠或其他被阻塞的情况,并且该线程在活动之前或期间被中断时抛出”,意即当线程正在等待、睡眠或被阻塞之前或期间被中断,则需要抛出InterruptedException异常,这应该是Java语言中的一项规定。
因此,await()的调用会让线程进入阻塞状态,在阻塞之前或期间(被唤醒之前)被中断的话,需要抛出InterruptedException异常。
三、AQS中的源码
让线程进入等待状态的await()方法
await()方法的逻辑是:
1、创建一个新的节点加入条件队列中。
2、只要线程的节点没有转移到同步队列(即还在条件队列中)就使用LockSupport.park方法挂起线程,除非线程被中断。只要线程没有中断且不在同步队列里面,不管被其他原因唤醒多少次都会循环使用LockSupport.park方法将其阻塞。
3、如果线程是因为中断被唤醒(即是调用singal()方法之前中断的)则需要抛出InterruptedException异常。
3、如果线程唤醒后被中断(即即是调用singal()方法之后中断的),则需要在退出await()时重新中断。
public final void await() throws InterruptedException {
//如果节点的线程需要阻塞的时候中断,抛出InterruptedException异常
if (Thread.interrupted())
throw new InterruptedException();
//创建一个新的节点加入条件队列中
Node node = addConditionWaiter();
//释放当前节点的锁,并唤醒同步队列中的下一个节点
int savedState = fullyRelease(node);
//标记线程的中断时机,0表示未中断,-1表示在唤醒(singal)之前中断,1表示在唤醒(singal)之后中断
//以便后续依据线程中断的时机分别作出处理
int interruptMode = 0;
//如果线程没有转移到同步队列,就进行阻塞,除非线程被中断
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//如果线程被唤醒,检查其是不是被中断;如果被中断则退出阻塞并且将线程转移到同步队列,
//如果未被中断则将循环检查其是否转移到同步队列,未转移则阻塞
//checkInterruptWhileWaiting()方法用于返回线程的中断状态,如果线程未被中断则返回0,
//如果线程中断了则先将线程转移到同步队列,然后检查线程中断的时机,唤醒(singal)之前中断返回-1,
//在唤醒(singal)之后中断返回1
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//走到这一步,说明线程已经被唤醒,可能是被singal唤醒,也可能是被中断唤醒,
//唤醒之后线程使用acquireQueued()方法在同步队列获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
//如果线程是在在唤醒(singal)之后中断,则需要重新中断
interruptMode = REINTERRUPT;
//将取节点从条件队列中移除
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
//reportInterruptAfterWait()方法用于在退出await()之前,对线程中断作出处理
//如果线程在唤醒之前中断,则抛出异常;如果线程在唤醒之后中断,则需要重新中断
reportInterruptAfterWait(interruptMode);
}
将节点加入条件队列的addConditionWaiter() 方法
private Node addConditionWaiter() {
Node t = lastWaiter;
//如果尾节点的状态不是conditon,则清理掉队列中所有非conditon状态的节点
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
//经过清理,队列的尾节点已经变动,需要重新获取队列的尾节点
t = lastWaiter;
}
//创建新节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//如果尾节点为空,则原队列是空队列,此时将队列的头尾指针都指向新
//建的节点
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
检查线程中断状态的checkInterruptWhileWaiting方法
//如果当前线程被中断,则调用transferAfterCancelledWait方法判断后续的处理应该是抛出
//InterruptedException还是重新中断
//如果线程未被中断,返回0
//如果线程已经中断,则调用transferAfterCancelledWait方法
//如果在被signalled之前中断的话返回-1,在被signalled之后中断的话 返回1
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
//此方法用于,线程中断之后将节点加入到同步队列,并且返回中断的时机,是在signal之前还是之后
final boolean transferAfterCancelledWait(Node node) {
//因为节点被唤醒时等待状态会被设为0,所以此步CAS如果成功,说明节点是在唤醒之前被中断,
//此时将节点状态设为0(无状态),再将节点加入同步队列,并返回true
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
//节点加入同步队列
enq(node);
return true;
}
//说明节点在被signal之后中断的,因为signal动作已经有把节点迁移到
//同步队列的操作,所以不需要重复迁移。但是这时候并不确定节点是否已经迁移完成
//也可能在节点迁移到同步队列之后,在迁到同步队列之前,主线程让步,
//所以主线程需要等待,等待节点加入到同步队列,然后返回false
while (!isOnSyncQueue(node))
//当前线程让步
Thread.yield();
return false;
}
处理中断的reportInterruptAfterWait方法
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
//如果线程在被唤醒之前中断,则抛出InterruptedException异常
if (interruptMode == THROW_IE)
throw new InterruptedException();
//如果线程在被唤醒之后中断,则需要重新中断
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
线程唤醒的方法
当使用signal()唤醒线程的时候,会从条件队列的头节点开始唤醒
唤醒在条件队列中的头节点线程分为三步:
(1)拿到条件队列中的头节点
(2)将头节点从条件队列中删除;
(3)将原头节点的等待状态设为0
(4)节点加入同步队列
(5)将同步队列中前一个节点的等待状态设为signal
在这里面有一个并发问题,两个线程如果同时调用了signal()方法,
第一个线程拿到了条件队列中的头节点但是还没有来得及将执行firstWaiter = first.nextWaiter,然后线程挂起,
此时第二个线程开始执行,也拿到了还没更新的firstWaiter并且将其唤醒,因此可能会因为线程安全问题导致两次signal()只唤醒了一个线程。
针对这个问题,AQS的解决方式:
(1)在唤醒线程前先通过CAS机制判断节点的等待状态是不是预期值-2,如果是的话将其修改为0;如果等待状态的实际值不是-2即说明节点已经被其他线程唤醒,就返回失败。
(2)如果唤醒失败,则继续取条件队列的新头节点进行唤醒,直到唤醒成功或者条件队列中已经没有节点了为止。
//唤醒在条件队列中的节点
public final void signal() {
//如果线程已经获取了同步状态,则抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//拿到头节点进行唤醒
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
/*
* 唤醒在条件队列中的头节点线程分为5步:
* 1、拿到条件队列中的头节点
* 2、将头节点从条件队列中删除;
* 3、将原头节点的等待状态设为0
* 4、节点加入同步队列
* 5、将同步队列中前一个节点的等待状态设为signal
*
* 在这里面有一个并发问题,两个线程如果同时调用了signal()方法,
* 第一个线程拿到了条件队列中的头节点但是还没有来得及将执行firstWaiter = first.nextWaiter,然后线程挂起,
* 此时第二个线程开始执行,也拿到了还没更新的firstWaiter并且将其唤醒,这个时候就会出现第一个再次唤醒原头节点
* 时失败的现象。
* 针对这个问题,AQS的解决方式是如果出现唤醒失败,则继续唤醒条件队列中新的头节点线程,直到成功或者条件队列中的已经没有
* 节点为止。
* 因此,signal()方法的是使用CAS机制来解决并非问题的。
* */
private void doSignal(Node first) {
do {
//将条件队列的头节点指针指向队列的第二个节点
if ( (firstWaiter = first.nextWaiter) == null)
//如果队列中没有第二个节点,说明队列原先只有一个节点,
//则此时队列已经空了,将尾节点指针置为null
lastWaiter = null;
//将原头节点从队列中分离出来
first.nextWaiter = null;
//调用transferForSignal方法,将节点转移到同步队列;
} while (!transferForSignal(first) &&
//如果转移失败,说明有其他线程已经唤醒了此节点
//此时继续唤醒条件队列中新的头节点
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
//将节点的等待状态改为0,如果设置失败,说明此节点已经被其他线程唤醒了,则返回失败
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//节点的等待状态改成功后,将节点加入到同步队列的尾部
//插入后返回插入节点的前一个节点
Node p = enq(node);
//获取前一个节点的等待状态
int ws = p.waitStatus;
//如果插入后p节点的前一个节点被取消了()
//或者无法将前一个节点的等待状态设为signal
//则代表p节点无法来唤醒node节点,因此直接调用LockSupport.unpark方法唤醒node节点。
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}