目录
- 前言
- 概述
- 1. CountDownLatch
- 2. CyclicBarrier
- 3. Semaphore(信号量)
- 4. Condition
- 案例
- CountDownLatch-马拉松场景
- CyclicBarrier-马拉松场景
- Semaphore-公交车占座场景
- Condition-线程等待唤醒场景
前言
在 Java 的 java.util.concurrent
(JUC) 包中,提供了四种核心并发工具类:CountDownLatch
、CyclicBarrier
、Semaphore
和 Condition
。它们在多线程编程中用于协调线程的执行顺序和资源访问,确保在复杂的并发场景下各任务按照预期顺序和条件完成。通过合理使用这些工具,可以大幅提升程序的可靠性和执行效率。本文将逐一介绍这四种工具的特点和使用场景,帮助大家更好地掌握多线程编程中的关键协作机制。
概述
1. CountDownLatch
等待一个线程或者一组线程全部执行完成后再执行后续的逻辑, 常用于一组任务的并发控制,无法重用。
- 内部维护一个计数器字段
count
,使用时需要设置初始数量,表示多少个线程同时执行完成CountDownLatch countDownLatch =new CountDownLatch(3)
- 常用API
countDownLatch.countDown()
:每次调用,计数器count减1,直到count为0.countDownLatch.await()
:等待阻塞,当count为0时放行。countDownLatch.await(long timeout, TimeUnit unit)
:和await()一样,只是可以设置一个超时时间,当等待时间大于超时时间,不管count是否为0,都会跳出阻塞执行后面逻辑countDownLatch.getCount()
:获取count计数器数值
2. CyclicBarrier
允许一组线程相互等待,直到它们到达共同的屏障点
,再同时继续执行,适用于多线程周期性汇合的场景。不同于 CountDownLatch
,CyclicBarrier
可重用。
注:对于
可重用
的解释:当count计数器减为0时,CyclicBarrier
的计数器中的count会自动复原为初始值,继而后面的业务中可再次使用CyclicBarrier
对象进行操作,而CountDownLatch
不可以。
- 同样内部也维护了一个
count
计数器,使用时需要设置初始数量,表示多少个线程同时执行操作CyclicBarrier barrier=new CyclicBarrier(3)
。 - 携带回调方法:就是到达
屏障点
(count =0)后需要额外进行的任务
CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
@Override
public void run() {
System.out.println("合照打卡");
}
});
-
常用API
barrier.await()
:CyclicBarrier 中的 await 与 CountDownLatch 的并不一致,CyclicBarrier 没有类似于 countDown 的方法,CyclicBarrier 的 await 方法你可以近似认为它是 CountDownLatch 中 await 和 countDown 的组合。当调用 CyclicBarrier 的 await 方法后,它会阻塞,且将计数器 -1,如果计数器变为 0 后,则跳出等待。不需要显式地减少计数器。barrier. await(long timeout, TimeUnit unit)
:该方法在阻塞一段指定的时间后,如果等待的线程未能在超时时间内到达同步点,将抛出TimeoutException
异常。barrier.getParties()
:获取当前count
的数值。barrier.reset()
:复原count
为初始值。barrier.isBroken()
:当CyclicBarrier
为不可用状态时,返回true。
注:若其中一个线程在等待过程中抛出了
TimeoutException
异常,这将引起其他所有线程在调用 await 时抛出BrokenBarrierException
异常。此时,CyclicBarrier 进入不可用状态
,必须调用reset
方法对其进行重置,方可继续使用。这种机制确保在超时或异常情况下,程序能够及时恢复到正常的同步状态。
3. Semaphore(信号量)
是一个用于控制并发访问资源的同步工具,用于控制多个线程对共享资源的并发访问量,通过许可机制允许一定数量的线程
同时访问资源,适用于限流、资源池等场景。
-
内部同样维护了一个计数器字段
permits
,在semaphore
中的定义为令牌
,允许指定的线程数量获取到令牌并往下继续执行。成功调用acquire()
方法后,permits
减1,直到减为0。Semaphore semaphore =new Semaphore(3)
。 -
主要API
semaphore.acquire()
:获取一个令牌,如果获取到令牌则继续向下执行,没获取到则线程阻塞直到获得令牌为止。semaphore.tryAcquire(long timeout, TimeUnit unit)
:获取一个令牌并设置超时时间,在超时时间内获取到令牌返回true,等待时间大于超时时间为获得令牌返回false。semaphore.release()
:释放一个令牌,供其他线程使用。semaphore. availablePermits()
:返回当前可用的令牌数量。
4. Condition
与 ReentrantLock
搭配使用,提供比 wait/notify
更灵活的线程等待和通知机制,可实现精准的条件等待和唤醒,常用于复杂线程协作的场景。
private final static ReentrantLock LOCK = new ReentrantLock();
private final static Condition condition = LOCK.newCondition();
-
主要API
condition.await()
:线程将锁(就是当前线程通过LOCK.lock()
获取的全局锁)交出去(释放全局锁供其他线程获取)并进入阻塞,等待其他线程唤醒。并且当前阻塞可以被中断,并抛出异常InterruptedException
。condition.await( int timeout, TimeUnit unit )
:基本与await()方法一样,只是加了一个超时时间,如果等待时间大于超时时间还未被唤醒,跳出阻塞执行后面的逻辑。condition.signal()
:随机唤醒一个正在等待的线程。condition.signalAll()
:唤醒所有等待中的线程。
注:上面api的调用必须在
LOCK.lock()
加锁块中使用,否则会抛出IllegalMonitorStateException
异常。
案例
源码获取:
GitHub - RemainderTime/xf-solution: 提供一些疑难问题的解决方案
子项目:juc-two
CountDownLatch-马拉松场景
对于CountDownLatch
, 模拟一个参加马拉松比赛的场景。
在这个马拉松比赛的场景中,假设小明、小刚和小红参加成都马拉松。由于他们的配速不同,所以每个人跑完全程的完成时间也不同,但他们约好必须等到所有人到达终点后一起坐车回家 。
他们跑完全程分别用时:
- 小明:5s
- 小刚:6s
- 小红:7s
创建参数者跑步线程类Running.class
@Slf4j
public class Running implements Runnable {
//名称
private String name;
//用时
private int time;
public Running(String name, int time) {
this.name = name;
this.time = time;
}
@Override
public void run() {
log.info(name + "开始跑步了----");
try {
Thread.sleep(time);
log.info(name + "达到终点了----用时:{}秒", time/1000);
//到达终点后计数器减1
CountDownLatchMain.countDownLatch.countDown();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
创建执行类CountDownLatchMain.class
@Slf4j
public class CountDownLatchMain {
//自定义线程池
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
5000,
TimeUnit.MICROSECONDS,
new LinkedBlockingDeque());
//初始化线程编排对象
public static CountDownLatch countDownLatch =new CountDownLatch(3);
public static void main(String[] args) {
log.info("马拉松正式开始---");
try {
executor.execute(new Running("小明", 5000));
executor.execute(new Running("小刚", 6000));
executor.execute(new Running("小红", 7000));
log.info("等待所有人到达终点---");
//阻塞等待全部线程完成
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("全部到达终点---");
log.info("一起坐车回家了");
}
}
执行main()
方法
CyclicBarrier-马拉松场景
CyclicBarrier同样使用马拉松场景,可以与上面 CountDownLatch进行对照学习他们的相似与不同之处。
为了体现CyclicBarrier的特性,对内容进行了调整
小明、小刚和小红一起参加成都马拉松(设定全程为30公里)。他们配速不同,因此每人跑完10公里的时间各异。他们约定
每跑完10公里都要等彼此
(可重用特性)到齐后再一起合照打卡
(回调函数使用),随后继续比赛,直到抵达终点拍照打卡后各自开车回家。
他们每跑完10公里分别用时:
- 小明:1s
- 小刚:2s
- 小红:3s
创建参数者跑步线程类Running.class
@Slf4j
public class Running implements Runnable {
//名称
private String name;
//用时
private int time;
public Running(String name, int time) {
this.name = name;
this.time = time;
}
@Override
public void run() {
log.info(name + "开始跑步了----");
try {
Thread.sleep(time);
log.info(name + "达到10公里了----用时:{}秒", time / 1000);
//到达终点后计数器减1
CyclicBarrierMain.barrier.await();
Thread.sleep(time);
log.info(name + "达到20公里了----用时:{}秒", time*2 / 1000);
CyclicBarrierMain.barrier.await();
Thread.sleep(time);
log.info(name + "达到终点30公里----用时:{}秒", time*3 / 1000);
CyclicBarrierMain.barrier.await();
log.info(name + "独自开车回家了---");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
创建执行类CyclicBarrierMain.class
@Slf4j
public class CyclicBarrierMain {
//自定义线程池
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
5000,
TimeUnit.MICROSECONDS,
new LinkedBlockingDeque());
//初始化线程编排对象
public static CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
@Override
public void run() {
System.out.println("合照打卡~~~");
}});
public static void main(String[] args) {
log.info("马拉松正式开始---");
try {
executor.execute(new Running("小明", 1000));
executor.execute(new Running("小刚", 2000));
executor.execute(new Running("小红", 3000));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
执行main()
方法
Semaphore-公交车占座场景
模拟一个坐公交车的场景,当车上没有空位时,站立的人需要等待有人下车了才能获得座位。
小明、小刚、小红、小绿在同一个公交站等同一路公交车。假设公交车只有3个座位,并且前一个站到后面一个站行驶所需要的平均时间为1s,写出他们4人在上车后占座的情况。
4人上车后需要经历的站点数量分别为:
- 小明:2
- 小刚:4
- 小红:4
- 小绿:4
创建乘客上车行为线程类 Seat.class
@Slf4j
public class Seat implements Runnable {
//用户名
private String name;
//需要经历的站点数量
private Integer num;
public Seat(String name, Integer num) {
this.name = name;
this.num = num;
}
@Override
public void run() {
long startTime = System.currentTimeMillis();
log.info(name + "上车了----");
try {
//获取令牌数量
int tokenCount = SemaphoreMain.semaphore.availablePermits();
if (tokenCount == 0) {
log.info(name + "--等待--空缺座位----");
}
//获取令牌并设置超时时间
boolean b = SemaphoreMain.semaphore.tryAcquire(SemaphoreMain.TIME * num, TimeUnit.MILLISECONDS);
long awaitTime = System.currentTimeMillis();
if (b) {
log.info(name + "抢占到了座位----");
if (tokenCount > 0) {
Thread.sleep(SemaphoreMain.TIME * num);
} else {
//等待时间
long await = awaitTime - startTime;
Thread.sleep(SemaphoreMain.TIME * num - await);
}
} else {
log.info(name + "整个行程没有抢到座位----");
}
long endTime = System.currentTimeMillis();
log.info(name + "到站下车----等待用时:{}秒---占座时长:{}秒", (awaitTime - startTime) / 1000, Math.round((endTime - awaitTime) / 1000.0));
//释放令牌
SemaphoreMain.semaphore.release();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
创建公交车运行类 SemaphoreMain.class
@Slf4j
public class SemaphoreMain {
//自定义线程池
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
4,
4,
5000,
TimeUnit.MICROSECONDS,
new LinkedBlockingDeque());
//初始化信号量并设置令牌数量
public static Semaphore semaphore =new Semaphore(3);
//每一个站公交行驶时间 ms
public static int TIME = 1000;
public static void main(String[] args) {
log.info("公交车到达站台----");
executor.execute(new Seat("小明",2));
executor.execute(new Seat("小刚", 4));
executor.execute(new Seat("小红",4));
executor.execute(new Seat("小绿", 4));
}
}
运行 main()
方法
Condition-线程等待唤醒场景
简单使用:多个线程进入等待状态,最后的一个线程唤醒所有等待线程
创建等待线程类AwaitThread.class
@Slf4j
public class AwaitThread implements Runnable{
private String name;
public AwaitThread(String name) {
this.name = name;
}
@Override
public void run() {
ConditionMain.LOCK.lock();
try {
log.info("线程{}开始等待-----", name);
ConditionMain.condition.await();
log.info("线程{}被唤醒-----", name);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
ConditionMain.LOCK.unlock();
}
}
}
创建唤醒线程类SignalThread.class
@Slf4j
public class SignalThread implements Runnable{
private String name;
public SignalThread(String name) {
this.name = name;
}
@Override
public void run() {
ConditionMain.LOCK.lock();
try {
log.info("线程{}开始唤醒其他线程-----", name);
ConditionMain.condition.signalAll();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
ConditionMain.LOCK.unlock();
}
}
}
创建主线程执行类ConditionMain.class
@Slf4j
public class ConditionMain {
//自定义线程池
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
4,
4,
5000,
TimeUnit.MICROSECONDS,
new LinkedBlockingDeque());
//定义ReentrantLock对象
public final static ReentrantLock LOCK = new ReentrantLock();
//获取并定义Condition对象
public final static Condition condition = LOCK.newCondition();
public static void main(String[] args) {
executor.execute(new AwaitThread("线程1"));
executor.execute(new AwaitThread("线程2"));
executor.execute(new AwaitThread("线程3"));
executor.execute(new SignalThread("线程4"));
}
}
执行main()
方法