从零开始 Spring Boot 42:异步执行

news2024/11/20 1:40:01

从零开始 Spring Boot 42:异步执行

spring boot

图源:简书 (jianshu.com)

在之前的文章中,我多次介绍过在 Spring 中如何使用@Async注解让方法调用变成“异步执行”:

  • 在这篇文章中,介绍了如何让定时任务使用@Async变成异步执行。
  • 在这篇文章中,介绍了如何让事件监听使用@Async变成异步执行。

下面,本篇文章将详细探讨@Async在 Spring 中的用途。

简单示例

老规矩,我们从一个简单示例开始说明:

@Component
public class Fibonacci {
    /**
     * 返回斐波那契数列的第n位的值
     *
     * @param n 从1开始(包括)
     * @return
     */
    public int fibonacci(int n) throws InterruptedException {
        Thread.sleep(100);
        if (n <= 2) {
            return 1;
        }
        return fibonacci(n - 1) + fibonacci(n - 2);
    }

    /**
     * 打印斐波那契数列第n位的结果到控制台
     * @param n 从1开始(包括)
     * @throws InterruptedException
     */
    public void print(int n) throws InterruptedException {
        System.out.printf("fibonacci %d=%d%n", n, fibonacci(n));
    }
}

这里定义一个 bean Fibonacci,负责返回或打印斐波那契数列。

为了让产生斐波那契数列元素的过程“更明显”,这里让每一步递归调用都延迟0.1秒(Thread.sleep(100))。

使用ApplicationRunner测试:

@Configuration
public class WebConfig {
    @Autowired
    private Fibonacci fibonacci;

    @Bean
    public ApplicationRunner applicationRunner() {
        return args -> {
            fibonacci.print(5);
            fibonacci.print(6);
            fibonacci.print(7);
        };
    }
}

输出:

fibonacci 5=5
fibonacci 6=8
fibonacci 7=13

整个测试用例都是顺序执行的,且存在明显的延迟。

可以利用@Async将相应方法的执行改为异步来改善性能:

@Component
public class Fibonacci {
	// ...
    @Async
    public void print(int n) throws InterruptedException {
        System.out.printf("fibonacci %d=%d%n", n, fibonacci(n));
    }
}

@Configuration
@EnableAsync
public class WebConfig {
	// ...
}

不要忘了在配置类上添加@EnableAsync以启用 Spring 的异步执行功能。

实现原理

实际上 Spring 的异步执行是通过使用代理(JDK 代理或 CGLIB)或者 AspectJ 织入来实现的。

AspectJ 是一个主流的 AOP 框架。

这点可以通过@EnableAsync注解的定义看出:

public @interface EnableAsync {
    Class<? extends Annotation> annotation() default Annotation.class;
    boolean proxyTargetClass() default false;
    AdviceMode mode() default AdviceMode.PROXY;
    int order() default 2147483647;
}

这些属性有如下用途:

  • annotation,指定用于标记异步执行方法的注解,默认情况下 Spring 使用@Asyncjavax.ejb.Asynchronous
  • mode,实现机制,有两个可选项:
    • AdviceMode.PROXY,用代理实现。
    • AdviceMode.ASPECTJ,用 AspectJ 实现。
  • proxyTargetClass,是否使用 CGLIB 代理,这个属性只有modeAdviceMode.PROXY时才生效。
  • order,设置AsyncAnnotationBeanPostProcessorBeanPostProcessor中的执行顺序,默认为最后运行,以便不影响之前可能存在的代理。

我们可以看出,默认情况下 Spring 使用 JDK 代理来实现异步调用,因此它也具备 Spring AOP 相同的限制。

AOP 实现

为了更好的说明问题,我们可以用 AOP 来自己实现一个类似的异步执行机制:

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface MyAsync {
}

@Component
@Aspect
public class MyAsyncAspect {
    @Around(value = "execution(void *(..)) && @annotation(annotation)")
    public Object asyncCall(ProceedingJoinPoint pjp, MyAsync annotation) {
        new Thread(() -> {
            try {
                pjp.proceed();
            } catch (Throwable e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            }
        }).start();
        return null;
    }
}

@Component
public class Fibonacci {
	// ...
    @MyAsync
    public void print(int n) throws InterruptedException {
        System.out.printf("fibonacci %d=%d%n", n, fibonacci(n));
    }
}

更多关于 AOP 的内容,可以阅读我的另一篇文章。

限制

在学习 AOP 的时候,我们知道因为 AOP 的实现机制的关系,存在着一些限制。而 Spring 异步执行采用和 Spring AOP 类似的实现原理,所以也存在同样的问题。

借鉴前边学到的内容,我们很容易就能总结出以下限制:

在默认情况下,异步执行使用 JDK 动态代理实现,因此:

  • 只能让public的方法异步执行(JDK 动态代理使用接口实现)。
  • “自调用”时可能无法异步执行(绕过代理)。

如果使用 CGLIB 代理实现,限制会相对少一些(可以代理protected方法),但依然存在自调用时的问题。

关于此类限制的讨论和相应的解决方案,可以阅读 AOP 相关的文章,里边有详细描述,这里不再赘述。

返回结果

通常情况下异步执行方法返回的都是void,但如果我们需要返回异步执行的结果,要怎么做?

看一个示例:

@Configuration
@EnableAsync
public class WebConfig {
    @Autowired
    private Fibonacci fibonacci;
    private static final int MAX_FIBONACCI_INDEX = 40;

    @Bean
    ApplicationRunner applicationRunner2() throws InterruptedException {
        return new ApplicationRunner() {
            @Override
            @MyClock
            public void run(ApplicationArguments args) throws Exception {
                List<Integer> numbers = new ArrayList<>();
                for (int n = 1; n <= MAX_FIBONACCI_INDEX; n++) {
                    numbers.add(fibonacci.fibonacci(n));
                }
                System.out.println(numbers);
            }
        };
    }
}

这里获取40个斐波那契元素,然后一起输出。因为其中每次获取斐波那契数都是顺序执行(单线程),所以相当耗时。

最终输出:

[1, 1, 2, ... , 63245986, 102334155]
com.example.async.WebConfig$2.run() is called, use 876 mills.

下面我们用异步执行来改善效率。

要让方法异步执行并返回一个值,需要让方法返回一个Future类型:

@Component
public class Fibonacci {
	// ...
    @Async
    public Future<Integer> asyncFibonacci(int n) throws InterruptedException {
        int result = fibonacci(n);
        return CompletableFuture.completedFuture(result);
    }
}

这里的CompletableFuture是 Spring 的一个Future实现,可以利用CompletableFuture.completedFuture返回一个包含异步调用结果的Future对象。

最终,我们需要收集所有异步执行返回的Future对象,并通过Future.get方法获取其中的异步执行结果:

@Configuration
@EnableAsync
public class WebConfig {
	// ...
    @Bean
    public ApplicationRunner applicationRunner() {
        return new ApplicationRunner() {
            @Override
            @MyClock
            public void run(ApplicationArguments args) throws Exception {
                List<Integer> numbers = new ArrayList<>();
                List<Future<Integer>> futures = new ArrayList<>();
                for (int n = 1; n <= MAX_FIBONACCI_INDEX; n++) {
                    futures.add(fibonacci.asyncFibonacci(n));
                }
                for (Future<Integer> future : futures) {
                    numbers.add(future.get());
                }
                System.out.println(numbers);
            }
        };
    }
    // ...
}

输出:

[1, 1, 2, ... , 63245986, 102334155]
com.example.async.WebConfig$1.run() is called, use 380 mills.

效率提升了一倍多。

并发相关的经验告诉我们,将并发用于密集计算,计算规模(并行任务数目)越大,性能提升越明显。

ThreadPoolTaskExecutor

默认情况下,Spring 使用ThreadPoolTaskExecutor执行异步方法:

@Configuration
@EnableAsync
public class WebConfig {
	// ...
    @Autowired
    private TaskExecutor taskExecutor;
    // ...
    @Bean
    public ApplicationRunner applicationRunner3(){
        return args -> {
            System.out.println(taskExecutor);
            if (taskExecutor instanceof ThreadPoolTaskExecutor){
                var executor = (ThreadPoolTaskExecutor) taskExecutor;
                System.out.println("getThreadNamePrefix:%s".formatted(executor.getThreadNamePrefix()));
                System.out.println("getActiveCount:%s".formatted(executor.getActiveCount()));
                System.out.println("getCorePoolSize:%s".formatted(executor.getCorePoolSize()));
                System.out.println("getKeepAliveSeconds:%s".formatted(executor.getKeepAliveSeconds()));
                System.out.println("getMaxPoolSize:%s".formatted(executor.getMaxPoolSize()));
                System.out.println("getQueueCapacity:%s".formatted(executor.getQueueCapacity()));
                System.out.println("getPoolSize:%s".formatted(executor.getPoolSize()));
            }
        };
    }
}

输出:

getThreadNamePrefix:task-
getActiveCount:0
getCorePoolSize:8
getKeepAliveSeconds:60
getMaxPoolSize:2147483647
getQueueCapacity:2147483647
getPoolSize:8

