简单的 CompletableFuture学习笔记

news2024/11/15 17:35:57

简单的 CompletableFuture学习笔记

这里记录一下自己学习的内容,简单记录一下方便后续学习,内容部分参考 CompletableFuture学习博客

1. CompletableFuture简介

在api接口调用时间过长,调用过多外围接口时,为了提升性能,我们采用线程池来负责多线程的处理操作,因为我们需要得到各个子线程处理的结果,所以我们需要使用线程池+Future的方式进行接口优化,但Future在应对并行结果组合以及后续处理等方面显得力不从心,弊端明显,然后便引入了本次学习的CompletableFuture。

CompletableFuture 是 Java 8 中引入的一个新的并发编程工具,它为开发者提供了一种简单、高效的方式来处理异步操作和并发任务。CompletableFuture 可以看作是 Future 的增强版,它提供了更丰富的功能和更方便的使用方式。

2. 简单示例

以下记录一下简单的代码示例

2.1 CompletableFuture初识

当我们需要进行异步处理的时候,我们可以通过CompletableFuture.supplyAsync方法,传入一个具体的要执行的处理逻辑函数,这样就轻松的完成了CompletableFuture的创建与触发执行

方法名称作用描述
supplyAsync静态方法,用于构建一个CompletableFuture<T> 对象,并异步执行传入的参数,允许执行函数有返回值
runAsync静态方法,用于构建一个CompletableFuture<Void> 对象,并异步执行传入函数,与supplyAsync的区别在于此方法传入的是Callable类型,仅执行,没有返回值

示例代码如下:

package cn.git.future;

import java.util.concurrent.*;

/**
 * @description: 初始了解completableFuture
 * @program: bank-credit-sy
 * @author: lixuchun
 * @create: 2024-08-07
 */
public class CompletableFutureDemo01 {

    static ThreadPoolExecutor executor = new ThreadPoolExecutor(
            5,
            50,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(100),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("主线程start ......");
        // 没有返回结果,可以执行任务
        CompletableFuture.runAsync(() -> {
            System.out.println("子线程执行了:" + Thread.currentThread().getName());
        }, executor);

        // 带返回结果
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("子线程2执行了:" + Thread.currentThread().getName());
            return 10;
        }, executor);
        Integer result = future2.get();
        System.out.println("result = " + result);

        System.out.println("主线程end ......");
    }
}

supplyAsync或者runAsync创建后便会立即执行,无需手动调用触发。

2.2 环环相扣处理

在流水线处理场景中,往往都是一个任务环节处理完成后,下一个任务环节接着上一环节处理结果继续处理。

CompletableFuture用于这种流水线环节驱动类的方法有很多,相互之间主要是在返回值或者给到下一环节的入参上有些许差异,使用时需要注意区分:
在这里插入图片描述
具体的方法的描述归纳如下:

方法名称作用描述
thenApplyCompletableFuture的执行后的结果进行追加处理,并将当前的CompletableFuture泛型对象更改为处理后新的对象类型,返回当前CompletableFuture对象
thenComposethenApply类似,区别点在于:此方法的入参函数返回一个CompletableFuture类型对象
thenAccept在所有异步任务完成后执行一系列操作,与thenApply类似,区别点在于thenApply返回void类型,没有具体结果输出,适合无需返回值的场景
thenRunthenAccept类似,区别点在于thenAccept可以将前面CompletableFuture执行的实际结果作为参数进行传入并使用,但是thenRun方法没有任何入参,只能执行一个Runnable函数,并且返回void类型

在这里插入图片描述
期望总是美好的,但是实际情况却总不尽如人意。在我们编排流水线的时候,如果某一个环节执行抛出异常了,会导致整个流水线后续的环节就没法再继续下去了,这时候需要使用 handle或者whenComplete来处理,具体比较如下

方法名称方法描述
handlethenApply类似,区别点在于handle执行函数的入参有两个,一个是CompletableFuture执行的实际结果,一个是是Throwable对象,这样如果前面执行出现异常的时候,可以通过handle获取到异常并进行处理。
whenComplete与handle类似,区别点在于whenComplete执行后无返回值。
  • whenComplete代码示例:

    package cn.git.future;
    
    import java.util.concurrent.*;
    
    /**
     * @description: 回调方法以及发生异常处理
     *  whenCompleteAsync作用为异步执行
     *  exceptionally作用为异常处理
     * @program: bank-credit-sy
     * @author: lixuchun
     * @create: 2024-08-07
     */
    public class CompletableFutureDemo02 {
    
        static ThreadPoolExecutor executor = new ThreadPoolExecutor(
                5,
                50,
                10,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(100),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            System.out.println("主线程start ......");
            // 没有返回结果,可以执行任务
            CompletableFuture.runAsync(() -> {
                System.out.println("子线程执行了:" + Thread.currentThread().getName());
                int i = 10 / 0;
            }, executor).whenCompleteAsync((res, exec) -> {
                System.out.println("whenCompleteAsync1");
                System.out.println("res = " + res);
                System.out.println("exec = " + exec);
            }).exceptionally((exec) -> {
                // 发生异常处理
                System.out.println("exec1 = " + exec);
                return null;
            });
    
            // 带返回结果
            CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
                System.out.println("子线程2执行了:" + Thread.currentThread().getName());
                int i = 10 / 0;
                return 10;
            }, executor).whenCompleteAsync((res, exec) -> {
                System.out.println("whenCompleteAsync2");
                System.out.println("res = " + res);
                System.out.println("exec = " + exec);
            }).exceptionally((exec) -> {
                // 发生异常处理,带返回值
                System.out.println("exec2" + exec);
                return -1;
            });
    
            Integer integer = integerCompletableFuture.get();
            System.out.println(integer);
            System.out.println("主线程end ......");
            executor.shutdown();
        }
    }
    
    

    执行结果
    在这里插入图片描述

  • handle代码示例

    package cn.git.future;
    
    import java.util.concurrent.*;
    
    /**
     * @description: handleAsync任务执行后自定义执行器
     * @program: bank-credit-sy
     * @author: lixuchun
     * @create: 2024-08-07
     */
    public class CompletableFutureDemo03 {
    
        static ThreadPoolExecutor executor = new ThreadPoolExecutor(
                5,
                50,
                10,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(100),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            // 没有返回结果,可以执行任务
            CompletableFuture.runAsync(() -> {
                System.out.println("子线程执行了:" + Thread.currentThread().getName());
            }, executor).handleAsync((result, exec) -> {
                System.out.println("exec = " + exec);
                return null;
            });
    
            // 带返回结果
            CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
                System.out.println("子线程2执行了:" + Thread.currentThread().getName());
                int i = 1 / 0;
                return 10;
            }, executor).handleAsync((result, exec) -> {
                System.out.println("exec = " + exec);
                return 55;
            });
    
            Integer integer = integerCompletableFuture.get();
            System.out.println(integer);
            executor.shutdown();
        }
    }
    
    

    执行结果:
    在这里插入图片描述

2.3 多个CompletableFuture组合操作

前面一直在介绍流水线式的处理场景,但是很多时候,流水线处理场景也不会是一个链路顺序往下走的情况,很多时候为了提升并行效率,一些没有依赖的环节我们会让他们同时去执行,然后在某些环节需要依赖的时候,进行结果的依赖合并处理。

CompletableFuture相比于Future的一大优势,就是可以方便的实现多个并行环节的合并处理。相关涉及方法介绍归纳如下:

方法名称方法描述
thenCombine将两个CompletableFuture对象组合起来进行下一步处理,可以拿到两个执行结果,并传给自己的执行函数进行下一步处理,最后返回一个新的CompletableFuture对象。
thenAcceptBoththenCombine类似,区别点在于thenAcceptBoth传入的执行函数没有返回值,即thenAcceptBoth返回值为CompletableFuture<void>
runAfterBoth等待两个CompletableFuture都执行完成后再执行某个Runnable对象,再执行下一个的逻辑,类似thenRun
applyToEither两个CompletableFuture中任意一个完成的时候,继续执行后面给定的新的函数处理。再执行后面给定函数的逻辑,类似thenApply
acceptEither两个CompletableFuture中任意一个完成的时候,继续执行后面给定的新的函数处理。再执行后面给定函数的逻辑,类似thenAccept
runAfterEither等待两个CompletableFuture中任意一个执行完成后再执行某个Runnable对象,可以理解为thenRun的升级版,注意与runAfterBoth对比理解。
allOf静态方法,阻塞等待所有给定的CompletableFuture执行结束后,返回一个CompletableFuture<Void>结果。
anyOf静态方法,阻塞等待任意一个给定的CompletableFuture对象执行结束后,返回一个CompletableFuture<Void>结果。
  • thenAcceptAsync 串行任务编排,任务有先后顺序

    package cn.git.future;
    
    import java.util.concurrent.*;
    
    /**
     * @description: 串行任务编排,任务有先后顺序
     * @program: bank-credit-sy
     * @author: lixuchun
     * @create: 2024-08-07
     */
    public class CompletableFutureDemo04 {
    
        static ThreadPoolExecutor executor = new ThreadPoolExecutor(
                5,
                50,
                10,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(100),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            // 没有返回结果,可以执行任务 线程1结束执行线程2
            CompletableFuture.runAsync(() -> {
                System.out.println("子线程1执行了:" + Thread.currentThread().getName());
            }, executor).thenRunAsync(()-> {
                System.out.println("子线程2执行了:" + Thread.currentThread().getName());
            }, executor);
    
            // 有返回结果,可以执行任务 线程1结束执行线程4
            CompletableFuture.supplyAsync(() -> {
                System.out.println("子线程3执行了:" + Thread.currentThread().getName());
                return 100;
            }, executor).thenAcceptAsync((param) -> {
                // 线程3执行结果作为线程4入参参数
                System.out.println("子线程4执行了:" + Thread.currentThread().getName());
                System.out.println("线程4参数 = " + param);
            }, executor);
    
    
            // 有返回结果,获取处理
            CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
                System.out.println("子线程5执行了:" + Thread.currentThread().getName());
                return 100;
            }, executor).thenApplyAsync((param) -> {
                // 线程3执行结果作为线程4入参参数
                System.out.println("子线程6执行了:" + Thread.currentThread().getName());
                System.out.println("线程6参数 = " + param);
                return param * 100;
            }, executor);
    
            System.out.println("thread6 result = " + integerCompletableFuture.get());
            executor.shutdown();
        }
    }
    
    

    执行结果:在这里插入图片描述

  • runAfterBothAsync两个线程,都完成再执行

    package cn.git.future;
    
    import java.util.concurrent.*;
    
    /**
     * @description: 多个线程,都完成再执行
     * @program: bank-credit-sy
     * @author: lixuchun
     * @create: 2024-08-07
     */
    public class CompletableFutureDemo05 {
    
        static ThreadPoolExecutor executor = new ThreadPoolExecutor(
                5,
                50,
                10,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(100),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            // 没有返回结果,串行执行任务
            final CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
                System.out.println("子线程1执行了:" + Thread.currentThread().getName());
            }, executor).thenRunAsync(() -> {
                System.out.println("子线程2执行了:" + Thread.currentThread().getName());
            }, executor);
    
            // 串行执行任务
            CompletableFuture<Void> future2 = CompletableFuture.supplyAsync(() -> {
                System.out.println("子线程3执行了:" + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return 100;
            }, executor).thenAcceptAsync((param) -> {
                // 线程3执行结果作为线程4入参参数
                System.out.println("子线程4执行了:" + Thread.currentThread().getName());
                System.out.println("线程4参数 = " + param);
            }, executor);
    
            // 两个任务都执行完成后,执行线程5
            future1.runAfterBothAsync(future2, () -> {
                System.out.println("子线程5执行了:" + Thread.currentThread().getName());
            }, executor);
    
        }
    }
    
    

    执行结果:
    在这里插入图片描述

  • thenAcceptBothAsync 两个线程都完成再处理带参数

    package cn.git.future;
    
    import java.util.concurrent.*;
    
    /**
     * @description: 多个线程,都完成再处理带参数
     * @program: bank-credit-sy
     * @author: lixuchun
     * @create: 2024-08-07
     */
    public class CompletableFutureDemo06 {
    
        static ThreadPoolExecutor executor = new ThreadPoolExecutor(
                5,
                50,
                10,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(100),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            // 没有返回结果,串行执行任务
            final CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
                System.out.println("子线程1执行了:" + Thread.currentThread().getName());
            }, executor).thenRunAsync(() -> {
                System.out.println("子线程2执行了:" + Thread.currentThread().getName());
            }, executor);
    
            // 串行执行任务
            CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
                System.out.println("子线程3执行了:" + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return 100;
            }, executor).exceptionally((e) -> {
                // 线程3执行结果作为线程4入参参数
                System.out.println("子线程4执行了:" + Thread.currentThread().getName());
                System.out.println("子线程4执行异常了:" + e.getMessage());
                return -1;
            });
    
            // 两个任务都执行完成后,执行线程5,带有返回参数
            future1.thenAcceptBothAsync(future2, (param1, param2) -> {
                // param1 = 线程3执行结果, param2 = 线程4执行结果
                System.out.println("线程5参数1 = " + param1);
                System.out.println("线程5参数2 = " + param2);
                System.out.println("子线程5执行了:" + Thread.currentThread().getName());
            });
    
        }
    }
    
    

    执行结果:
    在这里插入图片描述

  • thenCombineAsync 两个线程结果进行统一处理,返回新的返回结果

    package cn.git.future;
    
    import java.util.concurrent.*;
    
    /**
     * @description: 获取两个线程结果进行统一处理,返回新的返回结果
     * @program: bank-credit-sy
     * @author: lixuchun
     * @create: 2024-08-07
     */
    public class CompletableFutureDemo07 {
    
        static ThreadPoolExecutor executor = new ThreadPoolExecutor(
                5,
                50,
                10,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(100),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            // 没有返回结果,串行执行任务
            final CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
                System.out.println("子线程1执行了:" + Thread.currentThread().getName());
                return 50;
            }, executor);
            // 串行执行任务
            CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
                System.out.println("子线程3执行了:" + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return 100;
            }, executor).exceptionally((e) -> {
                // 线程3执行结果作为线程4入参参数
                System.out.println("子线程4执行了:" + Thread.currentThread().getName());
                System.out.println("子线程4执行异常了:" + e.getMessage());
                return -1;
            });
    
            // 获取两个线程结果进行统一处理,返回新的返回结果
            final CompletableFuture<Integer> future3 = future1.thenCombineAsync(future2, (f1, f2) -> {
                System.out.println("f1 = " + f1);
                System.out.println("f2 = " + f2);
                return f1 + f2;
            }, executor);
            System.out.println("future3 = " + future3.get());
        }
    }
    
    

    执行结果:
    在这里插入图片描述

  • runAfterEitherAsync两个任务完成其中任意一个任务

    package cn.git.future;
    
    import java.util.concurrent.*;
    
    /**
     * @description: 两个任务完成其中任意一个任务
     * @program: bank-credit-sy
     * @author: lixuchun
     * @create: 2024-08-07
     */
    public class CompletableFutureDemo08 {
    
        static ThreadPoolExecutor executor = new ThreadPoolExecutor(
                5,
                50,
                10,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(100),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            // 没有返回结果,串行执行任务
            CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
                System.out.println("子线程1执行了:" + Thread.currentThread().getName());
                return 50;
            }, executor);
    
            // 串行执行任务
            CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("子线程3执行了:" + Thread.currentThread().getName());
                return 100;
            }, executor).exceptionally((e) -> {
                // 线程3执行结果作为线程4入参参数
                System.out.println("子线程4执行了:" + Thread.currentThread().getName());
                System.out.println("子线程4执行异常了:" + e.getMessage());
                return -1;
            });
    
            // 两个任务完成其中任意一个任务,便开始执行
            future1.runAfterEitherAsync(future2, () -> {
                System.out.println("子线程5执行了:" + Thread.currentThread().getName());
            }, executor);
        }
    }
    
    

    执行结果:
    在这里插入图片描述

  • acceptEitherAsync 带有任意一个执行完毕参数信息

    package cn.git.future;
    
    import java.util.concurrent.*;
    
    /**
     * @description: 两个任务完成其中任意一个任务
     * @program: bank-credit-sy
     * @author: lixuchun
     * @create: 2024-08-07
     */
    public class CompletableFutureDemo09 {
    
        static ThreadPoolExecutor executor = new ThreadPoolExecutor(
                5,
                50,
                10,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(100),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            // 没有返回结果,串行执行任务
            CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("子线程1执行了:" + Thread.currentThread().getName());
                return 50;
            }, executor);
    
            // 串行执行任务
            CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
                System.out.println("子线程3执行了:" + Thread.currentThread().getName());
                return 100;
            }, executor).exceptionally((e) -> {
                // 线程3执行结果作为线程4入参参数
                System.out.println("子线程4执行了:" + Thread.currentThread().getName());
                System.out.println("子线程4执行异常了:" + e.getMessage());
                return -1;
            });
    
            // 两个任务完成其中任意一个任务,便开始执行,带有任意一个执行完毕参数信息
            future1.acceptEitherAsync(future2, (res) -> {
                System.out.println("新线程:" + Thread.currentThread().getName());
                System.out.println("获取任意结果:" + res);
            }, executor);
        }
    }
    
    

    执行结果:
    在这里插入图片描述

  • applyToEitherAsync 两个任务完成其中任意一个任务,便开始执行,并且获取返回结果

    package cn.git.future;
    
    import java.util.concurrent.*;
    
    /**
     * @description: 两个任务完成其中任意一个任务
     * @program: bank-credit-sy
     * @author: lixuchun
     * @create: 2024-08-07
     */
    public class CompletableFutureDemo10 {
    
        static ThreadPoolExecutor executor = new ThreadPoolExecutor(
                5,
                50,
                10,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(100),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            // 没有返回结果,串行执行任务
            CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("子线程1执行了:" + Thread.currentThread().getName());
                return 50;
            }, executor);
    
            // 串行执行任务
            CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
                System.out.println("子线程3执行了:" + Thread.currentThread().getName());
                return 100;
            }, executor).exceptionally((e) -> {
                // 线程3执行结果作为线程4入参参数
                System.out.println("子线程4执行了:" + Thread.currentThread().getName());
                System.out.println("子线程4执行异常了:" + e.getMessage());
                return -1;
            });
    
            // 两个任务完成其中任意一个任务,便开始执行,并且获取返回结果
            final CompletableFuture<Integer> future3 = future1.applyToEitherAsync(future2, (res) -> {
                System.out.println("子线程2执行了:" + Thread.currentThread().getName());
                System.out.println("子线程2执行结果:" + res);
                return res;
            }, executor);
    
            System.out.println("future3 = " + future3.get());
    
        }
    }
    
    

    执行结果:
    在这里插入图片描述

  • allOf / anyOf 多任务组合处理

    package cn.git.future;
    
    import java.util.concurrent.*;
    
    /**
     * @description: 多任务组合处理
     * @program: bank-credit-sy
     * @author: lixuchun
     * @create: 2024-08-07
     */
    public class CompletableFutureDemo11 {
    
        static ThreadPoolExecutor executor = new ThreadPoolExecutor(
                5,
                50,
                10,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(100),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            // 没有返回结果,串行执行任务
            CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("子线程1执行了:" + Thread.currentThread().getName());
                return 50;
            }, executor);
    
            // 串行执行任务
            CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
                System.out.println("子线程3执行了:" + Thread.currentThread().getName());
                return 100;
            }, executor).exceptionally((e) -> {
                // 线程3执行结果作为线程4入参参数
                System.out.println("子线程4执行了:" + Thread.currentThread().getName());
                System.out.println("子线程4执行异常了:" + e.getMessage());
                return -1;
            });
    
            // 串行执行任务
            CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> {
                System.out.println("子线程5执行了:" + Thread.currentThread().getName());
                return 200;
            }, executor).exceptionally((e) -> {
                // 线程3执行结果作为线程4入参参数
                System.out.println("子线程6执行了:" + Thread.currentThread().getName());
                System.out.println("子线程6执行异常了:" + e.getMessage());
                return -1;
            });
    
            // 等待三个任务执行完成 才开始进行后续处理操作,需要進行阻塞操作
            final CompletableFuture<Void> finalFuture = CompletableFuture.allOf(future1, future2, future3);
            finalFuture.get();
    
            // 任意一个任务执行完成 才开始进行后续处理操作,需要進行阻塞操作
            final CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2, future3);
            finalFuture.get();
            System.out.println("三个任务全部执行完毕啦。。。。。。");
        }
    }
    
    

    执行结果:
    在这里插入图片描述

  • 自定义循环异步调用例子

    package cn.git.future;
    
    import cn.git.entity.Person;
    
    import java.util.Arrays;
    import java.util.concurrent.*;
    import java.util.stream.Collectors;
    import java.util.stream.IntStream;
    import java.util.stream.Stream;
    
    /**
     * @description: 自定义循环异步调用例子
     * @program: bank-credit-sy
     * @author: lixuchun
     * @create: 2024-08-07
     */
    public class TestDemo {
    
        static ThreadPoolExecutor executor = new ThreadPoolExecutor(
                5,
                50,
                10,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(100),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            // 设定参数信息,循环多少次也是入参参数
            int [] params = {1,2,3,4,5,6,7,8,9,10};
            CompletableFuture<Person>[] futures = IntStream.of(params)
                    .mapToObj(param -> {
                        CompletableFuture<Person> future = CompletableFuture.supplyAsync(() -> {
                            if (param == 5) {
                                try {
                                    Thread.sleep(5000);
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                            }
                            System.out.println("线程:" + Thread.currentThread().getName() + "正在执行任务:" + param);
                            Person person = new Person();
                            person.setSex("男");
                            person.setAge(param);
                            person.setName("张三");
                            return person;
                        }, executor);
                        return future;
                    }).toArray(CompletableFuture[]::new);
    
            // 等待所有CompletableFuture完成
            CompletableFuture.allOf(futures).join();
    
            // 获取所有结果
            Object[] results = Stream.of(futures)
                    .map(future -> {
                        try {
                            return future.get();
                        } catch (InterruptedException | ExecutionException e) {
                            e.printStackTrace();
                            return null;
                        }
                    }).toArray();
    
            // 打印结果
            System.out.println(Arrays.stream(results).map(Object::toString).collect(Collectors.joining("\n")));
        }
    }
    
    

    执行结果:
    在这里插入图片描述

2.3 结果等待与获取

在执行线程中将任务放到工作线程中进行处理的时候,执行线程与工作线程之间是异步执行的模式,如果执行线程需要获取到共工作线程的执行结果,则可以通过get或者join方法,阻塞等待并从CompletableFuture中获取对应的值。

对get和join的方法功能含义说明归纳如下:

方法名称作用描述
get()等待CompletableFuture执行完成并获取其具体执行结果,可能会抛出异常,需要代码调用的地方手动try…catch进行处理。
get(long, TimeUnit)get()相同,只是允许设定阻塞等待超时时间,如果等待超过设定时间,则会抛出异常终止阻塞等待。
join()等待CompletableFuture执行完成并获取其具体执行结果,可能会抛出运行时异常,无需代码调用的地方手动try…catch进行处理。

从介绍上可以看出,两者的区别就在于是否需要调用方显式的进行try…catch处理逻辑,使用代码示例如下:

public void testGetAndJoin(String product) {
    // join无需显式try...catch...
    PriceResult joinResult = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiPrice(product))
            .join();
    
    try {
        // get显式try...catch...
        PriceResult getResult = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiPrice(product))
                .get(5L, TimeUnit.SECONDS);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1990160.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Self-study Python Fish-C Note14 P50to51

函数 (part 4) 本节主要讲函数 递归 递归 (recursion) 递归就是函数调用自身的过程 示例1&#xff1a; def fun1(i):if i > 0: print(something)i-1fun1(i) fun1(5) # 这样就会打印五遍 somethingsomething something something something something要让递归正常工作&am…

IDEA2024.2重磅发布,更新完有4G!

JetBrains 今天宣布了其 IDE 家族版本之 2024.2 更新&#xff0c;其亮点是新 UI 现在已是默认设置&#xff0c;并且对 AI Assistant &#xff08;AI助手&#xff09;进行了几项改进。 安装密道 新 UI 的设计更加简约&#xff0c;可以根据需要以视觉方式扩展复杂功能。值得开发…

Arduino学习笔记2——初步认识Arduino程序

Arduino使用的编程语言是C。 一、注释文字 我们可以在程序中插入注释文字来提示开发者代码的作用。在Arduino中&#xff0c;单行注释用的是两个斜杠&#xff0c;多行注释用的是对称的斜杠加星号&#xff1a; 二、函数 和C语言相同&#xff0c;可以看到在打开IDE自动打开的默…

高并发下的分布式缓存 | Cache-Aside缓存模式

Cache-aside 模式的缓存操作 Cache-aside 模式&#xff0c;也叫旁路缓存模式&#xff0c;是一种常见的缓存使用方式。在这个模式下&#xff0c;应用程序可能同时需要同缓存和数据库进行数据交互&#xff0c;而缓存和数据库之间是没有直接联系的。这意味着&#xff0c;应用程序…

Java数据结构 | 二叉树基础及基本操作

二叉树 一、树型结构1.1 树的概念1.2 关于树的一些常用概念&#xff08;很重要&#xff01;&#xff01;&#xff01;&#xff09;1.3 树的表示形式1.4 树的应用 二、二叉树2.1 二叉树的概念2.2 两种特殊的二叉树2.3 二叉树的性质2.4 二叉树的存储2.5 二叉树的基本操作2.5.1 代…

【前端可视化】 大屏可视化项目二 scale适配方案 g6流程图 更复杂的图表

项目介绍 第二个大屏可视化&#xff0c;整个项目利用scale进行按比例适配。 图表更加复杂&#xff0c;涉及到图表的叠加&#xff0c;mark&#xff0c;地图&#xff0c;g6流程图的能等 始终保持比例适配(本项目方案),始终满屏适配(项目一). echarts绘制较为复杂图表&#xff0…

C++:string类(auto+范围for,typeid)

目录 前言 auto typeid 范围for 使用方法 string类的模拟实现 默认构造函数 拷贝构造函数 swap 赋值重载 析构函数 迭代器iterator begin和end c_str clear size capacity []运算符重载 push_back reserve append 运算符重载 insert erase find npos…

postgresql 宝塔 连接不上,prisma

不太熟悉pgsql; 配置搞了半天; 一直连不上远程数据库; 后台经过探索发现需要以下配置 1. 端口放行; 5422 (pgsql的端口) 2.编辑 pg_hba.conf 文件最后新增一条,这样可以外部使用postgres超级管理员账号 host all all 0.0.0.0/0 md5 3. pris…

数据结构复杂度

文章目录 一. 数据结构前言1.1 数据结构1.2 算法 二. 算法效率2.1 时间复杂度2.1.1 T(N)函数式2.1.2 大O的渐进表示法 一. 数据结构前言 1.1 数据结构 什么是数据结构呢&#xff1f;打开一个人的主页&#xff0c;有很多视频&#xff0c;这是数据&#xff08;杂乱无章&#xf…

了解k8s架构,搭建k8s集群

kubernetes 概述 Kubernetes 集群图例 安装控制节点 安装网络插件 安装 calico 安装计算节点 2、node 安装 查看集群状态 kubectl get nodes # 验证容器工作状态 [rootmaster ~]# kubectl -n kube-system get pods

【学习笔记】:Maven初级

一、Maven简介 1、为什么需要maven Maven是一个依赖管理工具,解决如下问题: 项目依赖jar包多jar包来源、版本问题jar包导入问题jar包之间的依赖Maven是一个构建工具: 脱离IDE环境的项目构建操作,需要专门的工具2、Maven介绍 https://maven.apache.org/what-is-maven.htm…

代码随想录算法训练营第44天|LeetCode 1143.最长公共子序列、1035.不相交的线、53. 最大子序和、392.判断子序列

1. LeetCode 1143.最长公共子序列 题目链接&#xff1a;https://leetcode.cn/problems/longest-common-subsequence/description/ 文章链接&#xff1a;https://programmercarl.com/1143.最长公共子序列.html 视频链接&#xff1a;https://www.bilibili.com/video/BV1ye4y1L7CQ…

苹果离线打包机配置和打包

1、虚拟机安装 macOS虚拟机安装全过程&#xff08;VMware&#xff09;-腾讯云开发者社区-腾讯云 给 windows 虚拟机装个 mac 雪之梦 1、安装苹果镜像 去网上下载&#xff0c;打包机的镜像要和自己mac电脑上的保持一致。 同时打包机的用户名也需要和自己的mac保持一致。 2、…

云原生专题-k8s基础系列-k8s-namespaces详解

获取所有的pod实例&#xff1a; k8s中&#xff0c;命名空间&#xff08;Namespace&#xff09;提供一种机制&#xff0c;将同一集群中的资源划分为相互隔离的组。同一命名空间内的资源名称要唯一&#xff0c;命名空间是用来隔离资源的&#xff0c;不隔离网络。 https://kubern…

Kafka 实战使用、单机搭建、集群搭建、Kraft集群搭建

文章目录 实验环境单机服务启动停止服务简单收发消息其他消费模式理解Kakfa的消息传递机制 集群服务为什么要使用集群部署Zookeeper集群部署Kafka集群理解服务端的Topic、Partition和Broker总结 Kraft集群相关概念 实验环境 准备三台虚拟机 三台机器均预装CentOS7 操作系统。…

探索Transformer中的多头注意力机制:如何利用GPU并发

什么是多头注意力机制&#xff1f; 首先&#xff0c;什么是多头注意力机制&#xff1f;简单来说&#xff0c;它是Transformer模型的核心组件之一。它通过并行计算多个注意力头&#xff08;attention heads&#xff09;&#xff0c;使模型能够从不同的表示子空间中捕捉不同的特…

Oracle服务器windows操作系统升级出现计算机名称改变导致数据库无法正常连接

1.数据库莫名奇妙无法正常连接&#xff0c;经排查是主机名称改变&#xff0c;导致oracle无法正常运行 如何查看ORACLE主机名称及路径&#xff1a;需要修改 listener 和 tnsnames的配置的主机名 2.修改tnsnames配置的主机名称&#xff0c;HOST主机名称 3.修改listener中的主机…

【案例36】Apache未指向新的openssl

客户发现apache报openssl相关漏洞&#xff0c;于是升级了操作系统的openssl组件。但再次漏扫发现相关版本依旧显示openssl的版本为&#xff1a;1.0.2k。怀疑升级的有问题。 问题分析 查看libssl.so.10指向的是/lib64.so.10 ldd mod_ssl.so libssl.so.10指向的是openssl1.0.2k…

【实际案例】服务器宕机情况分析及处理建议

了解银河麒麟操作系统更多全新产品&#xff0c;请点击访问麒麟软件产品专区&#xff1a;https://product.kylinos.cn 服务器环境以及配置 物理机/虚拟机/云/容器 物理机 外网/私有网络/无网络 私有网络 处理器&#xff1a; Kunpeng 920 内存&#xff1a; 4 TiB BIOS版…

【JVM基础18】——实践-Java内存泄漏排查思路?

目录 1- 引言&#xff1a;2- ⭐核心&#xff1a;2-1 排查思路2-2 步骤1&#xff1a;获取堆内存快照 dump2-3 步骤2、3&#xff1a;使用 VisualVM 打开 dump文件 3- 小结&#xff1a;3-1 Java内存泄漏排查思路&#xff1f; 1- 引言&#xff1a; 首先得明确哪里会产生内存泄漏的…