项目上,我们是根据业务的使用频率,基于ThreadPoolExecutor自定义一个线程池,线程池的核心线程数、最大线程数、阻塞队列的容量都是估计的设置的,但是不知道线程资源的真正使用情况。
1.ThreadPoolExecutor配置参数动态修改
先来看看JDK的ThreadPoolExecutor是如何支持动态设置线程池参数的。ThreadPoolExecutor主要支持两个参数的修改,一个是corePoolSize核心线程数,另一个是maximumPoolSize最大线程数。先写个简单的Demo,熟悉下着两个参数的使用。
(1)定义一个线程池
注意这里每次调用getThreadPoolExecutor方法都应该返回同一个线程池,一开始,我是每次都new一个新的对象,导致后面修改线程池参数不生效。
核心线程数:1
最大线程数:2
阻塞队列容量:1
拒绝策略:任务溢出,直接中止,抛出异常。
最多同时并发执行3个任务。
import lombok.extern.slf4j.Slf4j;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
public class ThreadUtil {
private static ThreadPoolExecutor threadPoolExecutor;
public static ThreadPoolExecutor getThreadPoolExecutor() {
if(null == threadPoolExecutor) {
threadPoolExecutor = new ThreadPoolExecutor(1, 2, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(1), new ThreadPoolExecutor.AbortPolicy());
}
return threadPoolExecutor;
}
}
(2)线程池执行任务
我这里定义了一个接口TestRunnable,继承Callable接口,定义类TestRunnableImpl实现TestRunnable接口,这里多此一举的原因是,ThreadPoolExecutor的invokeAll方法需要调用的是继承Callable接口的子类。
我先是用jmeter做并发测试,哪怕1秒达到100个任务,但是根据打印日志显示,都是只用了一个线程,并且请求都能正常执行,不会出现任务超过(最大线程数+队列容量),然后抛出异常的情况。我猜测是线程执行任务很快(CPU上下文切换快),jmeter只是做到了http请求并发,但是把任务交给ThreadPoolExecutor执行未必并行。
ThreadPoolExecutor的invokeAll多个任务,可以得到预期效果。
/executeRunnable 请求加了一个index入参是为了控制同时执行的任务数。
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import gdut.ThreadUtil;
import gdut.entity.Article;
import gdut.entity.ArticleParams;
import gdut.entity.UpdateThreadParam;
import gdut.entity.UserLog;
import gdut.mapper.ArticleMapper;
import gdut.service.ArticleService;
import gdut.service.UserLogService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.assertj.core.util.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadPoolExecutor;
@RestController
@RequestMapping("/thread")
@Slf4j
public class ThreadController {
@Autowired
private ArticleService articleService;
@Autowired
private UserLogService userLogService;
@PostMapping("/executeRunnable")
public void executeRunnable(@RequestParam("index") int index) {
ThreadPoolExecutor threadPoolExecutor = ThreadUtil.getThreadPoolExecutor();
List<TestRunnableImpl> callableList = Lists.newArrayList();
for (int i = 0; i < index; i++) {
callableList.add(new TestRunnableImpl());
}
try {
threadPoolExecutor.invokeAll(callableList);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("active - {} - {} - {} - {} - {}", threadPoolExecutor.getActiveCount(), threadPoolExecutor.getCorePoolSize(),
threadPoolExecutor.getMaximumPoolSize(), threadPoolExecutor.getTaskCount(), threadPoolExecutor.getCompletedTaskCount());
}
interface TestRunnable<String> extends Callable<String> {
}
@AllArgsConstructor
class TestRunnableImpl implements TestRunnable<String> {
@Override
public String call() {
log.info("test - {} - {}", Thread.currentThread().getName(), LocalDateTime.now().toString());
List<Article> articleList = articleService.list();
long count = userLogService.count();
log.info("end - {} - {}", Thread.currentThread().getName(), LocalDateTime.now().toString());
return "success";
}
}
}
测试结果:
a.当任务数设置为3
http://localhost:8089/thread/executeRunnable?index=3
从打印日志可以看出,这里是创建了2个线程执行任务,符合最大线程数。
b.当任务数设置为4
http://localhost:8089/thread/executeRunnable?index=4
抛出异常:java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@151789c9 rejected from java.util.concurrent.ThreadPoolExecutor@261c83ca[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 3]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) ~[na:1.8.0_40]
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) [na:1.8.0_40]
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) [na:1.8.0_40]
(3)修改线程池参数
@PostMapping("/updateThreadPoolExecutor")
public String updateThreadPoolExecutor(@RequestBody UpdateThreadParam updateThreadParam) {
ThreadPoolExecutor threadPoolExecutor = ThreadUtil.getThreadPoolExecutor();
threadPoolExecutor.setCorePoolSize(updateThreadParam.getCoreSize());
threadPoolExecutor.setMaximumPoolSize(updateThreadParam.getMaxSize());
return "success";
}
增加1个核心线程和最大线程数。
http://localhost:8089/thread/updateThreadPoolExecutor
{
"coreSize":2,
"maxSize":3
}
再次下发http://localhost:8089/thread/executeRunnable?index=4 可以正常运行。
2.动态线程池实现
为了能更直观查看线程池运行时参数配置和执行情况,建立两个数据库表,一个是线程池参数配置表,另一个是线程执行记录表。
CREATE TABLE `thread_pool_config` (
`id` int(0) NOT NULL AUTO_INCREMENT,
`thread_pool_name` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '线程池名称',
`core_thread_size` int(0) NULL DEFAULT NULL COMMENT '核心线程数',
`maximum_thread_size` int(0) NULL DEFAULT NULL COMMENT '最大线程数',
`queue_capacity` int(0) NULL DEFAULT NULL COMMENT '队列容量',
`create_time` timestamp(0) NULL DEFAULT CURRENT_TIMESTAMP COMMENT '当前时间',
`update_time` timestamp(0) NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '更新时间',
`enable_flag` char(1) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '1' COMMENT '是否有效',
`pool_size` int(0) NULL DEFAULT NULL COMMENT '任务数量',
`active_threads` int(0) NULL DEFAULT NULL COMMENT '活跃执行任务线程数',
`queue_tasks` int(0) NULL DEFAULT NULL COMMENT '队列任务数量',
`completed_tasks` int(0) NULL DEFAULT NULL COMMENT '完成任务总数',
PRIMARY KEY (`id`) USING BTREE
)
CREATE TABLE `thread_log` (
`id` int(0) NOT NULL AUTO_INCREMENT,
`thread_pool_name` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '线程池名称',
`thread_name` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '线程名称',
`create_time` timestamp(0) NULL DEFAULT CURRENT_TIMESTAMP COMMENT '当前时间',
PRIMARY KEY (`id`) USING BTREE
)
创建默认线程池和动态修改线程池配置,把创建过的线程池放入Map容器中,后面可以从这个Map容器获取创建过的全部线程池。创建默认线程池,我这里根据线程池名称做了一个双层检查锁,确保不会重复创建新对象。
import lombok.Data;
import java.io.Serializable;
@Data
public class DynamicThreadPoolParams implements Serializable {
private String threadPoolName;
private int corePoolSize;
private int maximumPoolSize;
private int queueCapacity;
}
import gdut.entity.DynamicThreadPoolParams;
import jodd.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
public class DynamicThreadPoolExecutor {
public static final int DEFAULT_CORE_POOL_SIZE = 5;
public static final int DEFAULT_MAXIMUM_POOL_SIZE = 10;
public static final int QUEUE_CAPACITY = 20;
//key是线程池名称,value是线程池
public static Map<String, ThreadPoolExecutor> executorPool = new HashMap();
public static Map<String, ThreadPoolExecutor> getExecutorPool() {
return executorPool;
}
public static ThreadPoolExecutor dynamicThreadPoolExecutor(DynamicThreadPoolParams dynamicThreadPoolParams) {
ThreadPoolExecutor executorByName = executorPool.get(dynamicThreadPoolParams.getThreadPoolName());
if(null != executorByName) {
executorByName.setCorePoolSize(dynamicThreadPoolParams.getCorePoolSize());
executorByName.setMaximumPoolSize(dynamicThreadPoolParams.getMaximumPoolSize());
return executorByName;
}
synchronized (executorPool) {
if(null != executorByName) {
executorByName.setCorePoolSize(dynamicThreadPoolParams.getCorePoolSize());
executorByName.setMaximumPoolSize(dynamicThreadPoolParams.getMaximumPoolSize());
return executorByName;
}
//创建线程工厂
ThreadFactory threadFactory = ThreadFactoryBuilder.create().setNameFormat(dynamicThreadPoolParams.getThreadPoolName()).get();
executorByName = new ThreadPoolExecutor(dynamicThreadPoolParams.getCorePoolSize(), dynamicThreadPoolParams.getMaximumPoolSize(), 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(dynamicThreadPoolParams.getQueueCapacity()), threadFactory, new ThreadPoolExecutor.AbortPolicy());
executorPool.put(dynamicThreadPoolParams.getThreadPoolName(), executorByName);
}
return executorByName;
}
/**
* 默认线程池
* @param threadPoolName 线程池名称
* @return
*/
public static ThreadPoolExecutor defaultThreadPoolExecutor(String threadPoolName) {
//先从Map容器获取,获取到直接返回,没有获取到,双重检查
ThreadPoolExecutor executorByName = executorPool.get(threadPoolName);
if(null != executorByName) {
return executorByName;
}
synchronized (executorPool) {
if(null == executorByName) {
//创建线程工厂
ThreadFactory threadFactory = ThreadFactoryBuilder.create().setNameFormat(threadPoolName + "-%d").get();
executorByName = new ThreadPoolExecutor(DEFAULT_CORE_POOL_SIZE, DEFAULT_MAXIMUM_POOL_SIZE, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY), threadFactory, new ThreadPoolExecutor.AbortPolicy());
executorPool.put(threadPoolName, executorByName);
}
}
return executorByName;
}
}
多线程调用的时候,向线程执行日志表写入数据。这里加一个30秒的等待时间,是为了可以在这30秒,有时间观察到线程池创建的核心线程数、最大线程数、以及队列中等待的任务数量。
import gdut.entity.Article;
import gdut.entity.ThreadLog;
import gdut.service.ArticleService;
import gdut.service.ThreadLogService;
import gdut.service.UserLogService;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.Callable;
@AllArgsConstructor
@NoArgsConstructor
@Slf4j
@Setter
public class TestRunnableImpl implements TestRunnable<String> {
private ArticleService articleService;
private UserLogService userLogService;
private ThreadLogService threadLogService;
private String threadPoolName;
@Override
public String call() {
log.info("test - {} - {}", Thread.currentThread().getName(), LocalDateTime.now().toString());
List<Article> articleList = articleService.list();
long count = userLogService.count();
try {
Thread.sleep(30 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
saveThreadLog();
log.info("end - {} - {}", Thread.currentThread().getName(), LocalDateTime.now().toString());
return "success";
}
public void saveThreadLog() {
ThreadLog threadLog = new ThreadLog();
threadLog.setThreadName(Thread.currentThread().getName());
threadLog.setThreadPoolName(threadPoolName);
threadLogService.save(threadLog);
}
}
interface TestRunnable<String> extends Callable<String> {
}
线程池的执行过程有以下几个要点:
(1)要用线程池执行任务,需要指定线程池的名称,如果线程池容器里没有指定名称的线程池,创建一个新的,如果存在,就直接拿来使用。
(2)需要添加一个定时任务,每隔10s左右(时间自定义),就把线程池容器里的线程池实时运行参数配置,更新到数据库。
(3)不能直接从ThreadPoolExecutor获取阻塞队列的长度,可以通过另外2个参数得到:阻塞队列长度 = 队列元素数量 + 队列剩余容量。
import com.baomidou.mybatisplus.extension.service.IService;
import gdut.entity.DynamicRunnableParams;
import gdut.entity.DynamicThreadPoolParams;
import gdut.entity.ThreadPoolConfig;
import org.springframework.web.bind.annotation.RequestBody;
import java.util.concurrent.ThreadPoolExecutor;
public interface ThreadPoolConfigService extends IService<ThreadPoolConfig> {
ThreadPoolExecutor addThreadPoolExecutor(String threadPoolName);
ThreadPoolExecutor updateThreadPoolExecutor(DynamicThreadPoolParams dynamicThreadPoolParams);
void refreshThreadPoolExecutor();
void executeRunnable(DynamicRunnableParams dynamicRunnableParams);
}
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import gdut.entity.DynamicRunnableParams;
import gdut.entity.DynamicThreadPoolParams;
import gdut.entity.ThreadPoolConfig;
import gdut.entity.UserLog;
import gdut.mapper.ThreadPoolConfigMapper;
import gdut.mapper.UserLogMapper;
import gdut.service.ArticleService;
import gdut.service.ThreadLogService;
import gdut.service.ThreadPoolConfigService;
import gdut.service.UserLogService;
import gdut.util.DynamicThreadPoolExecutor;
import gdut.util.TestRunnableImpl;
import lombok.extern.slf4j.Slf4j;
import org.assertj.core.util.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
@Service
@Slf4j
public class ThreadPoolConfigServiceImpl extends ServiceImpl<ThreadPoolConfigMapper, ThreadPoolConfig> implements ThreadPoolConfigService {
@Autowired
private ArticleService articleService;
@Autowired
private UserLogService userLogService;
@Autowired
private ThreadLogService threadLogService;
public ThreadPoolExecutor addThreadPoolExecutor(String threadPoolName) {
ThreadPoolExecutor threadPoolExecutor = DynamicThreadPoolExecutor.defaultThreadPoolExecutor(threadPoolName);
ThreadPoolConfig threadPoolConfig = new ThreadPoolConfig();
threadPoolConfig.setThreadPoolName(threadPoolName);
threadPoolConfig.setCoreThreadSize(DynamicThreadPoolExecutor.DEFAULT_CORE_POOL_SIZE);
threadPoolConfig.setMaximumThreadSize(DynamicThreadPoolExecutor.DEFAULT_MAXIMUM_POOL_SIZE);
threadPoolConfig.setQueueCapacity(DynamicThreadPoolExecutor.QUEUE_CAPACITY);
threadPoolConfig.setPoolSize(threadPoolExecutor.getPoolSize());
threadPoolConfig.setActiveThreads(threadPoolExecutor.getActiveCount());
threadPoolConfig.setQueueTasks(threadPoolExecutor.getQueue().size());
threadPoolConfig.setCompletedTasks(threadPoolExecutor.getCompletedTaskCount());
this.save(threadPoolConfig);
return threadPoolExecutor;
}
public ThreadPoolExecutor updateThreadPoolExecutor(DynamicThreadPoolParams dynamicThreadPoolParams) {
ThreadPoolExecutor threadPoolExecutor = DynamicThreadPoolExecutor.dynamicThreadPoolExecutor(dynamicThreadPoolParams);
LambdaUpdateWrapper<ThreadPoolConfig> updateWrapper = new UpdateWrapper<ThreadPoolConfig>().lambda()
.eq(ThreadPoolConfig::getThreadPoolName, dynamicThreadPoolParams.getThreadPoolName())
.set(ThreadPoolConfig::getCoreThreadSize, dynamicThreadPoolParams.getCorePoolSize())
.set(ThreadPoolConfig::getMaximumThreadSize, dynamicThreadPoolParams.getMaximumPoolSize())
.set(ThreadPoolConfig::getPoolSize, threadPoolExecutor.getPoolSize())
.set(ThreadPoolConfig::getActiveThreads, threadPoolExecutor.getActiveCount())
.set(ThreadPoolConfig::getQueueTasks, threadPoolExecutor.getQueue().size())
.set(ThreadPoolConfig::getCompletedTasks, threadPoolExecutor.getCompletedTaskCount());
this.update(updateWrapper);
return threadPoolExecutor;
}
public void refreshThreadPoolExecutor() {
Map<String, ThreadPoolExecutor> executorPoolMap = DynamicThreadPoolExecutor.getExecutorPool();
if(null == executorPoolMap || executorPoolMap.size() == 0) {
return;
}
for(Map.Entry<String, ThreadPoolExecutor> entry : executorPoolMap.entrySet()) {
String threaPoolName = entry.getKey();
ThreadPoolExecutor threadPoolExecutor = entry.getValue();
LambdaQueryWrapper<ThreadPoolConfig> queryWrapper = new QueryWrapper<ThreadPoolConfig>().lambda()
.eq(ThreadPoolConfig::getThreadPoolName, threaPoolName);
ThreadPoolConfig newPoolConfig = new ThreadPoolConfig();
newPoolConfig.setThreadPoolName(threaPoolName);
newPoolConfig.setCoreThreadSize(threadPoolExecutor.getCorePoolSize());
newPoolConfig.setMaximumThreadSize(threadPoolExecutor.getMaximumPoolSize());
newPoolConfig.setQueueCapacity(threadPoolExecutor.getQueue().size() + threadPoolExecutor.getQueue().remainingCapacity());
newPoolConfig.setPoolSize(threadPoolExecutor.getPoolSize());
newPoolConfig.setActiveThreads(threadPoolExecutor.getActiveCount());
newPoolConfig.setQueueTasks(threadPoolExecutor.getQueue().size());
newPoolConfig.setCompletedTasks(threadPoolExecutor.getCompletedTaskCount());
ThreadPoolConfig threadPoolConfig = this.getOne(queryWrapper);
if(null == threadPoolConfig) {
this.save(newPoolConfig);
} else {
this.update(newPoolConfig, new UpdateWrapper<ThreadPoolConfig>().lambda()
.eq(ThreadPoolConfig::getThreadPoolName, threaPoolName));
}
}
List<String> threadPoolNameList = Lists.newArrayList(executorPoolMap.keySet());
this.remove(new QueryWrapper<ThreadPoolConfig>().notIn("thread_pool_name", threadPoolNameList));
}
@Override
public void executeRunnable(DynamicRunnableParams dynamicRunnableParams) {
ThreadPoolExecutor threadPoolExecutor = DynamicThreadPoolExecutor.defaultThreadPoolExecutor(dynamicRunnableParams.getThreadPoolName());
List<TestRunnableImpl> callableList = Lists.newArrayList();
for (int i = 0; i < dynamicRunnableParams.getExecuteCount(); i++) {
TestRunnableImpl testRunnable = new TestRunnableImpl();
testRunnable.setThreadLogService(threadLogService);
testRunnable.setArticleService(articleService);
testRunnable.setUserLogService(userLogService);
testRunnable.setThreadPoolName(dynamicRunnableParams.getThreadPoolName());
callableList.add(testRunnable);
}
try {
threadPoolExecutor.invokeAll(callableList);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
import gdut.entity.DynamicRunnableParams;
import gdut.entity.DynamicThreadPoolParams;
import gdut.entity.UpdateThreadParam;
import gdut.service.ArticleService;
import gdut.service.ThreadLogService;
import gdut.service.ThreadPoolConfigService;
import gdut.service.UserLogService;
import gdut.util.DynamicThreadPoolExecutor;
import gdut.util.TestRunnableImpl;
import gdut.util.ThreadUtil;
import lombok.extern.slf4j.Slf4j;
import org.assertj.core.util.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
@RestController
@RequestMapping("/dynamic")
@Slf4j
public class DynamicThreadPoolController {
@Autowired
ThreadPoolConfigService threadPoolConfigService;
@PostMapping("/executeRunnable")
public void executeRunnable(@RequestBody DynamicRunnableParams dynamicRunnableParams) {
threadPoolConfigService.executeRunnable(dynamicRunnableParams);
}
@PostMapping("/updateThreadPoolExecutor")
public String updateThreadPoolExecutor(@RequestBody DynamicThreadPoolParams dynamicThreadPoolParams) {
threadPoolConfigService.updateThreadPoolExecutor(dynamicThreadPoolParams);
return "success";
}
@PostMapping("/refreshThreadPoolExecutor")
public String refreshThreadPoolExecutor(@RequestBody DynamicThreadPoolParams dynamicThreadPoolParams) {
threadPoolConfigService.refreshThreadPoolExecutor();
return "success";
}
}
创建线程池执行任务:
http://localhost:8089/dynamic/executeRunnable
{
"threadPoolName":"Test",
"executeCount":50
}
拉取线程池配置,写入数据库:
http://localhost:8089/dynamic/refreshThreadPoolExecutor
更新线程池:
http://localhost:8089/dynamic/updateThreadPoolExecutor
{
"threadPoolName":"Test",
"corePoolSize":10,
"maximumPoolSize":30
}
3.参考
Hippo4J官网文档
8个Java线程池最佳实践和坑!
如何设置线程池参数?美团给出了一个让面试官虎躯一震的回答。
Java线程池实现原理及其在美团业务中的实践