线程池源码和CompletableFuture使用总结

news2024/11/13 14:43:58

线程池

线程池的创建方式

  • 通过Executors的静态方法
  • 通过 new ThreadPoolExecutor方式创建

七大参数的作用

参数作用
corePoolSize核心线程数,线程池创建好后就准备就绪的线程数量,一直存在
maximumPoolSize最大线程数量,控制资源
keepAliveTime存活时间,如果当前线程数量如果大于核心线程数量,释放空闲的线程,<br />最大线程-核心数量
unit时间单位
BlockingQueue阻塞队列,如果任务很多,就会把多的任务放在队列中
threadFactory线程的工厂
handler如果队列满了,按照指定的拒绝策略执行任务
    /**
     * 线程池详解
     * @param args
     */
    public static void main(String[] args) {
        // 第一种获取的方式
        ExecutorService service = Executors.newFixedThreadPool(10);
        // 第二种方式: 直接new ThreadPoolExecutor()对象,并且手动的指定对应的参数
        // corePoolSize:线程池的核心线程数量 线程池创建出来后就会 new Thread() 5个
        // maximumPoolSize:最大的线程数量,线程池支持的最大的线程数
        // keepAliveTime:存活时间,当线程数大于核心线程,空闲的线程的存活时间 8-5=3
        // unit:存活时间的单位
        // BlockingQueue<Runnable> workQueue:阻塞队列 当线程数超过了核心线程数据,那么新的请求到来的时候会加入到阻塞的队列中
        // new LinkedBlockingQueue<>() 默认队列的长度是 Integer.MAX 那这个就太大了,所以我们需要指定队列的长度
        // threadFactory:创建线程的工厂对象,作用:1、可以规范管理线程 2、(一般使用默认,当然也可以自己构建)自己构建线程工厂的好处就是可以指定线程名称,一旦线程任务出现错误,很容易定位
        // RejectedExecutionHandler handler:当线程数大于最大线程数的时候会执行的淘汰策略
      private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
        3,
        10,
        10,
        TimeUnit.SECONDS,
        new LinkedBlockingDeque<>(100),
        //Executors.defaultThreadFactory(),
        new ThreadFactory() {
          @Override
          public Thread newThread(@NotNull Runnable r) {
            return new Thread(r,"Thread Name");
          }
        },
        new RejectedExecutionHandler(){
             @Override
             public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                log.error("this Thread Name rejected task!");
             }
        });
      
        //execute:只能接受Runnable类型的任务,没有返回值
        //submit:不管是Runnable还是Callable类型的任务都可以接受,但是Runnable返回值均为void,所以使用Future的get()获得的还是null
        poolExecutor.execute(()->{
            System.out.println("----->" + Thread.currentThread().getName());
        });
    }

线程池的执行顺序

线程池创建,准备好core数量的核心线程,准备接收任务

在这里插入图片描述

线程池标识

public class ThreadPoolExecutor extends AbstractExecutorService {
  // ctl初始化了线程的状态和线程数量,初始状态为RUNNING并且线程数量为0 
  // 这里一个Integer既包含了状态也包含了数量,其中int类型一共32位,高3位标识状态,低29位标识数量
  private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  // 这里指定了Integer.SIZE - 3,也就是32 - 3 = 29,表示线程数量最大取值长
  private static final int COUNT_BITS = Integer.SIZE - 3;
  // 这里标识线程池容量,也就是将1向左位移上面的29长度,并且-1代表最大取值,二进制就是 000111..111
  private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

  // 线程池状态
  private static final int RUNNING    = -1 << COUNT_BITS;// 111 代表线程池尾为RUNING状态,代表正常接收任务
  private static final int SHUTDOWN   =  0 << COUNT_BITS;// 000 代表线程池为SHUTDOWN状态,不接收新任务,但是内部还会处理阻塞队列中的任务,正在处理的任务正常处理
  private static final int STOP       =  1 << COUNT_BITS;// 001 代表线程池尾STOP状态,不接收新任务,内部阻塞队列的任务不在处理,并中断正在执行的任务
  private static final int TIDYING    =  2 << COUNT_BITS;// 010 代表线程池尾TIDYING状态,一个过渡状态,代表线程池即将over
  private static final int TERMINATED =  3 << COUNT_BITS;// 011 TERMINATED状态,执行terminated()方法,线程池真的凉了

  // 得到线程池状态,通过传入的c,获取最高三位的值,拿到线程状态吗,最终就是拿 1110 000……和c做&运算得到高3位结果
  private static int runStateOf(int c)     { return c & ~CAPACITY; }
  // 得到线程池中的工作线程数量,最终得到现在线程数量,就是拿c 和 0001 111……做&运算,得到低29位结果
  private static int workerCountOf(int c)  { return c & CAPACITY; }
  private static int ctlOf(int rs, int wc) { return rs | wc; }
  
  // 判断当前线程池状态是否是Running
  private static boolean isRunning(int c) { return c < SHUTDOWN; }
}

execute

/**
* 线程池执行流程
*/
public void execute(Runnable command) {
  //健壮性判断
  if (command == null)
    throw new NullPointerException();
  // (拿到32位int)这里是获取核心线程数
  int c = ctl.get();
  //获取工作线程数量 工作线程数量ing < core
  if (workerCountOf(c) < corePoolSize) {
    // 添加工作线程,true代表创建核心线程
    if (addWorker(command, true))
      return;
    // 创建核心线程数量失败(可能存在并发导致失败),再次获取ctl保证ctl是当前最新值(可能存在并发,被人抢先创建了线程)
    c = ctl.get();
  }
  
  //判断是否为RUNING状态,是->将任务添加到阻塞队列中
  if (isRunning(c) && workQueue.offer(command)) {
    // 再次判断ctl
    int recheck = ctl.get();
    // 如果不是RUNING状态,不是->移除任务,拒绝策略
    if (! isRunning(recheck) && remove(command))
      reject(command);
    // 如果是RUNING状态,但是统计当前工作线程数量还为0,说明此时没有线程处理上面添加到队列的任务
    else if (workerCountOf(recheck) == 0)
      // 阻塞队列中有任务,但是没有工作线程,添加一个任务为null的工作线程处理阻塞队列中的任务
      // 避免出现队列任务没有线程执行的情况
      addWorker(null, false);
  }
  // 如果不能入队列,就尝试创建非核心线程(最大线程数)
  else if (!addWorker(command, false))
    // 如果还是失败,那就拒绝吧
    reject(command);
}

private boolean addWorker(Runnable firstTask, boolean core) {
  retry:// for循环标记,内套for使用标记可从内部跳出外部for循环
  // 经过大量判断,给工作线程数标识+1
  for (;;) {
    //获取ctl和线程池状态
    int c = ctl.get();
    int rs = runStateOf(c);

    // 除了RUNING之前都有可能
    if (rs >= SHUTDOWN &&
        // !(rs = SHUTDOWN && 当前任务为null && 阻塞队列不为null)
        /* rs = SHUTDOWN,如果不是SHUTDOWN,代表STOP更高的状态,没必要创建线程添加任务
        * firstTask == null,任务为null,并且线程不是RUNING,是不需要处理的
        * 阻塞队列不为null,阻塞队列为空时,返回flase,外层!变为true,不需要创建工作线程(阻塞队列都为空了,不需要创建了)
        */
        ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
      // 返回构建工作线程失败
      return false;

    for (;;) {
      // 获取工作线程数量
      int wc = workerCountOf(c);
      // 如果当前工作线程数量大于,线程池最大容量了就不创建了
      if (wc >= CAPACITY || 
          //判断当前工作线程数量,大于核心线程数量或者是最大线程数量,超过也不创建
          wc >= (core ? corePoolSize : maximumPoolSize))
        return false;
      //采用cas方式将工作线程数加一
      if (compareAndIncrementWorkerCount(c))
        // 成功退出外层for
        break retry;
      //cas失败了(有人并发操作了),重新获取ctl
      c = ctl.get();
      // 重新判断线程池状态,如果没有变化:继续执行内层for即可,还是RUNING
      if (runStateOf(c) != rs)
        // 如果有变化:结束本次外层for,继续下次外层for
        continue retry;
      // else CAS failed due to workerCount change; retry inner loop
    }
  }

  // 上面一顿操作只是给工作线程数标识成功+1,下面才是真正创建工作线程
  boolean workerStarted = false;// worker开始-false
  boolean workerAdded = false;// worker添加-true
  // worker就是工作线程
  Worker w = null;
  try {
    // 创建worker,传入任务
    w = new Worker(firstTask);
    final Thread t = w.thread;
    //健壮性判断
    if (t != null) {
      // 获取线程池全局锁,避免添加线程时,别人干掉线程池,干掉线程池必须获取这个锁
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
        // Recheck while holding lock.
        // Back out on ThreadFactory failure or if
        // shut down before lock acquired.
        // 获取线程池状态
        int rs = runStateOf(ctl.get());

         //runing状态
        if (rs < SHUTDOWN ||
            // SHUTDOWN状态,且任务为null,创建空任务工作线程处理阻塞队列的任务
            (rs == SHUTDOWN && firstTask == null)) {
          //线程是否时运行状态
          if (t.isAlive()) // precheck that t is startable
            throw new IllegalThreadStateException();
          //将工作线程添加到集合里HashSet
          workers.add(w);
          // 获取工作线程数
          int s = workers.size();
          // 如果工作线程数大于之前记录的最大工作线程数,就替换下
          if (s > largestPoolSize)
            largestPoolSize = s;
          //添加工作线程成功
          workerAdded = true;
        }
      } finally {
        mainLock.unlock();
      }
      //添加工作线程成功后就准备启动
      if (workerAdded) {
        // 启动工作线程
        t.start();
        // 开始工作成功-true
        workerStarted = true;
      }
    }
  } finally {
    // 如果启动工作线程失败
    if (! workerStarted)
      addWorkerFailed(w);
  }
  //返回工作线程是否启动
  return workerStarted;
}

worker的封装

private final class Worker
  extends AbstractQueuedSynchronizer  // AQS
  implements Runnable  // Runnable 那本身其实就是一个Runnable了
{

  private static final long serialVersionUID = 6138294804551838833L;

  final Thread thread;

  Runnable firstTask;

  volatile long completedTasks;

  Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    // this,把worker本身传给了newThread,那么start后调用的就是当前实现了Runnable的Worker的run()
    this.thread = getThreadFactory().newThread(this);
  }

  public void run() {
    runWorker(this);
  }
}

final void runWorker(Worker w) {
  // 获取当前线程
  Thread wt = Thread.currentThread();
  Runnable task = w.firstTask;
  w.firstTask = null;
  w.unlock(); // allow interrupts
  boolean completedAbruptly = true;
  try {
    // 如果任务不为null 执行     任务为空就从阻塞队列获取任务,拿不到任务就阻塞在这,如果是最大线程数的线程到达最大空闲时间就被销毁,如果是核心线程数线程就阻塞在这(线程复用)
    while (task != null || (task = getTask()) != null) {
      // 加锁,避免你shutdownNow我 任务也不会中断
      w.lock();
      // If pool is stopping, ensure thread is interrupted;
      // if not, ensure thread is not interrupted.  This
      // requires a recheck in second case to deal with
      // shutdownNow race while clearing interrupt
      
      //获取当前线程状态,如果大于STOP(只有TIDYING、TERMINATED,说明线程凉了)
      if ((runStateAtLeast(ctl.get(), STOP) ||
           (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
        // 就中断
        wt.interrupt();
      try {
        // 类似aop,执行任务前操作,可以自己重写
        beforeExecute(wt, task);
        Throwable thrown = null;
        try {
          //执行任务
          task.run();
        } catch (RuntimeException x) {
          thrown = x; throw x;
        } catch (Error x) {
          thrown = x; throw x;
        } catch (Throwable x) {
          thrown = x; throw new Error(x);
        } finally {
          // 类似aop,执行任务前操作,可以自己重写
          afterExecute(task, thrown);
        }
      } finally {
        task = null;
        w.completedTasks++;
        w.unlock();
      }
    }
    completedAbruptly = false;
  } finally {
    processWorkerExit(w, completedAbruptly);
  }
}

看看getTask()、

举例:有一个线程池,core:5,max:50,queue:100,如果并发是200,那么线程池是怎么处理的?

  • 首先 200个中的前面5个会直接被核心线程处理,然后6个到105个会加入到阻塞队列中,然后106到155的请求在最大线程数中,那么会创建对应的线程来处理这些请求,之后剩下的45个请求会被直接放弃

在这里插入图片描述

线程池的好处

  • 降低资源消耗
  • 提高响应速度
  • 提高线程的管理

CompletableFuture

Future是Java 5添加的类,用来描述一个异步计算的结果。你可以使用isDone方法检查计算是否完成,或者使用get阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel方法停止任务的执行。

虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果,为什么不能用规察者设计模式当计算结果完成及时通知监听者呢?

很多语言,比如Node.js,采用回调的方式实现异步编程。Java的一些框架,比如Netty,自己扩展了Java的 Future接口,提供了addListener等多个扩展方法;Google guava也提供了通用的扩展Future; Scala也提供了简单易用且功能强大的Future/Promise异步编程模式。

作为正统的Java类库,是不是应该做点什么,加强一下自身库的功能呢?

在Java 8中,新增加了一个包含50个方法左右的类:CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

CompletableFuture类实现了Future接口,所以你还是可以像以前一样通过get方法阻塞或者轮询的方式获得结果,但是这种方式不推荐使用。CompletableFuture和EutureTask同属于Future接口的实现类,都可以获取线程的执行结果。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-U7AG6IBn-1677581971066)(…/…/Pictures/typora-图片/多线程学习/image-20220915195450044.png)]

创建异步对象

CompletableFuture 提供了四个静态方法来创建一个异步操作。

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
  • 没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。

  • runAsync方法不支持返回值。它是以Runnable函数式接口类型为参数,所以CompletableFuture的计算结果为空。

  • supplyAsync可以支持返回值。它是以Supplier函数式接口类型为参数,CompletableFuture的计算结果类型为U。

@Slf4j
public class CompletableFutureDemo1 {
    
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
            5,
            50,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(100),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());

    /**
     * runAsync方法不支持返回值。它是以Runnable函数式接口类型为参数,所以CompletableFuture的计算结果为空。
     * supplyAsync可以支持返回值。它是以Supplier<U>函数式接口类型为参数,CompletableFuture的计算结果类型为U。
     */
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        log.info("main 线程开始了");
        CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
            log.info("线程1 开始了");
            // int i = 10 / 0;
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("线程1 结束了了");
        }, executor);

        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            log.info("线程2 开始了");
            //int i = 10 / 0;
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("线程2 结束了");
            return 10;
        }, executor);
        //get()仍然是阻塞式
        log.info("future1.get():" + future1.get());
        log.info("future2.get():" + future2.get());
        log.info("main 线程结束了");
    }
}

计算结果完成时的回调方法

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)//在原来线程继续执行
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)//异步新开线程处理
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)//异步新开线程处理,并且该线程归线程池executor管理
    
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
  • whenComplete 可以获取异步任务的返回值和抛出的异常信息,但是不能修改返回结果,但是不能处理
    execptionlly 当异步任务跑出了异常后会触发的方法,如果没有抛出异常该方法不会执行
    handle 可以获取异步任务的返回值和抛出的异常信息,而且可以显示的修改返回的结果
    
  • whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。

  • whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。

  • 当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。Function<? super T,? extends U>
    T:上一个任务返回结果的类型,U:当前任务的返回值类型

public static void main(String[] args) throws ExecutionException, InterruptedException {
    log.info("main 线程开始了");
    CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
        log.info("线程1 开始了");
        int i = 10 / 0;
        log.info("线程1 结束了了");
    }, executor).whenCompleteAsync((response, exception) -> {
        log.info("线程1 response:" + response);
        log.info("线程1 exception:"+ exception);
    }, executor).exceptionally((throwable)->{
        //获取异常
        log.info("线程1 exceptionally:" + throwable);
        //无返回结果
        return null;
    });

    CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
        log.info("线程2 开始了");
        int i = 10 / 0;
        log.info("线程2 结束了");
        return 10;
    }, executor).whenCompleteAsync((response, exception) -> {
        log.info("线程2 response:" + response);
        log.info("线程2 exception:"+ exception);
    }, executor).exceptionally((throwable)->{
        //获取异常
        log.info("线程2 exceptionally:" + throwable);
        //修改返回结果
        return 50;
    });

    //get()仍然是阻塞式
    log.info("future1.get():" + future1.get());
    log.info("future2.get():" + future2.get());
    log.info("main 线程结束了");
}

