线程
是进程中的一个实体,线程本身是不会独立存在的。
进程是代码在数据集合上的一次运行活动 是系统进行资源分配调度的基本单位 。
线程则是进程的一个执行路径, 一个进程中至少有 线程,进程中多个线程共享进程的资源。
操作系统在分配资源时是把资源分配给进程的, 但是 CPU 资源 它是被分配到线程的,因为真正要占用CPU资源运行的是线程 所以也说线程是 CPU 分配的基本单位。
创建线程
/**
创建线程方式一:继承Thread
优点:在run方法中使用this就可以获取当前线程,无需Thread.currentThread()
缺点:不支持多继承,继承了Thread,不能继承其他类
*/
public class ThreadTest{
public static class MyThread extends Thread{
@Override
public void run(){
System.out.println("I am a child thread");
}
}
public static void main(String[] args){
MyThread thread = new MyThread();
thread.start();
}
}
/**
创建线程方式二:实现Runnable接口
优点:,两个线程共用task代码逻辑,如果需要,可以 RunableTask添加参数进行任务区分。
缺点:两种方式都没有返回值
*/
public static class RunableTase implements Runnable{
@Override
public void run(){
System.out.println("I am a child thread");
}
}
public static void main(String[] args) throws InterruptedException{
RunableTask task = new RunableTask();
new Thread(task).start();
new Thread(task).start();
}
/**
创建线程方式三:实现Callable接口
*/
public static class CallerTase implements Callable<String>{
@Override
public String call() throws Excetion{
return "Hello";
}
}
public static void main(String[] args) throws InterruptedException{
//创建异步任务
FutureTask<String> futureTask = new FutureTask<>(new CallerTask());
//启动线程
new Thread(futureTask).start();
try{
String result = futureTask.get();
System.out.println();
}catch(Exception e){
e.printStackTrace();
}
}
=====================================================================
Runnable无返回值
public static void main(String[] args){
Runnable hellos = () -> {
for(int i = 1; i <= 1000; i+){
System.out.println("Hello " + i);
}
};
Runnable goodbyes = () -> {
for(int i = 1; i <= 1000; i+){
System.out.println("Goodbye " + i);
}
};
Executor executor = Executors.newCachedThreadPool();
executor.execute(hellos);
executor.execute(goodbyes); //结果是交叉输出
}
Callable有返回值
Future接口代表计算的结果,有如下方法
- V get() 阻塞,知道有了可用结果或者超时,返回计算值。
- V get(long timeout,TimeUnit.unit)
- boolean cancel(boolean mayInterruptIfRunning) 取消任务。如果任务还未运行,就不会将任务列入运行计划。否则如果参数为true,则正在运行任务的线程会被中断
- boolean isCancelled()
- boolean isDone()
//Executor的子接口ExecutorService
ExecutorService exec = Executors.newFixedThreadPool();
Callable<V> task = () -> {
while(){
if(Thread.currentThread().isInterrupted()){
return null;
}
}
return result;
}
线程池
/**
线程池的创建:
corePoolSize(线程池的基本大小)当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程
maximumPoolSize(线程池最大数量)。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如
果使用了无界的任务队列这个参数就没什么效果。
keepAliveTime(线程活动保持时间)线程池的工作线程空闲后,保持存活的时间。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率
TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微秒)
runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。
ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序
LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列
SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于Linked-BlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列
PriorityBlockingQueue:一个具有优先级的无限阻塞队列
RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。
AbortPolicy:直接抛出异常
CallerRunsPolicy:只用调用者所在线程来运行任务
DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务
DiscardPolicy:不处理,丢弃掉
*/
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,milliseconds,runnableTaskQueue, handler);
/**
向线程池提交任务:
execute()提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功
submit()用于提交需要返回值的任务线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完
*/
threadsPool.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
}
});
Future<Object> future = executor.submit(harReturnValuetask);
try {
Object s = future.get();
} catch (InterruptedException e) {
// 处理中断异常
} catch (ExecutionException e) {
// 处理无法执行任务异常
} finally {
// 关闭线程池(的shutdown或shutdownNow)
executor.shutdown();
}
异步调用vs同步调用(Future)
同步调用
@Component
public class Task{
public static Random random = new Random();
public void doTaskOne() throws Exception{
System.out.println("开始做任务一");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
System.out.println("完成任务一,耗时:" + (end - start) + "毫秒");
}
public void doTaskTwo() throws Exception {
System.out.println("开始做任务二");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
System.out.println("完成任务二,耗时:" + (end - start) + "毫秒");
}
public void doTaskThree() throws Exception {
System.out.println("开始做任务三");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
System.out.println("完成任务三,耗时:" + (end - start) + "毫秒");
}
}
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = Application.class)
public class ApplicationTests{
@Autowired
private Task task;
@Test
public void test() throws Exception{
task.doTaskOne();
task.doTaskTwo();
task.doTaskThree();
}
}
异步调用
指程序在顺序执行时,不等待异步调用的语句返回结果就执行后面的程序。
SpringBoot中通过注解@Async
注:@Async所修饰的函数不要定义为static类型,这样异步调用不会生效
@Component
public class Task{
@Async
public void doTaskOne() throws Exception{
}
@Async
public void doTaskTwo() throws Exception {
// 同上内容,省略
}
@Async
public void doTaskThree() throws Exception {
// 同上内容,省略
}
}
SpringBoot启动类@EnableAsync
@SpringBootApplication
@EnableAsync
public class Application{
public static void main(){
SpringApplication.run(Application.class, args);
}
}
继续反复执行单元测试,遇到各种不同的结果:
- 没有任何任务相关的输出
- 有部分任务相关的输出
- 乱序的任务相关的输出
原因是目前doTaskOne、doTaskTwo、doTaskThree三个函数的时候已经是异步执行了。主程序在异步调用之后,主程序并不会理会这三个函数是否执行完成了,由于没有其他需要执行的内容,所以程序就自动结束了,导致了不完整或是没有输出任务相关内容的情况
为了让doTaskOne、doTaskTwo、doTaskThree能正常结束,假设我们需要统计一下三个任务并发执行共耗时多少,这就需要等到上述三个函数都完成调动之后记录时间,并计算结果。
那么我们如何判断上述三个异步调用是否已经执行完成呢?我们需要使用Future来返回异步调用的结果,就像如下方式改造doTaskOne函数:
@Async
public Future<String> doTaskOne()throws Exception{
System.out.println("开始做任务一");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
System.out.println("完成任务一,耗时:" + (end - start) + "毫秒");
return new AsyncResult<>("任务一完成");
}
按照如上方式改造一下其他两个异步函数之后,下面我们改造一下测试用例,让测试在等待完成三个异步调用之后来做一些其他事情。
@Test
public void test() throws Exception {
long start = System.currentTimeMillis();
Future<String> task1 = task.doTaskOne();
Future<String> task2 = task.doTaskTwo();
Future<String> task3 = task.doTaskThree();
while(true) {
if(task1.isDone() && task2.isDone() && task3.isDone()) {
// 三个任务都调用完成,退出循环等待
break;
}
Thread.sleep(1000);
}
long end = System.currentTimeMillis();
System.out.println("任务全部完成,总耗时:" + (end - start) + "毫秒");
}
执行一下上述的单元测试,可以看到如下结果:
开始做任务一
开始做任务二
开始做任务三
完成任务三,耗时:37毫秒
完成任务二,耗时:3661毫秒
完成任务一,耗时:7149毫秒
任务全部完成,总耗时:8025毫秒
CompletableFuture
Future回顾
使用Future.get()的方式阻塞调用线程,或者使用轮询方式判断 Future.isDone 任务是否结束,再获取结果
@Test
public void testFuture() throws ExecutionException,InterruptedException{
ExecutorService.executorService = Executors.newFixedThreadPool(5);
Future<String> future = executorService.submit(() -> {
Thread.sleep(2000);
return "hello";
});
//Future.get()的方式阻塞调用线程,直到5个线程都执行结束
System.out.println(future.get());
System.out.println("end");
}
Future无法解决多个异步任务需要相互依赖的场景,简单点说就是,主线程需要等待子线程任务执行完毕之后在进行执行,想到了CountDownLatch,代码如下
/**
定义两个Future,第一个通过用户id获取用户信息,第二个通过商品id获取商品信息
*/
@Test
public void testCountDownLatch() throws InterruptedExcetion,ExecutionException{
ExecutorService executorService = Executors.newFixedThreadPool(5);
CountDownLatch downLatch = new CountDownLatch(2); //门阀初始化为2
long startTime = System.currentTimeMillis();
Future<String> userFuture = executorService.submit(() -> {
Thread.sleep(500);//模拟查询商品耗时500毫秒
downLatch.countDown(); //初始化减一
return "用户A";
});
Future<String> goodsFuture = executorService.submit(() -> {
Thread.sleep();//模拟查询商品耗时400毫秒
downLatch.countDown();
return "商品A";
});
downLatch.await();
//模拟主程序耗时时间
Thread.sleep(600);
System.out.println("获取用户信息:" + userFuture.get());
System.out.println("获取商品用户信息:" + goodsFuture.get());
System.out.println("总共用时:" + (System.currentTimeMilis() - startTime) + "ms");
}
不用异步操作,执行时间应该是:500+400+600 = 1500,用异步操作后实际只用1110。
CompletableFuture
@Test
public void testCompletableInfo() throws InterruptedExcption,ExecutionException{
//调用用户服务获取用户基本信息
CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() -> {
try{
Thread.sleep(500);
}catch(InterruptedException e){
e.printStackTrace();
}
return "用户A";
});
//调用商品服务获取商品用户基本信息
CompletableFuture<String> goodsFuture = CompletableFuture.supplyAsync(() -> {
try{
Thread.sleep(400);
}catch(InterruptedException e){
e.printStackTrance();
}
return "商品A";
});
System.out.println("获取用户信息:" + userFuture.get());
System.out.println("获取商品信息:" + goodsFuture.get());
//模拟主程序耗时时间
Thread.sleep(600);
System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
}
通过CompletableFuture可以很轻松的实现CountDownLatch的功能。
四种创建CompletableFuture的方式
/**
CompletableFuture四个静态方法执行异步任务
supplyAsync 执行任务,支持返回值
runAsync 执行任务,没有返回值
*/
//使用默认内置线程池ForkJoinPool.commonPool() 根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..}
//自定义线程,根据supplier构建任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..}
//使用默认内置线程forkJoinPool.commonPool(),根据runnable构建执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable){..}
//自定义线程,根据runnable构建执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor){..}
四种获取CompltableFuture结果的方式
//方式一 Future中已经提供
public T get()
//方式二 提供了超时处理,如果在指定时间未获取结果将抛出超时异常
public T get(long timeout, TimeUnit unit)
//方式三 立即获取结果不阻塞,结果计算已完成将返回结果获计算过程中的异常,如果未计算完成将返回设定的valueIfAbsent值
public T getNow(T valueIfAbsent)
//方式四 方法里不会抛出异常
public T join()
@Test
public void testComplatableGet() throws InterruptedException,ExecutionException{
CompetableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {
try{
Thread.sleep(1000);
}catch(InterruptedException e){
e.printStackTrace();
}
return "商品A";
});
//getNow方法测试
System.out.println(cp1.getNow("商品B"));//第一个执行结果为 「商品B」,因为要先睡上1秒结果不能立即获取
//join方法测试
CompletableFuture<Ineger> cp2 = CompletableFuture.supplyAsync((() -> 1/0));
System.out.println(cp2.join()); //join方法获取结果方法里不会抛异常,但是执行结果会抛异常,抛出的异常为CompletionException
System.out.println("-----------------------------------------------------");
//get测试
ComplatableFuture<Integer> cp3 = CompletableFuture.supplyAsync((() -> 1/0));
System.out.println(cp3.get());//get方法获取结果方法里将抛出异常,执行结果抛出的异常为ExecutionException
}
异步回调方法
1、thenRun/thenRunAsync
做完第一个任务后,再做第二个任务,第二个任务也没有返回值
@Test
public void testCompletableThenRunAsync() throws InterruptedException,ExecutionException{
long startTime = System.currentTimeMillis();
CompletableFuture<Void> cp1 = CompletableFuture.runAsync(() -> {
try{
//执行任务A
Thread.sleep(600);
}catch(InterruptedException e){
e.printStackTrace();
}
});
CompletableFuture<Void> cp2 = cp1.thenRun(() -> {
try {
//执行任务B
Thread.sleep(400);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
//get方法测试
System.out.println(cp2.get());
//模拟主程序耗时时间
Thread.sleep(600);
// 总共用时1610ms
System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
}
thenRun/thenRunAsync区别
如果执行了第一个任务的时候,传入了一个自定义线程池:
- 调用thenRun方法执行第二个任务时,则第二个任务和第一个任务公用同一个线程池
- 调用thenRunAsync执行第二个任务时,则第一个任务使用的是自己传入的线程池,第二个任务使用的是ForkJoin线程池。
说明: 后面介绍的thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是这个。
2、thenAccept/thenAcceptAsync
第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果作为入参传递到回调方法中,回调方法没有返回值。
@Test
public void testCompletableThenAccept() throws ExecutionException,InterruptedException{
long startTime = System.currentTimeMillis();
CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {
return "dev";
});
CompletableFuture<Void> cp2 = cp1.thenAccept((a) -> {
System.out.println("上一个任务的返回结果为:" + a);
});
cp2.get();
}
3、 thenApply/thenApplyAsync
表示第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的.
@Test
public void testCompletableThenApply() throws ExecutionException,InterruptedException{
CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {
return "dev";
}).thenApply((a) -> {
if(Objects.equals(a,"dev")){
return "dev";
}
return "prod";
});
System.out.println("当前环境为:" + c1.get());
}
4、异常回调whenComplete
当CompletableFuture的任务不论是正常完成还是出现异常它都会调用 「whenComplete」这回调函数。
- 正常完成:whenCompleta返回结果和上级任务一致,异常为null
- 出现异常:whenCompleta返回结果为null,异常为上级任务的异常
即调用get()时,正常完成时就获取到结果,出现异常时就会抛出异常,需要你处理该异常。
@Test
public void testCompletableWhenComplete() throws ExecutionException,InterruptedException{
CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
if(Math.random() < 0.5){
throw new RuntimeException("出错了");
}
System.out.println("正常结束");
return 0.11;
}).whenComplete((aDouble,throwable) -> {
if(aDouble == null){
System.out.println("whenCompleta aDouble is null");
} } else {
System.out.println("whenComplete aDouble is " + aDouble);
}
if (throwable == null) {
System.out.println("whenComplete throwable is null");
} else {
System.out.println("whenComplete throwable is " + throwable.getMessage());
}
});
}
whenComplete+exceptionally
当出现异常时,exceptionally中会捕获该异常,给出默认返回值0.0。
@Test
public void testWhenCompleteExceptionally() throws ExecutionException, InterruptedException {
CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("出错了");
}
System.out.println("正常结束");
return 0.11;
}).whenComplete((aDouble, throwable) -> {
if (aDouble == null) {
System.out.println("whenComplete aDouble is null");
} else {
System.out.println("whenComplete aDouble is " + aDouble);
}
if (throwable == null) {
System.out.println("whenComplete throwable is null");
} else {
System.out.println("whenComplete throwable is " + throwable.getMessage());
}
}).exceptionally((throwable) -> {
System.out.println("exceptionally中异常:" + throwable.getMessage());
return 0.0;
});
}
多任务组合回调
1、AND组合关系
runAfterBoth、thenAcceptBoth、thenCombine:表示当任务一和任务二都完成在执行任务三。
其区别:
- reunAfterBoth 不会把执行结果当作方法入参,且没有返回值
- thenAcceptBoth 会将两个任务的执行结果作为方法入参,传递到指定方法中,且无返回值
- thenCombine 会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值
@Test
public void testCompletableThenCombine() throws ExecutionException, InterruptedException {
//创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
//开启异步任务1
CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
System.out.println("异步任务1,当前线程是:" + Thread.currentThread().getId());
int result = 1 + 1;
System.out.println("异步任务1结束");
return result;
},exectorService);
//开启异步任务2
CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
System.out.println("异步任务2,当前线程是:" + Thread.currentThread().getId());
int result = 1 + 1;
System.out.println("异步任务2结束");
return result;
},executorService);
//任务组合
CompletableFuture<Integer> task3 = task.thenCombineAsync(task2,(f1,f2) -> {
System.out.println("执行任务3,当前线程是:" + Thread.currentThread().getId());
System.out.println("任务1返回值:" + f1);
System.out.println("任务2返回值:" + f2);
return f1 + f2;
},executorService);
Integer res = task3.get();
System.out.println("最终结果:" + res);
}
2、OR组合关系
runAfterEither、acceptEither、applyToEither 都表示两个任务只要有一个任务完成就执行任务三
- runAfterEither 不会把执行结果当作方法入参,且没有返回值
- acceptEither 会将已经完成的任务,作为方法入参,传递到指定方法中,且无返回值
- applyToEither 会将已经执行完成的任务,作为方法入参,传递到指定方法中,且有返回值
@Test
public void testCompletableEitherAsync() {
//创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
//开启异步任务1
CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
System.out.println("异步任务1,当前线程是:" + Thread.currentThread().getId());
int result = 1 + 1;
System.out.println("异步任务1结束");
return result;
},executorService);
//开启异步任务2
CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
System.out.println("异步任务2,当前线程是:" + Thread.currentThread().getId());
int result = 1 + 2;
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("异步任务2结束");
return result;
}, executorService);
//任务组合
task.acceptEitherAsync(task2,(res)->{
System.out.println("执行任务3,当前线程是:" + Thread.currentThread().getId());
System.out.println("上一个任务的结果为:"+res);
},executorService);
}
如果把上面的核心线程数改为1也就是
ExecutorService executorService = Executors.newFixedThreadPool(1);
运行结果就是下面的了,会发现根本没有执行任务3,显然是任务3直接被丢弃了。
3、多任务组合
「allOf」:等待所有任务完成
「anyOf」:只要有一个任务完成
@Test
public void testCompletableAallOf() throws ExecutionException, InterruptedException {
//创建线程
ExecutorService executorService = Executors.newFixedThreadPool(10);
//开启异步任务1
CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
System.out.println("异步任务1,当前线程是:" + Thread.currentThread().getId());
int result = 1 + 1;
System.out.println("异步任务1结束");
return result;
},ececutorService);
//开启异步任务2
CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
System.out.println("异步任务2,当前线程是:" + Thread.currentThread().getId());
int result = 1 + 2;
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("异步任务2结束");
return result;
}, executorService);
//开启异步任务3
CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> {
System.out.println("异步任务3,当前线程是:" + Thread.currentThread().getId());
int result = 1 + 3;
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("异步任务3结束");
return result;
}, executorService);
//任务组合
CompletableFuture<Void> allOf = CompletableFuture.allOf(task,task2,task3);
//等待所有任务完成
allOf.get();
//获取所有任务的返回结果
System.out.println("task结果为:" + task.get());
System.out.println("task2结果为:" + task2.get());
System.out.println("task3结果为:" + task3.get());
}
@Test
public void testCompletableAnyOf() throws ExecutionException, InterruptedException {
//创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
//开启异步任务1
CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
int result = 1 + 1;
return result;
}, executorService);
//开启异步任务2
CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
int result = 1 + 2;
return result;
}, executorService);
//开启异步任务3
CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> {
int result = 1 + 3;
return result;
}, executorService);
//任务组合
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(task, task2, task3);
//只要有一个有任务完成
Object o = anyOf.get();
System.out.println("完成的任务的结果:" + o);
}
1、Future需要获取返回值,才能获取异常信息
@Test
public void testWhenCompleteExceptionally() {
CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
if (1 == 1) {
throw new RuntimeException("出错了");
}
return 0.11;
});
//如果不加 get()方法这一行,看不到异常信息
//future.get();
}
Future需要获取返回值,才能获取到异常信息。如果不加 get()/join()方法,看不到异常信息。
2、CompletableFuture的get()方法是阻塞的
CompletableFuture的get()方法是阻塞的,如果使用它来获取异步调用的返回值,需要添加超时时间。
//反例
CompletableFuture.get();
//正例
CompletableFuture.get(5, TimeUnit.SECONDS);
3、不建议使用默认线程池
CompletableFuture代码中又使用了默认的 「ForkJoin线程池」,处理的线程个数是电脑 「CPU核数-1」。在大量请求过来的时候,处理逻辑复杂的话,响应会很慢。一般建议使用自定义线程池,优化线程池配置参数。
4、自定义线程池时,注意饱和策略
CompletableFuture的get()方法是阻塞的,我们一般建议使用future.get(5, TimeUnit.SECONDS)。并且一般建议使用自定义线程池。
但是如果线程池拒绝策略是DiscardPolicy或者DiscardOldestPolicy,当线程池饱和时,会直接丢弃任务,不会抛弃异常。因此建议,CompletableFuture线程池策略最好使用AbortPolicy,然后耗时的异步线程,做好线程池隔离。