用vector::size() 获取当前容器元素数量不是线程安全的,所以采用atomic_int 来实现当前容器元素数量的改变能够保证线程安全
线程池成员变量的修改
添加变量记录当前线程数量、空闲线程数量,以及线程数的上限:
int threadSizeThreshHold_; // 线程数量上限(cached模式使用)
std::atomic_int curThreadSize_; // 记录当前线程池里面线程的总数量(cached模式使用)
std::atomic_int idleThreadSize_; // 记录空闲线程的数量(cached模式使用)
存储线程池的容器也要改成 unordered_map
,用ThreadId
能够查找对应的Thread
对象,这样我们后面超时回收线程时能够通过threadId
来删除对应的Thread
对象
std::unordered_map<int, std::unique_ptr<Thread>> threads_;
主线程submitTask函数修改
线程池线程扩充是在提交任务submitTask
时检测到空闲线程数 小于 任务数 且 当前总线程数 小于 线程数阈值,则可以创建新线程:
Result ThreadPool::submitTask(std::shared_ptr<Task> sp)
{
// 获取锁
std::unique_lock<std::mutex> lck(taskQueMtx_);
if (!notFull_.wait_for(lck, std::chrono::seconds(1), [&]()->bool { return taskQue_.size() < (size_t)taskQueMaxThreshHold_ ;}))
{
std::cerr << "task queue is full, submit task fail" << std::endl;
return Result(sp, false);//
}
// 如果有空余,把任务放入任务队列中
taskQue_.emplace(sp);
taskSize_++; // ???
// 任务队列不空,唤醒notEmpty_的wait,即通知消费者消费
notEmpty_.notify_all();
// cached 任务处理比较紧急,场景:小而快的任务
// 同时需要根据任务数量和空闲线程数量,判断是否需要创建新的线程
if (poolMode_ == PoolMode::MODE_CACHED
&& taskSize_ > idleThreadSize_
&& curThreadSize_ < threadSizeThreshHold_)
{
std::cout << ">>> create new thread..." << std::endl;
// 创建新的线程对象
auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this, std::placeholders::_1));
int threadId = ptr->getId();
// unique_ptr的拷贝构造和赋值都被delete,所以要把ptr转为右值
threads_.emplace(threadId, std::move(ptr));
// 启动线程
threads_[threadId]->start();
curThreadSize_++;
idleThreadSize_++;
}
// 返回任务的Result对象
return Result(sp);
}
工作线程运行函数修改
cached
模式下,可能创建了很多线程, 但是空闲时间超过60s,应该把多余的线程回收掉(但是要保证线程数量>= initThreadSize_)即:
if 当前时间 - 上一次线程执行时间 > 60s then 回收线程
每秒种判断一次,需要区分超时返回 和 有任务待返回,使用 锁 + 双重判断
并且线程池关闭时,必须先让所有线程执行完手头任务才能结束线程,也就是while (taskQue_.size() == 0)
才会去判断线程池是否结束
// threadid参数是Thread::threadId_,是自定义的对象编号
// 线程池里所有任务消费任务
void ThreadPool::threadFunc(int threadid)
{
auto lastTime = std::chrono::high_resolution_clock().now();
// 所有任务必须执行完成,线程池才可以回收所有线程资源
for (;;)
{
std::shared_ptr<Task> task;
{
// 获取锁
std::unique_lock<std::mutex> lock(taskQueMtx_);
std::cout << "tid:" << std::this_thread::get_id() << "尝试获取任务..." << std::endl;
// 没有任务的处理
while (taskQue_.size() == 0)
{
// 线程池结束,回收线程资源
if (!isPoolRunning_)
{
threads_.erase(threadid);
std::cout << "threadid:" << std::this_thread::get_id() << " exit!" << std::endl;
exitCond_.notify_all();
return;
}
if (poolMode_ == PoolMode::MODE_CACHED)
{
// 超时,这里是用来实现每1s检测一次当前线程空闲时间是否达到删除条件
if(std::cv_status::timeout ==
notEmpty_.wait_for(lock, std::chrono::seconds(1)))
{
auto now = std::chrono::high_resolution_clock().now();
auto dur = std::chrono::duration_cast<std::chrono::seconds> (now - lastTime);
if (dur.count() >= THREAD_MAX_IDLE_TIME
&& curThreadSize_ > initThreadSize_)
{
// 回收当前现线程
// 记录线程数量相关变量的修改
// 将线程对象从线程列表容器中删除
// threadid => Thread对象 => erase
threads_.erase(threadid); // 注意不是删除std::this_thread::get_id()
curThreadSize_--;
idleThreadSize_--; // 这里线程肯定是空闲的,因为压根没执行任务
std::cout << "threadid:" << std::this_thread::get_id() << " exit!" << std::endl;
return;
}
}
}
else // fixed模式
{
notEmpty_.wait(lock);
}
}
idleThreadSize_--; // 执行任务时,空闲线程-1
std::cout << "tid:" << std::this_thread::get_id() << "获取任务成功..." << std::endl;
// 从任务队列中取一个任务出来
task = taskQue_.front();
taskQue_.pop();
taskSize_--;
// 若依然有剩余任务,继续通知其他线程执行任务
if (taskQue_.size() > 0)
{
notEmpty_.notify_all();
}
// 取出一个任务进行通知 生产者可以进行生产任务
notFull_.notify_all();
} // 只需要对taskQue_及相关临界资源操作才加锁,执行不需要加锁
// 当前线程负责执行这个任务
if (task != nullptr)
{
//task->run();
// 执行任务,把任务的返回值setVal方法给到Result
task->exec();
}
// 执行任务后空闲线程 + 1
idleThreadSize_++;
auto lastTime = std::chrono::high_resolution_clock().now();
}
}
线程池资源回收
线程池析构将运行状态设置为false
通知所有消费者(每个工作线程 Slave),让它们检测到运行状态为false可以退出了
注意:submitTask是主线程来调用的,用来分配任务的线程(Master)
ThreadPool::~ThreadPool()
{
isPoolRunning_ = false;
// 等待线程池里所有线程返回,两种状态:阻塞 或 正在执行任务中
std::unique_lock<std::mutex> lock(taskQueMtx_);
notEmpty_.notify_all(); //
// 等待线程销毁
exitCond_.wait(lock, [&]()->bool { return threads_.size() == 0; });
}
线程的初始数量可以用库函数提供的硬件并发数(CPU核心数)来给出,即
void start(int initThreadSize = std::thread::hardware_concurrency());
测试
设置初始线程数为4,提交6个任务,空闲时间最大6s(超过则回收),模式为MODE_CACHED
,测试代码如下:
#include <iostream>
#include <chrono>
#include "threadpool.h"
using uLong = unsigned long long;
class MyTask : public Task
{
public:
MyTask(uLong /*int*/ begin, uLong /*int*/ end)
: begin_(begin)
, end_(end)
{}
Any run()
{
std::cout << "tid:" << std::this_thread::get_id() << " begin!" << std::endl;
//std::this_thread::sleep_for(std::chrono::seconds(5));
uLong sum = 0;
for (uLong i = begin_; i <= end_; i++)
{
sum += i;
}
std::cout << "tid:" << std::this_thread::get_id() << " end!" << std::endl;
return sum;
}
private:
uLong /*int*/ begin_;
uLong /*int*/ end_;
};
int main()
{
#if 1
ThreadPool pool;
// 设置为可伸缩模式
pool.setMode(PoolMode::MODE_CACHED);
pool.start(4);
Result res1 = pool.submitTask(std::make_shared<MyTask>(1, 1000000000));
Result res2 = pool.submitTask(std::make_shared<MyTask>(1000000001, 2000000000));
Result res3 = pool.submitTask(std::make_shared<MyTask>(2000000001, 3000000000));
pool.submitTask(std::make_shared<MyTask>(2000000001, 3000000000));
pool.submitTask(std::make_shared<MyTask>(2000000001, 3000000000));
pool.submitTask(std::make_shared<MyTask>(2000000001, 3000000000));
uLong sum1 = res1.get().cast_<uLong>();
uLong sum2 = res2.get().cast_<uLong>();
uLong sum3 = res3.get().cast_<uLong>();
// Master - Slave线程模型
std::cout << (sum1 + sum2 + sum3) << std::endl;
getchar();
#endif
return 0;
}
实验结果如下:
可以看出能够动态创建2个额外线程,并且超时能够回收线程,退出时能正常回收所有线程。