handle:相当于execptionlly和whenComplete的结合体

public static void main(String[] args) throws ExecutionException, InterruptedException {
    log.info("main 线程开始了");
    CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
        log.info("线程1 开始了");
        int i = 10 / 0;
        log.info("线程1 结束了了");
    }, executor).handleAsync((response, exception) -> {
        //处理异常和返回值
        log.info("线程1 response:" + response);
        log.info("线程1 exception:"+ exception);
        //无返回值
        return null;
    }, executor);

    CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
        log.info("线程2 开始了");
        int i = 10 / 0;
        log.info("线程2 结束了");
        return 10;
    }, executor).handleAsync((response, exception) -> {
        //处理异常和返回值
        log.info("线程2 response:" + response);
        log.info("线程2 exception:"+ exception);
        //修改返回值
        return 55;
    }, executor);

    //get()仍然是阻塞式
    log.info("future1.get():" + future1.get());
    log.info("future2.get():" + future2.get());
    log.info("main 线程结束了");
}

线程串行方法

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
 
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
 
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
  • thenRun方法:只要上面的任务执行完成(若上面任务出现异常不会执行),就开始执行thenRun,只是处理完任务后,执行 thenRun的后续操作,thenRunAsync不能获取上一个的返回结果,自己有没有返回结果
  • thenAccept方法:消费处理结果(若上面任务出现异常不会执行)。接收任务的处理结果,并消费处理,无返回结果。
  • thenApply:获取上一个任务返回的结果,并返回当前任务结果。 (若上面任务出现异常不会执行),handle 方法和 thenApply 方法处理方式基本一样。不同的是 handle 是在任务完成后再执行,还可以处理异常的任务。
   public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("主线程 start ...");
        //thenRunAsync不能获取上一个的返回结果,自己有没有返回结果
        CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
            System.out.println("任务1 子线程执行了..." + Thread.currentThread().getName());
            try {
                int i = 10 / 0;
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, executor).thenRunAsync(() -> {
            System.out.println("任务2 子线程执行了..." + Thread.currentThread().getName());
        }, executor);


        //thenAcceptAsync可以得到上一任务的返回结果
        CompletableFuture.supplyAsync(() -> {
            System.out.println("任务3 子线程执行了..." + Thread.currentThread().getName());
            try {
                int i = 1 / 0;
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 100;
        }, executor).thenAcceptAsync((res) -> {
            System.out.println("任务4 子线程执行了..." + Thread.currentThread().getName() + ":" + res);
        }, executor);

        //thenApplyAsync可以获取上一个返回结果并返回结果
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务5 子线程执行了..." + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
                int i = 1 / 0;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 100;
        }, executor).thenApplyAsync((res) -> {
            System.out.println("任务6 子线程执行了..." + Thread.currentThread().getName() + ":" + res);
            return res * 100;
        }, executor);
    }

两个都完成

上面介绍的相关方法都是串行的执行,接下来看看需要等待两个任务执行完成后才会触发的几个方法

  • thenCombine :可以获取前面两线程的返回结果,本身也有返回结果
  • thenAcceptBoth:可以获取前面两线程的返回结果,本身没有返回结果
  • runAfterBoth:不可以获取前面两线程的返回结果,本身也没有返回结果
