异步代码处理

news2025/1/23 15:00:22

在Spring中,实现异步调用主要有三种方式:

方式一:注解方式
要开启异步支持,首先得在Spring Boot入口类上加上@EnableAsync注解:

@SpringBootApplication
@EnableAsync
public class DemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
}

新建service包,并创建TestService:

@Service
public class TestService {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Async
    public void asyncMethod() {
        sleep();
        logger.info("异步方法内部线程名称:{}", Thread.currentThread().getName());
    }

    public void syncMethod() {
        sleep();
    }

    private void sleep() {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

上面的Service中包含一个异步方法asyncMethod(开启异步支持后,只需要在方法上加上@Async注解便是异步方法了)和同步方法syncMethod。sleep方法用于让当前线程阻塞2秒钟。
因为异步的原因,程序并没有被sleep方法阻塞,这就是异步调用的好处。同时异步方法内部会新启一个线程来执行
默认情况下的异步线程池配置使得线程不能被重用,每次调用异步方法都会新建一个线程,我们可以自己定义异步线程池来优化。

@Configuration
public class AsyncPoolConfig {

    @Bean
    public ThreadPoolTaskExecutor asyncThreadPoolTaskExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(20);
        executor.setMaxPoolSize(200);
        executor.setQueueCapacity(25);
        executor.setKeepAliveSeconds(200);
        executor.setThreadNamePrefix("asyncThread");
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);

        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        executor.initialize();
        return executor;
    }
}

要使用该线程池,只需要在@Async注解上指定线程池Bean名称即可:

@Service
public class TestService {
    ......

    @Async("asyncThreadPoolTaskExecutor")
    public void asyncMethod() {
       ......
    }
    ......
}

处理异步回调
如果异步方法具有返回值的话,需要使用Future来接收回调值。我们修改TestService的asyncMethod方法,给其添加返回值:

@Async("asyncThreadPoolTaskExecutor")
public Future<String> asyncMethod() {
    sleep();
    logger.info("异步方法内部线程名称:{}", Thread.currentThread().getName());
    return new AsyncResult<>("hello async");
}

Future接口的get方法用于获取异步调用的返回值。
通过返回结果我们可以看出Future的get方法为阻塞方法,只有当异步方法返回内容了,程序才会继续往下执行。get还有一个get(long timeout, TimeUnit unit)重载方法,我们可以通过这个重载方法设置超时时间,即异步方法在设定时间内没有返回值的话,直接抛出java.util.concurrent.TimeoutException异常。
比如设置超时时间为60秒:

String result = stringFuture.get(60, TimeUnit.SECONDS);

方式二:内置线程池方式
可以使用Spring内置的线程池来实现异步调用,比如ThreadPoolTaskExecutor 和SimpleAsyncTaskExecutor。Spring提供了许多TaskExecutor的内置实现。下面简单介绍5种内置的线程池。

1)SimpleAsyncTaskExecutor:它不会复用线程,每次调用都是启动一个新线程。

2)ConcurrentTaskExecutor:它是Java API中Executor实例的适配器。

3)ThreadPoolTaskExecutor:这个线程池是最常用的。它公开了用于配置的bean属性,并将它包装在TaskExecutor中。

4)WorkManagerTaskExecutor:它基于CommonJ WorkManager来实现的,并且是在Spring上下文中的WebLogic或WebSphere中设置CommonJ线程池的工具类。

5)DefaultManagedTaskExecutor:主要用于支持JSR-236兼容的运行时环境,它是使用JNDI获得ManagedExecutorService,作为CommonJ WorkManager的替代方案。

通常情况下,ThreadPoolTaskExecuto最为常用,只要当ThreadPoolTaskExecutor不能满足需求时,可以使用ConcurrentTaskExecutor。如果在代码中声明了多个线程池,Spring会默认按照以下搜索顺序来调用线程池:

第一步,检查上下文中的唯一TaskExecutor Bean。

第二步,检查名为“ taskExecutor”的Executor Bean。

第三步,以上都无法无法处理,就会使用SimpleAsyncTaskExecutor来执行。

