异步编程Completablefuture使用详解----入门篇

news2024/11/27 18:37:30

JDK版本:jdk17
IDEA版本:IntelliJ IDEA 2022.1.3


文章目录

  • 一、CompletableFuture是什么?
  • 二、为什么要使用CompletableFuture?
    • 2.1 工具类准备
    • 2.2 Future的局限性
    • 2.3 CompletableFuture的优势
  • 三、如何使用CompletableFuture?
    • 3.1 异步任务之开启
      • 3.1.1 runAsync
      • 3.1.2 supplyAsync
      • 3.1.3 线程池
      • 3.1.4 异步编程思想
    • 3.2 异步任务之回调
      • 3.2.1 thenApply
      • 3.2.2 thenAccept
      • 3.2.3 thenRun
      • 3.2.3 更进一步提供并行化
    • 3.3 异步任务之编排
      • 3.3.1 编排2个依赖关系的异步任务 thenCompose()
      • 3.3.2 编排2个非依赖关系的异步任务 thenCombine()
      • 3.3.3 合并多个异步 allOf / anyOf
        • 3.3.3.1 allOf()
        • 3.3.3.2 anyOf()
    • 3.4 异步任务之异常处理
      • 3.4.1 exceptionally()
      • 3.4.2 handle()


提示:以下是本篇文章正文内容,下面案例可供参考

一、CompletableFuture是什么?

讯飞星火告诉我们,CompletableFuture是Java 8中引入的一个类,它实现了Future接口,用于表示异步计算的结果。与Future不同,CompletableFuture提供了更丰富的方法来处理计算过程中的异常、组合多个计算任务以及链式调用等。

在这里插入图片描述

一句话概括

CompletableFuture是Java 8中引入的一个类,它实现了Future接口,用于异步编程,可以链式调用多个任务


二、为什么要使用CompletableFuture?

在讲述CompletableFuture的使用优缺之前,先看一组案例代码

2.1 工具类准备

为了便于后续地调试和学习,需要先定义一个工具类辅助我们对知识的理解

示例代码如下

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.StringJoiner;
import java.util.concurrent.TimeUnit;

public class CommonUtils {

    // 读取文件路径的文件
    public static String readFile(String pathToFile) {
        try {
            return Files.readString(Paths.get(pathToFile));
        } catch (IOException e) {
            e.printStackTrace();
            return "";
        }
    }

    // 休眠指定的毫秒数
    public static void sleepMillis(long millis) {
        try {
            TimeUnit.MILLISECONDS.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 休眠指定的秒数
    public static void sleepSecond(long second) {
        try {
            TimeUnit.SECONDS.sleep(second);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    // 打印输出带线程信息的日志
    public static void printTheadLog(String message) {
        // 时间戳 | 线程id | 线程名 | 日志信息
        String result = new StringJoiner(" | ")
                .add(String.valueOf(System.currentTimeMillis()))
                .add(String.format("%2d", Thread.currentThread().getId()))
                .add(Thread.currentThread().getName())
                .add(message)
                .toString();
        System.out.println(result);
    }

}

注意

以上工具类中Files.readString()方法是jdk11提供的API,而StringJoiner对象是jdk17提供的API

2.2 Future的局限性

案例

替换新闻稿(news.txt)中的敏感词汇,把敏感词汇替换成*,敏感词存储在 filter_words.txt 中

示例代码如下:

public static void main(String[] args) throws ExecutionException, InterruptedException {

        //创建一个最多运行5个线程的线程池
        ExecutorService executor = Executors.newFixedThreadPool(5);
        // step 1: 读取敏感词汇 => thread1
        Future<String[]> filterWordFuture = executor.submit(() -> {
            String str = CommonUtils.readFile("E:\\project\\competable-future\\src\\main\\java\\filter_words.txt");
            String[] filterWords = str.split(",");
            return filterWords;
        });

        // step 2: 读取新闻稿 => thread2
        Future<String> newsFuture = executor.submit(() -> {
            return CommonUtils.readFile("E:\\project\\competable-future\\src\\main\\java\\news.txt");
        });

        // step 3: 替换操作 => thread3
        Future<String> replaceFuture = executor.submit(() -> {
            String[] words = filterWordFuture.get();
            String news = newsFuture.get();
            for (String word : words) {
                if (news.indexOf(word) > 0) {
                    news = news.replace(word, "**");
                }
            }
            return news;
        });

        // step 4: 打印输出替换后的新闻稿 => main
        String filteredNews = replaceFuture.get();
        System.out.println("filteredNews = " + filteredNews);

        //关闭线程池
        executor.shutdown();
    }

运行如下

在这里插入图片描述

发现

通过上面的代码,我们发现Future相比于所有任务都直接在主线程中处理,有很多优势,但同时也存在不足,至少表现如下:

  • 1.在没有阻塞的情况下,无法对Future的结果执行进一步的操作。Future不会告知你它什么时候完成,你如果想要得到结果,必须通过一个get()方法,该方法会阻塞直到结果可用为止。
  • 2.它不具备将回调函数附加到Future后并在Future的结果可用时自动调用回调的能力,而且它无法解决任务相互依赖的问题。 如上述案例中,filterWordFuture和newsFuture的结果不能自动发送给replaceFuture, 需要在replaceFuture中手动获取,所以使用Future不能轻而易举地创建异步工作流。
  • 3.不能将多个Future合并在一起。 假设你有多种不同的Future, 你想在它们全部并行完成后然后运行某个函数,Future很难独立完成这一需要。
  • 4.没有异常处理。 Future提供的方法中没有专门的API应对异常处理,还需要开发者自己手动异常处理。

2.3 CompletableFuture的优势

在这里插入图片描述

CompletableFuture相对于Future具有以下的优势

  • 为快速创建、链接依赖和结合多个Future提供了大量的便利方法。
  • 提供了适用于各种开发场景的回调函数,它还提供了非常全面的异常处理支持。
  • 它无疑衔接和亲和lJava 8 提供的Lambda表达式Stream - API
  • 真正意义上的异步编程,把异步编程和函数式编程、响应式编程多种高阶编程思维集于一身,设计上更优雅。

三、如何使用CompletableFuture?

3.1 异步任务之开启

3.1.1 runAsync

说明

如果你要异步运行某些耗时的后台任务,并且不想从任务中返回任何内容,则可以使用CompletableFuture.runAsync()方法。它接受一个Runnable接口实现类对象,方法返回CompletableFuture<Void>对象

static CompletableFuture<Void> runAsync(Runnable runnable)

案例①

创建一个不从任务中返回任何内容的CompletableFuture异步任务对象

示例代码如下


public class RunAsyncDemo {
  public static void main(String[] args) {
    // runAsync 创建异步任务
    CommonUtils.printTheadLog("main start");
    // 使用Runnable匿名内部类
    CompletableFuture.runAsync(new Runnable() {
      @Override
      public void run() {
        CommonUtils.printTheadLog("读取文件开始");
        // 使用睡眠来模拟一个长时间的工作任务(例如读取文件,网络请求等)
        CommonUtils.sleepSecond(3);
        CommonUtils.printTheadLog("读取文件结束");
      }
    });

    CommonUtils.printTheadLog("here are not blacked,main continue");
    CommonUtils.sleepSecond(4); // 此处休眠 为的是等待CompletableFuture背后的线程池执行完成。
    CommonUtils.printTheadLog("main end");
  }
}

运行如下

在这里插入图片描述
程序执行流程如下
在这里插入图片描述
Tips

异步任务的创建并不会立刻执行,而是会在抢到PCPU时间片后才会执行,异步任务的本质就是以开启线程的方式去执行任务!!!

代码优化

以Lambda表达式的形式传递Runnable接口实现类对象

优化代码如下

public class RunAsyncDemo2 {
    public static void main(String[] args) {
        // runAsync 创建异步任务
        CommonUtils.printTheadLog("main start");
        // 使用Lambda表达式
         CompletableFuture.runAsync(() -> {
             CommonUtils.printTheadLog("读取文件开始");
             CommonUtils.sleepSecond(3);
        	 CommonUtils.printTheadLog("读取文件结束");
        });
        CommonUtils.printTheadLog("here are not blacked,main continue");
        CommonUtils.sleepSecond(4); // 此处休眠 为的是等待CompletableFuture背后的线程池执行完成。
        CommonUtils.printTheadLog("main end");
    }
}

运行如下

在这里插入图片描述

案例②

使用CompletableFuture开启异步任务读取news.txt文件中的新闻稿,并打印输出

news.txt文件中的新闻稿内容如下

在这里插入图片描述

示例代码如下


public class RunAsyncDemo3 {
    public static void main(String[] args) {
        // 需求:使用CompletableFuture开启异步任务读取news.txt文件中的新闻稿,并打印输出。
        CommonUtils.printTheadLog("main start");

        CompletableFuture.runAsync(() -> {
            CommonUtils.printTheadLog("读取文件");
            String news = CommonUtils.readFile("E:\\project\\competable-future\\src\\main\\java\\news.txt");
            System.out.println(news);
        });

        CommonUtils.printTheadLog("here not blacked main continue");
        CommonUtils.sleepSecond(4);
        CommonUtils.printTheadLog("main end");
    }
}

运行如下

在这里插入图片描述

疑问🤔

异步任务是并发执行还是并行执行?

  • 如果是单核CPU,那么异步任务之间就是并发执行
  • 如果是多核CPU(多CPU)异步任务就是并行执行

何为并发执行?

如果电脑只有单个且单核的CPU,当前main线程与异步的多线程都需要使用CPU中的一个核,二者需要抢占CPU中的时间片去执行,谁抢到谁就执行,CPU会根据时间片去分配两个线程的执行,这就是并发

何为并行执行?

如果电脑只有单个且多核的cpu,我们就可以让CPU的核1去专门执行主线程main,CPU的核2专门执行异步的多线程,两个线程同时运行;如果是单核CPU,那么异常任务之间就是并发执行,如果是多核CPU(多CPU)异步任务就是并行执行

注意

作为开发者,我们只需要清楚如何开启异步任务,CPU硬件会把异步任务合理的分配给CPU上的核运行

3.1.2 supplyAsync

说明

CompletableFuture.runAsync()开启不带返回结果异步任务。但是,如果你想从后台的异步任务中返回一个结果怎么办?此时,CompletalbeFuture.supplyAsync()是你的最好的选择了

static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)

方法解析

它入参一个Supplier<U>供给者,用于供给带返回值的异步任务,并返回CompletableFuture<U>,其中的U是供给者给值的类型

案例

开启异步任务读取news.txt文件中的新闻稿,返回文件中内容并在主线程打印输出

示例代码如下

public static void main(String[] args) throws ExecutionException, InterruptedException {
        //需 求:开启异步任务读取news.txt文件中的新闻稿,返回文件中内容并在主线程打印输出

        CommonUtils.printTheadLog("main start");

        CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {

            @Override
            public String get() {
                String news = CommonUtils.readFile("E:\\project\\competable-future\\src\\main\\java\\news.txt");
                return news;
            }
        });

        CommonUtils.printTheadLog("here are not blocked,main continue");
        //阻塞并等待future完成
        String news = future.get();

        CommonUtils.printTheadLog("news ="+news);

        CommonUtils.printTheadLog("main end");


    }

运行如下

在这里插入图片描述

注意

如果想要获取上述代码中future中的结果,可以调用completableFuture.get()方法,get()将阻塞,直到future完成

疑问🤔

get方法会阻塞主线程,进而会不会影响程序性能?

此问题将会在回调函数中解答

我们依然可以使用Java 8的Lambda表达式使上面的代码更简洁

代码示例如下

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    String news = CommonUtils.readFile("E:\\project\\competable-future\\src\\main\\java\\news.txt");
    return news;
});

