同步工具类:Phaser
- 介绍
- 特性
- 动态调整线程个数
- 层次Phaser
- 源码分析
- state 变量解析
- 构造函数对state变量赋值
- 阻塞方法
- arrive()
- awaitAdvance()
- 业务场景
- 实现CountDownLatch功能
- 代码
- 测试结果
- 实现 CyclicBarrier功能
- 代码展示
- 测试结果
- 总结
介绍
一个可重复使用的同步屏障,功能类似于CyclicBarrier和CountDownLatch,但支持更灵活的使用。该工具类是 JDK 1.7才引入的。功能比 CyclicBarrier 和 CountDownLatch 都要强大。
CyclicBarrier 和 CountDownLatch 的学习地址
特性
动态调整线程个数
CyclicBarrier 所要同步的线程个数是在构造函数中指定的,之后不能更改,而Phaser可以在运行期间动态地调整要同步的线程个数。用来修改同步线程个数的函数有:
public int register() {
//注册一个线程
return doRegister(1);
}
public int bulkRegister(int parties) {
if (parties < 0)
throw new IllegalArgumentException();
if (parties == 0)
return getPhase();
//注册多个线程
return doRegister(parties);
}
public int arriveAndDeregister() {
//解注册,减少线程个数
return doArrive(ONE_DEREGISTER);
}
层次Phaser
即 多个Phaser 可以组成 如下的树状结构,可以通过在构造函数中传入父Phaser 来实现
public Phaser(Phaser parent) {
//传入Phaser 作为 该对象的父 节点
this(parent, 0);
}
如上图,如果传Phaser 参数过来,对应的线程数量会相加。对于树状Phaser 上的每个节点来说,可以当作一个独立的Phaser来看待,其运作机制和一个单独的Phaser 是一样的,具体来讲,Phaser 并不用感知子Phaser的存在,当子Phaser中注册的参与者数量大于0时,会把自己向父节点注册,当子Phaser中注册的参与者数量等于0时,会自动向父节点解除注册。父Phaser把子Phaser当作一个正常参与的线程即可。
源码分析
state 变量解析
Phaser 没有基于AQS来实现,但具备AQS的核心特性:state 变量,CAS 操作,阻塞队列等。
首先看下state 变量。
/**
* Primary state representation, holding four bit-fields:
* state 分为了四个部分。
* unarrived -- the number of parties yet to hit barrier (bits 0-15)
* parties -- the number of parties to wait (bits 16-31)
* phase -- the generation of the barrier (bits 32-62)
* terminated -- set if barrier is terminated (bit 63 / sign)
*
* Except that a phaser with no registered parties is
* distinguished by the otherwise illegal state of having zero
* parties and one unarrived parties (encoded as EMPTY below).
*
* To efficiently maintain atomicity, these values are packed into
* a single (atomic) long. Good performance relies on keeping
* state decoding and encoding simple, and keeping race windows
* short.
*
* All state updates are performed via CAS except initial
* registration of a sub-phaser (i.e., one with a non-null
* parent). In this (relatively rare) case, we use built-in
* synchronization to lock while first registering with its
* parent.
*
* The phase of a subphaser is allowed to lag that of its
* ancestors until it is actually accessed -- see method
* reconcileState.
*/
private volatile long state;
从英文注释中我们可以得知:这个64位的state变量被拆分成4部分。如图所示
最高位0表示未同步完成,1表示同步完成。初始最高位为0.可以看下如下几个函数对state 的各个部分组成的获取。获取当前轮数(32-62位)
/**
* Returns the current phase number. The maximum phase number is
* {@code Integer.MAX_VALUE}, after which it restarts at
* zero. Upon termination, the phase number is negative,
* in which case the prevailing phase prior to termination
* may be obtained via {@code getPhase() + Integer.MIN_VALUE}.
*
* @return the phase number, or a negative value if terminated
*/
public final int getPhase() {
//无符号右移(高位补0) 32位。
//然后强制int .则最高位还是 是否为同步信息。
//如果是负数,说明最高位是1,已经完成了同步
return (int)(root.state >>> PHASE_SHIFT);
}
private static final int PHASE_SHIFT = 32;
判断当前轮数同步是否完成。
/**
* Returns {@code true} if this phaser has been terminated.
*
* @return {@code true} if this phaser has been terminated
*/
public boolean isTerminated() {
//判断整个值是否是负数即可。最高位也是符号位。
return root.state < 0L;
}
获取总注册线程数
/**
* Returns the number of parties registered at this phaser.
*
* @return the number of parties
*/
public int getRegisteredParties() {
return partiesOf(state);
}
private static int partiesOf(long s) {
//先将s 转成int.相当于把高位 是否完成同步和轮数舍弃了。
//只剩下低位的总线程数和未达到线程数。然后向右无符号右移16位(高位补0)。
//就只剩下了总线程数
return (int)s >>> PARTIES_SHIFT;
}
private static final int PARTIES_SHIFT = 16;
获取未达到的线程数
/**
* Returns the number of registered parties that have not yet
* arrived at the current phase of this phaser. If this phaser has
* terminated, the returned value is meaningless and arbitrary.
*
* @return the number of unarrived parties
*/
public int getUnarrivedParties() {
return unarrivedOf(reconcileState());
}
private static int unarrivedOf(long s) {
// 首先将state强转为int,舍弃高位的 是否完成同步和轮数信息
int counts = (int)s;
//不为空的话,则和 0xffff 进行与操作,截取 低16位。即未到达的线程数
return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
}
private static final int EMPTY = 1;
private static final int UNARRIVED_MASK = 0xffff; // to mask ints
构造函数对state变量赋值
如果能理解state 变量的分布图。64位的划分,划分为4个部分。 那么就很容易理解构造函数的初始了。
/**
* Creates a new phaser with the given parent and number of
* registered unarrived parties. When the given parent is non-null
* and the given number of parties is greater than zero, this
* child phaser is registered with its parent.
*
* @param parent the parent phaser
* @param parties the number of parties required to advance to the
* next phase
* @throws IllegalArgumentException if parties less than zero
* or greater than the maximum number of parties supported
*/
public Phaser(Phaser parent, int parties) {
//初始化未到达的线程数,不能大于2 的16次方。上面有个state 变量划分图
if (parties >>> PARTIES_SHIFT != 0)
throw new IllegalArgumentException("Illegal number of parties");
//初始轮数为0
int phase = 0;
this.parent = parent;
//判断parent 是否为null。不为null.则把自己注册到父对象中
if (parent != null) {
final Phaser root = parent.root;
this.root = root;
this.evenQ = root.evenQ;
this.oddQ = root.oddQ;
if (parties != 0)
phase = parent.doRegister(1);
}
else {
this.root = this;
this.evenQ = new AtomicReference<QNode>();
this.oddQ = new AtomicReference<QNode>();
}
//如果parties为0,则赋值EMPTY.
//不为0,则将 phare 左移 32位 轮数赋值
// 将parties 左移16位 总线程数数,初始值和未达到数一样
// parties 未达到线程数
// 最后将三个数 进行或运算 (只有有一个是1,则结果为1)
this.state = (parties == 0) ? (long)EMPTY :
((long)phase << PHASE_SHIFT) |
((long)parties << PARTIES_SHIFT) |
((long)parties);
}
private static final int PARTIES_SHIFT = 16;
private static final int PHASE_SHIFT = 32;
private static final int EMPTY = 1;
阻塞方法
如下图所示,右边的主线程会调用awaitAdvance()进行阻塞。左边的arrive()会对state进行cas的累减操作。当未到达的线程数减到0时,唤醒右边阻塞的主线程。
在这里,阻塞使用的是一个称为Treiber Stack的数据结构,而不是AQS的双向链表。Treiber Stack是一个单向链表,出栈和入栈都在链表头部,所以只需要一个head指针,而不需要tail指针。
Treiber Stack 代码
static final class QNode implements ForkJoinPool.ManagedBlocker {
final Phaser phaser;
final int phase;
final boolean interruptible;
final boolean timed;
boolean wasInterrupted;
long nanos;
final long deadline;
volatile Thread thread; // nulled to cancel wait 每个Node 对应一个Thread 对象
QNode next; //单项链表,
QNode(Phaser phaser, int phase, boolean interruptible,
boolean timed, long nanos) {
this.phaser = phaser;
this.phase = phase;
this.interruptible = interruptible;
this.nanos = nanos;
this.timed = timed;
this.deadline = timed ? System.nanoTime() + nanos : 0L;
thread = Thread.currentThread();
}
}
/**
* Heads of Treiber stacks for waiting threads. To eliminate
* contention when releasing some threads while adding others, we
* use two of them, alternating across even and odd phases.
* Subphasers share queues with root to speed up releases.
*/
//为了减少并发冲突,这里定义了2个链表,也就是2个Treiber Stack。
private final AtomicReference<QNode> evenQ;
private final AtomicReference<QNode> oddQ;
private AtomicReference<QNode> queueFor(int phase) {
//当phase 为奇数的时候,阻塞线程放在addQ里面
// 为偶数的时候,阻塞线程放在eventQ里面
return ((phase & 1) == 0) ? evenQ : oddQ;
}
arrive()
对state 变量进行操作,然后唤醒线程。
arrive()和arriveAndDeregister()内部调用的都是doArrive() 函数。区别在于前者只是把未达到线程数减一。后者则把未到达线程数和下一轮的总线程数都减一。
public int arrive() {
return doArrive(ONE_ARRIVAL);
}
private static final int ONE_ARRIVAL = 1;
public int arriveAndDeregister() {
return doArrive(ONE_DEREGISTER);
}
//65536
private static final int ONE_PARTY = 1 << PARTIES_SHIFT;
//65537
private static final int ONE_DEREGISTER = ONE_ARRIVAL|ONE_PARTY;
doArrive()
把未到达线程数减一,减一之和,还未到1(空置设置的是1,这里和jdk1.7有点区别)。什么都不做。直接返回。如果到1后。说明所有线程到达。 然后会处理两件事:
1)重置state,把state 的未到达线程个数重置到总的注册的线程数中。同时phaser加1
2).唤醒队列中的线程
private int doArrive(int adjust) {
final Phaser root = this.root;
for (;;) {
long s = (root == this) ? state : reconcileState();
int phase = (int)(s >>> PHASE_SHIFT);
//小于0,直接返回
if (phase < 0)
return phase;
int counts = (int)s;
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
if (unarrived <= 0)
throw new IllegalStateException(badArrive(s));
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {//CAS 减一
if (unarrived == 1) {//所有线程到达
long n = s & PARTIES_MASK; // base of next state
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
if (root == this) {//父 Phaser 为空,调用自己的 onAdvance()
if (onAdvance(phase, nextUnarrived))
n |= TERMINATION_BIT;//最高位置为1
else if (nextUnarrived == 0)
n |= EMPTY;
else
n |= nextUnarrived;//下一轮的未到达数等于总的线程数
int nextPhase = (phase + 1) & MAX_PHASE;
n |= (long)nextPhase << PHASE_SHIFT;
//重置state
UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
//释放所有线程
releaseWaiters(phase);
}
else if (nextUnarrived == 0) { // propagate deregistration
//父 Phaser 不为空,调用父 的doArrive()并且下个未到达==0.
//则把未到的线程和下一轮未到的线程都减一
phase = parent.doArrive(ONE_DEREGISTER);
UNSAFE.compareAndSwapLong(this, stateOffset,
s, s | EMPTY);
}
else
// 父 Phaser 不为空,调用父 的doArrive() 减一个
phase = parent.doArrive(ONE_ARRIVAL);
}
//没有全部到达,直接返回
return phase;
}
}
}
releaseWaiters()
private void releaseWaiters(int phase) {
QNode q; // first element of queue
Thread t; // its thread
//根据phase是奇数还是偶数来找对应的栈
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
//遍历整个栈
while ((q = head.get()) != null &&
//如果当前节点的phase 不等于当前Phaser的phase,说明该节点不是当前轮的,而是前一轮的。
//需要被释放并唤醒
q.phase != (int)(root.state >>> PHASE_SHIFT)) {
if (head.compareAndSet(q, q.next) &&
(t = q.thread) != null) {
q.thread = null;
LockSupport.unpark(t);
}
}
}
awaitAdvance()
internalAwaitAdvance 方法主要是调用了ForkJoinPool.manageBlock函数。目的是把Node对应的线程阻塞。
public int awaitAdvance(int phase) {
final Phaser root = this.root;
//root==this 表示没有树状结构。只有一个Phaser
long s = (root == this) ? state : reconcileState();
int p = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
//phase 已经结束,不用阻塞了,直接返回
return phase;
if (p == phase)
//阻塞在这一层上面
return root.internalAwaitAdvance(phase, null);
return p;
}
业务场景
实现CountDownLatch功能
前面我们用 CountDownLatch实现了 主线程等待10 个线程预加载数据完成后再执行 加载其他组件功能。
代码可以参考之前的CountDownLatch文章.
代码
public class DoThing extends Thread{
private final Phaser startPhaser;
private final Phaser donePhaser;
public DoThing(Phaser startSignal, Phaser doneSignal) {
this.startPhaser = startSignal;
this.donePhaser = doneSignal;
}
@Override
public void run() {
try {
//开始阻塞了,等待主线程开启
System.out.println(Thread.currentThread().getName()+" 开始阻塞等待了...");
startPhaser.awaitAdvance(startPhaser.getPhase());
doWork();
donePhaser.arrive();
} catch (InterruptedException ex) {
}
}
public void doWork() throws InterruptedException {
System.out.println(Thread.currentThread().getName() + " 开始干活了,DB 中的数据加载到本地缓存中");
Thread.sleep(1000);
}
}
/**
* @author :echo_黄诗
* @description:利用Phaser 来实现 CountDownLatch 功能
* @date :2023/3/3 18:50
*/
public class Run {
public static void main(String[] args) {
Phaser startPhaser = new Phaser(1);
Phaser donePhaser = new Phaser(10);
for (int i = 0; i < 10; ++i) // create and start threads
new Thread(new DoThing(startPhaser, donePhaser)).start();
doSomethingElse();
//开始加载数据,
startPhaser.arrive();
//主线程阻塞,等待数据加载完成
donePhaser.awaitAdvance(donePhaser.getPhase());
doSomethingElse();
System.out.println("数据加载完成,开始启动其他组件,包括dubbo组件");
}
public static void doSomethingElse(){
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
测试结果
实现 CyclicBarrier功能
前面CyclicBarrier章节 我们利用 CyclicBarrier 实现了10个求职者 一起来了后开始笔试,然后一起面试的功能。之前的CyclicBarrier的代码可以参考.
代码展示
工具类
public class Utils {
/**
* 模拟在路上方法
*/
public static void doOnTheWay() {
doCostTime(2000);
}
/**
* 模拟笔试过程
*/
public static void doWriteExam() {
doCostTime(3000);
}
/**
* 模拟面试过程
*/
public static void doInterview() {
doCostTime(5000);
}
private static void doCostTime(int time) {
Random random = new Random();
try {
//随机休眠时间
int count = random.nextInt(time);
// System.out.println(count);
Thread.sleep(count);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
面试类
public class Interview extends Thread{
private Phaser phaser;
public Interview(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
phaser.arrive();
Utils.doInterview();
System.out.println(Thread.currentThread().getName() + " 面试完啦.....");
}
}
笔试类
public class WriteExam extends Thread{
private Phaser phaser;
public WriteExam(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
Utils.doWriteExam();
phaser.arrive();
System.out.println(Thread.currentThread().getName() + " 笔试做完了....");
}
}
来公司路上
public class OnTheWay extends Thread{
private Phaser phaser;
public OnTheWay(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
Utils.doOnTheWay();
System.out.println(Thread.currentThread().getName() + " 已经来公司了...");
phaser.arrive();
}
}
运行类
此处需要注意,因为是主线程执行 awaitAdvance。所以需要先执行子线程。不然主线程执行就直接阻塞了。
子线程都没有机会执行。因为子线程是靠主线程启动的。
/**
* @author :echo_黄诗
* @description:用Phaser 实现CyclicBarrier功能
* @date :2023/3/3 19:08
*/
public class Run {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor= new ThreadPoolExecutor(10,
20,100, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(100));
Phaser phaser=new Phaser(10);
for (int i=0;i<10;i++){
threadPoolExecutor.execute(new OnTheWay(phaser));
}
phaser.awaitAdvance(phaser.getPhase());
for (int i=0;i<10;i++){
threadPoolExecutor.execute(new WriteExam(phaser));
}
phaser.awaitAdvance(phaser.getPhase());
for (int i=0;i<10;i++){
threadPoolExecutor.execute(new Interview(phaser));
}
phaser.awaitAdvance(phaser.getPhase());
}
}
测试结果
从打印截图可以看出,使用Phaser 能很好的实现了CyclicBarrier的同步阻塞功能。
但是目前还不能实现其回调函数的功能。
总结
Phaser 相比CountDownLatch ,Semaphore,CyclicBarrier 的源码,实现上还是复杂的多。但使用上面比较简单。这里只是给大家一个例子。知道该如何用。目前分析源码也不是很透彻。如果要彻底弄清楚,可以参考 Java并发实现原理这本教程。