ThreadPoolTaskExecutor的这些 Getter 返回的信息包括:

  • getThreadNamePrefix,线程名称前缀。
  • getActiveCount,当前存活的线程数量。
  • getCorePoolSize,核心线程池大小(超过该值后会扩充线程池,直到最大线程池大小)。
  • getMaxPoolSize,最大线程池大小(超过该值后会将线程放入等待队列)。
  • getQueueCapacity,等待队列的容量(被塞满后新的线程将被丢弃)。
  • getKeepAliveSeconds,线程存活数目。
  • getPoolSize,当前线程池大小。

总的来说,``ThreadPoolTaskExecutor`可以合理地复用线程:如果所需线程数目超过核心线程池大小,会将线程放入等待队列,以等待核心线程空闲后执行。如果等待队列被塞满,会添加新的线程以期望能够加快线程执行。最后,如果添加的线程数目超过最大线程池大小,才会按照规则丢弃线程。

这个过程可以用下图表示:

img

图源:知乎

  • 在早期的 Spring 版本,默认使用simpleAsyncTaskExecutor执行异步调用,该TaskExecutor不会进行线程复用,只是简单的增加新的线程。
  • 这里比较重要的是核心线程池大小,一般来说设置为执行代码所在机器的CPU核心数即可,我的笔记本是8核的,所以这里 Spring 将该值设置为8。

一般来说,使用默认设置的ThreadPoolTaskExecutor就可以了,如果需要进行修改,可以:

@Configuration
public class AsyncConfig implements AsyncConfigurer {
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setThreadNamePrefix("ThreadPoolTaskExecutor-");
        threadPoolTaskExecutor.setCorePoolSize(8);
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }
}

此时在异步方法中打印线程名称:

@Component
public class Fibonacci {
	@Async
    public Future<Integer> asyncFibonacci(int n) throws InterruptedException {
        System.out.println(Thread.currentThread().getName());
        // ...
    }
    // ...
}

就能看到控制台输出的线程名称是ThreadPoolTaskExecutor-x,而不是之前默认的task-x

单独指定 Executor

我们也可以为某些异步方法单独指定一个Executor,而不是使用全局的Executor

@Configuration
@EnableAsync
public class WebConfig {
	// ...
    @Bean
    public Executor threadPoolTaskExecutor(){
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setThreadNamePrefix("another-ThreadPoolTaskExecutor-");
        return threadPoolTaskExecutor;
    }
}

@Component
public class Fibonacci {
    // ...
    @Async("threadPoolTaskExecutor")
    public Future<Integer> asyncFibonacci(int n) throws InterruptedException {
        System.out.println(Thread.currentThread().getName());
		// ...
    }
    // ...
}

就像上面的示例,可以在@Async中指定一个Executor类型的 bean,Spring 将用这个 bean 执行这个方法的异步调用。

异常处理

如果异常方法返回的是Future,且异步调用会产生异常,将通过Future.get抛出:

@Component
public class Fibonacci {
    // ...
    @Async
    public Future<Integer> asyncFibonacci(int n) throws InterruptedException {
        if (n < 1) {
            throw new IllegalArgumentException("n 不能小于1");
        }
		// ...
    }
    // ...
}

@Configuration
@EnableAsync
public class WebConfig {
    // ...
    @Bean
    public ApplicationRunner applicationRunner3() {
        return args -> {
            Future<Integer> future = fibonacci.asyncFibonacci(0);
            System.out.println(future.get());
        };
    }
}

这里会抛出一个IllegalStateException异常。

如果返回类型是void,Spring 会使用一个默认的“异常处理器”SimpleAsyncUncaughtExceptionHandler来处理异常:

@Component
public class Fibonacci {
	// ...
    @Async
    public void print(int n) throws InterruptedException {
        if (n < 1) {
            throw new IllegalArgumentException("n不能小于1");
        }
        System.out.printf("fibonacci %d=%d%n", n, fibonacci(n));
    }
}

@Configuration
@EnableAsync
public class WebConfig {
    @Bean
    public ApplicationRunner applicationRunner3() {
        return args -> {
            fibonacci.print(0);
        };
    }
}

错误信息:

2023-06-16T16:52:17.509+08:00 ERROR 27872 --- [lTaskExecutor-1] .a.i.SimpleAsyncUncaughtExceptionHandler : Unexpected exception occurred invoking async method: public void com.example.async.Fibonacci.print(int) throws java.lang.InterruptedException
...

可以用一个自定义异常处理器作为 Spring 异步调用时的全局异常处理器:

@Configuration
public class AsyncConfig implements AsyncConfigurer {
    // ...
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new AsyncUncaughtExceptionHandler() {
            @Override
            public void handleUncaughtException(Throwable ex, Method method, Object... params) {
                System.out.println("Exception message - " + ex.getMessage());
                System.out.println("Method name - " + method.getName());
                for (Object param : params) {
                    System.out.println("Parameter value - " + param);
                }
            }
        };
    }
}

The End,谢谢阅读。

本文的完整示例可以通过这里获取。

参考资料

  • 从零开始 Spring Boot 40:定时任务 - 红茶的个人站点 (icexmoon.cn)
  • 从零开始 Spring Boot 41:事件 - 红茶的个人站点 (icexmoon.cn)
  • 从零开始 Spring Boot 32:AOP II - 红茶的个人站点 (icexmoon.cn)
  • AsyncResult (Spring Framework 6.0.10 API) — AsyncResult(Spring Framework 6.0.10 API)
  • SimpleAsyncTaskExecutor (Spring Framework 6.0.10 API)
  • Spring自带的线程池ThreadPoolTaskExecutor - 知乎 (zhihu.com)
  • How To Do @Async in Spring | Baeldung

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/653766.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

​达梦三种主备集群介绍

DM主备守护集群分为实时主备、读写分离集群、MPP主备集群。基于redo日志实现&#xff0c;不同的集群采用不同的redo日志归档类型。 实时主备&#xff1a; 实时主备系统由主库、实时备库、守护进程和监视器组成。通过部署实时主备系统&#xff0c;可以及时检测并处理各种硬件故…

GIS大数据处理框架sedona(塞多纳)编程入门指导

GIS大数据处理框架sedona(塞多纳)编程入门指导 简介 Apache Sedona™是一个用于处理大规模空间数据的集群计算系统。Sedona扩展了现有的集群计算系统&#xff0c;如Apache Spark和Apache Flink&#xff0c;使用一组开箱即用的分布式空间数据集和空间SQL&#xff0c;可以有效地…

【备战秋招】每日一题:2023.05-B卷-华为OD机试 - 2023.05-B卷-华为OD机试 - AI面板识别

2023大厂笔试模拟练习网站&#xff08;含题解&#xff09; www.codefun2000.com 最近我们一直在将收集到的各种大厂笔试的解题思路还原成题目并制作数据&#xff0c;挂载到我们的OJ上&#xff0c;供大家学习交流&#xff0c;体会笔试难度。现已录入200道互联网大厂模拟练习题&a…

【双曲几何学 02】什么是极点和极线?

一、说明 Pole and polar 对于几何学&#xff0c;是普遍的概念。可能高中就学过&#xff0c;问题是在双曲几何又用到这个概念&#xff0c;因此&#xff0c;这里再次强调理解这个概念 。为后边学习双曲几何扫清障碍。 二、基本概念 在几何学中&#xff0c;极点和极线分别是相对于…

NetSuite SuiteQlet 功能包

目录 1.前言 2.功能说明 2.1术语 2.2功能概述 2.3逻辑阐释 3.安装 4.权限配置 ​​​5.操作指南 5.1Query查询 5.2Chart图表 5.3Dashboard仪表板发布 6.注意事项 7.视频链接 1.前言 SuiteQL是NetSuite数据分析框架中进行数据查询的有力工具&#xff0c;使用得当则…

华为云CodeArtBuild减负!云端编译构建,让你的开发省时省力!

每一个剑客都需要一把趁手的利器&#xff0c;初学者如何利用编译软件&#xff1f;只需五分钟&#xff0c;带你体验软件开发的效率。 请注册华为云账号并完成实名认证&#xff0c;实验过程中请使用Chrome浏览器完成相关操作。 华为云账号注册步骤请参考&#xff1a; ​​http…

介质中的平面电磁波与色散效应

目录 理想介质中的平面电磁波 方程推导 解的讨论 ​​​​​​​瞬时形式 等相面 参数讨论 导电媒质中的均匀平面波 方程推导 解的讨论 波的特征分析 色散效应的讨论 理想介质中的平面电磁波 方程推导 已知两个方程 我们如果令 方程就可以化简为 这两个方程在数学…

《C++ Primer》--学习1

变量和基本类型 NULL是一个预处理变量&#xff0c;用NULL初始化指针和用0初始化指针是一样的&#xff08;但是不可以直接用值为0的int变量来初始化&#xff09;&#xff0c;应该尽量用nullptr&#xff0c;避免使用NULL 指向指针的指针 声明符中修饰符的个数并没有限制&#…

redis中sort妙用,实现动态生成排行榜

在游戏中&#xff0c;有很多维度的排行榜&#xff0c;服务器在实现过程中&#xff0c;一般都要预先维护对应因子的zset&#xff0c;比如根据玩家等级来排序&#xff0c;那就需要对应有一个level作为score的zset&#xff0c;如果是以战斗力排序&#xff0c;那就用战力作为score维…

微服务基础介绍

Part1一、基本概念 微服务最主要的功能是根据业务拆分成一个一个的子服务&#xff0c;实现功能的去耦合&#xff0c;每一个微服务提供单个业务功能的服务&#xff0c;各司其职&#xff0c;从技术角度看就是一种灵活独立的单元&#xff0c;能够自行单独启动和关闭&#xff0c;一…

亚马逊云科技中国峰会:自主驾驶开发平台 Amazon DeepRacer

0.引言 自动驾驶技术的快速发展和应用前景已经引起了广泛的关注&#xff0c;毋庸讳言&#xff0c;无人驾驶已经成为当今及未来前沿科技公司的重点研究方向。在这个领域中&#xff0c;Amazon DeepRacer作为一款全面的自主驾驶开发平台备受瞩目。 1.了解Amazon DeepRacer 1.1 什…

什么是消息队列(MQ)

其实字面意思很清楚了&#xff0c;存放消息的队列。 由于它的应用场景在服务器方面被重新定义而名声大噪&#xff0c;它的价值也被由原先的通信而重新定义&#xff0c;成为高并发场景下&#xff0c;分布式系统解耦合&#xff0c;任务异步&#xff0c;流量削峰的利器。 其实消息…

【flink】SinkUpsertMaterializer

在flink cdc同步数据时&#xff0c;基于sql的实现方式中发现了作业DAG有个SinkMaterializer算子&#xff0c;而且检查checkpoint历史时发现该算子state越来越大&#xff0c; 有必要搞清楚为什么会多了这个算子&#xff0c;作用又是什么。 通过算子名称定位到了源码为类org.apa…

常用API(String,ArrayList)

1:String类概述 String是字符串类型&#xff0c;可以定义字符串变量指向字符串对象String是不可变字符串的原因&#xff1f;1.String变量每次的修改都是产生并指向新的字符串对象。2.原来的字符串对象都是没有改变的&#xff0c;所以称不可变字符串。 2&#xff1a;String创建…

一文搞懂VOS费率前缀、地区前缀的区别和使用

登录VOS3000客户端 进入费率管理 "VOS费率前缀"和"地区前缀"的主要区别如下: VOS费率前缀:VOS(Voice Over Service)费率前缀是指用于国际长途电话呼叫的特定前缀号码。不同的运营商或服务提供商可能会使用不同的VOS费率前缀,用于标识国际长途通话的费…

49 最佳实践-性能最佳实践-Nvme磁盘直通

文章目录 49 最佳实践-性能最佳实践-Nvme磁盘直通49.1 概述49.2 操作指导 49 最佳实践-性能最佳实践-Nvme磁盘直通 49.1 概述 设备直通技术是一种基于硬件的虚拟化解决方案&#xff0c;通过该技术&#xff0c;虚拟机可以直接连接到指定的物理直通设备上。对于用户来说&#x…

ByteV联合“智农”打造数字孪生高标准农田,助力乡村振兴

ByteV联合“智农”打造的数字孪生高标准农田&#xff0c;不仅要让粮食稳产、增产&#xff0c;更要对土壤肥力进行改良和提升。不仅能够实现科技引领农业发展&#xff0c;更在智慧农业的基础上实现一站式托管&#xff0c;真正做到技术提升、5G引领、建后管护的闭环管理。让高标准…

C语言之指针详解(7)

目录 本章重点 1. 字符指针 2. 数组指针 3. 指针数组 4. 数组传参和指针传参 5. 函数指针 6. 函数指针数组 7. 指向函数指针数组的指针 8. 回调函数 9. 指针和数组面试题的解析 上一篇博客我们说过会把回调函数的一些知识再给大家讲一遍 这里把void*强制类型转化为str…

【Hadoop】 | 搭建HA之报错锦集

知识目录 一、写在前面✨二、Hadoop的active结点无法主备切换&#x1f525;三、Hadoop Web端无法上传文件&#x1f349;四、hdfs创建文件夹报错&#x1f36d;五、IDEA操作Hdfs无法初始化集群&#x1f525;六、Java无法连接Hdfs&#x1f36d;七、找不到Hadoop家目录&#x1f525…

软件测试实战案例:支付功能板块如何测试?详细总结

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 大体上&#xff0…