CyclicBarrier
作用
同步辅助类,多个线程互相等待,直到到达同一个同步点,再继续一起执行,可有重复利用,通过构造函数可以设置一个等待的线程数,
线程调用await
方法表示自己已经到达屏障点,然后会等待其他线程也到达。如果当前线程不是最后一个到达的,它将被阻塞在这里,直到所有线程都调用了await
方法。
示例代码
public class Test {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(3);
for (int i = 0; i < 3; i++) {
final int threadId = i;
new Thread(() -> {
try {
System.out.println("线程 " + threadId + " 开始执行任务");
Thread.sleep((long) (Math.random() * 3000));
System.out.println("线程 " + threadId + " 到达屏障点");
barrier.await();// 在此处等待,直到3个线程都到达此处开始执行下面代码
System.out.println("线程 " + threadId + " 继续执行后续任务");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
使用场景
- 模拟并发测试:在进行并发测试时,需要模拟多个并发请求同时到达某个点的情况,
CyclicBarrier
可以方便地实现这种场景,让所有模拟的并发线程在指定点等待,然后一起继续执行后续的测试操作 - 多线程任务协同:例如在一个并行计算任务中,需要多个线程分别处理不同的数据块,当所有数据块都处理完毕后,再进行汇总或下一步的计算,这时可以使用
CyclicBarrier
来确保所有线程都完成各自的任务后再继续。
实现原理
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The number of parties */
private final int parties;
/** The command to run when tripped */
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();
/**
* Number of parties still waiting. Counts down from parties to 0
* on each generation. It is reset to parties on each new
* generation or when broken.
*/
private int count;
内部维护一个 ReentrantLock 和一个 Condition trip = lock.newCondition();
Generation 表示第几轮,CyclicBarrier 是可以循环利用的
count 表示等待的线程数
- 调用
await
方法时,调用线程会去获取lock
锁,然后–count,count减 1,表示有一个线程已经到达屏障点。 - 接着判断
count
是否为 0,如果count
不为 0,说明还有线程未到达屏障点,此时当前线程会调用trip.await()
方法进入等待状态,并释放lock
锁,让其他线程有机会获取锁并执行await
操作。 - 当count为0 时,就唤醒所有在trip 等待的线程执行,同时Generation 次数加1