文章目录
- 概述
- 1. Semaphore
- 2. Exchanger
- 3. CountDownLatch
- 4. CyclicBarrier
- 5. Phaser
- 原理 & Code
- 1. Semaphore
- 2. Exchanger
- 3. CountDownLatch
- 4. CyclicBarrier
- 5. Phaser
概述
Java 的 java.util.concurrent
包提供了许多实用的工具类,用于简化并发编程。这些工具类帮助开发者管理和协调多线程操作,从而避免手动实现复杂的同步机制。下面是一些主要的工具类及其作用的概述:
类 | 作用 |
---|---|
Semaphore | 限制同时访问某个资源的线程数量 |
Exchanger | 两个线程间交换数据 |
CountDownLatch | 使一个或多个线程等待,直到其他线程完成任务后再继续执行 |
CyclicBarrier | 使一组线程在某个屏障点同步,屏障可以被重复使用 |
Phaser | 更加灵活的同步工具,支持多阶段任务同步,类似但更强大于CyclicBarrier |
1. Semaphore
作用: Semaphore
用于限制同时访问某个资源的线程数量。它主要通过一个许可机制来实现,在需要时获取许可(acquire
),并在完成后释放许可(release
)。
应用场景: 控制对有限资源的访问,比如数据库连接池、网络资源等。
2. Exchanger
作用: Exchanger
是用于在线程之间交换数据的工具类。它使两个线程在某个交换点进行数据交换。
应用场景: 当两个线程需要在某个时刻交换数据时使用,比如在生产者-消费者模型中交换缓冲区。
3. CountDownLatch
作用: CountDownLatch
使一个或多个线程等待,直到计数器减到零时再开始工作。计数器通过调用 countDown()
方法来减少。
应用场景: 确保主线程等待多个线程完成任务后再继续执行,或实现线程之间的依赖关系。
4. CyclicBarrier
作用: CyclicBarrier
使一组线程在某个屏障点同步。所有线程都到达屏障后,屏障才会打开,并且可以重复使用。
应用场景: 适用于需要多个线程在某个阶段全部完成后再进行下一阶段的场景,比如并行计算、地图裁剪等。
5. Phaser
作用: Phaser
是一个增强版的 CyclicBarrier
,能够处理更复杂的多阶段任务。它支持动态注册和注销线程,并可以处理分层的、复杂的同步需求。
应用场景: 用于多阶段任务同步,支持动态的参与者,适合需要灵活的阶段控制的场景。
这些工具类极大地简化了并发编程中的常见任务,避免了开发者自己实现复杂的同步逻辑。
原理 & Code
1. Semaphore
Semaphore提供的功能就是多个线程彼此“打信号”。而这个“信号”是一个int
类型的数据,也可以看成是一种“资源”。
可以在构造函数中传入初始资源总数,以及是否使用“公平”的同步器。默认情况下,是非公平的。
// 默认情况下使用非公平
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
最主要的方法是acquire方法和release方法。acquire()方法会申请一个permit,而release方法会释放一个permit。当然,你也可以申请多个acquire(int permits)或者释放多个release(int permits)。
每次acquire,permits就会减少一个或者多个。如果减少到了0,再有其他线程来acquire,那就要阻塞这个线程直到有其它线程release permit为止。
用法: Semaphore主要用于控制对某个资源的访问许可数。它可以限制多个线程同时访问某一资源的数量。
实现原理: Semaphore内部通过一个计数器来跟踪可用许可数量,当一个线程获取许可时,计数器减少,释放许可时计数器增加。如果没有许可可用,线程将会被阻塞,直到有许可被释放。
举个例子,我想限制同时只能有3个线程在工作:
public class SemaphoreDemo {
static class MyThread implements Runnable {
private int value;
private Semaphore semaphore;
public MyThread(int value, Semaphore semaphore) {
this.value = value;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire(); // 获取permit
System.out.println(String.format("当前线程是%d, 还剩%d个资源,还有%d个线程在等待",
value, semaphore.availablePermits(), semaphore.getQueueLength()));
// 睡眠随机时间,打乱释放顺序
Random random =new Random();
Thread.sleep(random.nextInt(1000));
System.out.println(String.format("线程%d释放了资源", value));
} catch (InterruptedException e) {
e.printStackTrace();
} finally{
semaphore.release(); // 释放permit
}
}
}
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 10; i++) {
new Thread(new MyThread(i, semaphore)).start();
}
}
}
输出:
当前线程是1, 还剩2个资源,还有0个线程在等待
当前线程是0, 还剩1个资源,还有0个线程在等待
当前线程是6, 还剩0个资源,还有0个线程在等待
线程6释放了资源
当前线程是2, 还剩0个资源,还有6个线程在等待
线程2释放了资源
当前线程是4, 还剩0个资源,还有5个线程在等待
线程0释放了资源
当前线程是7, 还剩0个资源,还有4个线程在等待
线程1释放了资源
当前线程是8, 还剩0个资源,还有3个线程在等待
线程7释放了资源
当前线程是5, 还剩0个资源,还有2个线程在等待
线程4释放了资源
当前线程是3, 还剩0个资源,还有1个线程在等待
线程8释放了资源
当前线程是9, 还剩0个资源,还有0个线程在等待
线程9释放了资源
线程5释放了资源
线程3释放了资源
可以看到,在这次运行中,最开始是1, 0, 6这三个线程获得了资源,而其它线程进入了等待队列。然后当某个线程释放资源后,就会有等待队列中的线程获得资源。
当然,Semaphore默认的acquire方法是会让线程进入等待队列,且会抛出中断异常。但它还有一些方法可以忽略中断或不进入阻塞队列:
// 忽略中断
public void acquireUninterruptibly()
public void acquireUninterruptibly(int permits)
// 不进入等待队列,底层使用CAS
public boolean tryAcquire
public boolean tryAcquire(int permits)
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException
public boolean tryAcquire(long timeout, TimeUnit unit)
Semaphore原理
Semaphore内部有一个继承了AQS的同步器Sync,重写了tryAcquireShared
方法。在这个方法里,会去尝试获取资源。
如果获取失败(想要的资源数量小于目前已有的资源数量),就会返回一个负数(代表尝试获取资源失败)。然后当前线程就会进入AQS的等待队列。
2. Exchanger
Exchanger类用于两个线程交换数据。它支持泛型,也就是说你可以在两个线程之间传送任何数据。先来一个案例看看如何使用,比如两个线程之间想要传送字符串:
用法: Exchanger用于在两个线程之间交换数据。两个线程通过Exchanger同步到达一个交换点,然后彼此交换数据。
public class ExchangerDemo {
public static void main(String[] args) throws InterruptedException {
Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
try {
System.out.println("这是线程A,得到了另一个线程的数据:"
+ exchanger.exchange("这是来自线程A的数据"));
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
System.out.println("这个时候线程A是阻塞的,在等待线程B的数据");
Thread.sleep(1000);
new Thread(() -> {
try {
System.out.println("这是线程B,得到了另一个线程的数据:"
+ exchanger.exchange("这是来自线程B的数据"));
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
输出:
这个时候线程A是阻塞的,在等待线程B的数据
这是线程B,得到了另一个线程的数据:这是来自线程A的数据
这是线程A,得到了另一个线程的数据:这是来自线程B的数据
可以看到,当一个线程调用exchange方法后,它是处于阻塞状态的,只有当另一个线程也调用了exchange方法,它才会继续向下执行。看源码可以发现它是使用park/unpark来实现等待状态的切换的,但是在使用park/unpark方法之前,使用了CAS检查,估计是为了提高性能。
Exchanger一般用于两个线程之间更方便地在内存中交换数据,因为其支持泛型,所以我们可以传输任何的数据,比如IO流或者IO缓存。根据JDK里面的注释的说法,可以总结为一下特性:
- 此类提供对外的操作是同步的;
- 用于成对出现的线程之间交换数据;
- 可以视作双向的同步队列;
- 可应用于基因算法、流水线设计等场景。
Exchanger类还有一个有超时参数的方法,如果在指定时间内没有另一个线程调用exchange,就会抛出一个超时异常。
public V exchange(V x, long timeout, TimeUnit unit)
那么问题来了,Exchanger只能是两个线程交换数据吗?那三个调用同一个实例的exchange方法会发生什么呢?答案是只有前两个线程会交换数据,第三个线程会进入阻塞状态。
需要注意的是,exchange是可以重复使用的。也就是说。两个线程可以使用Exchanger在内存中不断地再交换数据。
3. CountDownLatch
CountDown代表计数递减,Latch是“门闩”的意思。也有人把它称为“屏障”。而CountDownLatch这个类的作用也很贴合这个名字的意义,假设某个线程在执行任务之前,需要等待其它线程完成一些前置任务,必须等所有的前置任务都完成,才能开始执行本线程的任务。
CountDownLatch类的原理挺简单的,内部同样是一个继承了AQS的实现类Sync,且实现起来还很简单,可能是JDK里面AQS的子类中最简单的实现了,有兴趣的读者可以去看看这个内部类的源码。
需要注意的是构造器中的计数值(count)实际上就是闭锁需要等待的线程数量。这个值只能被设置一次,而且CountDownLatch没有提供任何机制去重新设置这个计数值**。
// 构造方法:
public CountDownLatch(int count)
public void await() // 等待
public boolean await(long timeout, TimeUnit unit) // 超时等待
public void countDown() // count - 1
public long getCount() // 获取当前还有多少count
用法: CountDownLatch允许一个或多个线程等待其他线程完成操作。典型应用是启动一个主线程,等待一组线程完成任务。
示例:
玩游戏的时候,在游戏真正开始之前,一般会等待一些前置任务完成,比如“加载地图数据”,“加载人物模型”,“加载背景音乐”等等。只有当所有的东西都加载完成后,玩家才能真正进入游戏。下面我们就来模拟一下这个demo。
public class CountDownLatchDemo {
// 定义前置任务线程
static class PreTaskThread implements Runnable {
private String task;
private CountDownLatch countDownLatch;
public PreTaskThread(String task, CountDownLatch countDownLatch) {
this.task = task;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
Random random = new Random();
Thread.sleep(random.nextInt(1000));
System.out.println(task + " - 任务完成");
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
// 假设有三个模块需要加载
CountDownLatch countDownLatch = new CountDownLatch(3);
// 主任务
new Thread(() -> {
try {
System.out.println("等待数据加载...");
System.out.println(String.format("还有%d个前置任务", countDownLatch.getCount()));
countDownLatch.await();
System.out.println("数据加载完成,正式开始游戏!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 前置任务
new Thread(new PreTaskThread("加载地图数据", countDownLatch)).start();
new Thread(new PreTaskThread("加载人物模型", countDownLatch)).start();
new Thread(new PreTaskThread("加载背景音乐", countDownLatch)).start();
}
}
输出:
等待数据加载…
还有3个前置任务
加载人物模型 - 任务完成
加载背景音乐 - 任务完成
加载地图数据 - 任务完成
数据加载完成,正式开始游戏!
4. CyclicBarrier
用法: CyclicBarrier允许一组线程互相等待,直到所有线程都到达某个屏障点后再继续执行。适用于多阶段任务的同步。
原理:CyclicBarrier虽说功能与CountDownLatch类似,但是实现原理却完全不同,CyclicBarrier内部使用的是Lock + Condition
实现的等待/通知模式
示例:
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
public static void main(String[] args) {
int parties = 3;
CyclicBarrier barrier = new CyclicBarrier(parties, () -> System.out.println("All parties arrived at barrier, let's continue"));
for (int i = 1; i <= parties; i++) {
new Thread(new Task(barrier)).start();
}
}
}
class Task implements Runnable {
private CyclicBarrier barrier;
public Task(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " is waiting at barrier.");
barrier.await(); // 等待其他线程到达
System.out.println(Thread.currentThread().getName() + " is done.");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
public class CyclicBarrierDemo {
static class PreTaskThread implements Runnable {
private String task;
private CyclicBarrier cyclicBarrier;
public PreTaskThread(String task, CyclicBarrier cyclicBarrier) {
this.task = task;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
// 假设总共三个关卡
for (int i = 1; i < 4; i++) {
try {
Random random = new Random();
Thread.sleep(random.nextInt(1000));
System.out.println(String.format("关卡%d的任务%s完成", i, task));
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
System.out.println("本关卡所有前置任务完成,开始游戏...");
});
new Thread(new PreTaskThread("加载地图数据", cyclicBarrier)).start();
new Thread(new PreTaskThread("加载人物模型", cyclicBarrier)).start();
new Thread(new PreTaskThread("加载背景音乐", cyclicBarrier)).start();
}
}
输出:
关卡1的任务加载地图数据完成
关卡1的任务加载背景音乐完成
关卡1的任务加载人物模型完成
本关卡所有前置任务完成,开始游戏…
关卡2的任务加载地图数据完成
关卡2的任务加载背景音乐完成
关卡2的任务加载人物模型完成
本关卡所有前置任务完成,开始游戏…
关卡3的任务加载人物模型完成
关卡3的任务加载地图数据完成
关卡3的任务加载背景音乐完成
本关卡所有前置任务完成,开始游戏…
注意这里跟CountDownLatch的代码有一些不同。CyclicBarrier没有分为await()
和countDown()
,而是只有单独的一个await()
方法。
一旦调用await()方法的线程数量等于构造方法中传入的任务总量(这里是3),就代表达到屏障了。CyclicBarrier允许我们在达到屏障的时候可以执行一个任务,可以在构造方法传入一个Runnable类型的对象。上述案例就是在达到屏障时,输出“本关卡所有前置任务完成,开始游戏…”。
// 构造方法
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
// 具体实现
}
5. Phaser
用法: Phaser是一个更高级的同步工具,能够处理比CyclicBarrier更复杂的多阶段任务。它允许在任务的不同阶段动态地增加或减少线程数。
示例:
public class PhaserDemo {
static class PreTaskThread implements Runnable {
private String task;
private Phaser phaser;
public PreTaskThread(String task, Phaser phaser) {
this.task = task;
this.phaser = phaser;
}
@Override
public void run() {
for (int i = 1; i < 4; i++) {
try {
// 第二次关卡起不加载NPC,跳过
if (i >= 2 && "加载新手教程".equals(task)) {
continue;
}
Random random = new Random();
Thread.sleep(random.nextInt(1000));
System.out.println(String.format("关卡%d,需要加载%d个模块,当前模块【%s】",
i, phaser.getRegisteredParties(), task));
// 从第二个关卡起,不加载NPC
if (i == 1 && "加载新手教程".equals(task)) {
System.out.println("下次关卡移除加载【新手教程】模块");
phaser.arriveAndDeregister(); // 移除一个模块
} else {
phaser.arriveAndAwaitAdvance();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
Phaser phaser = new Phaser(4) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(String.format("第%d次关卡准备完成", phase + 1));
return phase == 3 || registeredParties == 0;
}
};
new Thread(new PreTaskThread("加载地图数据", phaser)).start();
new Thread(new PreTaskThread("加载人物模型", phaser)).start();
new Thread(new PreTaskThread("加载背景音乐", phaser)).start();
new Thread(new PreTaskThread("加载新手教程", phaser)).start();
}
}
输出:
关卡1,需要加载4个模块,当前模块【加载人物模型】
关卡1,需要加载4个模块,当前模块【加载地图数据】
关卡1,需要加载4个模块,当前模块【加载背景音乐】
关卡1,需要加载4个模块,当前模块【加载新手教程】
下次关卡移除加载【新手教程】模块
第1次关卡准备完成
关卡2,需要加载3个模块,当前模块【加载人物模型】
关卡2,需要加载3个模块,当前模块【加载背景音乐】
关卡2,需要加载3个模块,当前模块【加载地图数据】
第2次关卡准备完成
关卡3,需要加载3个模块,当前模块【加载地图数据】
关卡3,需要加载3个模块,当前模块【加载背景音乐】
关卡3,需要加载3个模块,当前模块【加载人物模型】
第3次关卡准备完成