文章目录
- 1 问题背景
- 2 前言
- 3 4种常用的方法
- 4 代码
- 4.1 isTerminated()
- 4.2 线程池的任务总数是否等于已执行的任务数
- 4.3 `CountDownLatch`计数器
- 4.4 `CyclicBarrier`计数器
1 问题背景
真实生产环境的电商项目,常使用线程池应用于执行大批量操作达到高性能的效果。应用场景有批量补偿修正数据库历史数据、定时批量执行业务逻辑(涉及到百万级数据)、批量初始化新业务的数据等等。用到线程池,必须要知道任务是否执行完了,才能进行下一步业务操作。今天总结归纳4种常用的方法判断线程池是否执行完所有任务
2 前言
先给出解决方案,文末再贴出详细代码
参考自:面试突击35:如何判断线程池已经执行完所有任务了?
3 4种常用的方法
- 线程池提供的
isTerminated()
方法。缺点是需要调用shutdown()
关闭线程池 - 判断线程池的任务总数是否等于已执行的任务数。优点是无需关闭线程池。缺点是两个数值都是动态计算的,只是一个近似值
CountDownLatch
计数器。写法很优雅,且无需关闭线程池,但它的缺点是只能使用一次,CountDownLatch 创建之后不能被重复使用CyclicBarrier
计数器。和 CountDownLatch 类似,它可以理解为一个可以重复使用的循环计数器,CyclicBarrier 可以调用reset()
将自己重置到初始状态
4 代码
4.1 isTerminated()
@Slf4j
public class IsTerminatedDemo {
private static final int BLOCKING_QUEUE_CAPACITY = 100;
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss:SSS");
/**
* 使用isTerminated判断线程池是否执行完任务,缺点是要关闭线程池
*
* @param args
*/
public static void main(String[] args) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
10,
10,
10 * 60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(BLOCKING_QUEUE_CAPACITY),
new DefaultThreadFactory("complete_thread_pool"),
new ThreadPoolExecutor.AbortPolicy());
// 添加任务
for (int i = 0; i < 5; i++) {
int finalI = i;
threadPool.submit(() -> {
// 随机休眠
int r = new Random().nextInt(5);
try {
Thread.sleep(r);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("Task NO.{} finish.", finalI);
});
}
threadPool.shutdown();
// 判断线程池是否执行完所有任务,前提是要执行shutdown
while (!threadPool.isTerminated()) {
log.info("{}: ThreadPool handleing task.", LocalDateTime.now().format(FORMATTER));
}
log.info("All tasks have been finished!");
}
}
4.2 线程池的任务总数是否等于已执行的任务数
@Slf4j
public class GetCompletedTaskCountDemo {
private static final int BLOCKING_QUEUE_CAPACITY = 100;
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss:SSS");
/**
* 判断线程池是否执行完所有任务,如果计划执行任务数=已完成任务数,那么线程池的任务就全部执行完了。
* 优点是无需关闭线程池
* 缺点是 getTaskCount() 和 getCompletedTaskCount() 返回的是一个近似值,因为线程池中的任务和线程的状态可能在计算过程中动态变化,所以它们两个返回的都是一个近似值
*
* @param args
*/
public static void main(String[] args) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
10,
10,
10 * 60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(BLOCKING_QUEUE_CAPACITY),
new DefaultThreadFactory("complete_thread_pool"),
new ThreadPoolExecutor.AbortPolicy());
// 添加任务
for (int i = 0; i < 5; i++) {
int finalI = i;
threadPool.submit(() -> {
// 随机休眠
int r = new Random().nextInt(5);
try {
Thread.sleep(r);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("Task NO.{} finish.", finalI);
});
}
// 判断线程池是否执行完所有任务,如果计划执行任务数=已完成任务数,那么线程池的任务就全部执行完了。
// 优点是无需关闭线程池
// 缺点是 getTaskCount() 和 getCompletedTaskCount() 返回的是一个近似值,因为线程池中的任务和线程的状态可能在计算过程中动态变化,所以它们两个返回的都是一个近似值
while (threadPool.getTaskCount() != threadPool.getCompletedTaskCount()) {
log.info("{}: ThreadPool handleing task.", LocalDateTime.now().format(FORMATTER));
}
log.info("All tasks have been finished!");
}
}
4.3 CountDownLatch
计数器
@Slf4j
public class CountDownLatchDemo {
private static final int BLOCKING_QUEUE_CAPACITY = 100;
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss:SSS");
/**
* 写法很优雅,且无需关闭线程池,但它的缺点是只能使用一次,CountDownLatch 创建之后不能被重复使用,
* 也就是说 CountDownLatch 可以理解为只能使用一次的计数器
*
* @param args
*/
public static void main(String[] args) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
10,
10,
10 * 60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(BLOCKING_QUEUE_CAPACITY),
new DefaultThreadFactory("complete_thread_pool"),
new ThreadPoolExecutor.AbortPolicy());
int taskCount = 5;
CountDownLatch cdl = new CountDownLatch(taskCount);
// 添加任务
for (int i = 0; i < taskCount; i++) {
int finalI = i;
threadPool.submit(() -> {
// 随机休眠
int r = new Random().nextInt(5);
try {
Thread.sleep(r);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("Task NO.{} finish.", finalI);
// 线程执行完,计数器减1
cdl.countDown();
});
}
log.info("{}: ThreadPool handleing task.", LocalDateTime.now().format(FORMATTER));
try {
// 阻塞等待所有线程执行完任务
cdl.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("All tasks have been finished!");
}
}
4.4 CyclicBarrier
计数器
@Slf4j
public class CyclicBarrierDemo {
private static final int BLOCKING_QUEUE_CAPACITY = 100;
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss:SSS");
/**
*和 CountDownLatch 类似,它可以理解为一个可以重复使用的循环计数器,CyclicBarrier 可以调用 reset 方法将自己重置到初始状态
*
* @param args
*/
public static void main(String[] args) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
10,
10,
10 * 60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(BLOCKING_QUEUE_CAPACITY),
new DefaultThreadFactory("complete_thread_pool"),
new ThreadPoolExecutor.AbortPolicy());
int taskCount = 5;
CyclicBarrier cb = new CyclicBarrier(taskCount, () -> log.info("log from CyclicBarrier, all tasks of ThreadPool have been finished"));
// 添加任务
for (int i = 0; i < taskCount; i++) {
int finalI = i;
threadPool.submit(() -> {
// 随机休眠
int r = new Random().nextInt(5);
try {
Thread.sleep(r);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("Task NO.{} finish.", finalI);
// 阻塞等待
try {
cb.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
log.info("All tasks have been finished!");
}
}