SpringBoot用线程池ThreadPoolTaskExecutor异步处理百万级数据

news2024/11/28 14:36:10

SpringBoot用线程池ThreadPoolTaskExecutor异步处理百万级数据

 更多优秀文章,请扫码关注个人微信公众号或搜索“程序猿小杨”添加。

一、背景:

    利用ThreadPoolTaskExecutor多线程异步批量插入,提高百万级数据插入效率。ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装处理。ThreadPoolTaskExecutor是ThreadPoolExecutor的封装,所以,性能更加优秀,推荐ThreadPoolTaskExecutor。

二、具体细节:

2.1、配置application.yml

# 异步线程配置 自定义使用参数async:  executor:    thread:      core_pool_size:  10  # 配置核心线程数 默认8个 核数*2+2      max_pool_size:  100   # 配置最大线程数      queue_capacity:  99988  # 配置队列大小      keep_alive_seconds:  20  #设置线程空闲等待时间秒s      name:        prefix: async-thread-  # 配置线程池中的线程的名称前缀

2.2、ThreadPoolConfig配置注入Bean

package com.wonders.common.config;import cn.hutool.core.thread.ThreadFactoryBuilder;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.scheduling.annotation.EnableAsync;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;
/** * @Description: TODO:利用ThreadPoolTaskExecutor多线程批量执行相关配置 * 自定义线程池 * 发现不是线程数越多越好,具体多少合适,网上有一个不成文的算法:CPU核心数量*2 +2 个线程。 * @Author: yyalin * @CreateDate: 2022/11/6 11:56 * @Version: V1.0 */@Configuration@EnableAsync@Slf4jpublic class ThreadPoolConfig {    //自定义使用参数    @Value("${async.executor.thread.core_pool_size}")    private int corePoolSize;   //配置核心线程数    @Value("${async.executor.thread.max_pool_size}")    private int maxPoolSize;    //配置最大线程数    @Value("${async.executor.thread.queue_capacity}")    private int queueCapacity;    @Value("${async.executor.thread.name.prefix}")    private String namePrefix;    @Value("${async.executor.thread.keep_alive_seconds}")    private int keepAliveSeconds;
    //1、自定义asyncServiceExecutor线程池    @Bean(name = "asyncServiceExecutor")    public ThreadPoolTaskExecutor asyncServiceExecutor() {        log.info("start asyncServiceExecutor......");        //在这里修改        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();        //配置核心线程数        executor.setCorePoolSize(corePoolSize);        //配置最大线程数        executor.setMaxPoolSize(maxPoolSize);        //设置线程空闲等待时间 s        executor.setKeepAliveSeconds(keepAliveSeconds);        //配置队列大小 设置任务等待队列的大小        executor.setQueueCapacity(queueCapacity);        //配置线程池中的线程的名称前缀        //设置线程池内线程名称的前缀-------阿里编码规约推荐--方便出错后进行调试        executor.setThreadNamePrefix(namePrefix);        // rejection-policy:当pool已经达到max size的时候,如何处理新任务        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());        //执行初始化        executor.initialize();        return executor;    }    /**     * 2、公共线程池,利用系统availableProcessors线程数量进行计算     */    @Bean(name = "commonThreadPoolTaskExecutor")    public ThreadPoolTaskExecutor commonThreadPoolTaskExecutor() {        ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();        int processNum = Runtime.getRuntime().availableProcessors(); // 返回可用处理器的Java虚拟机的数量        int corePoolSize = (int) (processNum / (1 - 0.2));        int maxPoolSize = (int) (processNum / (1 - 0.5));        pool.setCorePoolSize(corePoolSize); // 核心池大小        pool.setMaxPoolSize(maxPoolSize); // 最大线程数        pool.setQueueCapacity(maxPoolSize * 1000); // 队列程度        pool.setThreadPriority(Thread.MAX_PRIORITY);        pool.setDaemon(false);        pool.setKeepAliveSeconds(300);// 线程空闲时间        return pool;    }   //3自定义defaultThreadPoolExecutor线程池    @Bean(name = "defaultThreadPoolExecutor", destroyMethod = "shutdown")    public ThreadPoolExecutor systemCheckPoolExecutorService() {        int maxNumPool=Runtime.getRuntime().availableProcessors();        return new ThreadPoolExecutor(3,                maxNumPool,                60,                TimeUnit.SECONDS,                new LinkedBlockingQueue<Runnable>(10000),                //置线程名前缀,例如设置前缀为hutool-thread-,则线程名为hutool-thread-1之类。                new ThreadFactoryBuilder().setNamePrefix("default-executor-thread-%d").build(),                (r, executor) -> log.error("system pool is full! "));    }
}

