线程池的使用案例一

news2025/1/17 3:46:15

一、配置线程池

1、不推荐的方式

ExecutorService executorService = Executors.newFixedThreadPool();  // 创建⼀个固定⼤⼩的线程池,可控制并发的线程数,超出的线程会在队列中等待;
ExecutorService executorService = Executors.newCachedThreadPool();  // 创建⼀个可缓存的线程池,若线程数超过处理所需,缓存⼀段时间后会回收,若线程数不够,则新建线程;
ExecutorService executorService = Executors.newSingleThreadExecutor();  // 创建单个线程数的线程池,它可以保证先进先出的执⾏顺序;
ExecutorService executorService = Executors.newScheduledThreadPool();  // 创建⼀个可以执⾏延迟任务的线程池;
ExecutorService executorService = Executors.newSingleThreadScheduledExecutor();  // 创建⼀个单线程的可以执⾏延迟任务的线程池;
ExecutorService executorService = Executors.newWorkStealingPool();  // 创建⼀个抢占式执⾏的线程池(任务执⾏顺序不确定)【JDK1.8 添加】。

2、原始方式

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolUtil {

    // 核心线程数
    private static int corePoolSize =10;
    // 最大线程数
    private static int maxmumPoolSize =30;
    // 空闲存活时间
    private static long keepTime = 30;
    // 时间单位
    private static TimeUnit unit = TimeUnit.SECONDS;
    // 任务队列
    private static ArrayBlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(1000);
    // 创建线程工厂
    private static ThreadFactory threadFactory1 =  Executors.defaultThreadFactory();

    private static ThreadPoolExecutor.AbortPolicy policy = new ThreadPoolExecutor.AbortPolicy();

    public static void main(String[] args) throws Exception{

        ExecutorService executorService1 = Executors.newFixedThreadPool(10);
        ExecutorService executorService = new ThreadPoolExecutor(
                corePoolSize,
                maxmumPoolSize,
                keepTime,
                unit,
                blockingQueue,
                threadFactory1,
                policy);
        executorService.execute(new Runnable(){
            public void run(){
                System.out.println("new Runnable!");
            };
        });
         /*
          //线程池拒接收新提交的任务,同时立马关闭线程池,线程池里的任务不再执行。
          executorService.shutdownNow();
         */
         /*
          //线程池拒接收新提交的任务,同时等待线程池里的任务执行完毕后关闭线程池。
          executorService.shutdown();
         */

        // 这个方法会使线程等待timeout时长,当超过timeout时间后,会监测ExecutorService是否已经关闭,若关闭则返回true,
        // 否则返回false,一般情况下会和shutdown方法组合使用。
        boolean boole = executorService.awaitTermination(3,TimeUnit.SECONDS);
    }

}

3、Spring的方式

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Configuration
@EnableAsync  // 同一个类的中调用无效
public class ThreadPoolConfig {

    // 获取服务器的cpu个数
    private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors(); // 获取cpu个数
    // 核心线程数
    private static final int COUR_SIZE = CPU_COUNT * 2;
    // 最大线程数
    private static final int MAX_COUR_SIZE = CPU_COUNT * 4;
    // 队列容量
    private static final int QUEUE_SIZE = CPU_COUNT * 4 * 4;
    // 空闲存活时间
    private static long keepTime = 30;
    // 时间单位
    private static TimeUnit unit = TimeUnit.SECONDS;

    // 任务队列
    // private static ArrayBlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(10*10000);

    @Bean(name = "asyncDownLoadExcelExecutor")
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        // ThreadPoolTaskScheduler
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        // 设置核心线程数
        threadPoolTaskExecutor.setCorePoolSize(COUR_SIZE);
        // 配置最大线程数
        threadPoolTaskExecutor.setMaxPoolSize(MAX_COUR_SIZE);
        // 配置队列容量(这里设置成最大线程数的四倍)
        threadPoolTaskExecutor.setQueueCapacity(QUEUE_SIZE);
        // 默认是 60s,这里设置 30s
        threadPoolTaskExecutor.setKeepAliveSeconds(30);
        // 给线程池设置名称
        threadPoolTaskExecutor.setThreadNamePrefix("async-download-excel");
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 设置任务的拒绝策略
        return threadPoolTaskExecutor;
    }

    @Bean(name = "asyncUploadExcelExecutor")
    public ThreadPoolTaskExecutor asyncUploadExcelExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor =  new ThreadPoolTaskExecutor();
        // 设置核心线程数
        threadPoolTaskExecutor.setCorePoolSize(5);
        // 设置最大线程数
        threadPoolTaskExecutor.setMaxPoolSize(10);
        // 设置阻塞队列大小
        threadPoolTaskExecutor.setQueueCapacity(999);
        // 默认是 60s,这里设置30s
        threadPoolTaskExecutor.setKeepAliveSeconds(30);
        // 设置线程池中线程名前缀
        threadPoolTaskExecutor.setThreadNamePrefix("async-upload-excel");
        //当达到 MaxPoolSize 不再调用新线程,用调用者所在线程之星异步任务。
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return threadPoolTaskExecutor;
    }

}

二、百万数据的导出(生成多个文件,统一压缩)

2.1、引入依赖

 <dependency>
    <groupId>org.apache.poi</groupId>
    <artifactId>poi-ooxml</artifactId>
    <version>4.0.1</version>
 </dependency>

2.2、实体类

public class PersonEntity {

    private Long id;

    private String name;

    private Integer age;

    private String address;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }
}

2.3、Controller

    /**
     * 导出方法
     * 此处应注意 线程池拒绝策略 抛出的异常,若任务过大,则走降级方法。
     */
    public void exportMillionData(HttpServletRequest request, HttpServletResponse response) throws IOException {
        // 表格表头
        String[] TITLE = new String[]{ "姓名", "年龄", "地址"};
        // 获取数据进行分割
        int count = 100*10000; // personService.count();
        int pageSize = 50000;
        // 获取批次数
        int tableNum = count % pageSize == 0 ? (count / pageSize) : (count / pageSize) + 1;

        // 将数据多线程方式导出到excel
        CountDownLatch latch = new CountDownLatch(tableNum);
        for (int i = 0; i < tableNum; i++) {
          exportDataToExcel(latch, TITLE, pageSize, i);
        }
        try {
            // 阻塞 —— 等待全部执行完
            latch.await();
            // 压缩响应
            // 处理中文名不显示的问题
            String fileName = URLEncoder.encode("人员信息.zip", "UTF-8");
            response.setContentType("application/octet-stream;charset=UTF-8");
            response.setContentType("application/x-zip-compressed;charset=UTF-8");
            response.setHeader("Content-Disposition", "attachment;filename=" + fileName);
            response.addHeader("Pargam", "no-cache");
            response.addHeader("Cache-Control", "no-cache");
            response.addHeader("Access-Contro1-A11ow-0rigin", "*");
            File zip = ZipUtil.zip(new File("D://file/sys/"));
            ServletOutputStream output = response.getOutputStream();
            FileInputStream input = new FileInputStream(zip);
            byte[] buff = new byte[1024 * 10];
            int len = 0;
            while ((len = input.read(buff)) > -1) {
                output.write(buff, 0, len);
            }
            output.flush();
            output.close();
            if (zip.exists()) {
                zip.delete();
            }
        } catch (InterruptedException e){
            e.printStackTrace();
        }finally {
            FileUtil.deleteDir(new File("D://file/sys/"));
        }
    }

2.4、Service

    /**
     * 导出数据到 Excel
     * @param latch    锁
     * @param TITLE    表格头
     * @param pageSize 每个sheet的记录数
     * @param first    表格序号
     */
    @Async("asyncDownLoadExcelExecutor")
    public void exportDataToExcel(CountDownLatch latch, String[] TITLE, int pageSize, int first) throws IOException {
       //  IPage page = new Page();
       //  page.setCurrent(i + 1);
       //  page.setSize(pageSize);
        List<PersonEntity> records = new ArrayList<>();// personService.page(page).getRecords();
        int start = first * pageSize;
        int end = start + pageSize;
        String fileName = start + "-" + end + "人员信息" + ".xlsx";
        // 写出到本地的excel文件中
        SXSSFWorkbook wb = new SXSSFWorkbook();
        Sheet sheet = wb.createSheet(fileName);
        Row row = sheet.createRow(0);
        Cell cell = null;
        // 写标题
        for (int j = 0; j < TITLE.length; j++) {
            cell = row.createCell(j);
            cell.setCellValue(TITLE[j]);
        }
        // 写内容
        int rowNum = 1;
        for (PersonEntity entity : records) {
            row = sheet.createRow(rowNum++);
            row.createCell(0).setCellValue(entity.getName());
            row.createCell(1).setCellValue(entity.getAge());
            row.createCell(2).setCellValue(entity.getAddress());
        }
        fileName = new String(fileName.getBytes(), "UTF-8");
        File file = new File("D://file/sys/" + fileName);
        if (!file.exists()) {
            file.getParentFile().mkdirs();
        }
        FileOutputStream outputStream = new FileOutputStream(file);
        wb.write(outputStream);
        outputStream.flush();
        outputStream.close();
        latch.countDown();
    }

三、多线程插入数据 (类似分布式的TCC)

1、引入依赖

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jdbc</artifactId>
    <version>5.2.9.RELEASE</version>
</dependency>

2、定义线程池

@Bean(name = "asyncInsertDataExecutor")
public ThreadPoolTaskExecutor asyncUploadExcelExecutor() {
    ThreadPoolTaskExecutor threadPoolTaskExecutor =  new ThreadPoolTaskExecutor();
    // 设置核心线程数
    threadPoolTaskExecutor.setCorePoolSize(5);
    // 设置最大线程数
    threadPoolTaskExecutor.setMaxPoolSize(10);
    // 设置阻塞队列大小
    threadPoolTaskExecutor.setQueueCapacity(999);
    // 默认是 60s,这里设置30s
    threadPoolTaskExecutor.setKeepAliveSeconds(30);
    // 设置线程池中线程名前缀
    threadPoolTaskExecutor.setThreadNamePrefix("async-upload-excel");
    //当达到 MaxPoolSize 不再调用新线程,用调用者所在线程之星异步任务。
    threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    return threadPoolTaskExecutor;
}

3、Controller

@Autowired
private PlatformTransactionManager transactionManager;

public String insertData() throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(10);
    AtomicReference<Boolean> rollback = new AtomicReference<>(false);
    // 先在开启多线程外面,定义一个同步集合:
    List<TransactionStatus> transactionStatuses = Collections.synchronizedList(new ArrayList<>());
        // 调用线程方法
        for(int i=0;i<10;i++){
            exportDataToExcel(latch,rollback,transactionStatuses,new ArrayList<>());
        }
    // 阻塞 —— 等待全部执行完
    latch.await();
    // 如果出错回滚事务
    if (rollback.get()) {
        transactionStatuses.forEach(status -> transactionManager.rollback(status));
        return " 插入失败 ";
    } else {
        transactionStatuses.forEach(status -> transactionManager.commit(status));
        return " 插入成功 ";
    }
}

4、Service

@Async("asyncInsertDataExecutor")
public void exportDataToExcel(CountDownLatch latch,AtomicReference<Boolean> rollback,List<TransactionStatus> transactionStatuses,List<Object> list) {
    try {
        // 开启事务(可封装成方法)
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
        TransactionStatus status = transactionManager.getTransaction(def);
        transactionStatuses.add(status);

        // ....  业务代码
        list.clear();

    } catch (Exception e) {
        rollback.set(true);
        e.printStackTrace();
    }
    latch.countDown();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/843600.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

无监督SLAM框架

因为之前做过双目立体匹配相关项目&#xff0c;当时就了解到有一些无监督方法。最近涉及到SLAM相关项目&#xff0c;想到理论上可以用photo loss无监督学习光流和pose&#xff0c;因此查看了有没有SLAM无监督的相关论文。发现确实有几篇论文。但总感觉设计的不是很好&#xff0…

【计算机视觉|风格迁移】PP-GAN:使用GAN的地标提取器将韩国人像的风格转化为身份证照片

本系列博文为深度学习/计算机视觉论文笔记&#xff0c;转载请注明出处 标题&#xff1a;PP-GAN : Style Transfer from Korean Portraits to ID Photos Using Landmark Extractor with GAN 链接&#xff1a;[2306.13418] PP-GAN : Style Transfer from Korean Portraits to ID…

单元测试、接口测试、功能测试的区别

单元测试、接口测试、功能测试的区别 自动化测试分为单元自动化测试、接口自动化测试和功能自动化测试 功能测试的进行&#xff1a;首先编写测试用例&#xff0c;测试用例中最主要的是测试步骤和预期结果&#xff1b;测试人员根据测试用例执行操作步骤&#xff0c;然后通过眼睛…

想知道程序员工资最高的城市吗?来看看这10个城市排名吧

大家好&#xff0c;这里是程序员晚枫。 在下班回家的路上&#xff0c;又和同事聊起重庆工资不高的问题&#xff0c;那么国内哪些城市程序员工资最高呢&#xff1f;今天我们一起来看一下&#xff1a;我国程序员工资最高的前10个城市。 以下是我国程序员工资最高的前10个城市&a…

使用 POI 在 Word 中重新开始编号、自定义标题格式

效果图 引入依赖 <!-- https://mvnrepository.com/artifact/org.apache.poi/poi --><dependency><groupId>org.apache.poi</groupId><artifactId>poi</artifactId><version>4.1.2</version></dependency><!-- https…

刷题笔记 day8

1004 最大连续1的个数 III 这道题要求将原数组中的0翻转成1&#xff0c;求出最大元素全是1的子数组长度&#xff0c;看这道题第一感觉还要将里面的0变成1&#xff0c;感觉这道题解决起来很麻烦&#xff0c;但是我们可以转变思路&#xff0c;找出其最大子数组&#xff0c;使得子…

git 版本管理工具 学习笔记

git 学习笔记 目录 一、git是什么 二、创建仓库 三、工作区域和文件状态 四、添加和提交文件 五、回退版本 &#xff08;了解&#xff09; 六、查看差异 七、删除文件 八、.gitignore文件&#xff08;了解&#xff09; 九、github ssh-key配置 十、本地仓库和远程仓库内…

Shell - 备份mysql的N种姿势

文章目录 mysqldump --help备份mysql的N种姿势 mysqldump --help mysqldump 是一个常用的命令行工具&#xff0c;用于备份和还原 MySQL 数据库。 [rootVM-24-3-centos blg]# mysqldump --help mysqldump Ver 10.13 Distrib 5.6.50, for Linux (x86_64) Copyright (c) 2000,…

node爬取中国行政区域geo数据

依赖 {"dependencies": {"axios": "^1.4.0","colors": "^1.4.0","express": "^4.18.2","fs": "^0.0.1-security"} }数据来源 https://datav.aliyun.com/portal/school/atlas/are…

数据可视化(八)堆叠图,双y轴,热力图

1.双y轴绘制 #双Y轴可视化数据分析图表 #add_subplot() dfpd.read_excel(mrbook.xlsx) x[i for i in range(1,7)] y1df[销量] y2df[rate] #用来正常显示负号 plt.rcParams[axes.unicode_minus]False figplt.figure() ax1fig.add_subplot(1,1,1)#一行一列&#xff0c;第一个区域…

《Zookeeper》从零开始学Zookeeper源码(二)之数据序列化与通信协议

目录 序列化与反序列化通信协议请求头的数据结构响应头的数据结构 序列化与反序列化 zookeeper的客户端与服务端、服务端与服务端之间会进行一系列的网络通信&#xff0c;在进行数据的传输过程中就涉及到序列化与反序列化&#xff0c;zookeeper使用Jute作为它的序列化组件&…

FreeRTOS的线程间通信

一、分类 FreeRTOS的线程间通信分为这几大类 由于我还在学习中&#xff0c;目前显从信号开始记录学习 二、逐块讲解 1、信号&#xff08;osSignalWait osSignalSet&#xff09; FreeRTOS从V8.2.0版本开始提供任务通知这个功能&#xff0c;每个任务多有一个32位的通知值&am…

OLAP ModelKit Crack,ADO.NET和IList

OLAP ModelKit Crack,ADO.NET和IList OLAP ModelKit是一个多功能的.NET OLAP组件&#xff0c;用C#编写&#xff0c;只包含100%托管代码。它具有XP主题的外观&#xff0c;并能够使用任何.NET数据源(ADO.NET和IList)。借助任何第三方组件(尤其是图表组件)呈现数据的能力扩展了产品…

flink kafka消费者如何处理kafka主题的rebalance

背景&#xff1a; 我们日常使用kafka客户端消费kafka主题的消息时&#xff0c;当消费者退出/加入消费者组&#xff0c;kafka主题分区数有变等事件发生时&#xff0c;都会导致rebalance的发生&#xff0c;此时一般情况下&#xff0c;如果我们不自己处理offset&#xff0c;我们不…

那些被忽视的Python核心功能...

最实用、最简单、最优美……近些年&#xff0c;大家学习Python的热潮从未消退&#xff1b;无论是数据分析还是科学计算都少不了Python的身影。 Python也没有让人失望&#xff0c;Java用100行代码写出的程序&#xff0c;用Python十行就能搞定&#xff01; 当你要说Hello World…

银河麒麟QT连接DM8数据库

1. 安装达梦8 官网下载, 按照官方文档进行安装即可. 2. 安装unixodbc 1> 下载odbc安装包 unixODBC-2.3.7pre.tar.gz 2> 解压 tar -xvf unixODBC-2.3.7pre.tar.gz3> 编译 ./configure -prefix /usr/local make && make install4> 查找配置 odbcinst -j5…

Redis集群部署(docker-compose)

更多更新信息请关注“技术客格”公众号 使用3主+3从的部署方式 一、服务器规划 序号 服务器 端口 节点名称 备注 1 192.168.1.120 6179 redis-1 2 192.168.1.1206279 redis-2 3 192.168.1.1206379 redis-3 4 192.168.1.1206479 redis-4 5 192.168.1.1206579 redis-5 6 192.…

多线程的创建,复习匿名内部类,Thread的一些方法,以及lambda的变量捕捉,join用法

一、&#x1f49b; Java的Thread类表示线程 1.创建类&#xff0c;继承Thread重写run方法 2.创建类&#xff0c;实现Runnable重写run方法 3.可以继承Thread重写run基于匿名内部类 4.实现Runnable重写run基于匿名内部类 5.lamdba表达式表示run方法的内容&#xff08;推荐&#x…

一、安全世界观

文章目录 1、 Web安全简史1.1 中国黑客简史1.2 黑客技术的发展历程1.3 web安全的兴起 2、黑帽子、白帽子3、安全的本质4、安全三要素5、如何实施安全评估5.1 资产等级划分5.2 威胁分析5.3 风险分析5.4 设计安全方案 6、白帽子兵法6.1 Secure By Default6.2 纵深防御原则6.3 数据…

在Raspberry Pi 4上安装Ubuntu 20.04 + ROS noetic(不带显示器)

在Raspberry Pi 4上安装Ubuntu 20.04 ROS noetic&#xff08;不带显示器&#xff09; 1. 所需设备 所需设备&#xff1a; 树莓派 4 B 型 wifi microSD 卡&#xff1a;最小 32GB MicroSD 转 SD 适配器 &#xff08;可选&#xff09;显示器&#xff0c;鼠标等 2. 树莓派…