1.描述问题
在完成线程池核心功能功能时,没有遇到太大的问题(Any,Result,Semfore的设计),在做线程池资源回收时,遇到了死锁的问题
1、在ThreadPool的资源回收,等待线程池所有线程退出时,发生死锁问题,导致进程无法退出
死锁代码:
#include "threadpool.h"
#include <thread>
#include <iostream>
const int TASK_MAX_THRESHHOLD = INT32_MAX;
const int THREAD_MAX_THRESHHOLD = 100;
const int THREAD_MAX_IDLE_TIME = 60;//单位:秒
//线程池构造
ThreadPool::ThreadPool()
: initThreadSize_(0)
, taskSize_(0)
, idleThreadSize_(0)//刚开始时还没有线程
, curThreadSize_(0)
, taskQueMaxThreshHold_(TASK_MAX_THRESHHOLD)
, threadSizeThreshHold_(THREAD_MAX_THRESHHOLD)
, poolMode_(PoolMode::MODE_FIXED)
, isPoolRunning_(false)
{}
//线程池析构
ThreadPool::~ThreadPool()
{
isPoolRunning_ = false;
notEmpty_.notify_all();
//等待线程池里面所有的线程返回 有两种状态:阻塞 & 正在执行任务中
std::unique_lock<std::mutex> lock(taskQueMtx_);
exitCond_.wait(lock, [&]()->bool {return threads_.size() == 0; });
}
//设置线程池的工作模式
void ThreadPool::setMode(PoolMode mode)
{
if (checkRunningState())
return;
poolMode_ = mode;
}
// 设置task任务队列上限阈值
void ThreadPool::setTaskQueMaxThreshHold(int threshhold)
{
if (checkRunningState())
return;
taskQueMaxThreshHold_ = threshhold;
}
//设置线程池cached模式下线程阈值
void ThreadPool::setThreadSizeThreshHold(int threshhold)
{
if (checkRunningState())
return;
if (poolMode_ == PoolMode::MODE_CACHED)
{
threadSizeThreshHold_ = threshhold;
}
}
// 给线程池提交任务 用户调用该接口,传入任务对象,生产任务
Result ThreadPool::submitTask(std::shared_ptr<Task> sp)
{
//获取锁
std::unique_lock<std::mutex> lock(taskQueMtx_);
//线程的通信 等待任务队列有空余
// 用户提交任务,最长不能阻塞超过1s,否则判断提交任务失败,返回
//while (taskQue_.size() == taskQueMaxThreshHold_)
//{
// notFull_.wait(lock);
//}
/*
* wait:直到等待满足条件(第二个参数lamada)才返回
* wait_for:满足条件返回真,到了约定的时间段(5s)返回假
* wait_until:满足条件返回真,到了约定的时间点(下周一)返回假
*/
if (!notFull_.wait_for(lock, std::chrono::seconds(1),
[&]()->bool {return taskQue_.size() < (size_t)taskQueMaxThreshHold_; }))//等同于上面的语句,参数:需要释放的锁 函数对象(要能满足条件变量)
//任务队列中的任务数小于上限的阈值,否则就阻塞在这句
{
//表示notFull_等待1s,条件依然没有满足
std::cerr << "task queue is full,submit task fail." << std::endl;
//return task->getResult(); //Task Result 线程执行完task,task对象就被析构掉了
return Result(sp, false);//返回临时对象,应该自动匹配右值的资源转移,如果编译不通过,把C++标准调高一点
}
//如果有空余,把任务放入任务队列中
taskQue_.emplace(sp);
taskSize_++;
//因为新放了任务,任务队列肯定不空了,在notEmpty_上进行通知,赶快分配线程执行任务
notEmpty_.notify_all();
//cached模式 任务处理比较紧急 场景:小而快的任务 需要根据任务数量和空闲线程的数量,判断是否需要创建新的线程出来
if (poolMode_ == PoolMode::MODE_CACHED
&& taskSize_ > idleThreadSize_
&& curThreadSize_ < threadSizeThreshHold_)
{
std::cout << ">>> create new thread..." << std::this_thread::get_id() << " exit!" << std::endl;
//创建新的线程对象
auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this, std::placeholders::_1));
int threadId = ptr->getId();
threads_.emplace(threadId, std::move(ptr));
//threads_.emplace_back(std::move(ptr));
//启动线程
threads_[threadId]->start();
//修改线程个数相关的变量
curThreadSize_++;
idleThreadSize_++;
}
//返回任务的Result对象
return Result(sp);
// return task->getResult();
}
//开启线程池
void ThreadPool::start(int initThreadSize)
{
//设置线程池的运行状态
isPoolRunning_=true;
//记录初始线程个数
initThreadSize_ = initThreadSize;
curThreadSize_ = initThreadSize;
//创建线程对象
for (int i = 0; i < initThreadSize_; i++)
{
//创建thread线程对象的时候,把线程函数给到thread线程对象
auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this, std::placeholders::_1));
int threadId = ptr->getId();
threads_.emplace(threadId, std::move(ptr));
//threads_.emplace_back(std::move(ptr));//unique_ptr将左值引用的拷贝构造和赋值都delete了,需要右值(进行资源转移)
}
//启动所有线程 std::vector<Thread*> threads_;
for (int i = 0; i < initThreadSize_; i++)
{
threads_[i]->start(); //需要去执行一个线程函数
idleThreadSize_++;//记录初始空闲线程的数量
}
}
//定义线程函数 线程池的所有线程从任务队列里面消费任务
void ThreadPool::threadFunc(int threadid) //线程函数返回,相应的线程也就结束了
{
/*std::cout << "begin threadFunc tid:" << std::this_thread::get_id() << std::endl;
std::cout << "end threadFunc tid:" << std::this_thread::get_id() << std::endl;*/
auto lastTime = std::chrono::high_resolution_clock().now();
while (isPoolRunning_)
{
std::shared_ptr<Task> task;
{
//先获取锁
std::unique_lock<std::mutex> lock(taskQueMtx_);
std::cout << "tid:" << std::this_thread::get_id()
<< "尝试获取任务..." << std::endl;
//cached模式下,有可能已经创建了很多的线程,但是空闲时间超过60s,应该把多余的线程结束回收掉(超过initThreadSize_数量的线程要进行回收)
//当前时间-上一次线程执行的时间>60s
//每一秒中返回一次 怎么区分:超时返回?还是有任务待执行返回
while (taskQue_.size() == 0)
{
if (poolMode_ == PoolMode::MODE_CACHED)
{
//条件变量,超时返回了
if (std::cv_status::timeout == notEmpty_.wait_for(lock, std::chrono::seconds(1)))
{
auto now = std::chrono::high_resolution_clock().now();
auto dur = std::chrono::duration_cast<std::chrono::seconds>(now - lastTime);
if (dur.count() >= THREAD_MAX_IDLE_TIME && curThreadSize_ > initThreadSize_)
{
//开始回收当前线程
//记录线程数量的相关变量的值修改
//把线程对象从线程列表容器中删除 没有办法 threadFunc <=>thread对象
//threadid=>thread对象=》删除
threads_.erase(threadid);// 这个id不是std::this_thread::getid() 是自己生成的,我们自定义的
curThreadSize_--;
idleThreadSize_--;
std::cout << "threadid:" << std::this_thread::get_id() << "exit!" << std::endl;
return;
}
}
}
else
{
//等待notEmpty条件
notEmpty_.wait(lock);
}
//线程池结束,回收线程资源
if (!isPoolRunning_)
{
threads_.erase(threadid);// 这个id不是std::this_thread::getid() 是自己生成的,我们自定义的
std::cout << "threadid:" << std::this_thread::get_id() << "exit!" << std::endl;
exitCond_.notify_all();
return;
}
}
idleThreadSize_--;//唤醒线程工作,空闲线程-1
std::cout << "tid:" << std::this_thread::get_id()
<< "获取任务成功..." << std::endl;
//从任务队列中取一个任务出来
task = taskQue_.front();
taskQue_.pop();
taskSize_--;
//如果依然有剩余任务,继续通知其它的线程执任务
if (taskQue_.size() > 0)
{
notEmpty_.notify_all();
}
//取出一个任务,进行通知,通知可以继续提交生产任务
notFull_.notify_all();
}//就应该把锁释放掉
//当前线程负责执行这个任务
if (task != nullptr)
{
//task->run();//执行任务;把任务的返回值setVal方法给到Result
task->exec();
}
idleThreadSize_++;//线程执行完任务,空闲线程+1
lastTime = std::chrono::high_resolution_clock().now();//更新线程执行完任务的时间
}
threads_.erase(threadid);// 这个id不是std::this_thread::getid() 是自己生成的,我们自定义的
std::cout << "threadid:" << std::this_thread::get_id() << "exit!" << std::endl;
exitCond_.notify_all();
}
bool ThreadPool::checkRunningState() const
{
return isPoolRunning_;
}
/// 线程方法实现
int Thread::generateId_ = 0;
//线程构造
Thread::Thread(ThreadFunc func)
:func_(func)
,threadId_(generateId_++)
{}
//线程析构
Thread::~Thread(){}
//启动线程
void Thread::start()
{
//创建一个线程来执行一个线程函数
std::thread t(func_, threadId_);//C++11来说 线程对象t 和线程函数func_
t.detach();//设置分离线程,线程对象t出作用域会析构,但是线程函数不能结束否则程序会挂掉,所以要将线程分离出去,做到二者互不影响
//pthread_detach pthread_t设置成分离线程
//主线程要用pthread_join回收线程,防止孤儿线程的出现
} //获取线程id
int Thread::getId()const
{
return threadId_;
}
/// Task方法实现
Task::Task()
:result_(nullptr)
{}
void Task::exec()
{
result_->setVal(run());//这里发生多态调用
}
void Task::setResult(Result* res)
{
result_ = res;
}
/// Result方法的实现
Result::Result(std::shared_ptr<Task> task, bool isValid)
:isValid_(isValid)
,task_(task)
{
task_->setResult(this);
}
Any Result::get() // 用户调用的
{
if (!isValid_)
{
return "";
}
//task任务如果没有执行完,这里会阻塞用户的线程
sem_.wait();//用户调用get时,如果任务在线程池中,还没有被执行完,那么调用get方法的线程就会阻塞住
return std::move(any_);//右值引用
}
void Result::setVal(Any any)//谁调用的呢??
{
//存储task的返回值
this->any_ = std::move(any);
sem_.post();//已经获取的任务的返回值,增加信号量资源
}
我们的资源回收代码如下:
//线程池析构
ThreadPool::~ThreadPool()
{
isPoolRunning_ = false;
notEmpty_.notify_all();
//等待线程池里面所有的线程返回 有两种状态:阻塞 & 正在执行任务中
std::unique_lock<std::mutex> lock(taskQueMtx_);
exitCond_.wait(lock, [&]()->bool {return threads_.size() == 0; });
}
现在,有的线程没有被回收,线程队列中还有线程,所以就一直阻塞等待了。
线程池的那个线程为什么没有被回收掉?
(时而出现,时而不出现的问题)
我们通过在windows上调试:
我们通过在Linux上进行gdb调试
主要通过gdb attach到正在运行的进程,通过info threads,thread tid,bt等命令查看各个线程的调用堆栈信息,结合项目代码,定位到发生死锁的代码片段,分析死锁问题发生的原因
2.分析问题
原先针对上面的2种情况的处理方法如下:
第3种情况:
有的线程执行完任务,又进入while循环了
在这里有2种情况:
1、pool线程先获取到锁,线程池的线程获取不到锁,阻塞。
此时pool线程看wait条件,size>0,不满足条件,就进入等待wait状态了,并且把互斥锁mutex释放掉。
线程池的线程就获取到锁了,发现任务队列没有任务了,这个任务就在notEmpty条件变量上wait,但是此时pool线程没有办法再对这个条件变量notify了。
发生死锁了!!!
2、线程池里的线程先获取到锁,发生任务队列为空,在条件变量notEmpty上wait了,释放锁,然后pool线程抢到锁,只是看exitCond条件变量的wait条件,看size还是大于0,还是死锁了。
解决方法:pool线程获取到锁后再notify
//线程池析构
ThreadPool::~ThreadPool()
{
isPoolRunning_ = false;
//等待线程池里面所有的线程返回 有两种状态:阻塞 & 正在执行任务中
std::unique_lock<std::mutex> lock(taskQueMtx_);
notEmpty_.notify_all();
exitCond_.wait(lock, [&]()->bool {return threads_.size() == 0; });
}
我们在消费者线程进行锁+双重判断:
//定义线程函数 线程池的所有线程从任务队列里面消费任务
void ThreadPool::threadFunc(int threadid)//线程函数返回,相应的线程也就结束了
{
auto lastTime = std::chrono::high_resolution_clock().now();
//所有任务必须执行完成,线程池才可以回收所有线程资源
for (;;)
{
std::shared_ptr<Task> task;
{
//先获取锁,我们要注意控制锁的范围,取完任务,就释放锁
std::unique_lock<std::mutex> lock(taskQueMtx_);
std::cout << "tid:" << std::this_thread::get_id()
<< "尝试获取任务..." << std::endl;
//cached模式下,有可能已经创建了很多的线程,但是空闲时间超过60s,应该把多余的线程
//结束回收掉(超过initThreadSize_数量的线程要进行回收)
//当前时间 - 上一次线程执行的时间 > 60s
//每一秒中返回一次 怎么区分:超时返回?还是有任务待执行返回
//锁 + 双重判断
while (taskQue_.size() == 0)
{
//线程池要结束,回收线程资源
if (!isPoolRunning_)
{
threads_.erase(threadid);//std::this_thread::getid()
std::cout << "threadid:" << std::this_thread::get_id() << " exit!"
<< std::endl;
exitCond_.notify_all();
return;//线程函数结束,线程结束
}
if (poolMode_ == PoolMode::MODE_CACHED)
{
//条件变量,超时返回了
if (std::cv_status::timeout ==
notEmpty_.wait_for(lock, std::chrono::seconds(1)))
{
auto now = std::chrono::high_resolution_clock().now();
auto dur = std::chrono::duration_cast<std::chrono::seconds>(now - lastTime);
if (dur.count() >= THREAD_MAX_IDLE_TIME
&& curThreadSize_ > initThreadSize_)//任务数量大于空闲线程数量
{
//开始回收当前线程
//记录线程数量的相关变量的值修改
//把线程对象从线程列表容器中删除 没有办法 threadFunc《=》thread对象
//通过threadid => thread对象 => 删除
threads_.erase(threadid);//std::this_thread::getid()
curThreadSize_--;
idleThreadSize_--;
std::cout << "threadid:" << std::this_thread::get_id() << " exit!"
<< std::endl;
return;
}
}
}
else
{
//等待notEmpty条件
notEmpty_.wait(lock);
}
//if (!isPoolRunning_)
//{
// threads_.erase(threadid);//std::this_thread::getid()
// std::cout << "threadid:" << std::this_thread::get_id() << " exit!"
// << std::endl;
// exitCond_.notify_all();
// return;//结束线程函数,就是结束当前线程了!
//}
}
idleThreadSize_--;
std::cout << "tid:" << std::this_thread::get_id()
<< "获取任务成功..." << std::endl;
//从任务队列种取一个任务出来
task = taskQue_.front();
taskQue_.pop();
taskSize_--;
//如果依然有剩余任务,继续通知其它得线程执行任务
if (taskQue_.size() > 0)
{
notEmpty_.notify_all();
}
//取出一个任务,进行通知,通知可以继续提交生产任务
notFull_.notify_all();
} //就应该把锁释放掉
//当前线程负责执行这个任务
if (task != nullptr)
{
//task->run();//执行任务;把任务的返回值setVal方法给到Result,基类指针调用派生类对象的同名覆盖方法
task->exec();//用户还是使用run方法
}
idleThreadSize_++;
lastTime = std::chrono::high_resolution_clock().now();//更新线程执行完任务的时间
}
}