实现
首先要有线程池这个结构体:
- 互斥锁
- 条件变量
- 消息队列
- 关闭标志位
struct Pool {
std::mutex mtx;
std::condition_variable cond;
std::queue<std::function<void()>> tasks;
bool isClosed;//用来退出无限循环
};
【首先是线程池的对象】其实就是维护一个共享指针,指向这个pool结构体对象。注意是共享指针,因为这里的多个线程都要指向这个结构体对象
【线程池的构造函数】
- 开n个线程来detch。
- 最重要的是先加锁,随后开个无限的true循环(锁要给共享变量使用,所以在外面定义):
- 如果有任务,那么就取出任务来,随后解锁进行任务的执行。执行完再上锁进行下一个循环。
- 如果没有任务,则查看标志位是否为false,如果是则解锁
- 不是上述两种情况,则条件变量等待
【析构函数】
- 加锁,将标志位关闭,随后通知所有条件变量。
【添加任务】
- 加锁,将任务添加至队列中,通知一个条件变量。
- 注意由于是模板函数,需要加入完美转发
代码:
class ThreadPool {
public:
//构造函数--explicit是为了防止隐形转换
explicit ThreadPool(size_t thread_num = 1):_pool(make_shared<Pool>()) {
assert(thread_num > 0);
for(size_t i = 0; i < thread_num; ++i) {
thread([pool = _pool](){
unique_lock<mutex>ul(pool->mx);
while(true) {
if(!pool->tasks.empty()) {
auto task = move(pool->tasks.front());
pool->tasks.pop_front();
ul.unlock();
task();
ul.lock();
}
else if(pool->is_close) break;
else pool->cv.wait(ul);
}
}).detach();
}
}
~ThreadPool() {
{
lock_guard<mutex> lg(_pool->mx);
_pool->is_close = true;
}
_pool->cv.notify_all();
}
//因为模板函数,所以注意「完美转发」问题
template<typename T>
void AddTask(T&& task){
{
lock_guard<mutex> lg(_pool->mx);
_pool->tasks.push_back(forward<T>(task));
}
_pool->cv.notify_one();
}
private:
struct Pool {
mutex mx;
condition_variable cv;
deque<function<void()>> tasks;
bool is_close;
};
shared_ptr<Pool> _pool;
};
测试
首先注意,要引入两个头文件:
unistd.h
这个是有linux下的sleepbits/stdc++.h
这个是有线程池、信号变量等
int main() {
ThreadPool p(4);
p.AddTask([](){sleep(1);cout << "hello,world" << endl;});
p.AddTask([](){sleep(1);cout << "hello,world" << endl;});
p.AddTask([](){sleep(1);cout << "hello,world" << endl;});
p.AddTask([](){sleep(1);cout << "hello,world" << endl;});
sleep(5);//睡眠5s
return 0;
}
//注意编译程序时,是需要--> g++ test.cc -pthread -o test
当线程池数量为1时,会顺序输出:
当线程池数量为2时,会一下子输出2个:
当线程池数量为4时,会一下子都输出: