一、配置线程池
1、不推荐的方式
ExecutorService executorService = Executors.newFixedThreadPool(); // 创建⼀个固定⼤⼩的线程池,可控制并发的线程数,超出的线程会在队列中等待;
ExecutorService executorService = Executors.newCachedThreadPool(); // 创建⼀个可缓存的线程池,若线程数超过处理所需,缓存⼀段时间后会回收,若线程数不够,则新建线程;
ExecutorService executorService = Executors.newSingleThreadExecutor(); // 创建单个线程数的线程池,它可以保证先进先出的执⾏顺序;
ExecutorService executorService = Executors.newScheduledThreadPool(); // 创建⼀个可以执⾏延迟任务的线程池;
ExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); // 创建⼀个单线程的可以执⾏延迟任务的线程池;
ExecutorService executorService = Executors.newWorkStealingPool(); // 创建⼀个抢占式执⾏的线程池(任务执⾏顺序不确定)【JDK1.8 添加】。
2、原始方式
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolUtil {
// 核心线程数
private static int corePoolSize =10;
// 最大线程数
private static int maxmumPoolSize =30;
// 空闲存活时间
private static long keepTime = 30;
// 时间单位
private static TimeUnit unit = TimeUnit.SECONDS;
// 任务队列
private static ArrayBlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(1000);
// 创建线程工厂
private static ThreadFactory threadFactory1 = Executors.defaultThreadFactory();
private static ThreadPoolExecutor.AbortPolicy policy = new ThreadPoolExecutor.AbortPolicy();
public static void main(String[] args) throws Exception{
ExecutorService executorService1 = Executors.newFixedThreadPool(10);
ExecutorService executorService = new ThreadPoolExecutor(
corePoolSize,
maxmumPoolSize,
keepTime,
unit,
blockingQueue,
threadFactory1,
policy);
executorService.execute(new Runnable(){
public void run(){
System.out.println("new Runnable!");
};
});
/*
//线程池拒接收新提交的任务,同时立马关闭线程池,线程池里的任务不再执行。
executorService.shutdownNow();
*/
/*
//线程池拒接收新提交的任务,同时等待线程池里的任务执行完毕后关闭线程池。
executorService.shutdown();
*/
// 这个方法会使线程等待timeout时长,当超过timeout时间后,会监测ExecutorService是否已经关闭,若关闭则返回true,
// 否则返回false,一般情况下会和shutdown方法组合使用。
boolean boole = executorService.awaitTermination(3,TimeUnit.SECONDS);
}
}
3、Spring的方式
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Configuration
@EnableAsync // 同一个类的中调用无效
public class ThreadPoolConfig {
// 获取服务器的cpu个数
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors(); // 获取cpu个数
// 核心线程数
private static final int COUR_SIZE = CPU_COUNT * 2;
// 最大线程数
private static final int MAX_COUR_SIZE = CPU_COUNT * 4;
// 队列容量
private static final int QUEUE_SIZE = CPU_COUNT * 4 * 4;
// 空闲存活时间
private static long keepTime = 30;
// 时间单位
private static TimeUnit unit = TimeUnit.SECONDS;
// 任务队列
// private static ArrayBlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(10*10000);
@Bean(name = "asyncDownLoadExcelExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
// ThreadPoolTaskScheduler
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
// 设置核心线程数
threadPoolTaskExecutor.setCorePoolSize(COUR_SIZE);
// 配置最大线程数
threadPoolTaskExecutor.setMaxPoolSize(MAX_COUR_SIZE);
// 配置队列容量(这里设置成最大线程数的四倍)
threadPoolTaskExecutor.setQueueCapacity(QUEUE_SIZE);
// 默认是 60s,这里设置 30s
threadPoolTaskExecutor.setKeepAliveSeconds(30);
// 给线程池设置名称
threadPoolTaskExecutor.setThreadNamePrefix("async-download-excel");
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 设置任务的拒绝策略
return threadPoolTaskExecutor;
}
@Bean(name = "asyncUploadExcelExecutor")
public ThreadPoolTaskExecutor asyncUploadExcelExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
// 设置核心线程数
threadPoolTaskExecutor.setCorePoolSize(5);
// 设置最大线程数
threadPoolTaskExecutor.setMaxPoolSize(10);
// 设置阻塞队列大小
threadPoolTaskExecutor.setQueueCapacity(999);
// 默认是 60s,这里设置30s
threadPoolTaskExecutor.setKeepAliveSeconds(30);
// 设置线程池中线程名前缀
threadPoolTaskExecutor.setThreadNamePrefix("async-upload-excel");
//当达到 MaxPoolSize 不再调用新线程,用调用者所在线程之星异步任务。
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return threadPoolTaskExecutor;
}
}
二、百万数据的导出(生成多个文件,统一压缩)
2.1、引入依赖
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>4.0.1</version>
</dependency>
2.2、实体类
public class PersonEntity {
private Long id;
private String name;
private Integer age;
private String address;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
}
2.3、Controller
/**
* 导出方法
* 此处应注意 线程池拒绝策略 抛出的异常,若任务过大,则走降级方法。
*/
public void exportMillionData(HttpServletRequest request, HttpServletResponse response) throws IOException {
// 表格表头
String[] TITLE = new String[]{ "姓名", "年龄", "地址"};
// 获取数据进行分割
int count = 100*10000; // personService.count();
int pageSize = 50000;
// 获取批次数
int tableNum = count % pageSize == 0 ? (count / pageSize) : (count / pageSize) + 1;
// 将数据多线程方式导出到excel
CountDownLatch latch = new CountDownLatch(tableNum);
for (int i = 0; i < tableNum; i++) {
exportDataToExcel(latch, TITLE, pageSize, i);
}
try {
// 阻塞 —— 等待全部执行完
latch.await();
// 压缩响应
// 处理中文名不显示的问题
String fileName = URLEncoder.encode("人员信息.zip", "UTF-8");
response.setContentType("application/octet-stream;charset=UTF-8");
response.setContentType("application/x-zip-compressed;charset=UTF-8");
response.setHeader("Content-Disposition", "attachment;filename=" + fileName);
response.addHeader("Pargam", "no-cache");
response.addHeader("Cache-Control", "no-cache");
response.addHeader("Access-Contro1-A11ow-0rigin", "*");
File zip = ZipUtil.zip(new File("D://file/sys/"));
ServletOutputStream output = response.getOutputStream();
FileInputStream input = new FileInputStream(zip);
byte[] buff = new byte[1024 * 10];
int len = 0;
while ((len = input.read(buff)) > -1) {
output.write(buff, 0, len);
}
output.flush();
output.close();
if (zip.exists()) {
zip.delete();
}
} catch (InterruptedException e){
e.printStackTrace();
}finally {
FileUtil.deleteDir(new File("D://file/sys/"));
}
}
2.4、Service
/**
* 导出数据到 Excel
* @param latch 锁
* @param TITLE 表格头
* @param pageSize 每个sheet的记录数
* @param first 表格序号
*/
@Async("asyncDownLoadExcelExecutor")
public void exportDataToExcel(CountDownLatch latch, String[] TITLE, int pageSize, int first) throws IOException {
// IPage page = new Page();
// page.setCurrent(i + 1);
// page.setSize(pageSize);
List<PersonEntity> records = new ArrayList<>();// personService.page(page).getRecords();
int start = first * pageSize;
int end = start + pageSize;
String fileName = start + "-" + end + "人员信息" + ".xlsx";
// 写出到本地的excel文件中
SXSSFWorkbook wb = new SXSSFWorkbook();
Sheet sheet = wb.createSheet(fileName);
Row row = sheet.createRow(0);
Cell cell = null;
// 写标题
for (int j = 0; j < TITLE.length; j++) {
cell = row.createCell(j);
cell.setCellValue(TITLE[j]);
}
// 写内容
int rowNum = 1;
for (PersonEntity entity : records) {
row = sheet.createRow(rowNum++);
row.createCell(0).setCellValue(entity.getName());
row.createCell(1).setCellValue(entity.getAge());
row.createCell(2).setCellValue(entity.getAddress());
}
fileName = new String(fileName.getBytes(), "UTF-8");
File file = new File("D://file/sys/" + fileName);
if (!file.exists()) {
file.getParentFile().mkdirs();
}
FileOutputStream outputStream = new FileOutputStream(file);
wb.write(outputStream);
outputStream.flush();
outputStream.close();
latch.countDown();
}
三、多线程插入数据 (类似分布式的TCC)
1、引入依赖
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>5.2.9.RELEASE</version>
</dependency>
2、定义线程池
@Bean(name = "asyncInsertDataExecutor")
public ThreadPoolTaskExecutor asyncUploadExcelExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
// 设置核心线程数
threadPoolTaskExecutor.setCorePoolSize(5);
// 设置最大线程数
threadPoolTaskExecutor.setMaxPoolSize(10);
// 设置阻塞队列大小
threadPoolTaskExecutor.setQueueCapacity(999);
// 默认是 60s,这里设置30s
threadPoolTaskExecutor.setKeepAliveSeconds(30);
// 设置线程池中线程名前缀
threadPoolTaskExecutor.setThreadNamePrefix("async-upload-excel");
//当达到 MaxPoolSize 不再调用新线程,用调用者所在线程之星异步任务。
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return threadPoolTaskExecutor;
}
3、Controller
@Autowired
private PlatformTransactionManager transactionManager;
public String insertData() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(10);
AtomicReference<Boolean> rollback = new AtomicReference<>(false);
// 先在开启多线程外面,定义一个同步集合:
List<TransactionStatus> transactionStatuses = Collections.synchronizedList(new ArrayList<>());
// 调用线程方法
for(int i=0;i<10;i++){
exportDataToExcel(latch,rollback,transactionStatuses,new ArrayList<>());
}
// 阻塞 —— 等待全部执行完
latch.await();
// 如果出错回滚事务
if (rollback.get()) {
transactionStatuses.forEach(status -> transactionManager.rollback(status));
return " 插入失败 ";
} else {
transactionStatuses.forEach(status -> transactionManager.commit(status));
return " 插入成功 ";
}
}
4、Service
@Async("asyncInsertDataExecutor")
public void exportDataToExcel(CountDownLatch latch,AtomicReference<Boolean> rollback,List<TransactionStatus> transactionStatuses,List<Object> list) {
try {
// 开启事务(可封装成方法)
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
TransactionStatus status = transactionManager.getTransaction(def);
transactionStatuses.add(status);
// .... 业务代码
list.clear();
} catch (Exception e) {
rollback.set(true);
e.printStackTrace();
}
latch.countDown();
}