在前两篇文章中,我们了解了ReentrantLock内部公平锁和非公平锁的实现原理,可以知道其底层基于AQS,使用双向链表实现,同时在线程间通信方式(2)中我们了解到ReentrantLock也是支持条件锁的,接下来我们来看下,其内部条件锁的实现原理。
条件锁的使用
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(new Runnable() {
@Override
public void run() {
lock.lock();
System.out.println(Thread.currentThread().getName()+" enter lock first");
System.out.println(Thread.currentThread().getName()+" await start");
try {
condition.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName()+" await end");
lock.unlock();
}
});
executorService.execute(new Runnable() {
@Override
public void run() {
lock.lock();
System.out.println(Thread.currentThread().getName()+" enter lock first");
System.out.println(Thread.currentThread().getName()+" start sleep");
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName()+" end sleep");
System.out.println(Thread.currentThread().getName()+" signalAll condition");
condition.signalAll();
System.out.println(Thread.currentThread().getName()+"signal end");
lock.unlock();
}
});
}
如上代码所示,一般情况下我们通过
Condition condition = lock.newCondition();
创建条件对象,使用condition.await();
表示当前线程需要等待条件才能继续执行,当线程执行到此处时,会进入等待队列等待,直到有另一个线程通过condition.signalAll();
或condition.signal();
唤醒,此时表明当前线程执行条件已具备,此时当前线程继续执行,上述代码中,当前线程会转入AQS的同步等待队列中,去等待抢占lock锁,其运行结果如下图所示:
条件锁一般适用于线程需要具备一定条件后才能正确执行的情况。
ReentrantLock.newCondition()
上文看到Condition的创建和基本用法,接下来我们来看下Condition的实现原理,跟踪ReentrantLock的执行代码如下所示:
// ReentrantLock.java
public Condition newCondition() {
return sync.newCondition();
}
// ReentrantLock内部类Sync中
final ConditionObject newCondition() {
return new ConditionObject();
}
可以看到newCondition最终返回了一个ConditionObject类的对象,ConditionObject类代码如下所示:
// AQS中声明的ConditionObject
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
private transient Node firstWaiter;
private transient Node lastWaiter;
public ConditionObject() { }
private Node addConditionWaiter() {
}
private void doSignal(Node first) {
.....
}
private void doSignalAll(Node first) {
.....
}
private void unlinkCancelledWaiters() {
.....
}
相信大家已经看出来了,很熟悉的Node链表有没有?其中firstWaiter指向链表首位,lastWaiter指向链表尾,在该链表内维护一个Node的双向链表,结合AQS中实现,我们可以猜测出,在condition.await的时候会以当前线程创建Node节点,随后以插入条件队列,随后当执行condition.signal/condition.signalAll时,唤醒在链表上的这些节点,具体实现是不是这样呢?我们继续看
Condition.await
ConditionObject实现的await方法如下所示:
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 以当前线程创建Node对象,并添加值队尾
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
// 通过LockSupport阻塞线程
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
Condition.signal
ConditionObject中的signal函数实现如下所示:
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
// 对队首节点唤醒
doSignal(first);
}
private void doSignal(Node first) {
do {
// 重置firstWaiter并不断尝试唤醒首节点
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
// 尝试更新节点的waitStatus
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 当前线程可以正常执行了,将该节点移入同步等待队列中,尝试获取锁
Node p = enq(node);
int ws = p.waitStatus;
// 如果可以获取锁,则立即唤醒执行
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
Condition.signalAll的逻辑与signal基本一致,区别在于是将在该条件上等待的所有节点均移入同步等待队列中。