CyclicBarrier翻译过来是回环栅栏,它可以实现让一组线程等待至某个状态之后再全部同步执行,这个状态叫屏障点。当所有等待线程都被释放后,CyclicBarrier可以重置再次使用。
CyclicBarrier的功能是由ReentrantLock和Condition共同实现的,因此在其内部拥有ReentrantLock类型的lock属性和Condition类型的trip属性。此外,还有用于保存该屏障拦截的线程数parties属性和当前剩余等待的线程数count属性。这些属性的作用在后面我们详细介绍源码时再详细介绍。
CyclicBarrier的构造函数如下:
//parties表示当前屏障拦截的线程数
public CyclicBarrier(int parties) {
this(parties, null);
}
//barriesAction表示当所有线程都到达屏障时首先执行的行为
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
常用方法如下:
- public int await() throws InterruptedException, BrokenBarrierException:当前线程阻塞,直到parties个线程全部调用await()方法时再唤醒。BrokenBarrierException异常表示栅栏已被破坏,可能是由于其中的一个线程await()时被中断或超时;
- public void reset():重置屏障计数器的值,并将条件队列的所有线程唤醒。
由于CyclicBarrier的计数器可以重置,屏障可以重复使用,因此当paries的整数倍数量的线程调用await()方法时程序都是可以正常结束的,否则由于还有线程在阻塞,程序会一直阻塞不会结束。例如下面的程序中定义了屏障的拦截线程数paties=3,随后分别在7个线程中调用CyclicBarrier的await()方法。
public class CyclicBarrierTest {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
for (int i = 0; i < 7; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName()
+ "开始等待其他线程");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + "开始执行");
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + "执行完毕");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
控制人打印结果如下图,可以发现程序没有结束,因此线程Thread-6还在阻塞。
CyclicBarrier原理
由于CyclicBarrier是借助ReentrantLock和Condition实现的,因此我们在深入解析CyclicBarrier源码之前先来简单介绍下ReentrantLock和Condition是怎么工作的。
ReentrantLock和Condition
我们都知道AQS是基于MESA管程模型实现的,ReentrantLock是AQS的独占锁实现,而Condition可以理解为是MESA模型中的条件变量,那么当执行下面的程序时锁(ReentrantLock)是怎么变化的呢?需要注意的是,下面的程序中只有一个ReentrantLock对象,因此只有一把锁。
public class ReentrantLockDemo6 {
private static ReentrantLock lock = new ReentrantLock();
private static Condition cigCon = lock.newCondition();
private static Condition takeCon = lock.newCondition();
private static boolean hashcig = false;
private static boolean hastakeout = false;
//送烟
public void cigratee(){
lock.lock();
try {
while(!hashcig){
try {
log.debug("没有烟,歇一会");
cigCon.await();
}catch (Exception e){
e.printStackTrace();
}
}
log.debug("有烟了,干活");
}finally {
lock.unlock();
}
}
//送外卖
public void takeout(){
lock.lock();
try {
while(!hastakeout){
try {
log.debug("没有饭,歇一会");
takeCon.await();
}catch (Exception e){
e.printStackTrace();
}
}
log.debug("有饭了,干活");
}finally {
lock.unlock();
}
}
public static void main(String[] args) {
ReentrantLockDemo6 test = new ReentrantLockDemo6();
new Thread(() ->{
test.cigratee();
},"t1").start();
new Thread(() -> {
test.takeout();
},"t2").start();
new Thread(() ->{
lock.lock();
try {
hashcig = true;
log.debug("唤醒送烟的等待线程");
cigCon.signal();
}finally {
lock.unlock();
}
},"t3").start();
new Thread(() ->{
lock.lock();
try {
hastakeout = true;
log.debug("唤醒送饭的等待线程");
takeCon.signal();
}finally {
lock.unlock();
}
},"t4").start();
}
}
控制台打印结果如下:
- 线程t1执行cigratee()方法,cigratee()内部执行lock()方法加锁(即将当前lock对象的state设为1),随后cigCon调用await()方法释放锁(即将当前lock对象的state设为0),并将线程t1阻塞;
- 线程t2执行takeout()方法,takeout()内部执行lock()方法加锁(即将当前lock对象的state设为1),随后takeCon调用await()方法释放锁(即将当前lock对象的state设为0),并将线程t2阻塞;
- 线程t3调用cigCon.signal()方法将阻塞在cigCon这个“条件变量”的条件队列线程t1转移到lock的同步等待队列中,当线程t1获取锁后继续执行后面的逻辑,执行完后再调用unlock()方法释放锁;
- 线程t4调用takeCon.signal()方法将阻塞在takeCon这个“条件变量”的条件队列线程t2转移到lock的同步等待队列中,当线程t2获取锁后继续执行后面的逻辑,执行完后再调用unlock()方法释放锁。
关于以上,需要做更多的说明:
- ReentrantLock加锁的方式是将state设为1,并且保存持有锁的线程引用;解锁的方式是将state设为0,并释放线程引用;
- Condition的await()方法会释放锁,且阻塞当前线程,这一点可以从程序输出结果得到;
- Condition的signal()方法在唤醒阻塞在当前条件变量上的线程时,需要首先获取锁,否则会抛出IllegalMonitorStateException异常;
- 按照MESA模型,调用Condition的signal()方法后,会将当前条件变量的条件队列的线程转移到锁的同步等待队列中,当获取锁时才会继续执行后续逻辑。
我们可以通过跟踪CyclicBarrier的源码来达到学习CyclicBarrier以及ReentrantLock与Condition协作的目的。
CyclicBarrier源码
这里是从CyclicBarrier的await()方法一步一步地介绍的,较复杂,如果不想看可以直接跳到这一部分后面总结的await()方法执行的流程图部分。
我们使用下面的程序来切入CyclicBarrier源码的介绍。在下面的程序中,使用三个线程分别计算三位同学的平均分,三个线程都计算完成后再计算三位同学的平均分。
public class CyclicBarrierTest2 {
//保存每个学生的平均成绩
private ConcurrentHashMap<String, Integer> map=new ConcurrentHashMap<String,Integer>();
private ExecutorService threadPool= Executors.newFixedThreadPool(3);
private CyclicBarrier cb=new CyclicBarrier(3,()->{
int result=0;
Set<String> set = map.keySet();
for(String s:set){
result+=map.get(s);
}
System.out.println("三人平均成绩为:"+(result/3)+"分");
});
public void count(){
for(int i=0;i<3;i++){
threadPool.execute(new Runnable(){
@Override
public void run() {
//获取学生平均成绩
int score=(int)(Math.random()*40+60);
map.put(Thread.currentThread().getName(), score);
System.out.println(Thread.currentThread().getName()
+"同学的平均成绩为:"+score);
try {
//执行完运行await(),等待所有学生平均成绩都计算完毕
cb.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
}
public static void main(String[] args) {
CyclicBarrierTest2 cb=new CyclicBarrierTest2();
cb.count();
}
}
当线程t1首先计算完成执行cb.await()方法,await()方法实现如下:
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
dowait()方法实现如下:
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
//1.使用CyclicBarrier的lock对象加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//2.当前线程到达屏障,因此count数量-1,判断计算后的值是否为0
int index = --count;
if (index == 0) {
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
//3.当前线程阻塞,直到被唤醒、中断或超时
for (;;) {
try {
if (!timed)
//4.调用CyclicBarrier的Condition的await()方法
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
- 前面提到过CyclicBarrier中有一个ReentrantLock类型的属性lock,此处使用lock对象加锁,目的其实是为了后面Condition的await()方法的调用。需要注意的是,按照MESA模型,线程进入条件队列必须是在其获得锁之后,因此此处必须先加锁;
- 此时count = parties = 3,线程t1已到达屏障处,因此count需要减1,此时count=2,parties仍为3,parties的值是不会变的,原因后面会介绍;
- 由于此时count!=0,因此我们直接跳到4处,调用CyclicBarrier的Condition类型的属性trip的await()方法。
这里需要特别说明的是,Condition是一个接口,其实现类是AQS中的一个内部类ConditionObject,在其内部维护了条件等待队列的首节点Node对象firstWaiter和尾结点lastWaiter,另外还包括一些维护条件队列出队入队操作的方法。此处调用的trip的await()方法当然是Condition接口的await()方法,实现如下:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//1
Node node = addConditionWaiter();
//2
int savedState = fullyRelease(node);
int interruptMode = 0;
//3
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//4
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
首先进入addConditionWaiter()方法,实现如下:
private Node addConditionWaiter() {
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
- 由于此时条件等待队列还是空队列,因此尾节点lastWaiter为null;
- 将线程t1封装到Node对象中,并且将当前Node节点的状态设为CONDITION(值为-2)表示该节点是条件等待队列的节点;
- 将firstWaiter首节点和尾节点lastWaiter都指向线程t1的Node节点,表示t1节点入队。
然后回到await()方法,进入2处的fullRelease()方法,其实现逻辑如下:
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
需要注意的是,此时还在CyclicBarrier的lock锁内,此处调用release()方法释放锁(主要就是将state设为0,ReentrantLock的表示独占锁拥有者的exclusiveOwnerThread属性置为null),关于release()方法我们在介绍ReentrantLock的文章中介绍过,此处不再赘述。
再返回await()方法的代码3处,isOnSyncQueue()方法的实现如下:
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null)
return true;
return findNodeFromTail(node);
}
由于线程t1节点是在条件等待队列中,其状态是CONDITION,因此此处直接返回false。
再返回await()方法,可以看到isOnSyncQueue()方法返回false后就执行了LockSupport的park()方法将当前线程t1阻塞。
此时线程t1内部执行CyclicBarrier的await()方法的逻辑就暂时完成了,总结如下:
- 使用CyclicBarrier的lock对象加锁;
- 将CyclicBarrier的计数器count的值减1,此时count=2;
- 判断count不为0,调用Condition类型的trip属性的await()方法;
- 将当前线程t1封装到Node节点对象中,状态为CONDITION,表示该节点在条件等待队列中,将线程t1节点入队条件等待队列,此时firstWaiter和lastWaiter指向的都是线程t1节点;
- 释放lock锁;
- 将线程t1阻塞。
随后线程t2也执行到了await()方法,此处我们不再一步步地跟踪了,其执行逻辑如下:
- 使用CyclicBarrier的lock对象加锁;
- 将CyclicBarrier的计数器count的值减1,此时count=1;
- 判断count不为0,调用Condition类型的trip属性的await()方法;
- 将当前线程t2封装到Node节点对象中,状态为CONDITION,表示该节点在条件等待队列中,将线程t2节点入队条件等待队列,此时firstWaiter指向线程t1节点,lastWaiter指向线程t2节点;
- 释放lock锁;
- 将线程t2阻塞。
最后线程t3也执行到了await()方法,前两个步骤与线程t1和t2一样:
- 使用CyclicBarrier的lock对象加锁;
- 将CyclicBarrier的计数器count的值减1,此时count=0;
在第三个步骤dowait()方法,判断count的值为0,此处与线程t3与前面的线程逻辑不再一样了,相关部分代码如下:
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
此处由于计数器count=0,已经到了屏障处,因此首先执行我们在构造函数中传入的barrierCommand任务。随后调用nextGeneration()方法,实现如下:
private void nextGeneration() {
trip.signalAll();
count = parties;
generation = new Generation();
}
可以看到又调用了Condition的signalAll()方法,实现如下:
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
首先判断当前线程t2是否是获取独占锁的线程,是的。随后判断条件队列的首节点即线程t1节点不为空,调用doSignalAll()方法,实现如下:
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
- 将条件队列的首节点和尾节点置为null;
- 将首节点(线程t1节点)出队,执行transferForsignal()方法;
- 循环条件队列,不断地将节点出队,调用transferForsignal()方法。
在我们这个例子中循环了两次,分别为线程t1和线程t2的节点执行transferForsignal()方法,transferForsignal()方法实现如下:
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
- 使用CAS将节点的waitStatus置为0,在此之前条件队列节点的状态为CONDITION(-2);
- 调用enq()方法将节点入队到AQS的同步等待队列中,此方法在介绍ReentrantLock时也介绍过,不再赘述;
- 随后使用CAS将节点在同步等待队列中的前一个节点的waitStatus置为-1,表示当前节点可以被唤醒。
线程t2的逻辑到此处就执行完成了,一层层的方法返回,最终CyclicBarrier的doWait()方法的finally块中执行lock.unpark()方法。unpark()方法唤醒的是当前lock对象的同步等待队列的头节点,在当前lock锁竞争激烈的场景下,头节点有可能不是线程t1或者线程t2。此处需要注意的是,当线程t1或线程t2获得CPU继续执行时,应该在它们被阻塞的地方继续执行,即在AQS的ConditionObject的await()方法的代码4处(翻到前面的代码块)继续执行。继续执行的acquireQueued()方法是循环调用CAS获取lock锁的逻辑,因此线程t1和t2在此处会再次获取到锁,执行我们自己的代码中CyclicBarrier的await()方法之后的逻辑。