#Phaser
功能介绍
CyclicBarrier 解决了 CountDownLatch的缺点,但是其本身也仍然具备一定的缺陷,比如不可以动态添加parties 调用一次await 仅占用1个parties
public class MyPhaser {
private Phaser phaser = new Phaser(3);
public void testA(){
System.out.println(Thread.currentThread().getName() + " 1Begin " + System.currentTimeMillis());
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " 1End " + System.currentTimeMillis());
System.out.println(Thread.currentThread().getName() + " 2Begin " + System.currentTimeMillis());
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " 2End " + System.currentTimeMillis());
}
public void testB(){
try {
System.out.println(Thread.currentThread().getName() + " 1Begin " + System.currentTimeMillis());
Thread.sleep(5000);
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " 1End " + System.currentTimeMillis());
System.out.println(Thread.currentThread().getName() + " 2Begin " + System.currentTimeMillis());
Thread.sleep(5000);
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " 2End " + System.currentTimeMillis());
}catch (InterruptedException e){
e.printStackTrace();
}
}
public static void main(String[] args) {
MyPhaser myPhaser = new MyPhaser();
MyThread myThread = new MyThread(myPhaser);
myThread.setName("A");
myThread.start();
MyThread myThread1 = new MyThread(myPhaser);
myThread1.setName("B");
myThread1.start();
MyThreadC myThread2 = new MyThreadC(myPhaser);
myThread2.setName("C");
myThread2.start();
}
@AllArgsConstructor
@NoArgsConstructor
@Data
public static class MyThread extends Thread{
private MyPhaser myPhaser;
@Override
public void run() {
myPhaser.testA();
}
}
@AllArgsConstructor
@NoArgsConstructor
@Data
public static class MyThreadC extends Thread{
private MyPhaser myPhaser;
@Override
public void run() {
myPhaser.testB();
}
}
}
使用 arriveAndAwaitAdvance方法再遇到计数不足时会导致进程被阻塞 可以使用 arriveAndDeregister 在线程结束时使partie减一。
只需改动下 testB方法即可验证
public void testB(){
try {
System.out.println(Thread.currentThread().getName() + " 1Begin " + System.currentTimeMillis());
Thread.sleep(5000);
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " 1End " + System.currentTimeMillis());
}catch (InterruptedException e){
e.printStackTrace();
}
}
getPhase() 可以获取当前已经到达多少个屏障
在通过新的 屏障时被调用 onAdvance
返回true不等待了, Phaser 为无效/销毁状态
返回false Phaser 继续工作
private Phaser phaser = new Phaser(3){
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(phase + " " + registeredParties);
System.out.println("onAdvance used");
//返回true不等待了, Phaser 为无效/销毁状态
//返回false Phaser 继续工作
return true;
}
};
getRegisteredParties() 获取注册的parties
register() 动态添加一个parties
bulkRegister(n) 动态批量添加 parties
getArrivedParties() 获取已经被使用的parties
getUnarrivedParties() 获取未使用的parties
arrive() 使parties值加1,但不再屏障处等待,继续向下运行。
Pharse 也具备计数重制功能
public static void main(String[] args) {
Phaser phaser = new Phaser(2);
System.out.println(1 + " getPhase: " + phaser.getPhase() + " getArrivedParties: " + phaser.getArrivedParties() );
phaser.arrive();
System.out.println(2 + " getPhase: " + phaser.getPhase() + " getArrivedParties: " + phaser.getArrivedParties() );
phaser.arrive();
System.out.println(3 + " getPhase: " + phaser.getPhase() + " getArrivedParties: " + phaser.getArrivedParties() );
phaser.arrive();
System.out.println(4 + " getPhase: " + phaser.getPhase() + " getArrivedParties: " + phaser.getArrivedParties() );
phaser.arrive();
System.out.println(5 + " getPhase: " + phaser.getPhase() + " getArrivedParties: " + phaser.getArrivedParties() );
phaser.arrive();
}
如上,每次调用arrive() getArrivedParties+1 切 每次达到2 都会重制计数
awaitAdvance(n) 如果 n == getPhase 则阻塞 且 不可被 interrupted
public static void main(String[] args) {
Phaser phaser = new Phaser(2);
System.out.println(1 + " getPhase: " + phaser.getPhase() + " getArrivedParties: " + phaser.getArrivedParties() );
phaser.arrive();
phaser.awaitAdvance(0);
System.out.println("run");
System.out.println(2 + " getPhase: " + phaser.getPhase() + " getArrivedParties: " + phaser.getArrivedParties() );
phaser.arrive();
}
2 并未执行
awaitAdvanceInterruptibly() 与 awaitAdvance() 相反可以被中断。
awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) 当屏障达到 phase时 如果超过 timeout 时间没有变化测抛出异常
forceTermination() 将屏障取消 并且 getPhase = Integer.MIN_VALUE
isTerminated() 判断Phase 是否已经被销毁
总结:
类Phaser提供了动态增减parties计数的能力, 比CyclicBarrier 更方便。并且支持在指定屏障处等待,也支持在等待时中断或非中断等功能。相比CyclicBarrier 更建议使用Phase