CyclicBarrier 源码
1.构造方法
参数 n 为等待的线程数
CyclicBarrier cyclicBarrier = new CyclicBarrier(n);
public CyclicBarrier(int parties) {
this(parties, null);
}
参数 barrierAction 为当等待的线程达到 参数 parties 时执行的线程任务
blic CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
可以看到此处对 parties 的值进行了限制 如果少于0将抛出运行时异常 IllegalArgumentException
2.await() 方法
如果等待的线程没有达到 parties,那么线程进入等待状态 , 除非 :
- 线程等待超时
- 被其他线程唤醒
- 被中断
- 调用 reset() 方法
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
重载方法
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
dowait 方法
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
//加锁 保证操作共享变量时的线程安全
lock.lock();
try {
final Generation g = generation;
//判断当前线程是否已经被打断,如果被打断,抛出异常
if (g.broken)
throw new BrokenBarrierException();
//当前线程是否中断
if (Thread.interrupted()) {
//打破当前屏障
breakBarrier();
//抛出响应中断异常
throw new InterruptedException();
}
//计算器减一
int index = --count;
//如果当前计算器等于0 (所有线程都已进入屏障点)
if (index == 0) { // tripped
boolean ranAction = false;
try {
//执行打破屏障时的任务
final Runnable command = barrierCommand;
if (command != null)
command.run();
//记录动作
ranAction = true;
//唤醒所有等待线程
nextGeneration();
//返回
return 0;
} finally {
//如过已经执行过动作 就不用在执行 breakBarrier 方法
if (!ranAction)
//标记当前屏障已经被打破,并唤醒所有线程
breakBarrier();
}
}
// 循环等待 直到所有线程达到屏障点,超时,屏障被破坏,被中断
for (;;) {
try {
//如果没有设置超时时间
if (!timed)
//等待,直到所有线程都到达屏障点
trip.await();
else if (nanos > 0L)
// 设置了超时时间则在规定时间内等待所有线程进入屏障
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 判断 是否已经执行过 nextGeneration 创建了一个新的 generation 对象,
// 并且 generation是否已经被标记为被破坏
if (g == generation && ! g.broken) {
// 如果没有创建新的generation 对象,并且未被破坏,则打破当前屏障
breakBarrier();
//唤醒所有等待线程后抛出异常
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
//如果当前线程屏障被打破,抛出异常
if (g.broken)
throw new BrokenBarrierException();
//如果不相等,表示已经进入下一个屏障 并返回当前线程的index
if (g != generation)
return index;
//如果超时
if (timed && nanos <= 0L) {
//打破屏障,唤醒当前屏障等待的所有线程
breakBarrier();
//抛出超时异常
throw new TimeoutException();
}
}
} finally {
//释放锁
lock.unlock();
}
}
trip对象为当前锁Lock的条件对象
trip.await()
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
如果进入屏障点的线程数已达到要求
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
ranAction 用于记录动作. 如果屏障被打破时执行的线程任务抛出异常,也要保证唤醒所有等待线程,并标记当前屏障被打破
boolean ranAction = false;
# 只有 command.run() 执行成功 ranAction 的值 才会时 true
if (command != null)
command.run();
ranAction = true;
finally 代码块用来保证等待线程的释放 及 保证当前屏障被标记已经打破
finally {
if (!ranAction)
breakBarrier();
}
nextGeneration 唤醒等待线程,开启下一个屏障,还原count的值
//唤醒全部等待线程
trip.signalAll();
//开启下一个屏障 还原count的值
count = parties;
generation = new Generation();