上一篇文章讲的是关于是使用CountDownLatch实现生成年底报告遇到的问题,这个计数器和CyclicBarrier也有类似功能,但是应用场景不同。
一、应用场景
CountDownLatch:
有ABCD四个任务,ABC是并行执行,等ABC三个任务都执行完毕,D任务再开始执行。
比如:年度报告需要从3个维度统计,3个维度都统计好后,再查询上面3个维度数据生成最终报告。那么3个维度就是ABC,最后查询3个维度数据的任务就是D任务。
CyclicBarrier:
跑步比赛:5个运动员参加跑步比赛,等5个人都热身完毕了,5个人才开始同时起跑。
王者荣耀比赛:5个玩家参加比赛,等5个玩家都加载完毕了,5个人才能开赛进行对局。
二、CyclicBarrier是什么
CyclicBarrier是一个同步屏障,它允许多个线程相互等待,直到到达某个公共屏障点,才能继续执行。通常用来实现多个线程在同一个屏障处等待,然后再一起继续执行的操作。
CyclicBarrier也维护了一个类似计数器的变量,通过CyclicBarrier的构造函数指定,需要大于0,否则抛IllegalArgumenException异常。当线程到达屏障位置时,调用await()方法进行阻塞,直到所有线程到达屏障位置时,所有线程才会被释放,而屏障将会被重置为初始值以便下次使用。
三、常用方法
CyclicBarrier(int parties):CyclicBarrier的构造方法,可通过parties参数指定需要到达屏障的线程个数,但是要大于0,否则会抛IllegalArgumentException异常。
CyclicBarrier(int parties,Runnable barrierAction):另一个构造方法,parties作用同上,barrierAction表示最后一个到达屏障点的线程要执行的逻辑。
int await():表示线程到达屏障点,并等待其它线程到达,返回值表示当前线程在屏障中的位置(第几个到达的)。
int await(long timeout,TimeUnit unit):与await()类似,但是设置了超时时间,如果超过指定的时间后,仍然还有线程没有到达屏障点,则等待的线程会被唤醒并执行后续操作。
void reset():重置屏障状态,即将屏障计数器重置为初始值。
int getParties():获取需要同步的线程数量。
int getNumberWaiting():获取当前正在等待的线程数量。
-
CyclicBarrier的计数器可以重置而CountDownLatch不行,这意味着CyclicBarrier实例可以被重复使用而CountDownLatch只能被使用一次。而这也是循环屏障循环二字的语义所在。CyclicBarrier每调用一次await()方法都将使阻塞的线程数+1,只有阻塞的线程数达到设定值时屏障才会打开,允许阻塞的所有线程继续执行。
-
CyclicBarrier允许用户自定义barrierAction操作,这是个可选操作,可以在创建CyclicBarrier对象时指定
四、写个简单例子
package com.lsl.utills;
import java.util.Date;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 循环栅栏
* 模拟赛跑比赛:有5个人参加赛跑比赛,要等5个人都准备就绪后,5个人才能一起跑。
*/
public class CyclicBarrierDemo02 {
//参赛人数
private static int count = 5;
private static CyclicBarrier cycl;
private static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) {
CyclicBarrierDemo02 demo02 = new CyclicBarrierDemo02();
String name = "参赛者";
demo02.runingTest(name);
}
public void runingTest(String name ){
cycl = new CyclicBarrier(5);
for (int i =1;i<=count;i++){
String nameNew = name + i;
executor.execute(new RuningTask(nameNew,i));
}
}
/**
* 赛跑任务
*/
class RuningTask implements Runnable{
private String name;//参赛者姓名
private int waitTimes;
RuningTask(String name,int waitTimes){
this.name = name;
this.waitTimes = waitTimes;
}
@Override
public void run() {
try {
//准备阶段,模拟参赛人员热身准备
long sleeps = waitTimes*200;
Thread.sleep(sleeps);
long st = new Date().getTime();
System.err.println("name:" + name + "准备好了,准备了" + sleeps + "毫秒");
//栅栏位置
cycl.await();
// cycl.await(2, TimeUnit.SECONDS);
//开跑
long ed = new Date().getTime();
long times = ed-st;
System.out.println("name:" + name + ",等待了:" + times + "毫秒,开始起跑了");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
运行截图:
五、错误代码示例(失效情况)
package com.lsl.utills;
import java.util.Date;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 循环栅栏
* 模拟赛跑比赛:有5个人参加赛跑比赛,要等5个人都准备就绪后,5个人才能一起跑。
*/
public class CyclicBarrierDemo01 {
//参赛人数
private static int count = 5;
private CyclicBarrier cycl;
private static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) {
for (int i =1;i<=count;i++){
String name = "参赛者" + i;
CyclicBarrierDemo01 demo01 = new CyclicBarrierDemo01();
demo01.runingTest(name,i);
}
}
public void runingTest(String name,int num ){
cycl = new CyclicBarrier(5);
executor.execute(new RuningTask(name,num));
}
/**
* 赛跑任务
*/
class RuningTask implements Runnable{
private String name;//参赛者姓名
private int waitTimes;
RuningTask(String name,int waitTimes){
this.name = name;
this.waitTimes = waitTimes;
}
@Override
public void run() {
try {
//准备阶段,模拟参赛人员热身准备
long sleeps = waitTimes*200;
Thread.sleep(sleeps);
long st = new Date().getTime();
System.err.println("name:" + name + "准备好了,准备了" + sleeps + "毫秒");
//栅栏位置
int numberWaiting = cycl.getNumberWaiting();
System.out.println("numberWaiting=" + numberWaiting);
cycl.await();
// cycl.await(2, TimeUnit.SECONDS);
//开跑
long ed = new Date().getTime();
long times = ed-st;
System.out.println("name:" + name + ",等待了:" + times + "毫秒,开始起跑了");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
错误运行截图:
从错误运行截图可以看出,一直卡在所有参赛者都准备好阶段,没有一起开跑。
原因是上述代码写的有问题,从打印的numberWaiting=0可以分析出,每个线程使用的CyclicBarrier都不一样,因为private CyclicBarrier cycl;定义成了成员变量。
另外,上述代码中把线程内的cycl.await();改成cycl.await(2, TimeUnit.SECONDS);
运行截图如下:
从上述截图可以看出,如果是cycl.await(2, TimeUnit.SECONDS); 在规定时间内没有到设置的计数值,就会抛出异常。
上面的代码改进下,就是定义成静态变量。保证所有线程都是用的一个CyclicBarrier
代码如下:
package com.lsl.utills;
import java.util.Date;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 循环栅栏
* 模拟赛跑比赛:有5个人参加赛跑比赛,要等5个人都准备就绪后,5个人才能一起跑。
*/
public class CyclicBarrierDemo01 {
//参赛人数
private static int count = 5;
private static CyclicBarrier cycl;
private static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) {
for (int i =1;i<=count;i++){
String name = "参赛者" + i;
CyclicBarrierDemo01 demo01 = new CyclicBarrierDemo01();
demo01.runingTest(name,i);
}
}
public void runingTest(String name,int num ){
cycl = new CyclicBarrier(5);
executor.execute(new RuningTask(name,num));
}
/**
* 赛跑任务
*/
class RuningTask implements Runnable{
private String name;//参赛者姓名
private int waitTimes;
RuningTask(String name,int waitTimes){
this.name = name;
this.waitTimes = waitTimes;
}
@Override
public void run() {
try {
//准备阶段,模拟参赛人员热身准备
long sleeps = waitTimes*200;
Thread.sleep(sleeps);
long st = new Date().getTime();
System.err.println("name:" + name + "准备好了,准备了" + sleeps + "毫秒");
//栅栏位置
int numberWaiting = cycl.getNumberWaiting();
System.out.println("numberWaiting=" + numberWaiting);
cycl.await();
// cycl.await(2, TimeUnit.SECONDS);
//开跑
long ed = new Date().getTime();
long times = ed-st;
System.out.println("name:" + name + ",等待了:" + times + "毫秒,开始起跑了");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
运行截图如下: