CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(completableFutures);可以同步等待一组异步请求都返回结果后,再往下执行。voidCompletableFuture.get()会同步等待所有结果返回,并且不会超时。就因为没有设置超时时间,造成了死锁的发生。
这是发生死锁的代码:
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 线程池死锁案例
*/
public class ThreadPoolTest {
/**
* 线程池
*/
public static ThreadPoolExecutor THREAD_POOL = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), r -> new Thread(r, "THREAD_POOL_TEST"));
public static void main(String[] args) throws InterruptedException {
AtomicInteger atomicInteger = new AtomicInteger();
for (int i = 0; i < 10; i++) {
THREAD_POOL.submit(() -> {
System.out.println("run task" + atomicInteger.getAndIncrement());
executeSubTask();
});
}
Thread.sleep(1000);
System.out.println("task count: " + THREAD_POOL.getTaskCount());
System.out.println("task in queue count: " + THREAD_POOL.getQueue().size());
}
private static void executeSubTask() {
CompletableFuture[] completableFutures = new CompletableFuture[6];
AtomicInteger atomicInteger = new AtomicInteger();
for (int i = 0; i < 6; i++) {
CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> {
System.out.println("ok-" + atomicInteger.getAndIncrement());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "ok:" + atomicInteger.get();
}, THREAD_POOL);
completableFutures[i] = task;
}
System.out.println("submit subTask size:" + completableFutures.length);
try {
CompletableFuture.allOf(completableFutures).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}
处理逻辑是:
-
异步执行父任务
-
父任务里包含多个子任务
为了异步执行任务,所以放到线程池THREAD_POOL里。然后,任务里包含多个子任务,为了降低执行时间,把子任务做成了并发执行,使用了CompletableFuture.allOf(completableFutures).get()方法。
get()一般都会设置超时时间,但是因为异步任务设置了超时时间,所以就没再单独设置get()的超时时间,就引发了线程池死锁发生。
原因分析
子任务执行较慢,线程池的核心线程马上被父任务占满(5个)。父任务里提交的子任务都跑到阻塞队里里,等待核心线程来调度任务。核心线程又阻塞在CompletableFuture.allOf(completableFutures).get()这里,任务无法结束。
线程池状态
5个线程都处于WAITING状态,阻塞在 at java.util.concurrent.CompletableFuture.get(java.base@17/CompletableFuture.java:2072)
"THREAD_POOL_TEST" #12 prio=5 os_prio=0 cpu=4.14ms elapsed=485.93s tid=0x00007f64a448fcf0 nid=0x23b2 waiting on condition [0x00007f648c1fa000]
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@17/Native Method)
- parking to wait for <0x00000000e37436b0> (a java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(java.base@17/LockSupport.java:211)
at java.util.concurrent.CompletableFuture$Signaller.block(java.base@17/CompletableFuture.java:1864)
at java.util.concurrent.ForkJoinPool.unmanagedBlock(java.base@17/ForkJoinPool.java:3463)
at java.util.concurrent.ForkJoinPool.managedBlock(java.base@17/ForkJoinPool.java:3434)
at java.util.concurrent.CompletableFuture.waitingGet(java.base@17/CompletableFuture.java:1898)
at java.util.concurrent.CompletableFuture.get(java.base@17/CompletableFuture.java:2072)
at ThreadPoolTest.executeSubTask(ThreadPoolTest.java:45)
at ThreadPoolTest.lambda$main$1(ThreadPoolTest.java:19)
at ThreadPoolTest$$Lambda$237/0x0000000800d54428.run(Unknown Source)
at java.util.concurrent.Executors$RunnableAdapter.call(java.base@17/Executors.java:539)
at java.util.concurrent.FutureTask.run(java.base@17/FutureTask.java:264)
at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17/ThreadPoolExecutor.java:1136)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17/ThreadPoolExecutor.java:635)
at java.lang.Thread.run(java.base@17/Thread.java:833)
"THREAD_POOL_TEST" #13 prio=5 os_prio=0 cpu=0.54ms elapsed=485.93s tid=0x00007f64a4490b70 nid=0x23b3 waiting on condition [0x00007f647dffe000]
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@17/Native Method)
- parking to wait for <0x00000000e375c280> (a java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(java.base@17/LockSupport.java:211)
at java.util.concurrent.CompletableFuture$Signaller.block(java.base@17/CompletableFuture.java:1864)
at java.util.concurrent.ForkJoinPool.unmanagedBlock(java.base@17/ForkJoinPool.java:3463)
at java.util.concurrent.ForkJoinPool.managedBlock(java.base@17/ForkJoinPool.java:3434)
at java.util.concurrent.CompletableFuture.waitingGet(java.base@17/CompletableFuture.java:1898)
at java.util.concurrent.CompletableFuture.get(java.base@17/CompletableFuture.java:2072)
at ThreadPoolTest.executeSubTask(ThreadPoolTest.java:45)
at ThreadPoolTest.lambda$main$1(ThreadPoolTest.java:19)
at ThreadPoolTest$$Lambda$237/0x0000000800d54428.run(Unknown Source)
at java.util.concurrent.Executors$RunnableAdapter.call(java.base@17/Executors.java:539)
at java.util.concurrent.FutureTask.run(java.base@17/FutureTask.java:264)
at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17/ThreadPoolExecutor.java:1136)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17/ThreadPoolExecutor.java:635)
at java.lang.Thread.run(java.base@17/Thread.java:833)
"THREAD_POOL_TEST" #15 prio=5 os_prio=0 cpu=1.30ms elapsed=485.92s tid=0x00007f64a4491b70 nid=0x23b4 waiting on condition [0x00007f647defd000]
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@17/Native Method)
- parking to wait for <0x00000000e377bef8> (a java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(java.base@17/LockSupport.java:211)
at java.util.concurrent.CompletableFuture$Signaller.block(java.base@17/CompletableFuture.java:1864)
at java.util.concurrent.ForkJoinPool.unmanagedBlock(java.base@17/ForkJoinPool.java:3463)
at java.util.concurrent.ForkJoinPool.managedBlock(java.base@17/ForkJoinPool.java:3434)
at java.util.concurrent.CompletableFuture.waitingGet(java.base@17/CompletableFuture.java:1898)
at java.util.concurrent.CompletableFuture.get(java.base@17/CompletableFuture.java:2072)
at ThreadPoolTest.executeSubTask(ThreadPoolTest.java:45)
at ThreadPoolTest.lambda$main$1(ThreadPoolTest.java:19)
at ThreadPoolTest$$Lambda$237/0x0000000800d54428.run(Unknown Source)
at java.util.concurrent.Executors$RunnableAdapter.call(java.base@17/Executors.java:539)
at java.util.concurrent.FutureTask.run(java.base@17/FutureTask.java:264)
at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17/ThreadPoolExecutor.java:1136)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17/ThreadPoolExecutor.java:635)
at java.lang.Thread.run(java.base@17/Thread.java:833)
"THREAD_POOL_TEST" #14 prio=5 os_prio=0 cpu=1.08ms elapsed=485.92s tid=0x00007f6454002a90 nid=0x23b5 waiting on condition [0x00007f647ddfc000]
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@17/Native Method)
- parking to wait for <0x00000000e379a918> (a java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(java.base@17/LockSupport.java:211)
at java.util.concurrent.CompletableFuture$Signaller.block(java.base@17/CompletableFuture.java:1864)
at java.util.concurrent.ForkJoinPool.unmanagedBlock(java.base@17/ForkJoinPool.java:3463)
at java.util.concurrent.ForkJoinPool.managedBlock(java.base@17/ForkJoinPool.java:3434)
at java.util.concurrent.CompletableFuture.waitingGet(java.base@17/CompletableFuture.java:1898)
at java.util.concurrent.CompletableFuture.get(java.base@17/CompletableFuture.java:2072)
at ThreadPoolTest.executeSubTask(ThreadPoolTest.java:45)
at ThreadPoolTest.lambda$main$1(ThreadPoolTest.java:19)
at ThreadPoolTest$$Lambda$237/0x0000000800d54428.run(Unknown Source)
at java.util.concurrent.Executors$RunnableAdapter.call(java.base@17/Executors.java:539)
at java.util.concurrent.FutureTask.run(java.base@17/FutureTask.java:264)
at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17/ThreadPoolExecutor.java:1136)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17/ThreadPoolExecutor.java:635)
at java.lang.Thread.run(java.base@17/Thread.java:833)
"THREAD_POOL_TEST" #16 prio=5 os_prio=0 cpu=0.33ms elapsed=485.92s tid=0x00007f645c001250 nid=0x23b6 waiting on condition [0x00007f647dcfb000]
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@17/Native Method)
- parking to wait for <0x00000000e37b83f8> (a java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(java.base@17/LockSupport.java:211)
at java.util.concurrent.CompletableFuture$Signaller.block(java.base@17/CompletableFuture.java:1864)
at java.util.concurrent.ForkJoinPool.unmanagedBlock(java.base@17/ForkJoinPool.java:3463)
at java.util.concurrent.ForkJoinPool.managedBlock(java.base@17/ForkJoinPool.java:3434)
at java.util.concurrent.CompletableFuture.waitingGet(java.base@17/CompletableFuture.java:1898)
at java.util.concurrent.CompletableFuture.get(java.base@17/CompletableFuture.java:2072)
at ThreadPoolTest.executeSubTask(ThreadPoolTest.java:45)
at ThreadPoolTest.lambda$main$1(ThreadPoolTest.java:19)
at ThreadPoolTest$$Lambda$237/0x0000000800d54428.run(Unknown Source)
at java.util.concurrent.Executors$RunnableAdapter.call(java.base@17/Executors.java:539)
at java.util.concurrent.FutureTask.run(java.base@17/FutureTask.java:264)
at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17/ThreadPoolExecutor.java:1136)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17/ThreadPoolExecutor.java:635)
at java.lang.Thread.run(java.base@17/Thread.java:833)
解决方案
-
父任务和子任务使用单独的线程池,避免出现循环依赖
-
设置超时时间,CompletableFuture.allOf(completableFutures).get(3, TimeUnit.SECONDS)
另外,任务子任务数量比较多,就需要更多的线程去执行,5个核心线程数量就不够了。但加大核心线程数,对于突发类型的任务,空闲时又浪费线程。我们就可以设置更大核心线程的同时,并设置核心线程在超时时可以回收。
public static ThreadPoolExecutor THREAD_POOL = new ThreadPoolExecutor(20, 20, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), r -> new Thread(r, "THREAD_POOL_TEST"));
// 回收核心线程
THREAD_POOL.allowCoreThreadTimeOut(true);