示例1:

CompletableFuture.runAsync()来完成异步任务(标准版)

(1)配置线程池

    /**
     * int corePoolSize,
     * int maximumPoolSize,
     * long keepAliveTime,
     * TimeUnit unit,
     * BlockingQueue<Runnable> workQueue,
     * ThreadFactory threadFactory,
     * RejectedExecutionHandler handler
     * @return
     */
    @Bean
    public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool) {
        return new ThreadPoolExecutor(pool.getCoreSize(),
                pool.getMaxSize(),
                pool.getKeepAliveTime(),
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(100000),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
                );
    }

(2)线程池参数配置类

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
	my.thread.core-size=20
	my.thread.max-size=200
	my.thread.keep-alive-time=10
*/
@ConfigurationProperties(prefix = "my.thread")
@Component
@Data
public class ThreadPoolConfigProperties {
	//核心线程数
    private Integer coreSize;
    //最大线程数
    private Integer maxSize;
    //空余线程的存活时间
    private Integer keepAliveTime;
}

(3)测试异步任务

	@Autowired
    private ThreadPoolExecutor executor;
	
	//在这里开启一个异步任务,提交给线程池,runAsync()方法没有返回值,需要有返回值的可使用supplyAsync()方法
    @Test
    void testCompletableFuture() {
        CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
            int result = 0;
            for (int i = 0; i <= 100; i++) {
                result += i;
            }
            System.out.println(result);
        }, executor);
    }

(4)关于CompletableFuture的其他相关用法
4.1 CompletableFuture的**get()**方法可以获取异步的结果,get方法是一个阻塞式等待的方法,也即get方法会等待异步任务的完成

CompletableFuture<AtomicInteger> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            for (int i = 0; i <= 100; i++) {
                sum2.addAndGet(i);
            }
            return sum2;
     }, executor);
        //获取异步结果
 AtomicInteger integer = completableFuture2.get();

4.2 allOf : 等待所有任务完成完成

AtomicInteger sum = new AtomicInteger();
AtomicInteger sum2 = new AtomicInteger();
CompletableFuture<AtomicInteger> completableFuture1 = CompletableFuture.supplyAsync(() -> {
   for (int i = 0; i <= 100; i++) {
       sum.addAndGet(i);
   }
   return sum;
}, executor);


CompletableFuture<AtomicInteger> completableFuture2 = CompletableFuture.supplyAsync(() -> {
   for (int i = 0; i <= 100; i++) {
       sum2.addAndGet(i);
   }
   return sum2;
}, executor);
AtomicInteger integer = completableFuture2.get();

//allOf : 等待所有任务完成完成,注意get方法,是阻塞式等待,等待上面的异步任务都完成
CompletableFuture.allOf(completableFuture1,completableFuture2).get();

//获取异步结果
 AtomicInteger atomicInteger1 = completableFuture1.get();
 AtomicInteger atomicInteger2 = completableFuture2.get();

 System.out.println("结果是--->"+atomicInteger1.addAndGet(atomicInteger2.intValue()));

异步任务完成时,whenComplete,exceptionally

CompletableFuture<AtomicInteger> completableFuture3 = CompletableFuture.supplyAsync(() -> {
        for (int i = 0; i <= 10; i++) {
            sum2.addAndGet(i);
        }
        return sum2;
    }, executor).whenComplete((res, exception) -> {
        //当出现异常,可以拿到异常信息,但是无法修改返回数据
        System.out.println("结果是:" + res + ",异常:" + exception);
    }).exceptionally(throwable -> {
        //可以感知异常,同时返回默认值
        return new AtomicInteger(10);
    });

4.4 handle,方法完成后的后续处理

CompletableFuture<Integer> completableFuture4 = CompletableFuture.supplyAsync(() -> {
      int i = 10 / 2;
       return i;
   }, executor).handle((res, throwable) -> {
       //res 为结果,throwable 为异常
       if (res != null) {
           return res * 2;
       }
       if (throwable != null) {
           return -1;
       }
       return 0;
   });
   System.out.println("completableFuture4--结果是:"+completableFuture4.get());

