前几篇文章,我们详细描述了线程池的原理以及核心代码,以及阻塞队列,ReentrantLock的核心流程,本篇主要介绍下lock锁中的condition
公平锁和非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
非公平锁
//非公平锁
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
//CAS
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { //公平锁 需要判断当前队列是否有线程在等待
if (compareAndSetState(0, acquires)) {} //非公平锁 不需要
非公平锁,会先进行CAS获取锁,抢到了就直接返回。
Condition
condition需要依赖于ReentrantLock,调用await 进入等待或者是singale唤醒,都需要获取锁才可以操作。
一个ReentrantLock可以创建多个condition条件。
public Condition newCondition() {
return sync.newCondition();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
/**
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { }
condition核心流程其实就是两个,一个是await以及signal。
其实就是两个队列,一个是AQS本身的等待队列(双向链表),另一个就是Condition条件队列。
await() 会将已经获取lock锁的线程,释放锁,然后封装成一个Node节点,添加到条件队列尾部中。挂起。
signal() 将condition队列的头节点移动到AQS等待节点尾部,等待再次获取锁。
lock.lock();
await();
// 1.释放当前lock锁,从AQS队列中删除
// 2.将当前节点封装成Node 添加到条件队列尾部 进行阻塞。
// 3.当condition调用siganl的时候,删除在条件队列中的节点,转移到AQS中,等待获取锁。
// 4.获取到锁之后,执行剩余流程,进行unlock();
lock.unlock();
//前驱节点的引用
volatile Node prev;
//后继节点的引用
volatile Node next;
//本线程
volatile Thread thread;
// 这个是在condition中用来构建单向链表
Node nextWaiter;
await
// 持有锁的线程,执行await
//1.将当前线程封装为Node,仍到Condition的单向链表中
//2.将锁资源释放 确认Node没有在双向链表中
//3.挂起线程
public final void await() throws InterruptedException {
//判断线程是否中断 中断异常。
if (Thread.interrupted())
throw new InterruptedException();
//构造一个新的等待队列Node 加入到队尾
Node node = addConditionWaiter();
//释放当前线程的独占锁,不管重入几次,state = 0
int savedState = fullyRelease(node);
int interruptMode = 0;
//当前节点没有在同步队列上,还没有被singal 将当前线程阻塞
// 双向链表是否在同步队列,
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 被中断直接推出自旋
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 退出上面 自旋 当前节点已经在同步队列中了,
// 但是当前节点不一定在同步队列队首, acquireQueued将阻塞直到当前节点成为队首
// 即当前线程获得了锁。然后await()方法就可以退出了,让线程继续执行await()后的代码。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
节点添加到等待队列
//尾部方式插入到队尾
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
// 如果条件队列的最后一个节点取消了 清楚
// 单向链表有值,走这个逻辑
if (t != null && t.waitStatus != Node.CONDITION) {
// 这个方法会遍历整个条件队列,然后会将已取消的所有节点清除出队列
// 将当前Node脱离单向链表
unlinkCancelledWaiters();
// 更换新的
t = lastWaiter;
}
// 将当前Node 封装成Node
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 当前Node是否为空,放到单线链表中
// 这个操作持有锁的线程才能做
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
完全释放独占锁
// 完全释放独占锁
final int fullyRelease(Node node) {
boolean failed = true;
try {
//获取state的值
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
等待进入阻塞队列
while (!isOnSyncQueue(node)) {
// 当前线程park 等待singal()
LockSupport.park(this);
// 被中断直接推出自旋
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 判断当前节点是否在阻塞队列中
final boolean isOnSyncQueue(Node node) {
// 如果当前节点状态是CONDITION或node.prev是null,
// 则证明当前节点在等待队列上而不是同步队列上。之所以可以用node.prev来判断,
// 是因为一个节点如果要加入同步队列,在加入前就会设置好prev字段。
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 如果node.next不为null,则一定在同步队列上,因为node.next是在节点加入同步队列后设置的
if (node.next != null) // If has successor, it must be on queue
return true;
//前面的两个判断没有返回的话,就从同步队列队尾遍历一个一个看是不是当前节点。
return findNodeFromTail(node);
}
signal
上面的操作,可以看到线程lock.lock 之后被await 阻塞了,那么就需要另外一个线程进行唤醒。
// 将当前线程唤醒,并且需要仍到AQS双向链表中,同时断开现在存在的单向链表
public final void signal() {
// 当前执行singel的线程,必须是持有锁的线程
if (!isHeldExclusively())
// 当前执行singal的方法 没有持有锁
throw new IllegalMonitorStateException();
// 拿到了第一个被await的Node
Node first = firstWaiter;
// 如果没有first 为null , 没await 直接告辞结束
if (first != null)
//condition有 ,first节点进行唤醒
doSignal(first);
}
// 从前往后 遍历 找到第一个需要转移的node
private void doSignal(Node first) {
do {
// 判断当前Node是否是唯一的一个节点
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 断开存在的单向链表
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
// 将节点从条件队列转移到阻塞队列
// true 成功转移
// false 转移失败
final boolean transferForSignal(Node node) {
// 修改当前节点的状态 -2 > 0
// node在添加的时候 是-2
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
// 修改失败
return false;
// sigal 将Node的state -2 -> 0
// 执行enq方法,放入双向链表的内部 尾部
Node p = enq(node);
// 拿到上一个node的状态
int ws = p.waitStatus;
// 之前节点是取消状态
// 节点正常 但是cas
// 避免刚刚加入的AQS节点 无法被唤醒
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
获取独占锁
上面说了,当signal 将节点从conditon队列加入到阻塞队列中,这里的while循环就会跳出来。
while (!isOnSyncQueue(node)) {
// 当前线程park 等待singal()
LockSupport.park(this);
// 被中断直接推出自旋
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 退出上面 自旋 当前节点已经在同步队列中了,
// 但是当前节点不一定在同步队列队首, acquireQueued将阻塞直到当前节点成为队首
// 即当前线程获得了锁。然后await()方法就可以退出了,让线程继续执行await()后的代码。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取node的prev节点
final Node p = node.predecessor();
// p == head 说明当前节点虽然进到了阻塞队列,但是是阻塞队列的第一个,因为它的前驱是head
if (p == head && tryAcquire(arg)) {
// //到这里说明刚加入到等待队列里面的node只有一个,并且此时获取锁成功,设置head为node
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// // 到这里,说明上面的if分支没有成功,要么当前node本来就不是队头,
// // 要么就是tryAcquire(arg)没有抢赢别人,继续往下看
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
小结
总的来说,Condition的本质就是等待队列和同步队列的交互:
当一个持有锁的线程调用Condition.await()时,它会执行以下步骤:
- 构造一个新的等待队列节点加入到等待队列队尾
- 释放锁,也就是将它的同步队列节点从同步队列队首移除
- 自旋,直到它在等待队列上的节点移动到了同步队列(通过其他线程调用signal()) 或被中断
- 阻塞当前节点,直到它获取到了锁,也就是它在同步队列上的节点排队排到了队首。
当一个持有锁的线程调用Condition.signal()时,它会执行以下操作:
从等待队列的队首开始,尝试对队首节点执行唤醒操作;如果节点CANCELLED,就尝试唤醒下一个节点;如果再CANCELLED则继续迭代。
对每个节点执行唤醒操作时,首先将节点加入同步队列,此时await()操作的步骤3的解锁条件就已经开启了。然后分两种情况讨论:
- 如果先驱节点的状态为CANCELLED(>0) 或设置先驱节点的状态为SIGNAL失败,那么就立即唤醒当前节点对应的线程,此时await()方法就会完成步骤3,进入步骤4.
- 如果成功把先驱节点的状态设置为了SIGNAL,那么就不立即唤醒了。等到先驱节点成为同步队列首节点并释放了同步状态后,会自动唤醒当前节点对应线程的,这时候await()的步骤3才执行完成,而且有很大概率快速完成步骤4.