1、CyclicBarrier 介绍
从字面上看 CyclicBarrier 就是 一个循环屏障,它也是一个同步助手工具,它允许多个线程
在执行完相应的操作后彼此等待共同到达一个屏障点。
CyclicBarrier可以被循环使用,当屏障点值变为0之后,可以在接下来的的使用中重置屏障点
值,而无需重新定义一个CyclicBarrier。
Cyclic循环:所有线程释放后,屏障点的数值可以被重置
Barrier屏障:让一个或多个线程到达一个屏障点,会被阻塞;屏障点会有一个数值,当每有一
个线程到达屏障点时,屏障点数值就会减1操作,并且线程阻塞在屏障点,当屏
障点数值变为0时,屏障就会打开,唤醒所有阻塞在屏障点的线程。
在释放屏障点之后,可以先执行一个任务,然后让唤醒阻塞的线程继续执行后
续任务。
CyclicBarrier是一种同步机制,允许一组线程之间互相等待,现成达到屏障点其实是基于
await方法在屏障点阻塞;等待所有线程到达屏障点后再统一唤醒
CyclicBarrier 并不是基于AQS来实现的,其是基于ReentrantLock锁的机制来实现对屏障点的
“减减” 操作以及线程的挂起。
2、CyclicBarrier核心属性&构造方法
public class CyclicBarrier {
/**
* 内部类
*/
private static class Generation {
//该类用来标记是否被中断过
//用来表示阻塞时当前party有没有被强制中断
boolean broken = false;//某个线程由于执行了await()方法进入了阻塞状态,若该线程被执行了中断操作,那么 broken 得值就会变为true
}
/** 保证操作屏障值原子性的锁 */
private final ReentrantLock lock = new ReentrantLock();
/**
* 用于阻塞线程的条件变量:若有未到party的线程,那么等待该条件变量上
* 基于当前的Condition 实现线程的挂起和唤醒
* */
private final Condition trip = lock.newCondition();
/**
* 屏障数值,与count初始值一致
* todo 注意:不会对 parties 进行操作,因为 parties 是final修饰,初始化后不能修改
* */
private final int parties;//计数器得值
/*
* 当屏障数值count到达0时,优先执行当前任务,然后再会唤醒所有等待的线程执行后续任务
* */
private final Runnable barrierCommand;
/**
* 表示当前party是否被中断过
* */
private Generation generation = new Generation();
/**
*
* 屏障数值,初始值与 parties 相等,当每有一个线程到达屏障点时,就会执行count--操作
*/
private int count;
//构造方法
/**
*
* @param parties 屏障点数值
*
* @param barrierAction 当屏障点数值达到0时,优先执行该 barrierAction 任务
* 若barrierAction 为null,则直接执行唤醒的线程
*
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
//到达屏障点后优先执行的任务
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
}
3、CyclicBarrier应用场景及示例代码
3.1、将一个任务分成若干个并行的子任务,当所有的子任务全部执行结束后,再继续执行后边的
工作。
从这一点上看,CyclicBarrier 功能与CountDownLatch 的功能差不多,但他们运行方式上却
有很大区别;在 CyclicBarrier 中,每个子任务完成后,子线程调用 CyclicBarrier的await方法
使当前子线程进入阻塞状态,直到其他所有子线都完成了任务后,他们才能退出阻塞;
注意:这里CyclicBarrier并没有干预主线程的运行,所以主线程的 “运行/阻塞” 需要我们来
手动干预。所以 CyclicBarrier 更像是把“任务分片”而不是计数器,当每个分片任务
完成后都会阻塞在“屏障点”,
把前边CountDwonLatch 的示例使用 CyclicBarrier 来实现,比较 CountDwonLatch 与
CyclicBarrier 在相同场景下使用的不同,示例代码如下:
public class CylicBarrierExample1 {
public static void main(String[] args) {
//先获取商品编号列表
int[] products = getProductsByCategoryID();
//使用Stream 流,将商品编号列表中的每个商品转换为 ProductPrice
List<ProductPrice> list = Arrays.stream(products).mapToObj(ProductPrice::new).collect(Collectors.toList());
//定义 CyclicBarrier ,并设置子任务数
CyclicBarrier barrier = new CyclicBarrier(list.size());
//存放线程任务的集合
final List<Thread> threads = new ArrayList<>();
list.forEach(pp -> {
//对每个商品都创建一个子任务来计算
Thread thread = new Thread(() -> {
System.out.println(pp.getProdID()+" -> start calculate price.");
try {
//模拟业务逻辑耗时
TimeUnit.SECONDS.sleep(current().nextInt(10));
if(pp.prodID %2 == 0){
pp.setPrice(pp.prodID*0.9D);
}else {
pp.setPrice(pp.prodID*0.71D);
}
System.out.println(pp.getProdID()+" -> price calculate");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
try {
//当前子任务线程进入阻塞状态,在这里等待所有的子任务线程都执行到共同的屏障点 barrier point
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
});
threads.add(thread);
thread.start();
});
//遍历所有的子任务线程,让主线程等待所有的子任务线程结束
threads.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println("**************************************");
System.out.println("All of price calculate finished!");
list.forEach(System.out::println);
}
//获取商品编号列表
private static int[] getProductsByCategoryID(){
//商品列表编号为从1,10的数字
return IntStream.rangeClosed(1,10).toArray();
}
//定义商品类,有2个成员变量:商品编号和商品价格
private static class ProductPrice{
private final int prodID;//商品编号
private double price;//商品价格
public ProductPrice(int prodID){
this(prodID,-1);
}
public ProductPrice(int prodID,double price){
this.prodID = prodID;
this.price = price;
}
public int getProdID(){
return this.prodID;
}
public void setPrice(double price){
this.price = price;
}
@Override
public String toString() {
return "ProductPrice{" +
"prodID=" + prodID +
", price=" + price +
'}';
}
}
}
上边这段代码,有个需要优化的地方,即:既然 CyclicBarrier 中所有线程都会阻塞在屏
障点,所有任务都达到屏障点时才会往下执行,那么我们可以把主线程也作为一个任务线程
,即在定义 CyclicBarrier 屏障点数值时,在原有的数值上加1,然后在主线程中执行
CyclicBarrier的await方法,这样就不用让主线程等待每个子线程执行完成了
优化代码如下:
public static void main(String[] args) {
//先获取商品编号列表
int[] products = getProductsByCategoryID();
//使用Stream 流,将商品编号列表中的每个商品转换为 ProductPrice
List<ProductPrice> list = Arrays.stream(products).mapToObj(ProductPrice::new).collect(Collectors.toList());
//定义 CyclicBarrier ,并设置子任务数
CyclicBarrier barrier = new CyclicBarrier(list.size()+1);
//存放线程任务的集合
final List<Thread> threads = new ArrayList<>();
list.forEach(pp -> {
//对每个商品都创建一个子任务来计算
Thread thread = new Thread(() -> {
System.out.println(pp.getProdID()+" -> start calculate price.");
try {
//模拟业务逻辑耗时
TimeUnit.SECONDS.sleep(current().nextInt(10));
if(pp.prodID %2 == 0){
pp.setPrice(pp.prodID*0.9D);
}else {
pp.setPrice(pp.prodID*0.71D);
}
System.out.println(pp.getProdID()+" -> price calculate");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
try {
//当前子任务线程进入阻塞状态,在这里等待所有的子任务线程都执行到共同的屏障点 barrier point
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
});
threads.add(thread);
thread.start();
});
//主线程也阻塞在屏障点
barrier.await();
System.out.println("**************************************");
System.out.println("All of price calculate finished!");
list.forEach(System.out::println);
}
3.2、CyclicBarrier 循环使用
使用 CyclicBarrier 模拟旅游时导游清点人数的场景
大家报团旅游时,为了安全和避免掉队的,每次登上大巴,大巴启动前,导游都会清点人数
;在到达一个景点后,游客下车后,导游也会重复清点人数,保证所有的人都下来了后,才
会通知大巴师傅去停车场停车,下边写个demo简单模拟下这个场景,
/**
* CylicBarrier 的循环使用
*/
public class CylicBarrierExample2 {
public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
//定义 CyclicBarrier
final CyclicBarrier barrier = new CyclicBarrier(11);
//创建10个线程
for(int i=0;i<10;i++){
//定义游客子线程,传入游客编号和 barrier
new Thread(new Tourist(i,barrier)).start();
}
//主线程也进入阻塞,等待所有游客都上车
barrier.await();
System.out.println("Tour Guilder: all of Tourist get on the bus");
//主线程进入阻塞,所有游客都下车
barrier.await();
System.out.println("Tour Guilder: all of Tourist get OFF the bus");
}
//定义游客线程
private static class Tourist implements Runnable{
private final int touristID;
private final CyclicBarrier barrier;
private Tourist(int touristID,CyclicBarrier barrier){
this.touristID = touristID;
this.barrier = barrier;
}
@Override
public void run() {
System.out.printf("Tourist: %d by bus\n",touristID);
//上车耗时
this.spendSeveralSeconds();
//上车后等待其他同伴
this.waitAndPrint("Tourist: %d Get on the bus, and wait other people");
//todo 注意:所有线程到达屏障点后,最后一个到达屏障点的线程会重置CyclicBarrier
// 所以这里不需要手动调用reset()重置方法
//下车耗时
this.spendSeveralSeconds();
//下车后等待其他同步全部下车
this.waitAndPrint("Tourist: %d Get OFF the bus, and wait other people OFF");
}
//模拟乘客上车耗时
private void spendSeveralSeconds(){
try {
TimeUnit.SECONDS.sleep(current().nextInt(10));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//模拟上车后等待其他同伴
private void waitAndPrint(String msg){
System.out.printf(msg,touristID);
System.out.println();
try {
//所有线程到达屏障点后,最后一个到达屏障点的线程会重置CyclicBarrier
barrier.await();
} catch (InterruptedException |BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
4、CyclicBarrier 常用方法解析
在 CyclicBarrier 中常用方法就2个,即:await 和带超时时间的await ,但真正执行业务
的方法其实只有 doawait 一个方法,如下图所示:
4.1、dowait(boolean timed, long nanos) 方法
dowait 方法是CyclicBarrier 的核心方法,该方法功能是先将 CyclicBarrier 计数器count减1,
然后判断减1后的count是否等于0,若等于0,则唤醒所有阻塞在屏障点的线程,并重置
CyclicBarrier;若减1后的count不等于0,则当前线程被阻塞,直到被其他线程唤醒或过了
超时时间(有超时时间的情况)
dowait 代码如下:
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//获取 Generation 对象的引用
final Generation g = generation;
//判断是否有现成中断
if (g.broken) //表示当前party已经被中断
throw new BrokenBarrierException();
//有中断的线程混入其中,则干掉其他所有的线程重新开始
if (Thread.interrupted()) {//判断当前执行线程是否被中断,若被中断则先调用 breakBarrier()方法,再抛出异常
breakBarrier();
throw new InterruptedException();
}
int index = --count;//计数器减1,对屏障点数据做--操作
if (index == 0) { // tripped 当 count 为0 得时候,表示是最后一个线程,负责唤醒所有阻塞在条件变量上的线程,然后回调barrierCommand
boolean ranAction = false;
try {
final Runnable command = barrierCommand;//当前任务线程
//优先执行 barrierCommand 任务
if (command != null)
command.run();
ranAction = true;
//生成新的 Generation,并且直接返回
nextGeneration();//进入下一个party,这时屏障值被重置了,等价与调用了reset()方法
return 0;
} finally {
//如果 barrierCommand 方法发生了异常,则设置 broKen标志位
if (!ranAction)
//中断当前任务线程
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {//循环等待最后一个参与party的线程,或者被中断、等待超时
try {
if (!timed) //表示调用的是非超时时间的await方法,则这里也是调用Condition的不带超时时间的await
trip.await();
else if (nanos > 0L)//表示调用的是带超时时间的await方法
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
//执行到这,说明线程被中断了
//g == generation:查看 generation 是否被重置
//若 generation 没有被重置,且没有现成被中断,则调用 breakBarrier 方法执行线程中断后的操作
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {//表示 generation 已经被重置或者 有线程已经被中断,则表示本次CyclicBarrier已经作废,则中断当前线程
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
//执行到这里说明线程被唤醒了
//查看是否因为中断唤醒,若是则抛出异常
if (g.broken)
throw new BrokenBarrierException();
//查看当前线程是否是因为 generation 重置而被唤醒(即被reset),若是则直接返回index 数值
//或者任务正常完成也会被重置
if (g != generation)
return index;
//判断是否是因为到达超时时间被唤醒,若是则中断当前任务
if (timed && nanos <= 0L) {
//中断当前任务
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
private void nextGeneration() {
// signal completion of last generation 唤醒屏障点阻塞中的所有线程
trip.signalAll();
// set up next generation 修改 count 的值使其等于构造 CyclicBarrier 时传入的parties 值
count = parties;
generation = new Generation();//创建新的Generation,即生成下一代party
}
private void breakBarrier() {
generation.broken = true; //设置为中断状态
count = parties;//将计数器设置为构建 CyclicBarrer 时传入得值,即重置屏障点数值count
trip.signalAll();//唤醒其他所有等待的线程
}
4.2、reset() 方法
reset() 方法功能是重置CyclicBarrier
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//干掉当前所有的线程
breakBarrier(); // break the current generation
//生成下一代party
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
4.3、getNumberWaiting() 方法
该方法功能是返回正在阻塞在屏障点的线程数
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}