一、CyclicBarrier介绍
-
字面意思回环栅栏(循环屏障),通过它可以实现让一组线程等待至某个状态(屏障点)之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。
-
和CountDownLatch很像,CountDownLatch在操作时,只能使用一次,也就是state变为0之后,就无法继续玩了。
-
CyclicBarrier是可以复用的,他的计数器可以归位,然后再处理。而且可以在计数过程中出现问题后,重置当前CyclicBarrier,再次重新操作!
二、CyclicBarrier的使用
场景1:多线程计算数据,最后合并计算结果的场景。
public class CyclicBarrierTest2 {
//保存每个学生的平均成绩
private ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<String, Integer>();
private final ExecutorService threadPool = Executors.newFixedThreadPool(3);
private final CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
int result = 0;
for (String key : map.keySet()) {
result += map.get(key);
}
System.out.println("三人平均成绩为:" + (result / 3) + "分");
});
public void count() {
for (int i = 0; i < 3; i++) {
threadPool.execute(() -> {
//获取学生平均成绩
int score = (int) (Math.random() * 40 + 60);
map.put(Thread.currentThread().getName(), score);
System.out.println(Thread.currentThread().getName()
+ "同学的平均成绩为:" + score);
try {
//执行完运行await(),等待所有学生平均成绩都计算完毕
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
}
public static void main(String[] args) {
CyclicBarrierTest2 cb=new CyclicBarrierTest2();
cb.count();
}
}
场景2:利用CyclicBarrier的计数器能够重置,屏障可以重复使用的特性,可以支持类似“人满发车”的场景
public static void main(String[] args) {
AtomicInteger counter = new AtomicInteger();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
5,
5,
1000, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
(r) -> new Thread(r, counter.addAndGet(1) + " 号 "),
new ThreadPoolExecutor.AbortPolicy());
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("裁判:比赛开始~~"));
for (int i = 0; i < 10; i++) {
threadPoolExecutor.submit(() -> {
try {
int sleepMills = ThreadLocalRandom.current().nextInt(1000);
Thread.sleep(sleepMills);
System.out.println(Thread.currentThread().getName() + " 选手已就位, 准备共用时: " + sleepMills + "ms " + cyclicBarrier.getNumberWaiting());
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
}
三、CyclicBarrier核心源码分析
3.1 有参构造
CyclicBarrier没有直接使用AQS,而是使用ReentrantLock,简介的使用的AQS
// CyclicBarrier的有参
public CyclicBarrier(int parties, Runnable barrierAction) {、
// 健壮性判断!
if (parties <= 0) throw new IllegalArgumentException();
// parties是final修饰的,需要在重置时,使用!
this.parties = parties;
// count是在执行await用来计数的。
this.count = parties;
// 当计数count为0时 ,先执行这个Runnnable!在唤醒被阻塞的线程
this.barrierCommand = barrierAction;
}
3.2 await
-
线程执行await方法,会对count-1,再判断count是否为0
-
如果不为0,需要添加到AQS中的ConditionObject的Waiter队列中排队,并park当前线程
-
如果为0,证明线程到齐,需要执行nextGeneration,会先将
Waiter队列
中的Node全部转移到AQS的队列中,并且有后继节点的,ws设置为-1。没有后继节点设置为0。 -
然后
重置
count和broker标记。等到unlock执行后,每个线程都会被唤醒。
// 选手到位!!!
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
// 加锁?? 因为CyclicBarrier是基于ReentrantLock-Condition的await和singalAll方法实现的。
// 相当于synchronized中使用wait和notify
// 别忘了,只要挂起,会释放锁资源。
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 里面就是boolean,默认false
final Generation g = generation;
// 判断之前栅栏加入线程时,是否有超时、中断等问题,如果有,设置boolean为true,其他线程再进来,直接凉凉
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 对计数器count--
int index = --count;
// 如果--完,是0,代表突破栅栏,干活!
if (index == 0) {
// 默认false
boolean ranAction = false;
try {
// 如果你用的是2个参数的有参构造,说明你传入了任务,index == 0,先执行CyclicBarrier有参的任务
final Runnable command = barrierCommand;
if (command != null)
command.run();
// 设置为true
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// --完之后,index不是0,代表还需要等待其他线程
for (;;) {
try {
// 如果没设置超时时间。 await()
if (!timed)
trip.await();
// 设置了超时时间。 await(1,SECOND)
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
// 挂起线程
public final void await() throws InterruptedException {
// 允许中断
if (Thread.interrupted())
throw new InterruptedException();
// 添加到队列(不是AQS队列,是AQS里的ConditionObject中的队列)
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 挂起当前线程
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
}
// count到0,唤醒所有队列里的线程线程
private void nextGeneration() {
// 这个方法就是将Waiter队列中的节点遍历都扔到AQS的队列中,真正唤醒的时机,是unlock方法
trip.signalAll();
// 重置计数器
count = parties;
// 重置异常判断
generation = new Generation();
}