实际测试使用如下:
package com.study;
import java.util.concurrent.*;
/**
* 线程池作用:
* 1、线程的复用
* 2、资源管理
* 3、任务调度
* --------------执行过程--------------
* 第1-3个任务进来时,直接创建任务并执行
* 第4-8个任务进来时,会把新任务放到队列,然后按照顺序执行队列中的任务,新的任务在队列最后
* 第9-15个任务进来时,会先执行队列中已有的任务,再执行新的任务
* 第16个任务进来时,会执行拒绝策略
*
* @author admin
* @since 2025-04-18 15:48
*/
public class ThreadPoolDemo {
// 核心线程数
private static final int CORE_POOL_SIZE = 3;
// 最大线程数
private static final int MAX_POOL_SIZE = 10;
// 空闲线程存活时间
private static final long KEEP_ALIVE_TIME = 5;
// 时间单位
private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
// 队列容量
private static final int QUEUE_SIZE = 10;
/**
* 任务队列
* 1、ArrayBlockingQueue 有界队列,通用队列,线程池的默认队列
* 2、LinkedBlockingQueue 无界队列,默认大小为Integer.MAX_VALUE
* 3、SynchronousQueue 无容量队列,不存储任务,直接提交给线程池处理
* 4、PriorityBlockingQueue 优先级队列,线程池的默认队列
* 5、DelayQueue 延迟队列
*/
static ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(QUEUE_SIZE);
/**
* 拒绝策略
* 1、abortPolicy 直接抛出异常(默认)
* 2、discardPolicy 直接丢弃任务
* 3、discardOldestPolicy 踢出队列中最老的任务,再次提交当前任务
* 4、callerRunsPolicy 由提交任务的线程来执行任务
*/
static final RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
/**
* 线程工厂
*/
static final ThreadFactory threadFactory = new CustomThreadFactory(Thread.NORM_PRIORITY, false);
/**
* 传统线程池创建
* 场景:不同业务线需隔离资源(如支付交易与普通查询互不影响)
*/
private static final ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TIME_UNIT, queue, threadFactory, handler);
/**
* 1、固定-线程池创建
* 默认核心线程数和最大线程数相同
* 默认空闲线程存活时间为0s
* 默认LinkedBlockingQueue无界队列
* 默认拒绝策略和默认线程工厂
* 场景:短时涌入大量HTTP请求(如电商秒杀、票务系统),需快速响应且避免服务器崩溃
*/
private static final ExecutorService fixedExecutor = Executors.newFixedThreadPool(3);
/**
* 2、缓存-线程池创建
* 默认核心线程数0,所有线程均为非核心线程
* 最大线程数为Integer最大值
* 默认空闲线程存活时间60s
* 使用SynchronousQueue无容量队列,不存储任务,直接提交给线程池处理
* 默认拒绝策略和默认线程工厂
* 场景:主线程需快速返回,耗时操作异步执行(如发送邮件、生成报表)
*/
private static final ExecutorService cachedExecutor = Executors.newCachedThreadPool();
/**
* 3、任务-线程池创建
* 最大线程数为Integer最大值
* 默认空闲线程存活时间0s
* 默认DelayedWorkQueue高性能队列
* 默认拒绝策略和默认线程工厂
* 场景:定时执行任务(如每日数据备份、定期推送消息)
*/
private static final ExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1);
/**
* 4、单线程-线程池创建
* 核心线程数和最大线程数都为1
* 默认空闲线程存活时间0s
* 默认LinkedBlockingQueue无界队列
* 默认拒绝策略和默认线程工厂
* 场景:跨服务任务调度(如分布式锁续期、集群状态同步)
*/
private static final ExecutorService singleExecutor = Executors.newSingleThreadExecutor();
/**
* 5、并行-线程池创建
* 并行级别(默认 CPU 核心数)
* 场景:多核CPU任务并行执行(如并行计算、并行处理),海量数据分片并行处理(如日志分析、图像渲染)、递归任务分解
*/
private static final ExecutorService workExecutor = Executors.newWorkStealingPool();
/**
* 测试
*/
public static void main(String[] args) throws InterruptedException {
String type = "fixed";
String returnType = "test";
ExecutorService executorService = getExecutorService(type);
for (int i = 0; i < 10; i++) {
executeTask(executorService, i, returnType);
}
// 等待任务执行完成关闭线程池
executorService.shutdown();
// 添加等待终止逻辑(确保任务完成)
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
System.err.println("线程池未在限定时间内关闭");
}
}
/**
* 线程池执行任务
*/
private static void executeTask(ExecutorService executorService, int taskId, String returnType) {
// 根据返回值判断是否执行成功
if (("wait").equals(returnType)) {
Future<String> future = getFuture(executorService, taskId);
// 方法1:最大等待30s
getFutureResultWait(future, taskId);
} else if (("while").equals(returnType)) {
FutureTask<String> future = getFutureTask(executorService, taskId);
// 方法2:轮询判断是否执行完成
getFutureResultWhile(future, taskId);
} else {
// 方法3:异步获取返回值
CompletableFuture<Integer> futureWithExecutor = CompletableFuture.supplyAsync(() -> {
System.out.println(taskId + "\t" + Thread.currentThread().getName());
return taskId;
}, executorService);
// 添加上述异步回调处理
futureWithExecutor.whenComplete((result, ex) -> {
if (ex != null) {
System.err.println("Task " + taskId + " failed: " + ex.getMessage());
} else {
System.out.println("Task " + taskId + " completed: " + result);
}
});
}
}
/**
* 提交异步任务并返回Future对象
*
* @param executorService 线程池执行器,用于提交异步任务
* @param taskId 任务标识符,用于日志跟踪
* @return Future<String> 表示异步计算的结果对象
*/
private static Future<String> getFuture(ExecutorService executorService, int taskId) {
return executorService.submit(() -> {
// 任务执行逻辑:打印线程信息并模拟耗时操作
System.out.println(taskId + "\t" + Thread.currentThread().getName());
try {
Thread.sleep(1000);
return "OK";
} catch (InterruptedException e) {
// 正确的中断处理:恢复中断状态并记录日志
Thread.currentThread().interrupt();
System.out.println("Task interrupted: " + taskId);
return "ERROR";
}
});
}
/**
* 提交异步任务并返回FutureTask对象
*
* @param executorService 线程池执行器,用于提交异步任务
* @param taskId 任务标识符,用于日志跟踪
* @return FutureTask<String> 表示异步计算的结果对象
*/
private static FutureTask<String> getFutureTask(ExecutorService executorService, int taskId) {
// 创建Callable任务
Callable<String> task = () -> {
// 任务执行逻辑:打印线程信息并模拟耗时操作
System.out.println(taskId + "\t" + Thread.currentThread().getName());
try {
Thread.sleep(1000);
return "OK";
} catch (InterruptedException e) {
// 正确的中断处理:恢复中断状态并记录日志
Thread.currentThread().interrupt();
System.out.println("Task interrupted: " + taskId);
return "ERROR";
}
};
/*
* 创建FutureTask实例以包装异步任务
* @param task 需要执行的Callable/Runnable任务对象
* FutureTask兼具Runnable和Future的特性:
* 1. 可作为Runnable被线程池执行
* 2. 通过Future接口方法获取计算结果
*/
FutureTask<String> futureTask = new FutureTask<>(task);
/*
* 将FutureTask提交到线程池执行
* @param futureTask 包装了任务的可执行对象
* 提交后会立即返回,实际执行由线程池调度
* 后续可通过futureTask.get()阻塞获取计算结果
* 或通过futureTask.cancel()取消任务
*/
executorService.submit(futureTask);
return futureTask;
}
/**
* 阻塞等待Future结果并处理超时
*
* @param future 异步任务Future对象
* @param taskId 任务标识符,用于异常日志
*/
private static void getFutureResultWait(Future<String> future, int taskId) {
try {
// 设置最长等待时间为10秒
String result = future.get(5, TimeUnit.SECONDS);
System.out.println("Task result: " + result);
} catch (InterruptedException | ExecutionException e) {
// 中断处理:恢复中断状态并记录日志
Thread.currentThread().interrupt();
System.out.println("Task interrupted: " + taskId);
} catch (TimeoutException e) {
// 超时处理:主动取消任务并记录日志
future.cancel(true);
System.out.println("Task timeout: " + taskId);
}
}
/**
* 轮询检查任务完成状态
*
* @param future 异步任务Future对象
* @param taskId 任务标识符,用于中断日志
*/
private static void getFutureResultWhile(Future<String> future, int taskId) {
String result = "ERROR";
// 轮询机制检查任务状态
while (!future.isDone()) {
System.out.println("任务仍在执行中...");
try {
// 降低轮询频率以避免CPU过载
Thread.sleep(500);
result = "OK";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Task interrupted: " + taskId);
// 立即退出循环
break;
}
}
System.out.println("Task result: " + result);
}
/**
* 获取不同类型的线程池
*/
private static ExecutorService getExecutorService(String type) {
switch (type) {
case "fixed":
return fixedExecutor;
case "cached":
return cachedExecutor;
case "scheduled":
return scheduledExecutor;
case "single":
return singleExecutor;
case "work":
return workExecutor;
default:
return poolExecutor;
}
}
}
自定义线程工厂
package com.study;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 自定义线程工厂
*
* @author admin
* @since 2025-04-18 17:29
*/
public class CustomThreadFactory implements ThreadFactory {
// 编号从1开始
private final AtomicInteger number = new AtomicInteger(1);
private static final String NAME_PREFIX = "pool-thread-";
// 线程优先级
private final int priority;
// 是否守护线程
private final boolean daemon;
/**
* 默认构造方法,创建普通优先级、非守护线程
*/
public CustomThreadFactory() {
this(Thread.NORM_PRIORITY, false);
}
/**
* 构造方法,自定义优先级和是否守护线程
*
* @param priority 优先级
* @param daemon 是否守护线程
*/
public CustomThreadFactory(int priority, boolean daemon) {
if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
throw new IllegalArgumentException("优先级超出范围: " + priority);
}
this.priority = priority;
this.daemon = daemon;
}
/**
* 创建线程
*
* @param r 线程任务
* @return 线程对象
*/
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName(NAME_PREFIX + number.getAndIncrement());
thread.setPriority(priority);
thread.setDaemon(daemon);
thread.setUncaughtExceptionHandler((t, e) -> {
System.err.println("线程 " + t.getName() + " 发生异常: " + e);
e.printStackTrace();
});
return thread;
}
}