public static void main(String[] args) throws ExecutionException, InterruptedException {
    System.out.println("主线程 start ...");
    CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
        System.out.println("任务1 子线程执行了..." + Thread.currentThread().getName());
        try {
            //异常不会抛出,但是任务3不会执行了
            int i = 10 / 0;
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任务1 子线程执行了..end." + Thread.currentThread().getName());
    }, executor);

    CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务2 子线程执行了..." + Thread.currentThread().getName());

        try {
            Thread.sleep(600);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任务2 子线程执行了. end.." + Thread.currentThread().getName());
        return 10;
    }, executor);

    //不可以获取前面两线程的返回结果,本身也没有返回结果
    future1.runAfterBothAsync(future2, () -> {
        System.out.println("任务3 ----" + Thread.currentThread().getName());
    }, executor);

    System.out.println("主线程 end ..." + future2.get());
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
    System.out.println("主线程 start ...");
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务1 子线程执行了..." + Thread.currentThread().getName());
        try {
            //异常不会抛出,但是任务3不会执行了
            //int i = 10 / 0;
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任务1 子线程执行了..end." + Thread.currentThread().getName());
        return 20;
    }, executor);

    CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务2 子线程执行了..." + Thread.currentThread().getName());

        try {
            Thread.sleep(600);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任务2 子线程执行了. end.." + Thread.currentThread().getName());
        return 10;
    }, executor);

    //可以获取前面两线程的返回结果,本身没有返回结果
    future1.thenAcceptBothAsync(future2, (f1, f2) -> {
        System.out.println("f1 = " + f1);
        System.out.println("f2 = " + f2);
    }, executor);

    System.out.println("主线程 end ..." + future2.get());
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
    System.out.println("主线程 start ...");
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务1 子线程执行了..." + Thread.currentThread().getName());
        try {
            //异常会抛出
            int i=10/0;
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任务1 子线程执行了..end." + Thread.currentThread().getName());
        return 20;
    }, executor);

    CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务2 子线程执行了..." + Thread.currentThread().getName());

        try {
            Thread.sleep(600);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任务2 子线程执行了. end.." + Thread.currentThread().getName());
        return 10;
    }, executor);

    //可以获取前面两线程的返回结果,本身也有返回结果
    CompletableFuture<Integer> future = future1.thenCombineAsync(future2, (f1, f2) -> {
        System.out.println("f1 = " + f1);
        System.out.println("f2 = " + f2);
        return f1 + f2;
    }, executor);

    System.out.println("主线程 end ..." + future.get());
}

两个任务完成一个

在上面5个基础上我们来看看两个任务只要有一个完成就会触发任务3的情况

  • runAfterEither:不能获取完成的线程的返回结果,自身也没有返回结果
  • acceptEither:可以获取线程的返回结果,自身没有返回结果
  • applyToEither:既可以获取线程的返回结果,自身也有返回结果
public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务1 线程开始了..." + Thread.currentThread().getName());
        int i = 100 / 5;
        System.out.println("任务1 线程结束了..." + Thread.currentThread().getName());
        return i;
    }, executor);
    CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务2 线程开始了..." + Thread.currentThread().getName());
        int i = 100 /10;
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任务2 线程结束了..." + Thread.currentThread().getName());
        return i+"";
    }, executor);
    // runAfterEitherAsync 不能获取前面完成的线程的返回结果,自身也没有返回结果
    future1.runAfterEitherAsync(future2,()->{
        System.out.println("任务3执行了....");
    },executor);

    // acceptEitherAsync 可以获取前面完成的线程的返回结果  自身没有返回结果
    future1.acceptEitherAsync(future2,(res)->{
        System.out.println("res = " + res);
    },executor);

    // applyToEitherAsync 既可以获取完成任务的线程的返回结果  自身也有返回结果
    CompletableFuture<String> stringCompletableFuture = future1.applyToEitherAsync(future2, (res) -> {
        System.out.println("res = " + res);
        return res + "-->OK";
    }, executor);
    // 可以处理异步任务之后的操作
    System.out.println("获取的线程的返回结果是:" + stringCompletableFuture.get() );
}

多任务组合

  • allOf:等待所有任务完成

  • anyOf:只要有一个任务完成

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);
public static void main(String[] args) throws ExecutionException, InterruptedException {   
   CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务1 线程开始了..." + Thread.currentThread().getName());
        int i = 100 / 5;
        System.out.println("任务1 线程结束了..." + Thread.currentThread().getName());
        return i;
    }, executor);
    CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务2 线程开始了..." + Thread.currentThread().getName());
        int i = 100 /10;
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任务2 线程结束了..." + Thread.currentThread().getName());
        return i+"";
    }, executor);

    CompletableFuture<Object> future3 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务3 线程开始了..." + Thread.currentThread().getName());
        int i = 100 /10;
        System.out.println("任务3 线程结束了..." + Thread.currentThread().getName());
        return i+"";
    }, executor);

    CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2, future3);
    anyOf.get();
    System.out.println("主任务执行完成..." + anyOf.get());

    CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);
    allOf.get();// 阻塞在这个位置,等待所有的任务执行完成
    System.out.println("主任务执行完成..." + future1.get() + " :" + future2.get() + " :" + future3.get());
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/377864.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

嵌入式 linux 系统开发网络的设置

目录 一、前言 二、linux网络静态地址设置 前言 为什么要对linux系统下的ubuntu进行网络设置呢&#xff1f; 因为我们在嵌入式开发中&#xff0c;我们要保证windows系统、linux系统、开发板的ip要处于同一个网段&#xff0c;而默认ubuntu下的linux系统的ip是动态分配的&#…

ACM-蓝桥杯训练第一周

&#x1f680;write in front&#x1f680; &#x1f4dd;个人主页&#xff1a;认真写博客的夏目浅石.CSDN &#x1f381;欢迎各位→点赞&#x1f44d; 收藏⭐️ 留言&#x1f4dd;​ &#x1f4e3;系列专栏&#xff1a;ACM周训练题目合集.CSDN &#x1f4ac;总结&#xff1a…

debian 部署nginx https

我是flask 处理请求单进程&#xff0c; 差点意思 &#xff0c; 考虑先flask 在往下走 一&#xff1a;安装nginx 因为我是debian 系统&#xff0c;所以我的建议是直接 sudo apt-get install nginx 你也可以选择在官网下载&#xff0c; 但是我搭建ssl 的时候安装openssl非常的麻…

记住这3点,有效提高江苏专转本上岸率

记住这3点&#xff0c;有效提高上岸率 我们都知道&#xff0c;在江苏统招专转本考试中想岸并不是一件容易的事情。考生能否顺利上岸&#xff0c;往往受多方面因素影响&#xff0c;这其中包括&#xff1a;个人基础、学习方式、信息搜索能力。 如何提高自己的专转本上岸几率&…

粗心小编被云拯救,那云上数据谁来拯救?

开工第三天      小编已忙的焦头烂额      不是因为工作积压      而是因为自己的疏忽      也许是没有进入工作状态,一大早先经历了自行车钥匙丢失,手机遗落在家,好不容易坐到工位上才发现,备份数据的U盘忘带了。    不过,好在提前将工作文件上传到了云端…

10 大项目管理知识领域 4大核心领域 5大辅助领域

有人说&#xff1a;一个人从1岁活到80岁很平凡&#xff0c;但如果从80岁倒着活&#xff0c;那么一半以上的人都可能不凡。 生活没有捷径&#xff0c;我们踩过的坑都成为了生活的经验&#xff0c;这些经验越早知道&#xff0c;你要走的弯路就会越少。 项目管理的10大知识领域&a…

LeetCode 1145. 二叉树着色游戏 -- 简单搜索

二叉树着色游戏 提示 中等 199 相关企业 有两位极客玩家参与了一场「二叉树着色」的游戏。游戏中&#xff0c;给出二叉树的根节点 root&#xff0c;树上总共有 n 个节点&#xff0c;且 n 为奇数&#xff0c;其中每个节点上的值从 1 到 n 各不相同。 最开始时&#xff1a; 「一…

【C++】vector的基本使用

难道向上攀爬的那条路&#xff0c;不是比站在顶峰更让人热血沸腾吗&#xff1f; 文章目录一、vector和string的联系与不同二、vector的扩容操作1.resize() &#xff08;缺省值为匿名对象&#xff09;&& reserve()2.reserve在g和vs上的扩容机制3.reserve异地扩容和shri…

Pinia 介绍、使用、实践

1. Pinia 介绍1.1 Pinia 是什么Pinia 官网https://pinia.vuejs.org/vuex Githubhttps://github.com/vuejs/vuex上面是 Vuex Github 中置顶说明&#xff0c;我们可以得知&#xff1a;Pinia 现在是新的默认设置&#xff0c;Vue 的官方状态管理库已更改为 Pinia&#xff0c;Vue3、…

数据结构与算法系列之时间与空间复杂度

这里写目录标题算法的复杂度大O的渐进表示法实例分析空间复杂度每日一题算法的复杂度 衡量一个算法的好坏&#xff0c;一般 是从时间和空间两个维度来衡量的&#xff0c; 即时间复杂度和空间复杂度。 时间复杂度主要衡量一个算法的运行快慢&#xff0c; 空间复杂度主要衡量一个…

