线程池
线程池的创建方式
- 通过Executors的静态方法
- 通过 new ThreadPoolExecutor方式创建
七大参数的作用
参数 | 作用 |
---|---|
corePoolSize | 核心线程数,线程池创建好后就准备就绪的线程数量,一直存在 |
maximumPoolSize | 最大线程数量,控制资源 |
keepAliveTime | 存活时间,如果当前线程数量如果大于核心线程数量,释放空闲的线程,<br />最大线程-核心数量 |
unit | 时间单位 |
BlockingQueue | 阻塞队列,如果任务很多,就会把多的任务放在队列中 |
threadFactory | 线程的工厂 |
handler | 如果队列满了,按照指定的拒绝策略执行任务 |
/**
* 线程池详解
* @param args
*/
public static void main(String[] args) {
// 第一种获取的方式
ExecutorService service = Executors.newFixedThreadPool(10);
// 第二种方式: 直接new ThreadPoolExecutor()对象,并且手动的指定对应的参数
// corePoolSize:线程池的核心线程数量 线程池创建出来后就会 new Thread() 5个
// maximumPoolSize:最大的线程数量,线程池支持的最大的线程数
// keepAliveTime:存活时间,当线程数大于核心线程,空闲的线程的存活时间 8-5=3
// unit:存活时间的单位
// BlockingQueue<Runnable> workQueue:阻塞队列 当线程数超过了核心线程数据,那么新的请求到来的时候会加入到阻塞的队列中
// new LinkedBlockingQueue<>() 默认队列的长度是 Integer.MAX 那这个就太大了,所以我们需要指定队列的长度
// threadFactory:创建线程的工厂对象,作用:1、可以规范管理线程 2、(一般使用默认,当然也可以自己构建)自己构建线程工厂的好处就是可以指定线程名称,一旦线程任务出现错误,很容易定位
// RejectedExecutionHandler handler:当线程数大于最大线程数的时候会执行的淘汰策略
private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
3,
10,
10,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(100),
//Executors.defaultThreadFactory(),
new ThreadFactory() {
@Override
public Thread newThread(@NotNull Runnable r) {
return new Thread(r,"Thread Name");
}
},
new RejectedExecutionHandler(){
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.error("this Thread Name rejected task!");
}
});
//execute:只能接受Runnable类型的任务,没有返回值
//submit:不管是Runnable还是Callable类型的任务都可以接受,但是Runnable返回值均为void,所以使用Future的get()获得的还是null
poolExecutor.execute(()->{
System.out.println("----->" + Thread.currentThread().getName());
});
}
线程池的执行顺序
线程池创建,准备好core数量的核心线程,准备接收任务
线程池标识
public class ThreadPoolExecutor extends AbstractExecutorService {
// ctl初始化了线程的状态和线程数量,初始状态为RUNNING并且线程数量为0
// 这里一个Integer既包含了状态也包含了数量,其中int类型一共32位,高3位标识状态,低29位标识数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 这里指定了Integer.SIZE - 3,也就是32 - 3 = 29,表示线程数量最大取值长
private static final int COUNT_BITS = Integer.SIZE - 3;
// 这里标识线程池容量,也就是将1向左位移上面的29长度,并且-1代表最大取值,二进制就是 000111..111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池状态
private static final int RUNNING = -1 << COUNT_BITS;// 111 代表线程池尾为RUNING状态,代表正常接收任务
private static final int SHUTDOWN = 0 << COUNT_BITS;// 000 代表线程池为SHUTDOWN状态,不接收新任务,但是内部还会处理阻塞队列中的任务,正在处理的任务正常处理
private static final int STOP = 1 << COUNT_BITS;// 001 代表线程池尾STOP状态,不接收新任务,内部阻塞队列的任务不在处理,并中断正在执行的任务
private static final int TIDYING = 2 << COUNT_BITS;// 010 代表线程池尾TIDYING状态,一个过渡状态,代表线程池即将over
private static final int TERMINATED = 3 << COUNT_BITS;// 011 TERMINATED状态,执行terminated()方法,线程池真的凉了
// 得到线程池状态,通过传入的c,获取最高三位的值,拿到线程状态吗,最终就是拿 1110 000……和c做&运算得到高3位结果
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 得到线程池中的工作线程数量,最终得到现在线程数量,就是拿c 和 0001 111……做&运算,得到低29位结果
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
// 判断当前线程池状态是否是Running
private static boolean isRunning(int c) { return c < SHUTDOWN; }
}
execute
/**
* 线程池执行流程
*/
public void execute(Runnable command) {
//健壮性判断
if (command == null)
throw new NullPointerException();
// (拿到32位int)这里是获取核心线程数
int c = ctl.get();
//获取工作线程数量 工作线程数量ing < core
if (workerCountOf(c) < corePoolSize) {
// 添加工作线程,true代表创建核心线程
if (addWorker(command, true))
return;
// 创建核心线程数量失败(可能存在并发导致失败),再次获取ctl保证ctl是当前最新值(可能存在并发,被人抢先创建了线程)
c = ctl.get();
}
//判断是否为RUNING状态,是->将任务添加到阻塞队列中
if (isRunning(c) && workQueue.offer(command)) {
// 再次判断ctl
int recheck = ctl.get();
// 如果不是RUNING状态,不是->移除任务,拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果是RUNING状态,但是统计当前工作线程数量还为0,说明此时没有线程处理上面添加到队列的任务
else if (workerCountOf(recheck) == 0)
// 阻塞队列中有任务,但是没有工作线程,添加一个任务为null的工作线程处理阻塞队列中的任务
// 避免出现队列任务没有线程执行的情况
addWorker(null, false);
}
// 如果不能入队列,就尝试创建非核心线程(最大线程数)
else if (!addWorker(command, false))
// 如果还是失败,那就拒绝吧
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:// for循环标记,内套for使用标记可从内部跳出外部for循环
// 经过大量判断,给工作线程数标识+1
for (;;) {
//获取ctl和线程池状态
int c = ctl.get();
int rs = runStateOf(c);
// 除了RUNING之前都有可能
if (rs >= SHUTDOWN &&
// !(rs = SHUTDOWN && 当前任务为null && 阻塞队列不为null)
/* rs = SHUTDOWN,如果不是SHUTDOWN,代表STOP更高的状态,没必要创建线程添加任务
* firstTask == null,任务为null,并且线程不是RUNING,是不需要处理的
* 阻塞队列不为null,阻塞队列为空时,返回flase,外层!变为true,不需要创建工作线程(阻塞队列都为空了,不需要创建了)
*/
! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
// 返回构建工作线程失败
return false;
for (;;) {
// 获取工作线程数量
int wc = workerCountOf(c);
// 如果当前工作线程数量大于,线程池最大容量了就不创建了
if (wc >= CAPACITY ||
//判断当前工作线程数量,大于核心线程数量或者是最大线程数量,超过也不创建
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//采用cas方式将工作线程数加一
if (compareAndIncrementWorkerCount(c))
// 成功退出外层for
break retry;
//cas失败了(有人并发操作了),重新获取ctl
c = ctl.get();
// 重新判断线程池状态,如果没有变化:继续执行内层for即可,还是RUNING
if (runStateOf(c) != rs)
// 如果有变化:结束本次外层for,继续下次外层for
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 上面一顿操作只是给工作线程数标识成功+1,下面才是真正创建工作线程
boolean workerStarted = false;// worker开始-false
boolean workerAdded = false;// worker添加-true
// worker就是工作线程
Worker w = null;
try {
// 创建worker,传入任务
w = new Worker(firstTask);
final Thread t = w.thread;
//健壮性判断
if (t != null) {
// 获取线程池全局锁,避免添加线程时,别人干掉线程池,干掉线程池必须获取这个锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 获取线程池状态
int rs = runStateOf(ctl.get());
//runing状态
if (rs < SHUTDOWN ||
// SHUTDOWN状态,且任务为null,创建空任务工作线程处理阻塞队列的任务
(rs == SHUTDOWN && firstTask == null)) {
//线程是否时运行状态
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//将工作线程添加到集合里HashSet
workers.add(w);
// 获取工作线程数
int s = workers.size();
// 如果工作线程数大于之前记录的最大工作线程数,就替换下
if (s > largestPoolSize)
largestPoolSize = s;
//添加工作线程成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//添加工作线程成功后就准备启动
if (workerAdded) {
// 启动工作线程
t.start();
// 开始工作成功-true
workerStarted = true;
}
}
} finally {
// 如果启动工作线程失败
if (! workerStarted)
addWorkerFailed(w);
}
//返回工作线程是否启动
return workerStarted;
}
worker的封装
private final class Worker
extends AbstractQueuedSynchronizer // AQS
implements Runnable // Runnable 那本身其实就是一个Runnable了
{
private static final long serialVersionUID = 6138294804551838833L;
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// this,把worker本身传给了newThread,那么start后调用的就是当前实现了Runnable的Worker的run()
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
final void runWorker(Worker w) {
// 获取当前线程
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 如果任务不为null 执行 任务为空就从阻塞队列获取任务,拿不到任务就阻塞在这,如果是最大线程数的线程到达最大空闲时间就被销毁,如果是核心线程数线程就阻塞在这(线程复用)
while (task != null || (task = getTask()) != null) {
// 加锁,避免你shutdownNow我 任务也不会中断
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
//获取当前线程状态,如果大于STOP(只有TIDYING、TERMINATED,说明线程凉了)
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
// 就中断
wt.interrupt();
try {
// 类似aop,执行任务前操作,可以自己重写
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 类似aop,执行任务前操作,可以自己重写
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
看看getTask()、
举例:有一个线程池,core:5,max:50,queue:100,如果并发是200,那么线程池是怎么处理的?
- 首先 200个中的前面5个会直接被核心线程处理,然后6个到105个会加入到阻塞队列中,然后106到155的请求在最大线程数中,那么会创建对应的线程来处理这些请求,之后剩下的45个请求会被直接放弃
线程池的好处
- 降低资源消耗
- 提高响应速度
- 提高线程的管理
CompletableFuture
Future是Java 5添加的类,用来描述一个异步计算的结果。你可以使用isDone方法检查计算是否完成,或者使用get阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel方法停止任务的执行。
虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果,为什么不能用规察者设计模式当计算结果完成及时通知监听者呢?
很多语言,比如Node.js,采用回调的方式实现异步编程。Java的一些框架,比如Netty,自己扩展了Java的 Future接口,提供了addListener等多个扩展方法;Google guava也提供了通用的扩展Future; Scala也提供了简单易用且功能强大的Future/Promise异步编程模式。
作为正统的Java类库,是不是应该做点什么,加强一下自身库的功能呢?
在Java 8中,新增加了一个包含50个方法左右的类:CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。
CompletableFuture类实现了Future接口,所以你还是可以像以前一样通过get方法阻塞或者轮询的方式获得结果,但是这种方式不推荐使用。CompletableFuture和EutureTask同属于Future接口的实现类,都可以获取线程的执行结果。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-U7AG6IBn-1677581971066)(…/…/Pictures/typora-图片/多线程学习/image-20220915195450044.png)]
创建异步对象
CompletableFuture 提供了四个静态方法来创建一个异步操作。
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
-
没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。
-
runAsync方法不支持返回值。它是以Runnable函数式接口类型为参数,所以CompletableFuture的计算结果为空。
-
supplyAsync可以支持返回值。它是以Supplier函数式接口类型为参数,CompletableFuture的计算结果类型为U。
@Slf4j
public class CompletableFutureDemo1 {
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
5,
50,
10,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(100),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
/**
* runAsync方法不支持返回值。它是以Runnable函数式接口类型为参数,所以CompletableFuture的计算结果为空。
* supplyAsync可以支持返回值。它是以Supplier<U>函数式接口类型为参数,CompletableFuture的计算结果类型为U。
*/
public static void main(String[] args) throws ExecutionException, InterruptedException {
log.info("main 线程开始了");
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
log.info("线程1 开始了");
// int i = 10 / 0;
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("线程1 结束了了");
}, executor);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
log.info("线程2 开始了");
//int i = 10 / 0;
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("线程2 结束了");
return 10;
}, executor);
//get()仍然是阻塞式
log.info("future1.get():" + future1.get());
log.info("future2.get():" + future2.get());
log.info("main 线程结束了");
}
}
计算结果完成时的回调方法
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)//在原来线程继续执行
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)//异步新开线程处理
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)//异步新开线程处理,并且该线程归线程池executor管理
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
-
whenComplete 可以获取异步任务的返回值和抛出的异常信息,但是不能修改返回结果,但是不能处理 execptionlly 当异步任务跑出了异常后会触发的方法,如果没有抛出异常该方法不会执行 handle 可以获取异步任务的返回值和抛出的异常信息,而且可以显示的修改返回的结果
-
whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
-
whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。
-
当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。Function<? super T,? extends U>
T:上一个任务返回结果的类型,U:当前任务的返回值类型
public static void main(String[] args) throws ExecutionException, InterruptedException {
log.info("main 线程开始了");
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
log.info("线程1 开始了");
int i = 10 / 0;
log.info("线程1 结束了了");
}, executor).whenCompleteAsync((response, exception) -> {
log.info("线程1 response:" + response);
log.info("线程1 exception:"+ exception);
}, executor).exceptionally((throwable)->{
//获取异常
log.info("线程1 exceptionally:" + throwable);
//无返回结果
return null;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
log.info("线程2 开始了");
int i = 10 / 0;
log.info("线程2 结束了");
return 10;
}, executor).whenCompleteAsync((response, exception) -> {
log.info("线程2 response:" + response);
log.info("线程2 exception:"+ exception);
}, executor).exceptionally((throwable)->{
//获取异常
log.info("线程2 exceptionally:" + throwable);
//修改返回结果
return 50;
});
//get()仍然是阻塞式
log.info("future1.get():" + future1.get());
log.info("future2.get():" + future2.get());
log.info("main 线程结束了");
}
handle:相当于execptionlly和whenComplete的结合体
public static void main(String[] args) throws ExecutionException, InterruptedException {
log.info("main 线程开始了");
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
log.info("线程1 开始了");
int i = 10 / 0;
log.info("线程1 结束了了");
}, executor).handleAsync((response, exception) -> {
//处理异常和返回值
log.info("线程1 response:" + response);
log.info("线程1 exception:"+ exception);
//无返回值
return null;
}, executor);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
log.info("线程2 开始了");
int i = 10 / 0;
log.info("线程2 结束了");
return 10;
}, executor).handleAsync((response, exception) -> {
//处理异常和返回值
log.info("线程2 response:" + response);
log.info("线程2 exception:"+ exception);
//修改返回值
return 55;
}, executor);
//get()仍然是阻塞式
log.info("future1.get():" + future1.get());
log.info("future2.get():" + future2.get());
log.info("main 线程结束了");
}
线程串行方法
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
- thenRun方法:只要上面的任务执行完成(若上面任务出现异常不会执行),就开始执行thenRun,只是处理完任务后,执行 thenRun的后续操作,thenRunAsync不能获取上一个的返回结果,自己有没有返回结果
- thenAccept方法:消费处理结果(若上面任务出现异常不会执行)。接收任务的处理结果,并消费处理,无返回结果。
- thenApply:获取上一个任务返回的结果,并返回当前任务结果。 (若上面任务出现异常不会执行),handle 方法和 thenApply 方法处理方式基本一样。不同的是 handle 是在任务完成后再执行,还可以处理异常的任务。
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程 start ...");
//thenRunAsync不能获取上一个的返回结果,自己有没有返回结果
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
System.out.println("任务1 子线程执行了..." + Thread.currentThread().getName());
try {
int i = 10 / 0;
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, executor).thenRunAsync(() -> {
System.out.println("任务2 子线程执行了..." + Thread.currentThread().getName());
}, executor);
//thenAcceptAsync可以得到上一任务的返回结果
CompletableFuture.supplyAsync(() -> {
System.out.println("任务3 子线程执行了..." + Thread.currentThread().getName());
try {
int i = 1 / 0;
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 100;
}, executor).thenAcceptAsync((res) -> {
System.out.println("任务4 子线程执行了..." + Thread.currentThread().getName() + ":" + res);
}, executor);
//thenApplyAsync可以获取上一个返回结果并返回结果
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("任务5 子线程执行了..." + Thread.currentThread().getName());
try {
Thread.sleep(1000);
int i = 1 / 0;
} catch (InterruptedException e) {
e.printStackTrace();
}
return 100;
}, executor).thenApplyAsync((res) -> {
System.out.println("任务6 子线程执行了..." + Thread.currentThread().getName() + ":" + res);
return res * 100;
}, executor);
}
两个都完成
上面介绍的相关方法都是串行的执行,接下来看看需要等待两个任务执行完成后才会触发的几个方法
- thenCombine :可以获取前面两线程的返回结果,本身也有返回结果
- thenAcceptBoth:可以获取前面两线程的返回结果,本身没有返回结果
- runAfterBoth:不可以获取前面两线程的返回结果,本身也没有返回结果
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程 start ...");
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
System.out.println("任务1 子线程执行了..." + Thread.currentThread().getName());
try {
//异常不会抛出,但是任务3不会执行了
int i = 10 / 0;
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务1 子线程执行了..end." + Thread.currentThread().getName());
}, executor);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2 子线程执行了..." + Thread.currentThread().getName());
try {
Thread.sleep(600);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务2 子线程执行了. end.." + Thread.currentThread().getName());
return 10;
}, executor);
//不可以获取前面两线程的返回结果,本身也没有返回结果
future1.runAfterBothAsync(future2, () -> {
System.out.println("任务3 ----" + Thread.currentThread().getName());
}, executor);
System.out.println("主线程 end ..." + future2.get());
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程 start ...");
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1 子线程执行了..." + Thread.currentThread().getName());
try {
//异常不会抛出,但是任务3不会执行了
//int i = 10 / 0;
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务1 子线程执行了..end." + Thread.currentThread().getName());
return 20;
}, executor);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2 子线程执行了..." + Thread.currentThread().getName());
try {
Thread.sleep(600);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务2 子线程执行了. end.." + Thread.currentThread().getName());
return 10;
}, executor);
//可以获取前面两线程的返回结果,本身没有返回结果
future1.thenAcceptBothAsync(future2, (f1, f2) -> {
System.out.println("f1 = " + f1);
System.out.println("f2 = " + f2);
}, executor);
System.out.println("主线程 end ..." + future2.get());
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程 start ...");
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1 子线程执行了..." + Thread.currentThread().getName());
try {
//异常会抛出
int i=10/0;
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务1 子线程执行了..end." + Thread.currentThread().getName());
return 20;
}, executor);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2 子线程执行了..." + Thread.currentThread().getName());
try {
Thread.sleep(600);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务2 子线程执行了. end.." + Thread.currentThread().getName());
return 10;
}, executor);
//可以获取前面两线程的返回结果,本身也有返回结果
CompletableFuture<Integer> future = future1.thenCombineAsync(future2, (f1, f2) -> {
System.out.println("f1 = " + f1);
System.out.println("f2 = " + f2);
return f1 + f2;
}, executor);
System.out.println("主线程 end ..." + future.get());
}
两个任务完成一个
在上面5个基础上我们来看看两个任务只要有一个完成就会触发任务3的情况
- runAfterEither:不能获取完成的线程的返回结果,自身也没有返回结果
- acceptEither:可以获取线程的返回结果,自身没有返回结果
- applyToEither:既可以获取线程的返回结果,自身也有返回结果
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1 线程开始了..." + Thread.currentThread().getName());
int i = 100 / 5;
System.out.println("任务1 线程结束了..." + Thread.currentThread().getName());
return i;
}, executor);
CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2 线程开始了..." + Thread.currentThread().getName());
int i = 100 /10;
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务2 线程结束了..." + Thread.currentThread().getName());
return i+"";
}, executor);
// runAfterEitherAsync 不能获取前面完成的线程的返回结果,自身也没有返回结果
future1.runAfterEitherAsync(future2,()->{
System.out.println("任务3执行了....");
},executor);
// acceptEitherAsync 可以获取前面完成的线程的返回结果 自身没有返回结果
future1.acceptEitherAsync(future2,(res)->{
System.out.println("res = " + res);
},executor);
// applyToEitherAsync 既可以获取完成任务的线程的返回结果 自身也有返回结果
CompletableFuture<String> stringCompletableFuture = future1.applyToEitherAsync(future2, (res) -> {
System.out.println("res = " + res);
return res + "-->OK";
}, executor);
// 可以处理异步任务之后的操作
System.out.println("获取的线程的返回结果是:" + stringCompletableFuture.get() );
}
多任务组合
-
allOf:等待所有任务完成
-
anyOf:只要有一个任务完成
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1 线程开始了..." + Thread.currentThread().getName());
int i = 100 / 5;
System.out.println("任务1 线程结束了..." + Thread.currentThread().getName());
return i;
}, executor);
CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2 线程开始了..." + Thread.currentThread().getName());
int i = 100 /10;
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务2 线程结束了..." + Thread.currentThread().getName());
return i+"";
}, executor);
CompletableFuture<Object> future3 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务3 线程开始了..." + Thread.currentThread().getName());
int i = 100 /10;
System.out.println("任务3 线程结束了..." + Thread.currentThread().getName());
return i+"";
}, executor);
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2, future3);
anyOf.get();
System.out.println("主任务执行完成..." + anyOf.get());
CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);
allOf.get();// 阻塞在这个位置,等待所有的任务执行完成
System.out.println("主任务执行完成..." + future1.get() + " :" + future2.get() + " :" + future3.get());
}