4.5 异步任务串行化

/**
* 异步任务串行化
*      thenAcceptAsync  可以接收上一步获取的结果,但是无返回值
*      thenApplyAsync   可以接收上一步获取的结果,有返回值
*/
   CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
       int i = 10 / 2;
       return i;
   }, executor).thenApplyAsync(res -> {
       //res为上一步的结果
       return res * 2;
   }, executor).thenAcceptAsync((res) -> {
       System.out.println("hello ...thenAcceptAsync");
   }, executor);

示例2:

CompletableFuture.runAsync()来完成异步任务(简易版)

(1)初始化池

private ExecutorService executor = Executors.newFixedThreadPool(5);

(2)使用

@ApiOperation(value = "领实物奖")
    @PostMapping(Routes.API_HELP_OFFLINE_PRIZE)
    public Response getOfflinePrize(@Valid @RequestBody GetOfflinePrizeReq req, HttpServletRequest request) {
        try {
            UserInfo user = getUser(req.getToken(), request);
           
            CompletableFuture.runAsync(() -> {
                record.setPrizeName(prizes.get(0).getPrizeName());
                helpFacadeService.sendPrizes(user, record, prizes);
                helpFacadeService.recordPrizeAddress(req, user, prizes);
            }, executor);
            return ResponseUtils.buildResponse(null);
        } catch (Exception e) {
            log.error("[getPrize error] error: {}", e);
            return ResponseUtils.buildErrorResponse(SYSTEM_ERROR, "领奖失败");
        }
    }

CompletableFuture提供了四个静态方法来创建一个异步操作:

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

这四个方法的区别是:

· runAsync() 以Runnable函数式接口类型为参数,没有返回结果,supplyAsync() 以Supplier函数式接口类型为参数,返回结果类型为U;Supplier接口的 get()是有返回值的(会阻塞)
· 使用没有指定Executor的方法时,内部使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。
· 默认情况下CompletableFuture会使用公共的ForkJoinPool线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置ForkJoinPool线程池的线程数)。如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰

方式三:自定义线程池方式
可以通过实现AsyncConfigurer接口或者直接继承AsyncConfigurerSupport类来自定义线程池。但是非完全托管的Bean和完全托管的Bean实现方式有点小差异。

首先,来看非完全托管的Spring Bean,实现方式如代码所示:
在这里插入图片描述

@Configuration
@EnableAsync
public class AppConfig implements AsyncConfigurer {

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(7);
        executor.setMaxPoolSize(42);
        executor.setQueueCapacity(11);
        executor.setThreadNamePrefix("MyExecutor-");
        executor.initialize();//手动初始化
        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new MyAsyncUncaughtExceptionHandler();
    }
}

在这段代码中,ThreadPoolTaskExecutor不是完全托管的Spring bean。

然后,来看完全托管的Spring Bean,实现方式如代码所示

@Configuration
@EnableAsync
public class AppConfig implements AsyncConfigurer {

    @Bean
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(7);
        executor.setMaxPoolSize(42);
        executor.setQueueCapacity(11);
        executor.setThreadNamePrefix("MyExecutor-");
        //executor.initialize();//不用手动调用
        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new MyAsyncUncaughtExceptionHandler();
    }
}

只要在异步方法上添加@Bean注解,不需要手动调用线程池的initialize()方法,在Bean在初始化之后会自动调用。需要注意的是,在同级类中直接调用异步方法无法实现异步。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/105308.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

算法leetcode|24. 两两交换链表中的节点(rust重拳出击)

文章目录24. 两两交换链表中的节点&#xff1a;样例 1&#xff1a;样例 2&#xff1a;样例 3&#xff1a;提示&#xff1a;分析&#xff1a;题解&#xff1a;rustgoccpythonjava24. 两两交换链表中的节点&#xff1a; 给你一个链表&#xff0c;两两交换其中相邻的节点&#xf…

百倍加速IO读写!快使用Parquet和Feather格式!⛵

&#x1f4a1; 作者&#xff1a;韩信子ShowMeAI &#x1f4d8; 数据分析实战系列&#xff1a;https://www.showmeai.tech/tutorials/40 &#x1f4d8; 本文地址&#xff1a;https://www.showmeai.tech/article-detail/409 &#x1f4e2; 声明&#xff1a;版权所有&#xff0c;转…

你碰到过这8种Spring事务失效的场景的那几种?

前言 作为Java开发工程师&#xff0c;相信大家对Spring种事务的使用并不陌生。但是你可能只是停留在基础的使用层面上&#xff0c;在遇到一些比较特殊的场景&#xff0c;事务可能没有生效&#xff0c;直接在生产上暴露了&#xff0c;这可能就会导致比较严重的生产事故。今天&a…

产业智能化创新标杆 2022年度“飞桨产业应用创新奖”颁布

随着AI进入工业大生产阶段&#xff0c;更多垂直行业正在与AI深度融合&#xff0c;更多创新实践不断涌现。日前&#xff0c;WAVE SUMMIT 2022深度学习开发者峰会在线上召开&#xff0c;百度发布了飞桨产业级深度学习平台和文心大模型的生态成果和最新进展&#xff0c;重磅颁发了…

MySQL高级【索引分类】

目录 1&#xff1a;索引分类 1.1&#xff1a;索引分类 1.2&#xff1a;聚集索引&二级索引 2&#xff1a;索引语法 1&#xff1a;索引分类 1.1&#xff1a;索引分类 在MySQL数据库&#xff0c;将索引的具体类型主要分为以下几类&#xff1a;主键索引、唯一索引、常规索…

(2022年12月最新)spring-core-rce漏洞复现CVE-2022-22965

1、漏洞简介 2022年3月29日&#xff0c;Spring框架曝出RCE 0day漏洞。已经证实由于 SerializationUtils#deserialize 基于 Java 的序列化机制&#xff0c;可导致远程代码执行 (RCE)&#xff0c;使用JDK9及以上版本皆有可能受到影响。 通过该漏洞可写入webshell以及命令执行。在…

数据结构C语言版 —— 二叉树的顺序存储堆的实现

二叉树顺序结构实现(堆) 1. 堆的概念 堆在物理上是一个一维数组&#xff0c;在逻辑上是一颗完全二叉树满足父亲节点小于等于孩子节点的叫做小堆或者小根堆满足父亲节点大于等于孩子节点的叫做大堆或者大根堆 堆的孩子和父亲的下标关系 已知父亲(parent)的下标 左孩子(left)下…

基于昇思MindSpore实现使用胶囊网络的图像描述生成算法

基于昇思MindSpore实现使用胶囊网络的图像描述生成算法 项目链接 https://github.com/Liu-Yuanqiu/acn_mindspore 01 项目描述 1.1 图像描述生成算法 人类可以轻易的使用语言来描述所看到的场景&#xff0c;但是计算机却很难做到&#xff0c;图像描述生成任务的目的就是教…

昇思MindSpore动静结合中list和dict方法实现

01 概述 静态图和动态图是神经学习框架中的重要概念&#xff0c;昇思MindSpore同时支持动态图和静态图两种模式&#xff0c;在动态图与静态图的结合方面做了很多工作。本文以昇思MindSpore框架中图模式下list和dict的实现方式为例&#xff0c;介绍昇思MindSpore框架中的动静结…

C与C++如何互相调用

个人主页&#xff1a;董哥聊技术我是董哥&#xff0c;嵌入式领域新星创作者创作理念&#xff1a;专注分享高质量嵌入式文章&#xff0c;让大家读有所得&#xff01;文章目录1、为什么会有差异&#xff1f;2、extern "C"3、C调用C正确方式4、C调用C5、总结在项目开发过…

[第十二届蓝桥杯/java/算法]C——卡片