3.1.3 线程池

引言

大家已经知道,runAsync()和supplyAsync()方法都是开启单独的线程中执行异步任务。但是,我们从未创建线程对吗?不是吗?

说明

CompletableFuture会从全局ForkJoinPool.commonPool()线程池获取来执行这些任务,当然, 你也可以创建一个线程池,并将其传递给runAsync() 和supplyAsync()方法,以使它们在从你指定的线程池获得的线程中执行任务。
CompletableFuture API中的所有方法都有两种变体,一种是接受传入的Executor参数作为指定的线程池,而另一个则使用默认的线程池(ForkJoinPool.commonPool())

// runAsync()重载
static CompletableFuture<Void> runAsync(Runnable runnable)
static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
// spplyAsync()重载
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)

案例

指定线程池,开启异步任务读取news.txt中的新闻稿,返回文件内容并在主线程打印输出

代码示例如下

 public static void main(String[] args) throws ExecutionException, InterruptedException {
        //需求:使用自定义线程,从中开启异步任务读取news.txt文件中的新闻稿,返回文件中内容并在主线程打印输出

        CommonUtils.printTheadLog("main start");
        //创建包含固定4个线程的线程池
        ExecutorService pool = Executors.newFixedThreadPool(4);

        CompletableFuture<String> future = CompletableFuture.supplyAsync(()  -> {

            String news = CommonUtils.readFile("E:\\project\\competable-future\\src\\main\\java\\news.txt");
            CommonUtils.printTheadLog("news ="+news );
            return news;

        },pool);

        CommonUtils.printTheadLog("here are not blocked,main continue");
        //阻塞并等待future完成
        String news = future.get();

        CommonUtils.printTheadLog("news ="+news);

        //关闭线程池
        pool.shutdown();
        CommonUtils.printTheadLog("main end");


    }

运行如下

在这里插入图片描述

注意

如果所有completableFuture共享一个线程池,那么一旦有异步任务执行一些很慢的I/O操作,就会导致线程池中所有的线程都阻塞在I/O操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰

3.1.4 异步编程思想

综合上述,我们并没有显示地创建线程,更没有涉及线程通信的概念,整个过程根本就没涉及线程知识,以上专业的说法是:线程的创建和线程负责的任务进行解耦,它给我们带来的好处线程的创建和启动全部交给线程池负责,具体任务的编写交给程序员,专人专事。

异步编程是可以让程序并行(也可能是并发)运行的一种手段,其可以让程序中的一个工作单元作为异步任务与主线程分开独立运行,并且在异步任务运行结束后,会通知主线程它的运行结果或者失败原因,毫无疑问,一个异步任务其实就是开启一个线程来完成的,使用异步编程可以提高应用程序的性能和响应能力等。

作为开发者,只需要有一个意识:

开发者只需要把耗时的操作交给CompletableFuture开一个异步任务,然后继续关注主线程业务,当异步任务运行完成时会通知主线程它的运行结果。我们把具备了这种编程思想的开发称为异步编程思想

3.2 异步任务之回调

引言

CompletalbeFuture.get()方法是阻塞的。调用时它会阻塞等待,直到这个Future完成,并在完成后返回结果。但是,很多时候这不是我们想要的。

对于构建异步系统,我们应该能够将回调附加到CompletableFuture上,当这个Future完成时,该回调自动被调用,这样,我们就不必等待结果了,然后在Future的回调函数内编写完成Future之后需要执行的逻辑。我们可以使用thenApply(),thenAccept()和thenRun()方法,它们可以把回调函数附加到CompletableFuture上

3.2.1 thenApply

说明

使用thenApply()方法可以处理和转换CompletableFuture的结果,它以Function<T, U>作为参数。 Function<T, U>是一个函数式接口,表示一个转换操作,它接受类型T的参数并产生类型R的结果

CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)

案例

异步读取filter_words.txt文件中的内容,读取完成后,把内容转换成数组(敏感词数组),异步任务返回敏感词数组

filter_words.txt文件中的内容如下

在这里插入图片描述

示例代码如下

 public static void main(String[] args) throws ExecutionException, InterruptedException {

        //需求:异步读取filter_wors.txt文件中的内容,读取完成后,把内容转换成数组(敏感词数组),异步任务返回敏感词数组
        CommonUtils.printTheadLog("main start");

        CompletableFuture<String> readFileFuture = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                CommonUtils.printTheadLog("开始读取filter_wors.txt文件");
                String readFile = CommonUtils.readFile("E:\\project\\competable-future\\src\\main\\java\\filter_words.txt");
                return readFile;
            }
        });
        //回调函数
        CompletableFuture<String[]> splitWordFuture = readFileFuture.thenApply(content -> {
            CommonUtils.printTheadLog("开始读取敏感词组");
            String[] split = content.split(",");
            return split;
        });

        CommonUtils.printTheadLog("main lock");
        String[] strings = splitWordFuture.get();
        String splitWord = Arrays.toString(strings);
        CommonUtils.printTheadLog("splitWord ="+splitWord);
     }

运行如下

在这里插入图片描述

注意

你还可以通过附加一系列thenApply()回调方法,在CompletableFuture上编写一系列转换序列。一个thenApply()方法的结果可以传递给序列中的下一个,如果你对链式操作很了解,你会发现结果可以在链式操作上传递

代码示例如下

CompletableFuture<String[]> filterWordsFuture = CompletableFuture.supplyAsync(() -> {
    String filterWordContent = CommonUtils.readFile("E:\\project\\competable-future\\src\\main\\java\\filter_words.txt");
    return filterWordContent;
}).thenApply(content -> {
    String[] filterWords = content.split(",");
    return filterWords;
});

3.2.2 thenAccept

说明

如果不想从回调函数返回结果,而只想在Future完成后运行一些代码,则可以使用thenAccpet(),这些方法是一个Consumer<? super T>,它可以对异步任务的执行结果进行消费使用,方法返回CompletableFuture

CompletableFuture<Void> thenAccept(Consumer<? super T> action)

该方法通常用作回调链中的最后一个回调

案例

异步读取filter_words.txt文件中的内容,读取完成后,把内容转换成敏感词数组,然后打印敏感词数组

代码示例如下

public static void main(String[] args) throws InterruptedException {

        CommonUtils.printTheadLog("main start");

        //链式操作
        CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                CommonUtils.printTheadLog("读取filter_words.txt文件");
                String readFile = CommonUtils.readFile("E:\\project\\competable-future\\src\\main\\java\\filter_words.txt");
                return readFile;            }
        }).thenApply( content -> {
            CommonUtils.printTheadLog("把文件内容转换成敏感词数组");
            String[] split = content.split(",");
            return split;
        }).thenAccept( fildWord ->{
             CommonUtils.printTheadLog(Arrays.toString(fildWord));
        });

        //让异步任务执行完毕
        Thread.sleep(4000);

        CommonUtils.printTheadLog("main stop");
    }

运行如下

在这里插入图片描述

3.2.3 thenRun

说明

前面我们已经知道,通过thenApply(Function<T,R>)对链式操作中的上一个异步任务的结果进行转换,返回一个新的结果; 通过thenAccpet(Consumer)对链式操作中的上一个异步任务的结果进行消费,不返回新结果;

如果我们只是想从CompletableFuture的链式操作得到一个完成的通知,甚至都不使用上一个链式操作的结果,那么CompletableFuture.thenRun()会是你最佳的选择,它需要一个Runnable并返回CompletableFuture<Void>

CompletableFuture<Void> thenRun(Runnable action);

案例

如果我们仅仅想知道 filter_words.txt 的文件是否读取完成

示例代码如下

 public static void main(String[] args) {

        //需求:仅仅想知道 filter_words.txt 的文件是否读取完成
        CommonUtils.printTheadLog("main start");

        CompletableFuture.supplyAsync(() -> {
            CommonUtils.printTheadLog("开始读取filter_words.txt");
            String readFile = CommonUtils.readFile("E:\\project\\competable-future\\src\\main\\java\\filter_words.txt");
            return readFile;
        }).thenRun(() -> {
            CommonUtils.printTheadLog("读取filter_words.txt文件完成");
        });

        CommonUtils.printTheadLog("main continue");
        CommonUtils.sleepSecond(4);
        CommonUtils.printTheadLog("main end");


    }

运行如下

在这里插入图片描述

3.2.3 更进一步提供并行化

说明

CompletableFuture 提供的所有回调方法都有两个异步变体

异变体方法代码如下

CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) 
// 回调方的异步变体(异步回调)
CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) 

注意

这些带了Async的异步回调通过在单独的线程中执行回调任务来帮我们去进一步促进并行计算

案例

回顾之前的案例需求,异步读取filter_words.txt文件中的内容,读取完成后,转换成敏感词数组,主线程获取结果打印输出这个数组

代码示例如下

public static void main(String[] args) throws ExecutionException, InterruptedException {
        ///回顾: 异步读取filter_words.txt文件中的内容,读取完成后,转换成敏感词数组,主线程获取结果打印输出这个数组

        CommonUtils.printTheadLog("main start");

        CompletableFuture<String[]> filtwordFuture = CompletableFuture.supplyAsync(() -> {
            CommonUtils.printTheadLog("开始读取filter_words.txt");
            String readFile = CommonUtils.readFile("E:\\project\\competable-future\\src\\main\\java\\filter_words.txt");
            return readFile;
        }).thenApply(content -> {
            CommonUtils.printTheadLog("开始转换成敏感词数组");
            return content.split(",");
        });


        CommonUtils.printTheadLog("main continue");
        String[] strings = filtwordFuture.get();

        CommonUtils.printTheadLog("strings ="+strings);
        CommonUtils.printTheadLog("main stop");
    }

总结

  • 一般而言,commonPool为了提高性能,并不会立马收回线程,thenApply中回调任务和supplyAsync中的异步任务使用的是同一个线程
  • 这里存在一个特殊情况,即如果supplyAsync中的任务是立即返回结果(不是耗时的任务),那么thenApply回调任务也会在主线程执行

要更好地控制执行回调任务的线程,可以使用异步回调。如果使用thenApplyAsync()回调,那么它将在从ForkJoinPool.commonPool()获取另一个线程执行(概率获取),一般情况是接着使用上一次的线程

代码示例如下

public static void main(String[] args) throws ExecutionException, InterruptedException {
        ///回顾: 异步读取filter_words.txt文件中的内容,读取完成后,转换成敏感词数组,主线程获取结果打印输出这个数组

        CommonUtils.printTheadLog("main start");

        CompletableFuture<String[]> filtwordFuture = CompletableFuture.supplyAsync(() -> {
            CommonUtils.printTheadLog("开始读取filter_words.txt");
            String readFile = CommonUtils.readFile("E:\\project\\competable-future\\src\\main\\java\\filter_words.txt");
            return readFile;
        }).thenApplyAsync(content -> {
            CommonUtils.printTheadLog("开始转换成敏感词数组");
            return content.split(",");
        });


        CommonUtils.printTheadLog("main continue");
        String[] strings = filtwordFuture.get();

        CommonUtils.printTheadLog("strings ="+strings);
        CommonUtils.printTheadLog("main stop");
    }

运行如下

在这里插入图片描述

另外,如果将Executor传递给thenApplyAsync()回调,则该回调的异步任务将在从Excutor的线程池中获取的线程中执行

示例代码如下(上述案例需求不变)

 public static void main(String[] args) throws ExecutionException, InterruptedException {
        //使用thenApplyAsync

        //创建自定义线程池
        ExecutorService executor = Executors.newFixedThreadPool(4);

        CommonUtils.printTheadLog("main start");

        CompletableFuture<String[]> future = CompletableFuture.supplyAsync(() -> {
            CommonUtils.printTheadLog("开始读取filter_words.txt");
            String readFile = CommonUtils.readFile("E:\\project\\competable-future\\src\\main\\java\\filter_words.txt");
            return readFile;
        }).thenApplyAsync(content -> {
            CommonUtils.printTheadLog("开始转换成敏感词数组");
            return content.split(",");
        },executor);


        CommonUtils.printTheadLog("main continue");
        String[] strings = future.get();
        CommonUtils.printTheadLog("strings ="+ Arrays.toString(strings));
        CommonUtils.printTheadLog("main stop");
    }

运行如下

在这里插入图片描述

其他两个回调的变体版本如下

代码如下

// thenAccept和其异步回调
CompletableFuture<Void> thenAccept(Consumer<? super T> action)
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action Executor executor)
// thenRun和其异步回调
CompletableFuture<Void> thenRun(Runnable action)
CompletableFuture<Void> thenRunAsync(Runnable action)
CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor)

3.3 异步任务之编排

3.3.1 编排2个依赖关系的异步任务 thenCompose()

回顾案例

异步读取filter_words.txt文件中的内容,读取完成后,转换成敏感词数组让主线程待用

关于读取和解析内容,假设使用以下的readFileFuture(String fileName) 和 splitFuture(String content) 方法去完成

示例代码如下

public static CompletableFuture<String> readFileFuture(String fileName) {
    return CompletableFuture.supplyAsync(()->{
        String filterWordsContent = CommonUtils.readFile(fileName);
        return filterWordsContent;
    });
}

public static CompletableFuture<String[]> splitFuture(String content) {
    return CompletableFuture.supplyAsync(()->{
        String[] filterWords = content.split(",");
        return filterWords;
    });
}


public static void main(String[] args) throws ExecutionException, InterruptedException {
        //编排2个依赖关系的异步任务thenCompose()

        //使用thenApply
        CompletableFuture<CompletableFuture<String[]>> future = readFileFuture("E:\\project\\competable-future\\src\\main\\java\\filter_words.txt").thenApply(content -> {
            return splitFuture(content);
        });

        CompletableFuture<String[]> completableFuture = future.get();

        System.out.println("completableFuture = " + completableFuture);

        String[] content = completableFuture.get();

        System.out.println("content = " + content);
    }

运行如下

在这里插入图片描述

结果

在上面的案例中,thenApply(Function<T,R>)中Function回调会对上一步异步结果转换后得到一个简单值,但现在这种情况下,如果结果是嵌套的CompletableFuture,所以这是不符合预期的,那怎么办呢?

我们想的是

把上一步异步任务的结果,转成一个CompletableFuture对象,这个Completable对象中包含本次异步任务处理后的结果。也就是说,我们想结合上一步异步任务的结果得到下一个新的异步任务中,结果由这个新的异步任务返回

此时,你需要使用 thenCompose() 方法代替, 我们可以把它理解为异步任务的组合

该方法签名如下

CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)

解读

T : 上一步异步任务的结果

所以,thenCompose 用来连接两个有依赖关系的异步任务,结果由第二个任务返回

案例

使用thenCompose()方法编排上述回顾案例中的两个异步任务

示例代码如下

public static void main(String[] args) throws ExecutionException, InterruptedException {
        //异步读取filter_words.txt文件中的内容,读取完成后,转换成敏感词数组,主线程获取结果打印输出这个数组
        //thnCompose

        CommonUtils.printTheadLog("main start");

        CompletableFuture<String[]> future = CompletableFuture.supplyAsync(() -> {
            CommonUtils.printTheadLog("开始读取filter_words.txt");
            String readFile = CommonUtils.readFile("E:\\project\\competable-future\\src\\main\\java\\filter_words.txt");
            return readFile;
        }).thenCompose(content -> CompletableFuture.supplyAsync(() -> {
            CommonUtils.printTheadLog("开始转换成敏感词数组");
            String[] split = content.split(",");
            return split;
        }));

        CommonUtils.printTheadLog("main continue");
        String[] strings = future.get();

        CommonUtils.printTheadLog("strings ="+strings);
        CommonUtils.printTheadLog("main stop");


    }

运行如下

在这里插入图片描述
因此,如果我们想连接(编排)两个依赖关系的异步任务(CompletableFuture对象),就使用thenCompose()方法

当然,thenCompose也存在异步回调变体版本的方法

方法签名如下

CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)

3.3.2 编排2个非依赖关系的异步任务 thenCombine()

说明

我们已经知道,当其中一个Future依赖于另一个Future,使用thenCompose()用于组合两个Future。如果两个Future之间没有依赖关系,你希望两个Future独立运行并在两者都完成之后执行回调操作时,则使用thenCombine()

方法签名如下

CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)

解读

T是第一个任务的结果
U是第二个任务的结果
V经BiFunction应用转换后的结果

案例

替换新闻稿( news.txt )中敏感词汇,把敏感词汇替换成*,敏感词存储在 filter_words.txt 中

示例代码如下

 public static void main(String[] args) throws ExecutionException, InterruptedException {

        //需求:替换新闻稿(news.txt)中敏感词汇,把敏感词汇替换成*,敏感词存储在filter_.words.txt中

        CommonUtils.printTheadLog("main start");

        //读取敏感词并转为敏感词数组
        CompletableFuture<String[]> future1 = CompletableFuture.supplyAsync(() -> {
            CommonUtils.printTheadLog("开始读取filter_words.txt");
            String readFile = CommonUtils.readFile("E:\\project\\competable-future\\src\\main\\java\\filter_words.txt");
            CommonUtils.printTheadLog("开始转为敏感词数组");
            String[] words = readFile.split(",");
            return words;
        });

        //读取新闻稿(news.txt)
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            String newsContent = CommonUtils.readFile("E:\\project\\competable-future\\src\\main\\java\\news.txt");
            return newsContent;
        });

        //替换敏感词
        CompletableFuture<String> future3 = future1.thenCombine(future2, (words, newsContent) -> {
            for (String word : words) {
                if (newsContent.indexOf(word) > 0) {
                    newsContent  = newsContent.replace(word, "**");
                }
            }
            return newsContent;
        });

        CommonUtils.printTheadLog("main continue");
        String content = future3.get();

        CommonUtils.printTheadLog("content="+content);
        CommonUtils.printTheadLog("main stop");

    }

运行如下

在这里插入图片描述

注意

当两个Future都完成时,才将两个异步任务的结果传递给thenCombine的回调函数进一步处理!!!

thenCombine 也存在异步回调变体版本

方法签名如下

CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor)

3.3.3 合并多个异步 allOf / anyOf

引言

在上述案例中,我们使用thenCompose()和thenCombine()将两个CompletableFuture组合和合并一起,

如果要编排任意数量的CompletableFuture,那该怎么办?

可以使用以下方法来组合任意数量的CompletableFuture

方法签名如下

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
3.3.3.1 allOf()

适用情形

有多个需要独立并运行的Future,并在所有这些Future都完成后执行一些操作

案例

统计news1.txt,news2.txt,news3.txt文件中包含CompletableFuture关键字的文件的个数

①news1.txt内容如下

在这里插入图片描述

②news2.txt内容如下

在这里插入图片描述

③news3.txt内容如下

在这里插入图片描述

代码示例如下

public static CompletableFuture<String> readFileFuture(String fileName){
        return CompletableFuture.supplyAsync(()->{
            String content = CommonUtils.readFile(fileName);
            return content;
        });
    }

    public static void main(String[] args) {
        //需求:统计news1.txt、new2.txt、new3.txt文件中包合CompLetableFuture.关键字的文件的个数

        // step 1: 创建List集合存储文件名
        List<String> files = Arrays.asList("E:\\project\\competable-future\\src\\main\\resources\\news1.txt",
                "E:\\project\\competable-future\\src\\main\\resources\\news2.txt",
                "E:\\project\\competable-future\\src\\main\\resources\\news3.txt");

        // step 2: 根据文件名调用readFileFuture创建多个CompletableFuture,并存入List集合中
        List<CompletableFuture<String>> readFileFutureList = files.stream().map(file -> {
           return readFileFuture(file);
       }).collect(Collectors.toList());
        
        // step 3: 把List集合转换成数组待用,以便传入allOf方法中
        int len = readFileFutureList.size();
        CompletableFuture[] readFileArray = readFileFutureList.toArray(new CompletableFuture[len]);

        // step 4: 使用allOf方法合并多个异步任务
        CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(readFileArray);

        // step 5: 当多个异步任务都完成后,使用回调操作文件结果,统计符合条件的文件个数
        CompletableFuture<Long> countFuture = allOfFuture.thenApply(v -> {
            return readFileFutureList.stream().
                    map(future -> future.join()).
                    filter(content -> content.contains("CompletableFuture")).
                    count();
        });

        // step 6: 主线程打印输出文件个数
        Long count = countFuture.join();

        System.out.println("count = " + count);

    }

运行如下

在这里插入图片描述
Trips

allOf 特别适合合并多个异步任务,当所有异步任务都完成时可以进一步操作

3.3.3.2 anyOf()

适用场景

当给定的多个异步任务中的有任意Future一个完成时,需要执行一些操作,就可以使用它

方法签名如下

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

注意

anyOf()返回一个新的CompletableFuture, 新的CompletableFuture的结果和cfs已完成的那个异步任务结果相同

案例

anyOf执行过程

示例代码如下

public static void main(String[] args) throws ExecutionException, InterruptedException {

        //anyOf()
        CompletableFuture<String> future1 =CompletableFuture.supplyAsync(()->{
            CommonUtils.sleepSecond(2);
            return "Future1的结果";
        });

        CompletableFuture<String> future2 =CompletableFuture.supplyAsync(()->{
            CommonUtils.sleepSecond(1);
            return "Future2的结果";
        });

        CompletableFuture<String> future3 =CompletableFuture.supplyAsync(()->{
            CommonUtils.sleepSecond(3);
            return "Future3的结果";
        });

        CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);

        Object ret = anyOfFuture.get();
        System.out.println("ret = " + ret);
    }

运行如下

在这里插入图片描述

说明

在上面的示例中,当一个CompletableFuture中的任意一个完成时,anyOfFuture就完成了。由于future2的睡眠时间最少,因此它将首先完成,最终结果将是"Future2的结果"。

注意

  • anyOf() 方法返回类型必须是 CompletableFutue <Object>
  • anyOf()的问题在于,如果你拥有返回不同类型结果的CompletableFuture,那么你将不知道最终CompletableFuture的类型

3.4 异步任务之异常处理

在前面的章节中,我们并没有更多地关心异常处理的问题,其实, CompletableFuture提供了优化处理异常的方式

首先,让我们了解异常如何在回调链中传播

代码示例如下

public static void main(String[] args) {

        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
//            int r = 1 / 0;
            return "result1";
        }).thenApply(result -> {
            CommonUtils.printTheadLog(result);
            String str = null;
            int length = str.length();
            return result + " result2";
        }).thenApply(result -> {
            return result + " result3";
        }).thenAccept(result -> {
            CommonUtils.printTheadLog(result);
        });


    }

运行如下

在这里插入图片描述

总结

  • 如果在 supplyAsync 任务中出现异常,后续的 thenApply 和 thenAccept 回调都不会执行, CompletableFuture 将传入异常处理
  • 如果在第一个thenApply任务中出现异常,第二个 thenApply 和最后的 thenAccept 回调不会被执行,CompletableFuture 将转入异常处理,依次类推

3.4.1 exceptionally()

说明

exceptionally 用于处理回调链上的异常, 回调链上出现的任何异常,回调链不继续向下执行,都在exceptionally中处理异常

方法签名如下

CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) 

案例

演示exceptionally()的执行

示例代码如下

 public static void main(String[] args) {

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
//            int r = 1 / 0;
            return "result1";
        }).thenApply(result -> {
            CommonUtils.printTheadLog(result);
            String str = null;
            int length = str.length();
            return result + " result2";
        }).thenApply(result -> {
            return result + " result3";
        }).exceptionally(ex ->{
            String message = ex.getMessage();
            System.out.println("message = " + message);
            return "Unknown";
        });

    }

运行如下

在这里插入图片描述
Trips

因为 exceptionally 只处理一次异常,所以常常用在回调链的末端

3.4.2 handle()

说明

CompletableFuture API 还提供了一种更通用的方法handle() 表示从异常中恢复 handle() 常常被用来恢复回调链中的一次特定的异常,回调链恢复后可进一步向下传递。

方法签名如下

CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)

案例

演示handle()的执行

示例代码如下

 public static void main(String[] args) throws ExecutionException, InterruptedException {

        // handle()
        CommonUtils.printTheadLog("main start");
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            int r = 1 / 0;
            return "result1";
        }).handle((result,ex)->{
            CommonUtils.printTheadLog("上一步异常的恢复");
            if(ex != null){
                CommonUtils.printTheadLog("出现异常:" + ex.getMessage());
                return "UnKnown";
            }
            return result;
        });

        CommonUtils.printTheadLog("main continue");
        String ret = future.get();
        CommonUtils.printTheadLog("ret = " + ret);

        CommonUtils.printTheadLog("main end");

    }

运行如下

在这里插入图片描述

如果发生异常,则 result 参数为 null ,否则 ex 参数将为 null

总结

异步任务不管是否发生异常,handle方法都会执行。所以,handle核心作用在于对上一步异步任务进行现场修复

案例

对回调链中的一次异常进行恢复处理

示例代码如下

 public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 需求: 对回调链中的一次异常进行恢复处理

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
//            int r = 1 / 0;
            return "result1";
        }).handle((result, ex) -> {
            if (ex != null) {
                System.out.println("出现异常:" + ex.getMessage());
                return "UnKnown1";
            }
            return result;
        }).thenApply(result -> {

            String str = null;
            int len = str.length();

            return result + " result2";
        }).handle((result, ex) -> {
            if (ex != null) {
                System.out.println("出现异常:" + ex.getMessage());
                return "UnKnown2";
            }
            return result;    
        }).thenApply(result -> {
            return result + " result3";
        });

        String ret = future.get();
        CommonUtils.printTheadLog("ret = " + ret);
    }

运行如下

在这里插入图片描述
和以往一样,为了提供并行化,异常处理可以方法单独的线程执行,以下是它们的异步回调版本

方法签名如下

CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
CompletableFuture<T> exceptionallyAsync(Function<Throwable, ? extends T> fn) // jdk17+
CompletableFuture<T> exceptionallyAsync(Function<Throwable, ? extends T> fn, Executor executor) // jdk17+

CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1399486.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Web3艺术市场:NFT与数字创作的结合

在数字时代&#xff0c;随着区块链技术的崛起&#xff0c;一种新型数字资产&#xff0c;非同质化代币&#xff08;NFT&#xff09;&#xff0c;正逐渐改变传统艺术市场的格局。这种数字化的艺术品售卖方式成为了Web3艺术市场的代表&#xff0c;推动着数字创作与艺术市场的结合。…

day05_java中的流程控制

概述 在一个程序执行的过程中&#xff0c;各条语句的执行顺序对程序的结果是有直接影响的。所以&#xff0c;我们必须清楚每条语句的执 行流程。而且&#xff0c;很多时候要通过控制语句的执行顺序来实现我们想要的功能。 流程控制语句又分为&#xff1a; 顺序结构 丶分支结构丶…

性能优化-HVX架构简介

来自 「发表于知乎专栏《移动端算法优化》」 本文主要介绍Hexagon DSP的HVX技术&#xff0c;旨在通过简单的语言讲清HVX技术。 &#x1f3ac;个人简介&#xff1a;一个全栈工程师的升级之路&#xff01; &#x1f4cb;个人专栏&#xff1a;高性能&#xff08;HPC&#xff09;开…

vue3 + antd 封装动态表单组件(一)

前置条件&#xff1a; vue版本 v3.3.11 ant-design-vue版本 v4.1.1 创建动态组件配置文件config.js import { Input, Textarea, InputNumber, Select, RadioGroup, CheckboxGroup, DatePicker } from ant-design-vue;// 表单域组件类型 export const componentsMap {Text: …

编译poco库出现的问题及其解决办法

作者&#xff1a;朱金灿 来源&#xff1a;clever101的专栏 为什么大多数人学不会人工智能编程&#xff1f;>>> 使用vs2015编译poco库出现问题&#xff1a;error C3688: invalid literal suffix ‘I64_FMT’; literal operator or literal operator template ‘opera…

大模型:我也会自监督学习~

前言 当下大模型的能力已经很强了&#xff0c;但是将来我们想要的是能力更强的大模型&#xff0c;其最好能够处理各种复杂问题也即强对齐模型。 之前大模型训练的监督信号主要来源于人类反馈&#xff0c;但是如果想要训练一个强对齐模型必然就需要一个对应的强监督信号&#…

第二篇【传奇开心果系列】Vant 开发移动应用:开发常见页面

传奇开心果博文系列 Vant of Vue 开发移动应用示例系列博文博文目录一、常见页面的重要作用二、常见页面介绍三、分别示例代码四、常见页面样式示例代码五、主要知识点总结 Vant of Vue 开发移动应用示例系列博文 博文目录 一、常见页面的重要作用 常见页面在移动应用中扮演…

手把手教你如何快速定位bug,如何编写测试用例,快来观摩......

手把手教你如何快速定位bug,如何编写测试用例,快来观摩......手把手教你如何快速定位bug,如何编写测试用例,快来观摩......作为一名测试人员如果连常见的系统问题都不知道如何分析&#xff0c;频繁将前端人员问题指派给后端人员&#xff0c;后端人员问题指派给前端人员&#xf…

算法第二十一天-丑数

丑数 题目要求 解题思路 首先判断数字是不是为0或者负数&#xff0c;两者均不可能成为丑数&#xff1b; 之后对n进行不断整除&#xff0c;直到无法除尽为止。 简单判断最后的数是不是1即可。 代码 class Solution:def isUgly(self, n: int) -> bool:if n<0:return Fa…

linux perf工具使用

参考文章Linux性能调优之perf使用方法_perf交叉编译-CSDN博客 perf是一款Linux性能分析工具。比如打流性能优化的时候&#xff0c;就能够看到是哪些函数消耗的cpu高 那么linux如何编译perf工具呢&#xff1f; perf工具编译 进入perf目录下linux-3.16/tools/perf make ARCH…

线程状态转换

&#x1f4dd;个人主页&#xff1a;五敷有你 &#x1f525;系列专栏&#xff1a;并发编程⛺️稳中求进&#xff0c;晒太阳 程状态转换 假设有线程Thread t 情况1 new-->RUNNABLE 当调用t.start()方法时&#xff0c;由new ->RUNNABLE 情况2 RUNNABLE WAITING t…

Spark On Hive配置测试及分布式SQL ThriftServer配置

文章目录 Spark On Hive的原理及配置配置步骤在代码中集成Spark On Hive Spark分布式SQL执行原理及配置配置步骤在代码中集成Spark JDBC ThriftServer 总结 Spark On Hive的原理及配置 Spark本身是一个执行引擎&#xff0c;而没有管理metadate的能力&#xff0c;当我们在执行S…

Redis原理篇(SkipList)

一.概述 本质是双端链表&#xff0c;只不过在正向遍历时可以不一个一个遍历&#xff0c;而是可以跳着遍历。 怎么实现的呢&#xff0c;下面是SkipList源码 二.源码 1. zskiplist 意义&#xff1a;跳表 zskiplist里面有头指针和尾指针&#xff0c;节点数量&#xff0c;最大…

Python自动化测试【selenium面试题】

一、selenium中如何判断元素是否存在&#xff1f; expected_conditions模块提供了16种判断方法&#xff0c;以下方法是判断元素存在DOM中&#xff1a; presence_of_element_located """ An expectation for checking that an element is present on the DOM of…

第二百七十八回

文章目录 1. 概念介绍2. 使用方法2.1 DropdownMenu2.1 DropdownMenuEntry 3. 示例代码4. 内容总结 我们在上一章回中介绍了"如何禁止页面跟随手机自动旋转"相关的内容&#xff0c;本章回中将介绍DropdownMenu组件.闲话休提&#xff0c;让我们一起Talk Flutter吧。 1.…

免费的网站站群软件,批量管理不同网站程序

在网站运营的过程中&#xff0c;站群软件成为提高效率、管理多个网站的得力助手。本文将专心分享三款卓越的站群软件&#xff0c;其中特别推荐147SEO软件&#xff0c;它不仅能够批量管理网站&#xff0c;还能自动更新原创文章&#xff0c;并主动推送各大搜索引擎。不论您运营何…

logstack 日志技术栈-02-ELK 的缺点?loki 更轻量的解决方案?

ELK/EFK日志系统 如果今天谈论到要部署一套日志系统&#xff0c;相信用户首先会想到的就是经典的ELK架构&#xff0c;或者现在被称为Elastic Stack。 Elastic Stack架构为Elasticsearch Logstash Kibana Beats的组合&#xff0c;其中&#xff0c;Beats负责日志的采集&…

51单片机中断

1、什么是中断&#xff1f; CPU在处理某一事件A时&#xff0c;发生了另一事件B请求CPU迅速去处理&#xff08;中断发生&#xff09;&#xff1b; CPU暂时中断当前的工作&#xff0c;转去处理事件B&#xff08;中断响应和中断服务&#xff09;&#xff1b; 待CPU将事件B处理完…

解决一个mysql的更新属性长度问题

需求背景&#xff1a; 线上有一个 platform属性&#xff0c;原有长度为 varchar(10)&#xff0c;但是突然需要填入一个11位长度的值&#xff1b;而偏偏这个属性在线上100张表中有50张都存在&#xff0c;并且名字各式各样&#xff0c;庆幸都包含 platform&#xff1b;例如 platf…

【计算机网络】Socket的TCP_NODELAY选项与Nagle算法

TCP_NODELAY是一个套接字选项&#xff0c;用于控制TCP套接字的延迟行为。当TCP_NODELAY选项被启用时&#xff0c;即设置为true&#xff0c;就会禁用Nagle算法&#xff0c;从而实现TCP套接字的无延迟传输。这意味着每次发送数据时都会立即发送&#xff0c;不会等待缓冲区的填充或…