介绍
提交任务函数submitTask
中返回的Result
类型应该是用Result
类包装当前的task
,因为出函数之后task
即如下形式:return Result(task);
Result
和Task
都要互相持有对方的指针,Task
要将任务执行结果通过Result::setVal(run())
调用传给其对应Result
对象。
Result类的声明和实现如下
// threadpool.h
// 提交到线程池的task任务执行完成后的返回值类型
class Result
{
public:
Result(std::shared_ptr<Task> task, bool isValid = true);
~Result() = default;
//获取任务执行后的返回值
void setVal(Any any);
// get方法,用户调用这个方法获取task的返回值
Any get();
private:
Any any_; // 存储任务的返回值
Semaphore sem_; // 线程通信
std::shared_ptr<Task> task_; // 指向对应获取返回值的任务对象
std::atomic_bool isValid_; // 检查返回值是否有效
};
// threadpool.cpp
Result::Result(std::shared_ptr<Task> task, bool isValid)
: isValid_(isValid)
, task_(task)
{
task->setResult(this);
}
//获取任务执行后的返回值
void Result::setVal(Any any)
{
// 存储task的返回值
this->any_ = std::move(any); // Any 类只提供右值拷贝和移动赋值函数
sem_.post(); // 获取任务的返回值,增加信号量资源
}
// get方法,用户调用这个方法获取task的返回值
Any Result::get()
{
if (!isValid_)
{
return "";
}
sem_.wait(); // task任务如果没执行完,这里会阻塞用户线程
return std::move(any_); // Any不提供左值拷贝构造和拷贝赋值
}
Task类的声明和实现如下
// threadpool.h
class Task
{
public:
Task();
~Task() = default;
void exec();
void setResult(Result* res);
// 用户可自定义任务类型,从Task派生出来,重写run方法,实现自定义任务处理
virtual Any run() = 0;
private:
// 不要用智能指针,可能会导致和Result产生循环引用的问题
Result* result_; // Result对象生命周期 > Task对象生命周期
};
// threadpool.cpp
Task::Task()
: result_(nullptr)
{}
void Task::exec()
{
if (nullptr != result_)
{
result_->setVal(run()); // !!!
}
}
void Task::setResult(Result* res)
{
result_ = res;
}
Master-Slave线程模型
Master
线程用来分解任务,然后给各个Slave
分配任务;
等待各个Slave
线程执行完任务,返回结果;
Master
线程合并各个任务结果并输出
测试
使用3个线程来计算1+2+…+300000000 的和(1加到3亿)
测试代码如下:
#include <iostream>
#include <chrono>
#include "threadpool.h"
using uLong = unsigned long long;
class MyTask : public Task
{
public:
MyTask(uLong /*int*/ begin, uLong /*int*/ end)
: begin_(begin)
, end_(end)
{}
Any run()
{
std::cout << "tid:" << std::this_thread::get_id() << " begin!" << std::endl;
//std::this_thread::sleep_for(std::chrono::seconds(5));
uLong sum = 0;
for (uLong i = begin_; i <= end_; i++)
{
sum += i;
}
std::cout << "tid:" << std::this_thread::get_id() << " end!" << std::endl;
return sum;
}
private:
uLong /*int*/ begin_;
uLong /*int*/ end_;
};
int main()
{
ThreadPool pool;
pool.start(4);
Result res1 = pool.submitTask(std::make_shared<MyTask>(1, 1000000000));
Result res2 = pool.submitTask(std::make_shared<MyTask>(1000000001, 2000000000));
Result res3 = pool.submitTask(std::make_shared<MyTask>(2000000001, 3000000000));
uLong sum1 = res1.get().cast_<uLong>();
uLong sum2 = res2.get().cast_<uLong>();
uLong sum3 = res3.get().cast_<uLong>();
// Master - Slave线程模型
std::cout << (sum1 + sum2 + sum3) << std::endl;
getchar();
return 0;
}
出现的问题
测试时最后一个获取任务的线程,没有执行完成
而且,似乎出现了死锁的现象,按任意键无反应
分析
根据卡住的位置,我简单的推测是第三个任务执行的问题,于是在提交第三个子任务处打上断点
到达断点后进入MyTask
构造函数,在初始化列表发现end_
成员被初始化为负数(如下图)
这是由于int类型的最大值在编译器中定义如下
#define INT_MAX 2147483647
小于3000000000,导致了溢出,从而转变成负数。
这样导致在如下的任务执行函数的循环中,程序永远无法出循环
解决办法
将MyTask
类中的成员类型改变为unsigned long long
class MyTask : public Task
{
public:
MyTask(uLong /*int*/ begin, uLong /*int*/ end)
: begin_(begin)
, end_(end)
{}
Any run()
{
std::cout << "tid:" << std::this_thread::get_id() << " begin!" << std::endl;
//std::this_thread::sleep_for(std::chrono::seconds(5));
uLong sum = 0;
for (uLong i = begin_; i <= end_; i++)
{
sum += i;
}
std::cout << "tid:" << std::this_thread::get_id() << " end!" << std::endl;
return sum;
}
private:
uLong /*int*/ begin_;
uLong /*int*/ end_;
};
程序运行后结果正确