&#x1f9d1;‍&#x1f393;个人介绍&#xff1a;大二软件生&#xff0c;现学JAVA、Linux、MySQL、算法 &#x1f4bb;博客主页&#xff1a;渡过晚枫渡过晚枫 &#x1f453;系列专栏&#xff1a;[编程神域 C语言]&#xff0c;[java/初学者]&#xff0c;[蓝桥杯] &#x1f4d6…

中外法律文献查找下载常用数据库大盘点

中外法律文献查找下载常用数据库有&#xff1a; 一、Westlaw&#xff08;法律全文数据库&#xff09; 是法律出版集团Thomson Legal and Regulator’s于1975年开发的&#xff0c;为国际法律专业人员提供的互联网的搜索工具。 Westlaw International其丰富的资源来自法律、法规…

图(Graph)详解 - 数据结构

文章目录&#xff1a;图的基本概念图的存储结构邻接矩阵邻接矩阵的实现邻接表邻接表实现图的遍历图的广度优先搜索&#xff08;BFS&#xff09;图的深度优先搜索&#xff08;DFS&#xff09;最小生成树Kruskal算法Prim算法最短路径单源最短路径 - Dijkstra算法单源最短路径 - B…

Linux学习-91-Discuz论坛安装

17.22 Discuz论坛安装 通过 Discuz! 搭建社区论坛、知识付费网站、视频直播点播站、企业网站、同城社区、小程序、APP、图片素材站&#xff0c;游戏交流站&#xff0c;电商购物站、小说阅读、博客、拼车系统、房产信息、求职招聘、婚恋交友等等绝大多数类型的网站。Discuz!自2…

《教养的迷思》

在读《穷查理宝典》时&#xff0c;查理芒格在有一讲&#xff0c;专门谈及《教养的迷思》一书&#xff0c;说到作者朱迪斯哈里斯。查理芒格认为哈里斯在探求真理的道路上走得很顺利&#xff0c;取得成功的因素之一就是她热衷于摧毁自己的观念。 朱迪斯在书的开端首先严肃地纠正了…

【案例教程】无人机生态环境监测、图像处理与GIS数据分析综合实践

【查看原文】无人机生态环境监测、图像处理与GIS数据分析综合实践技术应用 构建“天空地”一体化监测体系是新形势下生态、环境、水文、农业、林业、气象等资源环境领域的重大需求&#xff0c;无人机生态环境监测在一体化监测体系中扮演着极其重要的角色。通过无人机航空遥感技…

Fabric系列 - 多通道技术(Muti-channel)

可在节点&#xff0c;通道和联盟级别上配置。 一个Fabric网络中能够运行多个账本&#xff0c;每个通道间的逻辑相互隔离不受影响&#xff0c;如下图所示&#xff0c;每种颜色的线条代表一个逻辑上的通道&#xff0c;每个Peer节点可以加入不同的通道&#xff0c;每个通道都拥有…

AI编译器XLA调研

文章目录一、XLA简介二、XLA在TensorFlow中的应用2.1 XLA是什么&#xff1f;&#xff08;tensorflow\compiler\xla&#xff09;2.2 TensorFlow怎样转化为XLA &#xff08;tensorflow\compiler\tf2xla&#xff09;2.3 JIT(just in time) 即时编译 &#xff08;tensorflow\compil…

【大数据技术Hadoop+Spark】Flume、Kafka的简介及安装(图文解释 超详细)

Flume简介 Flume是Cloudera提供的一个高可用、高可靠、分布式的海量日志采集、聚合和传输的系统&#xff0c;Flume支持在日志系统中定制各类数据发送方&#xff0c;用于收集数据&#xff1b;同时&#xff0c;Flume提供对数据进行简单处理&#xff0c;并写到各种数据接受方&…

NLP学习笔记(四) Seq2Seq基本介绍

大家好&#xff0c;我是半虹&#xff0c;这篇文章来讲序列到序列模型 (Sequence To Sequence, Seq2Seq) 本文写作思路如下&#xff1a; 从循环神经网络的应用场景引入&#xff0c;介绍循环神经网络作为编码器和解码器使用&#xff0c;最后是序列到序列模型 在之前的文章中&am…