1、CountDownLatch:
如果我们知道了我们的需要执行的任务数,那么我们可以用java并发包下的CountDownLatch,直接上代码:
public class CountDownLaunch {
private static final Executor executor = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
CountDownLaunch.concurrencyTest();
}
/**
* 要求并行执行完thread1、thread2才能执行
* thread3
*/
private static void concurrencyTest() {
// 创建一个count为2的CountDownLatch对象
CountDownLatch countDownLatch = new CountDownLatch(2);
// 并行执行以下2块代码,
executor.execute(()->{
System.out.println("thread1");
// 每次任务执行完count -1
countDownLatch.countDown();
});
executor.execute(()->{
System.out.println("thread2");
// 每次任务执行完count -1
countDownLatch.countDown();
});
try {// 阻塞等待上面任务执行完,也就是count值为0/或线程被中断
countDownLatch.await();
System.out.println("thread3");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
2、CompletableFuture:
语意更加清晰,如 f3 = f1.thenCombine(f2, ()->{}) 能够清晰地表述“任务 3 要等待任务 1 和任务 2 都完成后才能开始,代码演示:
public class CompletableFutureTest {
public static void main(String[] args) {
CompletableFutureTest.testCompletableFuture();
}
private static void testCompletableFuture() {
ThreadPoolExecutor threadPoolExecutor = ThreadPoolTest.getThreadPoolExecutor();
// 从自定义线程池拿去线程异步处理数据
CompletableFuture<String> CompletableFuture1 = CompletableFuture.supplyAsync(()->{
System.out.println("thread1");
return "thread1";
}, threadPoolExecutor);
// 从自定义线程池拿去线程异步处理数据
CompletableFuture<String> CompletableFuture2 = CompletableFuture.supplyAsync(()->{
System.out.println("thread2");
return "thread2";
}, threadPoolExecutor);
System.out.println("thread3");
try {
// 这一步表示要CompletableFuture3的执行依赖于CompletableFuture1/CompletableFuture2执行成功
CompletableFuture<String> CompletableFuture3 = CompletableFuture1.thenCombine(CompletableFuture2, (string1,string2)->{
return string1+string2;
});
// 阻塞等待CompletableFuture3的结果
String join = CompletableFuture3.join();
System.out.println(join);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
创建CompletableFuture的4种方式:
//使用默认线程池
static CompletableFuture<Void>
runAsync(Runnable runnable)
static <U> CompletableFuture<U>
supplyAsync(Supplier<U> supplier)
//可以指定线程池
static CompletableFuture<Void>
runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U>
supplyAsync(Supplier<U> supplier, Executor executor)
默认情况下CompletableFuture使用的是公共的线程池forkJoinPool,那这个线程池默认创建的线程数是cpu的核数,在实际情况下,如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。
创建完 CompletableFuture 对象之后,会自动地异步执行 runnable.run() 方法或者 supplier.get() 方法,对于一个异步操作,你需要关注两个问题:一个是异步操作什么时候结束,另一个是如何获取异步操作的执行结果。因为 CompletableFuture 类实现了 Future 接口,所以这两个问题你都可以通过 Future 接口来解决。
CompletableFuture .get()源码介绍: