异步回调模式

news2025/1/23 20:25:19

异步回调

所谓异步回调,本质上就是多线程中线程的通信,如今很多业务系统中,某个业务或者功能调用多个外部接口,通常这种调用就是异步的调用。如何得到这些异步调用的结果自然也就很重要了。
Callable、Future、FutureTask

public class test implements Callable<Boolean>{
    public static void main(String[] args) {
        test a=new test();
        FutureTask futureTask=new FutureTask<>(a);
        new Thread(futureTask).start();
        Object su=null;
        try {
            su=futureTask.get();
        }catch (Exception e){
            e.printStackTrace();
        }
        System.out.println(su);
    }
    @Override
    public Boolean call() throws Exception {
        return null;
    }
}

FutureTask和Callable都是泛型类,泛型参数表示返回结果的类型。通过FutureTask获取异步线程的执行结果,但是其调用get()方法获取异步结果时,主线程也会被阻塞。属于异步阻塞模式。异步阻塞模式属于主动模式的异步调用,异步回调属于被动模式的异步调用。Java中回调模式的标准实现类为CompletableFuture。由于此类出现时间比较晚,期间Guava和Netty等都提出了自己的异步回调模式API来使用。这里主要介绍CompletableFuture,其他的有时间后面再学习。

CompletableFuture

在这里插入图片描述
CompletableFuture实现Future和CompletionStage两个接口。此类的实例作为一个异步任务,可以在自己异步执行完成之后触发一些其他的异步任务,从而达到异步回调的效果。主要方法如下所示:

runAsync和supplyAsync创建子任务

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {
        return asyncSupplyStage(screenExecutor(executor), supplier);
}
public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(asyncPool, runnable);
}
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) {
        return asyncRunStage(screenExecutor(executor), runnable);
}

可以看出runAsync没有返回值,supplyAsync有返回值,此处用supplyAsync举例:

ExecutorService executor= Executors.newFixedThreadPool(10);
CompletableFuture<String> completableFuture= CompletableFuture.supplyAsync(()->{
        return "你好,周先生";
},executor);
System.out.println(completableFuture.get());//输出你好,周先生
executor.shutdown();

上例中的线程池可以自己构造,如若不指定使用CompletableFuture中默认的线程池ForkJoinPool。
handle()方法统一处理异常和结果

//在执行任务的同一个线程中处理异常和结果
public <U> CompletableFuture<U> handle(
    BiFunction<? super T, Throwable, ? extends U> fn) {
    return uniHandleStage(null, fn);
}
//可能不在执行任务的同一个线程中处理异常和结果
public <U> CompletableFuture<U> handleAsync(
    BiFunction<? super T, Throwable, ? extends U> fn) {
    return uniHandleStage(asyncPool, fn);
}
//在指定线程池executor中处理异常和结果
public <U> CompletableFuture<U> handleAsync(
    BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
    return uniHandleStage(screenExecutor(executor), fn);
}

案例:

CompletableFuture<String> completableFuture= CompletableFuture.supplyAsync(()->{
     throw new RuntimeException("你好");
});
completableFuture.handle(new BiFunction<String,Throwable,String>(){
     @Override
     public String apply(String s, Throwable throwable) {
          if(throwable==null){
               System.out.println("mei");;
          }else {
               System.out.println("出错了");
          }
          return "ok";
     }
});

异步任务的串行执行

主要方法为以下几种:thenApply()、thenAccept()、thenRun()和 thenCompose()。

thenApply()
此方法实现异步任务的串行化执行,前一个任务结果作为下一个任务的入参。

	后一个任务与前一个任务在同一个线程中执行
	public <U> CompletableFuture<U> thenApply(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(null, fn);
    }
	//后一个任务与前一个任务不在同一个线程中执行
    public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(asyncPool, fn);
    }
	//后一个任务在指定的executor线程池中执行
    public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn, Executor executor) {
        return uniApplyStage(screenExecutor(executor), fn);
    }

其中泛型参数T:上一个任务所返回结果的类型。泛型参数U:当前任务的返回类型。
案例:

		ExecutorService executor= Executors.newFixedThreadPool(10);
        CompletableFuture<String> completableFuture= CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getId());//12
            return "你好,周先生";
        },executor).thenApplyAsync(new Function<String,String>() {
            @Override
            public String apply(String s) {
                System.out.println(Thread.currentThread().getId());//13
                return "你好,毛先生";
            }
        });

        System.out.println(completableFuture.get());//输出你好,毛先生
        executor.shutdown();

thenRun()
此方法不关心任务的处理结果。只要前一个任务执行完成,就开始执行后一个串行任务。而且没有返回值。

	//后一个任务与前一个任务在同一个线程中执行
	public CompletableFuture<Void> thenRun(Runnable action) {
        return uniRunStage(null, action);
    }
	//后一个任务与前一个任务可以不在同一个线程中执行
    public CompletableFuture<Void> thenRunAsync(Runnable action) {
        return uniRunStage(asyncPool, action);
    }
	//后一个任务在executor线程池中执行
    public CompletableFuture<Void> thenRunAsync(Runnable action,
                                                Executor executor) {
        return uniRunStage(screenExecutor(executor), action);
    }

thenAccept()
使用此方法时一个任务可以接收(或消费)前一个任务的处理结果,但是后一个任务没有结果输出。

	//后一个任务与前一个任务在同一个线程中执行
	public <U> CompletableFuture<Void> thenAcceptBoth(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action) {
        return biAcceptStage(null, other, action);
    }
	//后一个任务与前一个任务不在同一个线程中执行
    public <U> CompletableFuture<Void> thenAcceptBothAsync(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action) {
        return biAcceptStage(asyncPool, other, action);
    }
	//后一个任务在指定的executor线程池中执行
    public <U> CompletableFuture<Void> thenAcceptBothAsync(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action, Executor executor) {
        return biAcceptStage(screenExecutor(executor), other, action);
    }

thenCompose()
对两个任务进行串行的调度操作,第一个任务操作完成时,将其结果作为参数传递给第二个任务。

	//后一个任务与前一个任务在同一个线程中执行
	public <U> CompletableFuture<U> thenCompose(
        Function<? super T, ? extends CompletionStage<U>> fn) {
        return uniComposeStage(null, fn);
    }
	//后一个任务与前一个任务不在同一个线程中执行
    public <U> CompletableFuture<U> thenComposeAsync(
        Function<? super T, ? extends CompletionStage<U>> fn) {
        return uniComposeStage(asyncPool, fn);
    }
	//后一个任务在指定的executor线程池中执行
    public <U> CompletableFuture<U> thenComposeAsync(
        Function<? super T, ? extends CompletionStage<U>> fn,
        Executor executor) {
        return uniComposeStage(screenExecutor(executor), fn);
    }

thenCompose()方法第二个任务的返回值是一个CompletionStage异步实例。

		ExecutorService executor= Executors.newFixedThreadPool(10);
        CompletableFuture<String> completableFuture= CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getId());//12
            return "你好,周先生";
        },executor).thenComposeAsync(new Function<String,CompletableFuture<String>>(){
            @Override
            public CompletableFuture<String> apply(String s) {
                return CompletableFuture.supplyAsync(()->{
                    System.out.println(Thread.currentThread().getId());//12
                    return "你好,毛先生";
                });
            }
        });
        System.out.println(completableFuture.get());//输出你好,毛先生
        executor.shutdown();

异步任务的合并执行

主要实现为以下几个方法:thenCombine()、runAfterBoth()、
thenAcceptBoth()。

thenCombine()
thenCombine()会在两个CompletionStage任务都执行完成后,一块来处理两个任务的执行结果。如果要合并多个任务,可以使用allOf()。

	//合并第二步任务的CompletionStage实例,返回第三步任务的CompletionStage
	public <U,V> CompletableFuture<V> thenCombine(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(null, other, fn);
    }
	//不一定在同一个线程中执行第三步任务的CompletionStage实例
    public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(asyncPool, other, fn);
    }
	//第三步任务的CompletionStage实例在指定的executor线程池中执行
    public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
        return biApplyStage(screenExecutor(executor), other, fn);
    }

其中泛型参数T:表示第一个任务所返回结果的类型。泛型参数U:表示第二个任务所返回结果的类型。泛型参数V:表示第三个任务所返回结果的类型。
案例:

		CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getId());//12
            return "你好,周先生";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getId());//12
            return "你好,毛先生";
        });
        CompletableFuture<String> future3 = future1.thenCombine(future2, new BiFunction<String, String, String>(){
            @Override
            public String apply(String s, String s2) {
                return s+"-----"+s2;
            }
        });
        String s = future3.get();
        System.out.println(s);//你好,周先生-----你好,毛先生

而runAfterBoth()方法不关注每一步任务的输入参数和输出参数,thenAcceptBoth()中第三个任务接收第一个和第二个任务的结果,但是不返回结果。

异步任务的选择执行

若异步任务的选择执行不是按照某种条件进行选择的,而按照执行速度进行选择的:前面两并行任务,谁的结果返回速度快,其结果将作为第三步任务的输入。对两个异步任务的选择可以通过CompletionStage接口的applyToEither()、acceptEither()等方法来实现。
applyToEither()

	//和其他任务进行速度比较,最快返回的结果用于执行fn回调函数
	public <U> CompletableFuture<U> applyToEither(
        CompletionStage<? extends T> other, Function<? super T, U> fn) {
        return orApplyStage(null, other, fn);
    }
	//和其他任务进行速度比较,最快返回的结果用于执行fn回调函数,不一定在同一个线程中执行fn回调函数
    public <U> CompletableFuture<U> applyToEitherAsync(
        CompletionStage<? extends T> other, Function<? super T, U> fn) {
        return orApplyStage(asyncPool, other, fn);
    }
	//和其他任务进行速度比较,最快返回的结果用于执行fn回调函数,在指定线程池执行fn回调函数
    public <U> CompletableFuture<U> applyToEitherAsync(
        CompletionStage<? extends T> other, Function<? super T, U> fn,
        Executor executor) {
        return orApplyStage(screenExecutor(executor), other, fn);
    }

案例:

		CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getId());//12
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "你好,周先生";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getId());//12
            return "你好,毛先生";
        });
        CompletableFuture<String> future3 = future1.applyToEither(future2, new Function<String, String>(){
            @Override
            public String apply(String s) {
                return s;
            }
        });
        String s = future3.get();
        System.out.println(s);//你好,毛先生

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1297895.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python中利用遗传算法探索迷宫出路

更多资料获取 &#x1f4da; 个人网站&#xff1a;ipengtao.com 当处理迷宫问题时&#xff0c;遗传算法提供了一种创新的解决方案。本文将深入探讨如何运用Python和遗传算法来解决迷宫问题。迷宫问题是一个经典的寻路问题&#xff0c;寻找从起点到终点的最佳路径。遗传算法是一…

半导体划片机助力氧化铝陶瓷片切割:科技与工艺的完美结合

在当今半导体制造领域&#xff0c;氧化铝陶瓷片作为一种高性能、高可靠性的材料&#xff0c;被广泛应用于各种电子设备中。而半导体划片机的出现&#xff0c;则为氧化铝陶瓷片的切割提供了新的解决方案&#xff0c;实现了科技与工艺的完美结合。 氧化铝陶瓷片是一种以氧化铝为基…

基于ssm端游账号销售管理系统论文

摘 要 互联网发展至今&#xff0c;无论是其理论还是技术都已经成熟&#xff0c;而且它广泛参与在社会中的方方面面。它让信息都可以通过网络传播&#xff0c;搭配信息管理工具可以很好地为人们提供服务。针对端游账号销售信息管理混乱&#xff0c;出错率高&#xff0c;信息安全…

单片机语言--C51语言的数据类型以及存储类型以及一些基本运算

C51语言 本文主要涉及C51语言的一些基本知识&#xff0c;比如C51语言的数据类型以及存储类型以及一些基本运算。 文章目录 C51语言一、 C51与标准C的比较二、 C51语言中的数据类型与存储类型2.1、C51的扩展数据类型2.2、数据存储类型 三、 C51的基本运算3.1 算术运算符3.2 逻辑…

使用Draw.io制作泳道图

使用Draw.io制作泳道图 一、横向泳道图1. 有标题泳道图2. 无标题泳道图3. 横纵向扩展泳道 二、纵向泳道图三、横纵交错地泳道图想做这样的图具体步骤1. 拖拽一个带标题的横向泳道图2. 拖拽一个带标题的单一图&#xff0c;并且把它放进Lane1中3. 其他注意 四、下载文件说明 一、…

ThinkPHP生活用品商城系统

有需要请加文章底部Q哦 可远程调试 ThinkPHP生活用品商城系统 一 介绍 此生活用品商城系统基于ThinkPHP框架开发&#xff0c;数据库mysql&#xff0c;前端bootstrap。系统分为用户和管理员。(附带配套设计文档) 技术栈&#xff1a;ThinkPHPmysqlbootstrapphpstudyvscode 二 …

SAP UI5 walkthrough step7 JSON Model

这个章节&#xff0c;帮助我们理解MVC架构中的M 我们将会在APP中新增一个输入框&#xff0c;并将输入的值绑定到model&#xff0c;然后将其作为描述&#xff0c;直接显示在输入框的右边 首先修改App.controllers.js webapp/controller/App.controller.js sap.ui.define([&…

.NET开源且好用的权限工作流管理系统

前言 系统权限管理、工作流是企业应用开发中很常见的功能&#xff0c;虽说开发起来难度不大&#xff0c;但是假如从零开始开发一个完整的权限管理和工作流平台的话也是比较耗费时间的。今天推荐一款.NET开源且好用的权限工作流管理系统&#xff08;值得借鉴参考和使用&#xf…

【Python】翻译包translate

在Python里&#xff0c;可以用translate包完成语言的翻译转化 import translatetrantranslate.Translator(from_lang"ZH",to_lang"JA") strtran.translate("今天的天气怎么样")print(str)

2023年 - 我的程序员之旅和成长故事

2023年 - 我的程序员之旅和成长故事 &#x1f525; 1.前言 大家好&#xff0c;我是Leo哥&#x1fae3;&#x1fae3;&#x1fae3;&#xff0c;今天咱们不聊技术&#xff0c;聊聊我自己&#xff0c;聊聊我从2023年年初到现在的一些经历和故事&#xff0c;我也很愿意我的故事分…

做数据分析为何要学统计学(6)——什么问题适合使用方差分析?

方差分析&#xff08;ANOVA&#xff0c;也称变异数分析&#xff09;是英国统计学家Fisher&#xff08;1890.2.17&#xff0d;1962.7.29&#xff09;提出的对两个或以上样本总体均值进行差异显著性检验的方法。 它的基本思想是将测量数据的总变异&#xff08;即总方差&#xff…

Mac电脑vm虚拟机 VMware Fusion Pro中文 for mac

VMware Fusion Pro是一款功能强大的虚拟机软件&#xff0c;适用于需要在Mac电脑上运行其他操作系统的用户。它具有广泛的支持、快速稳定的特点以及多种高级功能&#xff0c;可以满足用户的各种需求和场景。 多操作系统支持&#xff1a;VMware Fusion Pro允许在Mac电脑上运行多…

Linux 删除文件名乱码的文件

现象&#xff1a; 处理&#xff1a; 1.>ls -li 获取文件对应的ID号 2.把删除指定文件&#xff08;ID号 &#xff09;执行&#xff1a; find ./ -inum 268648910 -exec rm {} \;

MYSQL练题笔记-高级查询和连接-连续出现的数字

一、题目相关内容 1&#xff09;相关的表和题目 2&#xff09;帮助理解题目的示例&#xff0c;提供返回结果的格式 二、自己初步的理解 其实这一部分的题目很简单&#xff0c;但是没啥思路啊&#xff0c;怎么想都想不通&#xff0c;还是看题解吧&#xff0c;中等题就是中等题…

每日一练【四数之和】

一、题目描述 18. 四数之和 给你一个由 n 个整数组成的数组 nums &#xff0c;和一个目标值 target 。请你找出并返回满足下述全部条件且不重复的四元组 [nums[a], nums[b], nums[c], nums[d]] &#xff08;若两个四元组元素一一对应&#xff0c;则认为两个四元组重复&#x…

Conda 搭建简单的机器学习 Python 环境

文章目录 Conda 概述Conda 常用命令Conda 自身管理查看 Conda 版本更新 Conda清理索引缓存添加镜像源设置搜索时显示通道地址查看镜像源删除镜像源 环境管理创建虚拟环境删除虚拟环境查看所有虚拟环境复制虚拟环境激活虚拟环境关闭虚拟环境导入、导出环境 包管理虚拟环境下安装…

java多线程(常用方法、实现方式、线程安全问题、生命周期、线程池)

多线程相关的三组概念 程序和进程 程序&#xff08;program&#xff09;&#xff1a;一个固定的运行逻辑和数据的集合&#xff0c;是一个静态的状态&#xff0c;一般存储在硬盘中。简单来说就是我们编写的代码 进程&#xff08;process&#xff09;&#xff1a;一个正在运行的…

openGauss学习笔记-149 openGauss 数据库运维-备份与恢复-逻辑备份与恢复之gs_restore

文章目录 openGauss学习笔记-149 openGauss 数据库运维-备份与恢复-逻辑备份与恢复之gs_restore149.1 背景信息149.2 命令格式149.3 参数说明149.3.1 通用参数-V, –version149.3.2 导入参数 149.4 示例 openGauss学习笔记-149 openGauss 数据库运维-备份与恢复-逻辑备份与恢复…

springboot075电影评论网站系统设计与实现

springboot075电影评论网站系统设计与实现 源码获取&#xff1a; https://docs.qq.com/doc/DUXdsVlhIdVlsemdX

Nginx负载均衡实战

&#x1f3b5;负载均衡组件 ngx_http_upstream_module https://nginx.org/en/docs/http/ngx_http_upstream_module.html upstream模块允许Nginx定义一组或多组节点服务器组&#xff0c;使用时可以通过多种方式去定义服务器组 样例&#xff1a; upstream backend {server back…