2.3、创建异步线程,业务类

 //1、自定义asyncServiceExecutor线程池    @Override    @Async("asyncServiceExecutor")    public void executeAsync(List<Student> students,                             StudentService studentService,                             CountDownLatch countDownLatch) {        try{            log.info("start executeAsync");            //异步线程要做的事情            studentService.saveBatch(students);            log.info("end executeAsync");        }finally {            countDownLatch.countDown();// 很关键, 无论上面程序是否异常必须执行countDown,否则await无法释放        }    }

2.4、拆分集合工具类

package com.wonders.threads;
import com.google.common.collect.Lists;import org.springframework.util.CollectionUtils;
import java.util.ArrayList;import java.util.List;
/** * @Description: TODO:拆分工具类 * 1、获取需要进行批量更新的大集合A,对大集合进行拆分操作,分成N个小集合A-1 ~ A-N; * 2、开启线程池,针对集合的大小进行调参,对小集合进行批量更新操作; * 3、对流程进行控制,控制线程执行顺序。按照指定大小拆分集合的工具类 * @Author: yyalin * @CreateDate: 2022/5/6 14:43 * @Version: V1.0 */public class SplitListUtils {    /**     * 功能描述:拆分集合     * @param <T> 泛型对象     * @MethodName: split     * @MethodParam: [resList:需要拆分的集合, subListLength:每个子集合的元素个数]     * @Return: java.util.List<java.util.List<T>>:返回拆分后的各个集合组成的列表     * 代码里面用到了guava和common的结合工具类     * @Author: yyalin     * @CreateDate: 2022/5/6 14:44     */    public static <T> List<List<T>> split(List<T> resList, int subListLength) {        if (CollectionUtils.isEmpty(resList) || subListLength <= 0) {            return Lists.newArrayList();        }        List<List<T>> ret = Lists.newArrayList();        int size = resList.size();        if (size <= subListLength) {            // 数据量不足 subListLength 指定的大小            ret.add(resList);        } else {            int pre = size / subListLength;            int last = size % subListLength;            // 前面pre个集合,每个大小都是 subListLength 个元素            for (int i = 0; i < pre; i++) {                List<T> itemList = Lists.newArrayList();                for (int j = 0; j < subListLength; j++) {                    itemList.add(resList.get(i * subListLength + j));                }                ret.add(itemList);            }            // last的进行处理            if (last > 0) {                List<T> itemList = Lists.newArrayList();                for (int i = 0; i < last; i++) {                    itemList.add(resList.get(pre * subListLength + i));                }                ret.add(itemList);            }        }        return ret;    }
    /**     * 功能描述:方法二:集合切割类,就是把一个大集合切割成多个指定条数的小集合,方便往数据库插入数据     * 推荐使用     * @MethodName: pagingList     * @MethodParam:[resList:需要拆分的集合, subListLength:每个子集合的元素个数]     * @Return: java.util.List<java.util.List<T>>:返回拆分后的各个集合组成的列表     * @Author: yyalin     * @CreateDate: 2022/5/6 15:15     */    public static <T> List<List<T>> pagingList(List<T> resList, int pageSize){        //判断是否为空        if (CollectionUtils.isEmpty(resList) || pageSize <= 0) {            return Lists.newArrayList();        }        int length = resList.size();        int num = (length+pageSize-1)/pageSize;        List<List<T>> newList =  new ArrayList<>();        for(int i=0;i<num;i++){            int fromIndex = i*pageSize;            int toIndex = (i+1)*pageSize<length?(i+1)*pageSize:length;            newList.add(resList.subList(fromIndex,toIndex));        }        return newList;    }
    // 运行测试代码 可以按顺序拆分为11个集合    public static void main(String[] args) {        //初始化数据        List<String> list = Lists.newArrayList();        int size = 19;        for (int i = 0; i < size; i++) {            list.add("hello-" + i);        }        // 大集合里面包含多个小集合        List<List<String>> temps = pagingList(list, 100);        int j = 0;        // 对大集合里面的每一个小集合进行操作        for (List<String> obj : temps) {            System.out.println(String.format("row:%s -> size:%s,data:%s", ++j, obj.size(), obj));        }    }
}

2.5、造数据,多线程异步插入

 public int batchInsertWay() throws Exception {        log.info("开始批量操作.........");        Random rand = new Random();        List<Student> list = new ArrayList<>();        //造100万条数据        for (int i = 0; i < 1000003; i++) {            Student student=new Student();            student.setStudentName("大明:"+i);            student.setAddr("上海:"+rand.nextInt(9) * 1000);            student.setAge(rand.nextInt(1000));            student.setPhone("134"+rand.nextInt(9) * 1000);            list.add(student);        }        //2、开始多线程异步批量导入        long startTime = System.currentTimeMillis(); // 开始时间        //boolean a=studentService.batchInsert(list);        List<List<Student>> list1=SplitListUtils.pagingList(list,100);  //拆分集合        CountDownLatch countDownLatch = new CountDownLatch(list1.size());        for (List<Student> list2 : list1) {            asyncService.executeAsync(list2,studentService,countDownLatch);        }        try {            countDownLatch.await(); //保证之前的所有的线程都执行完成,才会走下面的;            long endTime = System.currentTimeMillis(); //结束时间            log.info("一共耗时time: " + (endTime - startTime) / 1000 + " s");            // 这样就可以在下面拿到所有线程执行完的集合结果        } catch (Exception e) {            log.error("阻塞异常:"+e.getMessage());        }        return list.size();
    }

2.6、测试结果

10个核心线程:

20个核心线程

50个核心线程:

汇总结果:

序号

核心线程(core_pool_size)

插入数据(万)耗时(秒)
110100w31s
215100w28s
350100w27s

结论:对不同线程数的测试,发现不是线程数越多越好,具体多少合适,网上有一个不成文的算法:CPU核心数量*2 +2 个线程。

个人推荐配置:

int processNum = Runtime.getRuntime().availableProcessors(); // 返回可用处理器的Java虚拟机的数量int corePoolSize = (int) (processNum / (1 - 0.2));int maxPoolSize = (int) (processNum / (1 - 0.5));

更多优秀文章,请扫码关注个人微信公众号或搜索“程序猿小杨”添加。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/646636.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AI+低代码:开发革命的崭新纪元!带你一文速通了解

信息技术、通信技术和计算能力的迅速发展&#xff0c;AI技术在诸多领域中掀起了一股革命浪潮&#xff0c;成为推动社会进步和发展的重要力量&#xff0c;也是许多国家和企业日益重视和投资的方向。 而全球化和技术革新的深入推进&#xff0c;传统产业对劳动力的需求逐渐减少&am…

阿里飞猪三面

&#xff08;有许多人是用青春的幸福作成功的代价的。——莫扎特&#xff09; 背景 该岗位是阿里飞猪的前端部门&#xff0c;岗位名称是node.js高级/专家开发工程师。主要负责用NodeJs作为后端技术&#xff0c;向上层Java&#xff0c;Node等业务服务&#xff0c;提供中间层基础…

操作教程:如何正确配置让EasyNVR级联至EasyNVS平台?

EasyNVS是EasyNVR的云管理平台&#xff0c;可实现内网监控上云&#xff0c;视频汇聚等功能。近期经常有用户咨询EasyNVR如何级联至EasyNVS平台进行云端统计和管理&#xff0c;在今天的文章中&#xff0c;我们来详细介绍一下。 1、配置EasyNVS 1&#xff09;运行EasyNVS之前&a…

Quarkus - 发布JSON Restful服务

目标 基于实现第一个Hello World发布一个JSON的Restful服务&#xff0c;该服务提供GET,POST,DELETE三个接口&#xff0c;分别是获取水果列表&#xff0c;添加水果&#xff0c;根据水果名字删除水果。 发布Rest服务 POM配置 添加jackson扩展的依赖 <dependency><gr…

Oralce系列十八:Oracle RAC

Oracle RAC 1. Oracle RAC介绍1.1 基本概念1.2 Oracle RAC应用场景1.3 Oracle RAC的优缺点 2. Oracle RAC架构3. Oracle RAC 的安装 1. Oracle RAC介绍 1.1 基本概念 Oracle RAC&#xff08;Oracle Real Application Server Cluster&#xff09;是一种分布式数据库解决方案&a…

涂鸦T2-U开发板快速入门

文章目录 T2-U模组概述特性 1、环境搭建2、SDK下载3、编译3.1、 命令编译3.2、 命令编译清除3.3、 Wind-IDE 编译3.4、编译完成 4、下载4.1、 GUI工具4.2、Wind IDE一键下载 5、 运行 涂鸦 T2-U 开发板 是一款专为开发者打造的智能硬件产品原型开发板。它可与其他功能电路模组或…

App的回归测试,有什么高效的测试方法?

直接抛出观点&#xff1a;高效的测试方法当然有&#xff0c;那就是采用【接口自动化】。 为了系统阐述这个问题&#xff0c;让你能有较强烈的获得感&#xff0c;本篇文章将采用下列结构进行展开&#xff1a; 1、回归测试&#xff0c;测哪些东西&#xff1f; 2、传统的回归测试…

容器认证有什么等级?考试内容是什么?

信息通信行业是进几十年新兴起的一个行业&#xff0c;对我们的生活产生了巨大的影响&#xff0c;传统的购物、出行方式发生了巨大的变化&#xff0c;而且我们的眼界更加开阔。可以了解到世界各地的风土人情&#xff0c;这一切都离不开信息通信技术的发展&#xff0c;同时市场为…

拥有Android开发经验? 走出焦虑圈,车载应用开发正好合适你!

行业前景 当前&#xff0c;车联网已成为智能交通的重要发展方向之一。车载Android应用可以为车辆提供智能化服务&#xff0c;帮助驾驶者增强车辆控制和安全&#xff0c;提供大量娱乐和信息服务&#xff0c;如导航、音乐、天气预报、电子地图等。而且&#xff0c;车联网相关政策…

为什么APP安全很重要?APP盾如何提供帮助?

为什么APP安全很重要&#xff1f; APP安全是流程、功能、控制、功能、最佳实践和工具的有效和高效组合&#xff0c;用于通过主动查找和保护各种漏洞来确保各类APP/软件的安全。 应用安全非常重要的4个主要原因 1.确保关键数据资产的安全 数据是新的石油&#xff0c;攻击者最…

抖音seo矩阵号源码开发分享(一)

抖音SEO矩阵系统源码开发&#xff0c;需要遵循一下步骤&#xff1a; 1. 确定需求和功能&#xff1a;明确系统的主要目标和需要实现的功能&#xff0c;包括关键词研究、短视频制作、外链建设、数据分析、账号设置优化等方面。 2. 设计系统架构&#xff1a;根据需求和功能确定系…

运维圣经:DDos攻击应急响应指南

目录 DDos攻击简介 DDos攻击应急响应指南 一. 问题排查 二. 临时处置 三. 研判溯源 四. 清楚加固 DDos攻击简介 分布式拒绝服务是种基于DoS的特殊形式的拒绝服务攻击&#xff0c;是一种分布、 协作的大规模攻击方式&#xff0c;主要瞄准比较大的站点&#xff0c;像商业公…

MFC没有IMEMode 想软件自动切换到英文状态以便扫码时不会变成汉字。 MFC-自定义控件Edit control。MFC禁用中文输入法

0、直接说最终解决方法&#xff1a;MFC禁用中文输入法 #include <Imm.h> // Function for Disabling IME 禁用中文输入法 void DisableIME(HWND hWnd) {HIMC m_hImc; // 全局或者成员变量//HWND hWnd pWnd->GetDlgItem(IDC_EDIT1)->m_hWnd;if (hWnd &&am…

ffmpeg 编译android mac环境编译 或者centos

ndk版本:ndk21 (可使用android studio内部下载的ndk) /Users/XXXX/Library/Android/sdk/ndk/21.4.7075529 ffmpeg下载 ffmpeg6.0 1:git clone https://git.ffmpeg.org/ffmpeg.git ffmpeg 2:或者直接点击下面按钮下载 http://ffmpeg.org/releases/ffmpeg-6.0.tar.xz 环…

极致呈现系列之:Echarts雷达图的魅力与价值

目录 什么是雷达图vue3中引入雷达图绘制简单的雷达图雷达图的数据处理什么是数据归一化处理对chartData的数据进行归一化将归一化后的数据应用到雷达图中 美化雷达图 什么是雷达图 雷达图是一种基于极坐标系的可视化图表&#xff0c;用于展示多维数据之间的关系。它通过设置不…

Cross-modal Moment Localization in Videos论文笔记

Cross-modal Moment Localization in Videos论文笔记 0.来源1.摘要2.介绍3.模型3.1语言-时间注意网络3.2损失函数3.2.1对齐损失3.2.2位置损失3.2.2损失函数 4.实验4.1数据集4.2性能比较4.3 ROLE的几种变体 5.未来工作 0.来源 2018年 Cross-modal Moment Localization in Video…

MFC-皮肤颜色集组合界面程序DlgSkinBase

虽然是小程序,编辑的源代码也很少&#xff0c;但其中的编程思想却是大型工程项目的配色经典基础。就如万丈高楼的基础中的钢筋般重要。 或者很多程序员一辈子也难接触到大型项目程序...还是那句话&#xff0c;既然缘分来了&#xff0c;不妨共享出来&#xff0c;希望能对有缘人有…

卖课软文怎么写,揭秘知识付费软文写作技巧

随着互联网的发展&#xff0c;知识付费已经成为了一种趋势。越来越多的人开始关注自我提升和职业发展&#xff0c;而知识付费也成为了他们获取知识和技能的重要途径。在这个背景下&#xff0c;卖课软文也成为了知识付费领域一种重要的推广方式。本文伯乐网络传媒将为大家揭秘卖…

hadoop本地化windows部署

文章目录 前言1. hadoop on windows1.1 安装jdk1.2 安装hadoop1.2.1 解压1.2.2 备用目录1.2.3 修改配置1.2.4 安装winutils-master1.2.5 格式化namenode1.2.6 启动hadoop1.2.7 web-ui登陆hadoop hdfs 2. spark on windows2.1 安装scala2.2 安装spark2.2.1 解压2.2.2 环境变量2.…

2年点工月薪10k,自学自动化年薪突破30W

我是农村出生的家庭&#xff0c;经济并不富裕&#xff0c;一个人奔波在大城市&#xff0c;总是很自卑。那段时间父亲身体不好&#xff0c;家里打电话说要花很多钱&#xff0c;于是我辞掉了一个月薪7k的功能测试&#xff0c;去了一个电子厂&#xff0c;每天加班加满月薪也能拿到…