Linux -- 程序 进程 线程 概念引入

程序与进程 &#xff1a;程序 &#xff1a;什么是程序 &#xff1f;&#xff1f;&#xff1f;伪官方 &#xff1a; 二进制文件&#xff0c;文件存储在磁盘中&#xff0c;例如 /usr/bin 目录下 。 是静态。 简单讲 &#xff1a;# 我们都学习了语言&#xff0c;比如下面这串代…

全国领先——液力悬浮仿生型人工心脏上市后在同济医院成功植入

2023年2月22日&#xff0c;华中科技大学同济医学院附属同济医院&#xff08;同济医院&#xff09;心脏大血管外科团队举办了一场气氛热烈的小规模庆祝活动&#xff0c;魏翔主任、程才副主任、王星宇副主任医师和李师亮医师到场&#xff0c;为终末期心衰患者黄先生“庆生”&…

Java 文本检索神器 “正则表达式”

Java 文本检索神器 “正则表达式” 每博一文案 在我们短促而又漫长的一生中&#xff0c;我们在苦苦地寻找人生的幸福&#xff0c;可幸福往往又与我们失之交臂&#xff0c; 当我们为此而耗尽宝贵的。青春年华&#xff0c;皱纹也悄悄地爬上了眼角的时候&#xff0c;我们或许才能…

Maven工程打jar包的N种方式

Maven工程打jar包 一、IDEA自带打包插件二、maven插件打包2.1 制作瘦包&#xff08;直接打包&#xff0c;不打包依赖包&#xff09;2.2 制作瘦包和依赖包&#xff08;相互分离&#xff09;2.3 制作胖包&#xff08;项目依赖包和项目打为一个包&#xff09;2.4 制作胖包&#xf…

数据结构与算法(二十)快速排序、堆排序(四)

数据结构与算法&#xff08;三&#xff09;软件设计(十九)https://blog.csdn.net/ke1ying/article/details/129252205 排序 分为 稳定排序 和 不稳定排序 内排序 和 外排序 内排序指在内存里&#xff0c;外排序指在外部存储空间排序 1、排序的方法分类。 插入排序&#xff…

下拉框推荐-Suggest-SUG

什么是下拉框推荐 在我们使用各种app&#xff08;飞猪&#xff09;想要搜索我们想要的东西&#xff0c;假设我想要上海迪士尼的门票&#xff0c;那么精确的query是“上海迪士尼门票”&#xff0c;要打7个字&#xff0c;如果在你输入“上海”的时候app就推荐了query“上海迪士尼…

无线蓝牙耳机哪个牌子好?2023质量好的无线蓝牙耳机推荐

近几年&#xff0c;随着蓝牙技术的不断进步&#xff0c;使用蓝牙耳机的人也越来越多。蓝牙耳机的出现&#xff0c;不仅能让我们摆脱线带来的约束&#xff0c;还能提升我们学习和工作的效率。最近看到很多人问&#xff0c;无线蓝牙耳机哪个牌子好&#xff1f;下面&#xff0c;我…

accent-color一行代码,让你的表单组件变好看

不做切图仔,从关注本专栏开始 文章目录 不做切图仔,从关注本专栏开始前言兼容性语法继承性智能前言 在之前的网站开发中,我们是很难去更改的你某些控件的颜色。我们可能要使用各种技巧来自定义我们的控件。好消息是,今天如果我们想要去改变控件的颜色,css为我们提供了一些…

docker删除已停止的容器

一、docker删除已停止的容器 1、根据容器的状态&#xff0c;删除Exited状态的容器 先停止容器、再删除镜像中的容器、最后删除none的镜像。执行命令如下&#xff1a; docker stop $(docker ps -a | grep "Exited" | awk {print $1 }) #停止容器 docker rm $(docke…

【C++初阶】1. C++入门

1. 前言 1. 什么是C C语言是结构化和模块化的语言&#xff0c;适合处理较小规模的程序。对于复杂的问题&#xff0c;规模较大的程序&#xff0c;需要高度的抽象和建模时&#xff0c;C语言则不合适。为了解决软件危机&#xff0c; 20世纪80年代&#xff0c; 计算机界提出了OOP(…