前言
相关系列
- 《Java & Lock & 目录》(持续更新)
- 《Java & Lock & CyclicBarrier & 源码》(学习过程/多有漏误/仅作参考/不再更新)
- 《Java & Lock & CyclicBarrier & 总结》(学习总结/最新最准/持续更新)
- 《Java & Lock & CyclicBarrier & 问题》(学习解答/持续更新)
涉及内容
- 《Java & Lock & 总结》
- 《Java & Lock & CountDownLatch & 总结》
- 《Java & Lock & ReentrantLock & 总结》
概述
简介
CyclicBarrier @ 循环栅栏类是俗称“三剑客”的三类常用线程控制工具之一,用于通过批量拦截/释放确保指定数量的线程同时开始/结束对资源的访问。所谓拦截,本质是令线程进入等待状态。循环栅栏类被广泛用于对多线程执行时机进行协调控制的场景,例如控制多线程任务同时执行/统一结束等。循环栅栏类采用减法计数作为拦截线程总数的统计方式,其会在拦截线程总数到达拦截上限前拦截所有经过的线程,并在达到拦截上限时统一释放。从核心功能上来说,循环栅栏类与“三剑客”中的CountDownLatch @ 倒数闭锁类是完全一致的,即都被设计用于对多线程任务进行批次控制。但两者在功能细节上却存在区别,最典型的差异是:循环栅栏类可以做到倒数闭锁类无法实现的循环拦截,即其会在旧批次的拦截线程释放后自动开启新批次的拦截,该知识点会在下文讲解循环时详述。此外,循环栅栏类支持在释放拦截线程时执行自定义操作,该知识点会在下文讲解自定义时详述。
循环栅栏可能会被损坏。所谓损坏是指循环栅栏失去拦截能力,这种情况下正被拦截的线程及后续到达的线程都将抛出损坏栅栏异常。循环栅栏通常不会损坏,但导致循环栅栏损坏的原因却不少,具体可以列为线程拦截时间超时抛出超时异常;线程在拦截期间被中断抛出中断异常;自定义操作执行异常及循环栅栏在拦截期间被重置四种。可以发现,循环栅栏损坏是一种全局性的状态,即任意拦截线程的异常都会导致循环栅栏的损坏,而损坏的循环栅栏又会进一步导致其它拦截线程抛出损坏栅栏异常。损坏后的循环栅栏可以通过重置恢复拦截能力,即修复被损坏的状态。因此虽然在拦截过程中重置循环栅栏会导致正被拦截的线程抛出损坏栅栏异常,但新达到的线程依然可以被正常拦截,该知识点会在下文讲解拦截时详述。
循环栅栏类直接基于ReentrantLock @ 可重入锁类,间接基于AQS类的独占模式实现。与直接基于AQS类共享模式实现的倒数闭锁类不同,循环栅栏类是直接可重入锁类实现的。但又因为可重入锁类直接基于AQS类的独占模式实现,因此循环栅栏类本质上也基于AQS类实现,只不过是以间接的方式而已。需要注意的是:循环栅栏类并不是可重入锁类的子类,其是通过在内部组合可重入锁类的字段来实现自身设计的。
与倒数闭锁类的对比
- 倒数闭锁类是一次性的,只能进行单次批量拦截;而循环栅栏类则支持多次批量拦截;
- 倒数闭锁类不支持在拦截线程释放时执行自定义操作;而循环栅栏类则支持,但如果没有必要也可以选择不执行;
- 倒数闭锁类基于AQS类共享模式实现;而循环栅栏类则直接基于可重入锁类,间接基于AQS类独占模式实现,这也是循环栅栏类可以实现循环拦截的核心原因;
- 倒数闭锁类的任意拦截线程异常不会导致其它拦截线程异常;而循环栅栏类的任意拦截线程异常都将导致同批次的拦截线程抛出损坏栅栏异常;
- 倒数闭锁类的线程“拦截”与“计数”是分离的,因此可以拦截与拦截上限不同数量的线程,并且线程释放也可以由拦截线程以外的线程控制;而循环栅栏类的线程“拦截”与“计数”则是绑定的,因此可拦截线程数量与拦截上限必然等同,并且线程释放也只能由拦截线程控制。
使用
创建
-
public CyclicBarrier(int parties) —— 创建指定拦截上限的循环栅栏。
-
public CyclicBarrier(int parties, Runnable barrierAction) —— 创建指定拦截上限/自定义操作的循环栅栏。
方法
-
public int getParties() —— 获取同行者 —— 获取当前循环栅栏的拦截上限,即每批次可拦截的线程总数。
-
public int getNumberWaiting() —— 获取等待数量 —— 获取当前循环栅栏当前批次中正在被拦截的线程总数。
-
public int await() throws InterruptedException, BrokenBarrierException —— 等待 —— 通过当前循环栅栏的当前批次令当前线程无限等待至拦截线程总数达到拦截上限并返回拦截索引为止。拦截线程总数达到拦截上限后方法会释放当前批次的所有拦截线程,并自动开启新批次的拦截。拦截索引用于表示当前线程在当前批次拦截线程中的顺序,取值范围为[0, getParties() - 1],数字越大意味着拦截越早。如果当前线程在拦截期间被中断则抛出中断异常并损坏当前循环栅栏;如果在拦截期间当前循环栅栏被损坏则抛出损坏栅栏异常;如果当前线程是当前批次最后拦截的线程则在释放前会执行自定义操作(如果存在的话)。
-
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException —— 等待 —— 通过当前循环栅栏的当前批次令当前线程有限等待至拦截线程总数达到拦截上限并返回拦截索引为止,超出指定等待时间则抛出超时异常并损坏当前循环栅栏。拦截线程总数达到拦截上限后方法会释放当前批次的所有拦截线程,并自动开启新批次的拦截。拦截索引用于表示当前线程在当前批次拦截线程中的顺序,取值范围为[0, getParties() - 1],数字越大意味着拦截越早。如果指定等待时间不合法(<=0)则当前线程不会被拦截;如果当前线程在拦截期间被中断则抛出中断异常并损坏当前循环栅栏;如果在拦截期间当前循环栅栏被损坏则抛出损坏栅栏异常;如果当前线程是当前批次最后拦截的线程则在释放前会执行自定义操作(如果存在的话)。
-
public boolean isBroken() —— 是否损坏 —— 判断当前循环栅栏是否损坏,是则返回true;否则返回false。
-
public void reset() —— 重置 —— 重置当前循环栅栏,本质是先损坏当前批次再开启新批次,因此该方法会导致旧批次正被拦截的线程抛出损坏栅栏异常,但后续达到的新批次线程可被正常拦截。
模板
/**
* 线程池执行器(注意!!!此处只是为了快速获取线程池执行器,开发环境不推荐使用
* newFixedThreadPool(int nThreads)方法创建线程池执行器,易造成OOM。此外,
* 当将循环栅栏类与线程池执行器类配合使用时,必须保证线程池执行器中的执行线程数量
* 大于循环栅栏的[同行者总数],否则可能导致循环栅栏因为无法拦截到[同行者总数]数
* 量的线程而永远无法释放拦截线程!!!
*/
private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(3);
public static void main(String[] args) {
// 创建无栅栏活动的循环栅栏。
// CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
// 创建有栅栏活动的循环栅栏。
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
System.out.println("数量已达标!!!拦截线程已被释放!!!");
});
// 使用线程池执行器执行任务。
for (int i = 1; i <= 2; i++) {
System.out.println("第【" + i + "】批次/代拦截开启....");
for (int j = 1; j <= 3; j++) {
EXECUTOR_SERVICE.submit(() -> {
try {
long threadId = Thread.currentThread().getId();
System.out.println("线程【" + threadId + "】开始执行任务...");
System.out.println("线程【" + threadId + "】被拦截...");
cyclicBarrier.await();
System.out.println("线程【" + threadId + "】继续执行任务...");
System.out.println("线程【" + threadId + "】结束执行任务!!!");
} catch (InterruptedException | BrokenBarrierException ignored) {
// 什么也不做。
}
});
}
try {
// 主线程等待5s,目的是令拦截的线程输出完整的日志。
Thread.sleep(5000);
} catch (InterruptedException ignored) {
// 什么也不做。
}
System.out.println("第【" + i + "】批次/代拦截结束!!!");
System.out.println();
}
}
实现
AQS类/可重入锁类
可重入锁类可提供循环栅栏类设计实现的完整支持。可重入锁类基于AQS类的独占模式实现,该知识点会在可重入锁类的相应文章中详述。循环栅栏类选择使用可重入锁类作为其底层实现的核心原因有二:一是可重入锁类可提供循环设计所需的同步/线程安全环境;二是其条件机制可作为拦截设计的实际功能提供方。对于前者而言,虽然尚未对循环拦截的实现机制进行详述,但不难猜测其本质必然是对内部资源的重置。而由于循环栅栏类是设计在并发环境中使用的API,因此其必须保证内部资源在重置期间不受并发线程的干扰而发生错乱,即重置必须是原子操作,否则就可能因为内部资源不可靠而导致“线程被提前释放”等异常情况的发生,而可重入锁类作为独占特性的锁类API可以轻易提供这种线程安全的环境;对于后者而言,由于线程拦截的本质是令线程进入有限/无限等待状态,因此可重入锁类具备线程管理能力的条件机制便可直接作为拦截设计的实际功能提供方,即直接通过条件机制来令线程进入有限/无限等待状态。
同行者/总数
拦截上限具体是指循环栅栏类在一批次中所允许拦截的最大线程总数。循环栅栏类对于每批次可拦截的线程总数存在拦截上限,该拦截上限会在创建循环栅栏时传入并保存在[parties @ 同行者]中。当发现拦截线程总数已达[同行者]时,循环栅栏将释放当前批次的所有拦截线程,并开启新一轮的拦截批次。
拦截线程总数的统计采用减法计数。循环栅栏类设计将每批次的拦截线程总数保存在[count @ 总数]中,当循环栅栏创建/开启新的拦截批次时,循环栅栏会将[总数]初始化/重置为[同行者],并在每次拦截线程时递减[总数],直至[总数]归零为止。[总数]归零意味着当前批次的拦截线程总数已达[同行者],这种情况下循环栅栏会在释放所有拦截线程后重置[总数],并开启新的拦截批次,因此拦截线程总数的统计是通过减法计数实现的。
拦截/条件机制
条件机制令可重入锁类具备了暂时解锁的能力。通过使用可重入锁类提供的条件机制,即通过对可重入锁类newCondition()方法返回的Condition @ 条件接口对象的使用,线程可以做到在已持有可重入锁的情况下原子性地暂时解锁并进入等待状态,直至在满足人为定义的条件后重新恢复持有。这种暂时解锁的方式相对于标准的永久解锁而言具备三点特性:一是原子性,即无论线程重入加锁了可重入锁多少次都可以一次性解锁,而无需像永久解锁一样需要执行等同于重入次数的多次解锁;二是暂时解锁后的线程会被自动纳入条件机制的条件队列中等待,而不会像永久解锁的线程一样执行自定义的后续逻辑。关于条件队列的内容在此不会赘述,只需知道这些等待线程可以通过条件机制重新被唤醒即可;三是持有必然会被恢复,无论暂时解锁的线程为了条件达成而等待了多久,其最终都必然会被唤醒并恢复(本质还是竞争)对可重入锁的持有,并且重入次数也与暂时解锁前等同,否则就说明开发者未曾合法使用条件机制。条件机制的存在为线程提供了在已持有可重入锁的情况下等待条件达成的能力,虽说令线程等待/唤醒可以通过多种方法/API实现,但如果条件必须依赖其它线程在持有可重入锁的情况下达成,那使用条件机制则将是其唯一的选择,因为独占特性的可重入锁不允许被多个线程同时持有。
条件机制由条件接口定义。虽说有些跑题,但这里依然要着重说明的是:“条件机制”一词本意是指由条件接口定义的条件机制概念,而上文中提及的条件机制则是对由ConditionObject @ 条件队列类提供的条件机制实现的代称。虽说条件对象类是Java JDK对条件接口的唯一实现类,但严格来说“条件机制”一词并不能与可重入锁类的条件机制画上等意。原因是仅提供令线程等待/唤醒基本功能定义的条件接口/条件机制是不含任何逻辑性/目的性的。线程具体会出于什么原因/条件/目的,又是以什么方式等待/唤醒这些都由条件接口的实现类具体决定。故而可重入锁类这种提供暂时解锁能力的条件机制本质上只是对条件机制概念的一种实现,并无法全权代表条件机制概念本身。因为只要存在相应需求/意愿,开发者完全可以基于自身设计构造一套截然不同的条件机制实现。此外还值得一说的是:条件对象类其实是AQS类的内部类,因此可重入锁类的条件机制实际上是由AQS类全权提供的。
循环栅栏类通过可重入锁类的暂时解锁能力实现对线程的批量拦截。循环栅栏会令线程在内部可重入锁的保护下递减[总数]并判断是否归零,未归零意味着拦截线程总数尚未达到[同行者],这种情况线程会通过条件机制暂时解锁可重入锁并加入条件队列中等待,该等待便是拦截的本质。而随着后续线程同样暂时解锁而导致条件队列中等待的线程越来越多,批量拦截的效果便得以达成。
释放/栅栏命令
批次中最后达到的线程(下文简称最后线程)会释放所有拦截线程,并开启新的拦截批次。[总数]会在最后线程执行递减后归零,[总数]归零意味着拦截线程总数已达[同行者],因此最后线程不会再暂时解锁令自身被拦截,而是会通过条件机制唤醒之前被拦截的所有线程以令其恢复执行,从而达到释放拦截线程的效果。此外,最后线程会在释放拦截线程后执行内部资源的重置操作以支持下一批次的拦截,该操作的详细内容会在下文详述,此处只需知道当最后线程通过时循环栅栏必然已开启新的拦截批次即可。
最后线程会在释放拦截线程前执行自定义操作。自定义操作的本质是保存在[barrierCommand @ 栅栏命令]中的Runnable @ 可运行,其可在创建循环栅栏时通过特定的构造方法指定,并会在最后线程释放同批次的拦截线程前被执行。由于循环栅栏类支持多批次拦截,因此[栅栏命令]也会被执行多次。
最后线程会最先通过循环栅栏。释放的本质是最后线程通过可重入锁类的条件机制唤醒之前等待的线程,因此无论同步块(即循环栅栏类中受可重入锁类保护的代码)中是否还有后续流程,释放后的拦截线程都需要重新竞争可重入锁以恢复持有,并最终通过永久解锁的方式通过循环栅栏。而由于最后线程在唤醒等待线程时无需暂时解锁,因此其会在始终持有可重入锁的状态下完成整体流程,故而在最后线程通过循环栅栏之前,释放线程永远不可能恢复持有,更不要论后续通过循环栅栏了,因此最后线程会是最先通过循环栅栏的线程。
旧批次的释放线程可能与新批次的拦截线程并发访问循环栅栏。已知最后线程会是最先通过循环栅栏的线程,并且当其通过时循环栅栏已开启新批次的拦截,因此如果在旧批次的释放线程通过循环栅栏期间有新线程到达,则就会出现旧批次的释放线程与新批次的拦截线程并发访问循环栅栏的情况。不过这并不会造成拦截批次的错乱,因为此时旧批次的拦截线程必然都已脱离条件队列,故而不会出现不同批次的拦截线程共存于条件队列的情况。此外,如果拦截线程因为循环栅栏被损坏而被释放且随后循环栅栏又被重置,则也可能导致新/旧批次线程并发访问的情况发生,该知识点会在下文讲解代/循环时详述。我们由此可知虽然循环栅栏中可能存在多个批次,即可能存在隶属于不同批次的线程并发访问循环栅栏的情况,但除了最新批次的线程可能为拦截线程外,其它批次的线程必然皆为释放线程。
代/循环
在正式讲述循环之前,我们需要先了解Generation @ 代的概念。代是循环栅栏类用于代表批次的具象化产物,其本质是私有静态内部类代类的实例。循环栅栏类每开启新批次的拦截时都会为之创建新代用于与拦截线程进行关联,而由于循环栅栏中可能存在多个批次,因此可知循环栅栏中可能存在多个代。这些代会以拦截线程局部变量的形式被保存,但最新批次的代还会被“额外”保存在[generation @ 代]中。或者说正是因为最新批次的代会被保存在[代]中,而所有的代又都曾是最新代,因此拦截线程在到达循环栅栏时都会从[代]中获取代并作为局部变量保存,从而建立起与代/批次的关联。虽然循环栅栏中可能存在多个代,但除了[代]所代表的最新批次可能处于对线程的拦截过程中外,其它代所代表的批次必然都已释放了拦截线程(虽然释放线程可能还未恢复持有/通过循环栅栏/抛出异常),因为新代的创建会在旧批次的拦截线程唤醒/释放之后进行。代是循环栅栏类非常重要的知识点,其不仅与循环特性紧密相连,还与后续损坏的内容息息相关。
循环拦截是循环栅栏类的最大特性,其本质在于内部资源的重置。[总数]达到[同行者]时,为了开启新批次的拦截,最后线程需要将循环栅栏的内部资源重置为初始状态。这其中具体包括的内容有三:一是释放当前批次的拦截线程。该重置操作已在上文中详细阐述过,其本质在于清空条件队列,以确保旧批次的拦截线程不会遗留到新批次中;二是[总数]重置。由于旧批次拦截已将[总数]归零,因此循环栅栏类需要将[总数]恢复为[同行者]以支持新批次的拦截/递减;三是创建新代。循环栅栏会为新批次创建新代以作为标记,并会将之保存在[代]中用于比对使用。
基于AQS类的独占模式实现是循环栅栏类实现循环拦截的必要条件。我们大概可以猜测到的是循环栅栏类的性能并不会十分优秀,因为多线程竞争加锁可重入锁是会大幅降低性能的行为,并且条件机制的使用还会加剧这种竞争,因此可想而知循环栅栏类在性能上大致是无法与基于AQS类共享模式实现的倒数闭锁类相比较的。但话虽如此,独占特性却是循环栅栏类实现循环拦截的必要条件,因为只有在线程安全的环境下才能保证所有内部资源的重置可以在执行过程中不受其它线程的打扰,从而避免“[总数]还未重置/代还未新建就有新线程达到循环栅栏”等一系列并发问题的产生。
损坏/重置
损坏的循环栅栏会失去拦截线程能力,此时正被拦截的线程及后续到达的线程都将抛出损坏栅栏异常。导致循环栅栏损坏的原因具体可以分为以下四种:
- 线程拦截时间超时而抛出超时异常:当线程因为超时唤醒时,其会先损坏循环栅栏,再抛出超时异常;
- 线程在拦截期间被中断而抛出中断异常:中断异常实际上是由条件机制抛出的,但循环栅栏类会捕获中断异常,并在损坏循环栅栏后再次抛出;
- [栅栏命令]执行异常:最后线程会在try块中执行[栅栏命令],并在finally块中确保出现异常时损坏循环栅栏;
- 循环栅栏在拦截线程期间被重置。
循环栅栏损坏的本质是[代]的损坏。当上述四种情况的前三种发生时,异常拦截线程会将[代]的[broken @ 损坏]设置为true表示其已被损坏,再通过条件机制唤醒被拦截的所有线程。而当释放线程恢复持有后如果发现其关联的代已被损坏则不会再正常通过循环栅栏,而是直接抛出损坏栅栏异常,并且后续到达循环栅栏的线程也会同样因为发现[代]已损坏而直接抛出损坏栅栏异常。一个值得讨论的点是:既然异常拦截线程会直接损坏[代],那为什么释放线程不直接通过[代]而是通过其关联代来判断循环栅栏是否损坏呢?这是因为在上述三种情况中异常拦截线程都处于被拦截期间,因此其必然属于最新批次,即其关联代与[代]是相同的,故而可以直接对[代]进行损坏。而释放线程的关联代虽然理论上与[代]是相同的(因为其与异常拦截线程属于相同批次),但却不排除[代]被更新的可能。因为在释放线程进行持有恢复的过程中完全可能有外部线程通过reset()方法对循环栅栏进行了重置,该过程会为[代]创建新的实例,因此释放线程的关联代与[代]可能并不相同,所以不可以直接通过[代]来判断循环栅栏是否损坏。由此可知,虽然代的损坏意味着其代表批次的异常结束,但只有的[代]的损坏才表示循环栅栏损坏。
重置可以修复损坏的循环栅栏,即令循环栅栏恢复拦截线程的能力。所谓的重置指的是reset()方法,该方法会通过将循环栅栏重置为初始状态的方式令其恢复拦截线程的能力,因此其本质与最后线程为循环栅栏开启新批次是完全一致的。由于除非损坏,否则执行重置时循环栅栏必然处于对线程的拦截过程中。此时如果直接重置将大概率导致循环栅栏的当前批次在未拦截到[同行者]数量的线程下就释放拦截线程,这是不符合循环栅栏类定义的。因此reset()方法在正式重置循环栅栏前会先将之损坏,因为在循环栅栏损坏的状态下无需等待拦截线程总数达到[同行者]即可释放,但后果就是当前批次已被拦截的线程都将抛出损坏栅栏异常,也就是上述四种原因的最后一种情况。与前三种情况不同的是:由于循环栅栏损坏后会被立刻重置,因此后续到达的线程不会抛出损坏栅栏异常,因为重置已为循环栅栏创建了新[代]/开启了新批次。同时我们也可知:即使循环栅栏在损坏后已被重置,但只要曾经批次的代被损坏,则该批次的释放线程就会抛出损坏栅栏异常。
中断/超时的拦截线程可能抛出损坏栅栏异常。已知的是:如果线程在拦截期间中断/超时,则理论上其应该在损坏循环栅栏后抛出中断/超时异常。但有一种情况是:如果拦截线程在因为中断/超时唤醒后发现关联代已被损坏,则其不会再执行上述操作而是会抛出损坏代异常,因为这说明损坏发生在中断/超时之前…或者说其认为损坏发生在中断/超时之前,因为损坏也可能是在其唤醒后与检查前发生的,不过这无法被具体探测。由此我们可知:即使同一批次的拦截线程中有多个线程并发中断/超时,但除最早损坏循环栅栏的线程会抛出中断/超时异常外,其它线程最终都只会抛出损坏栅栏异常。