C++实现线程池
- 一、前言
- 二、线程池的接口设计
- 2.1、类封装
- 2.2、线程池的初始化
- 2.3、线程池的启动
- 2.4、线程池的停止
- 2.5、线程的执行函数run()
- 2.6、任务的运行函数
- 2.7、等待所有线程结束
- 三、测试线程池
- 四、源码地址
- 总结
一、前言
C++实现的线程池,可能涉及以下知识点:
- decltype。
- packaged_task。
- make_shared。
- mutex。
- unique_lock。
- notify_one。
- future。
- queue。
- bind。
- thread。
等等。
二、线程池的接口设计
(1)封装一个线程池的类。
(2)线程池的初始化:设置线程的数量。
(3)启动线程池:创建线程等工作。
(4)执行任务的函数。
(5)停止线程池。
(6)等所有任务执行完成,退出执行函数。
2.1、类封装
线程池类,采用c++11来实现。
#ifndef _CPP_THREAD_POOL_H_
#define _CPP_THREAD_POOL_H_
#include <iostream>
#include <functional>
#include <memory>
#include <queue>
#include <mutex>
#include <vector>
#include <thread>
#include <future>
#ifdef WIN32
#include <windows.h>
#else
#include <sys/time.h>
#endif
using namespace std;
void getNow(timeval *tv);
int64_t getNowMs();
#define TNOW getNow()
#define TNOWMS getNowMs()
class CPP_ThreadPool{
protected:
struct TaskFunc{
TaskFunc(uint64_t expireTime):_expireTime(expireTime){}
int64_t _expireTime=0;//超时的绝对时间
function<void()> _func;
};
typedef shared_ptr<TaskFunc> TaskFuncPtr;
/*
* @brief 获取任务 **
*@return TaskFuncPtr
*/
bool get(TaskFuncPtr& task);
/*
* @brief 线程池是否退出
*/
bool isTerminate()
{
return _bTerminate;
}
/*
* @brief 线程运行态
*/
void run();
public:
/*
* @brief 构造函数
*/
CPP_ThreadPool();
/*
* @brief 析构, 会停止所有线程
*/
virtual ~CPP_ThreadPool();
/*
* * @brief 初始化.
* * @param num 工作线程个数
*/
bool init(size_t num);
/*
* @brief 停止所有线程, 会等待所有线程结束
*/
void stop();
/*
* @brief 启动所有线程
*/
bool start();
/*
* @brief 等待当前任务队列中, 所有工作全部结束(队列无任务).
* @param millsecond 等待的时间(ms), -1:永远等待
* @return true, 所有工作都处理完毕
* false,超时退出
*/
bool waitForAllDone(int millsecond=-1);
/*
* @brief 获取线程个数.
* @return size_t 线程个数
*/
size_t getThreadNum()
{
unique_lock<mutex> lock(_mutex);
return _threads.size();
}
/*
* @brief 获取当前线程池的任务数
* @return size_t 线程池的任务数
*/
size_t getJobNum()
{
unique_lock<mutex> lock(_mutex);
return _tasks.size();
}
/*
* @brief 用线程池启用任务(F是function, Args是参数) **
* @param ParentFunctor
* @param tf
* @return 返回任务的future对象, 可以通过这个对象来获取返回值
*/
template <class F,class... Args>
auto exec(F&& f, Args&&... args)->future<decltype(f(args...))>
{
return exec(0,f,args...);
}
/*
* unused.
*
* @brief 用线程池启用任务(F是function, Args是参数)
* @param 超时时间 ,单位ms (为0时不做超时控制) ;若任务超时,此任务将被丢弃
* @param bind function
* @return 返回任务的future对象, 可以通过这个对象来获取返回值
*
* template <class F, class... Args>
* 它是c++里新增的最强大的特性之一,它对参数进行了高度泛化,它能表示0到任意个数、任意类型的参数
* auto exec(F &&f, Args &&... args) -> std::future<decltype(f(args...))>
* std::future<decltype(f(args...))>:返回future,调用者可以通过future获取返回值
* 返回值后置
*/
template<class F,class... Args>
auto exec(int64_t timeoutMs,F&& f,Args&&... args) -> future<decltype(f(args...))>
{
//获取现在时间
int64_t expireTime=(timeoutMs==0)?0:TNOWMS+timeoutMs;
// 定义返回值类型
using retType=decltype(f(args...));
// 封装任务
auto task=make_shared<packaged_task<retType()>>(bind(forward<F>(f),forward<Args>(args)...));
// 封装任务指针,设置过期时间
TaskFuncPtr fPtr=make_shared<TaskFunc>(expireTime);
fPtr->_func=[task](){
(*task)();
};
unique_lock<mutex> lock(_mutex);
// 插入任务
_tasks.push(fPtr);
// 唤醒阻塞的线程,可以考虑只有任务队列为空的情 况再去notify
_condition.notify_one();
return task->get_future();
}
protected:
size_t _threadNum;//线程数量
bool _bTerminate;//判定是否终止线程池
mutex _mutex; //唯一锁
vector<thread*> _threads; //工作线程数组
queue<TaskFuncPtr> _tasks; //任务队列
condition_variable _condition;//条件变量
atomic<int> _atomic{0};//原子变量
};
#endif
使用示例:
CPP_ThreadPool tpool;
tpool.init(5); //初始化线程池线程数
//启动线程方式
tpool.start();
//将任务丢到线程池中*
tpool.exec(testFunction, 10); //参数和start相同
//等待线程池结束
tpool.waitForAllDone(1000); //参数<0时, 表示无限等待(注意有人调用stop也会推出)
//此时: 外部需要结束线程池是调用
tpool.stop();
注意:ZERO_ThreadPool::exec执行任务返回的是个future, 因此可以通过future异步获取结果, 比如:
int testInt(int i)
{
return i;
}
auto f = tpool.exec(testInt, 5);
cout << f.get() << endl; //当testInt在线程池中执行后, f.get()会返回数值5
class Test
{
public:
int test(int i);
};
Test t;
auto f = tpool.exec(std::bind(&Test::test, &t, std::placeholders::_1), 10);
//返回的future对象, 可以检查是否执行
cout << f.get() << endl;
2.2、线程池的初始化
主要是设置线程池中线程的数量,如果线程池已经存在则直接返回,防止重复初始化。
bool CPP_ThreadPool::init(size_t num)
{
unique_lock<mutex> lock(_mutex);
if(!_threads.empty())
return false;
_threadNum=num;
return true;
}
2.3、线程池的启动
根据设置的线程数量,创建线程并保存在一个数组中。如果线程池已经存在则直接返回,防止重复启动。
bool CPP_ThreadPool::start()
{
unique_lock<mutex> lock(_mutex);
if(!_threads.empty())
return false;
for(size_t i=0;i<_threadNum;i++)
{
_threads.push_back(new thread(&CPP_ThreadPool::run,this));
}
return true;
}
2.4、线程池的停止
设置线程退出条件,并通知所有线程。停止时,要等待所有线程都执行完任务,再销毁线程。
需要注意锁的粒度。
void CPP_ThreadPool::stop()
{
// 注意要有这个{},不然会死锁。
{
unique_lock<mutex> lock(_mutex);
_bTerminate=true;
_condition.notify_all();
}
size_t thdCount=_threads.size();
for(size_t i=0;i<thdCount;i++)
{
if(_threads[i]->joinable())
{
_threads[i]->join();
}
delete _threads[i];
_threads[i]=NULL;
}
unique_lock<mutex> lock(_mutex);
_threads.clear();
}
2.5、线程的执行函数run()
读取任务:判断任务是否存在,如果任务队列为空,则进入等待状态直到任务队列不为空或退出线程池(这里需要两次判断,因为可能存在虚假唤醒)。
执行任务:调用匿名函数。
检测所有任务都是否执行完毕:这里使用了原子变量来检测任务是否都执行完,原因在于任务队列为空不代表任务已经执行完(任务可能还在运行中、也可能是任务刚弹出还没运行),使用原子变量来计数就更严谨。
bool CPP_ThreadPool::get(TaskFuncPtr& task)
{
unique_lock<mutex> lock(_mutex);
if(_tasks.empty())//判断任务是否存在
{
_condition.wait(lock,[this]{
return _bTerminate || !_tasks.empty();//唤醒条件
});
}
if(_bTerminate)
return false;
if(!_tasks.empty())//判断任务是否存在
{
task=move(_tasks.front());// 使用移动语义
_tasks.pop();//弹出一个任务
return true;
}
return false;
}
// 执行任务的线程
void CPP_ThreadPool::run()
{
while(!isTerminate())
{
TaskFuncPtr task;
// 读取任务
bool ok=get(task);
if(ok)
{
++_atomic;
try
{
if(task->_expireTime!=0 && task->_expireTime < TNOWMS)
{
// 处理超时任务
}
else
task->_func();//执行任务
}
catch(...)
{}
--_atomic;
// 任务执行完毕,这里只是为了通知waitForAllDone
unique_lock<mutex> lock(_mutex);
if(_atomic==0 && _tasks.empty())
_condition.notify_all();
}
}
}
2.6、任务的运行函数
这里使用了可变模块参数、智能指针、bind、function、捕获列表的相关技术知识。
返回任务的future对象, 可以通过这个对象来获取返回值。
超时时间 ,单位ms (为0时不做超时控制) ;若任务超时,此任务将被丢弃。
可变模块参数对参数进行了高度泛化,它能表示0到任意个数、任意类型的参数。
/*
* @brief 用线程池启用任务(F是function, Args是参数) **
* @param ParentFunctor
* @param tf
* @return 返回任务的future对象, 可以通过这个对象来获取返回值
*/
template <class F,class... Args>
auto exec(F&& f, Args&&... args)->future<decltype(f(args...))>
{
return exec(0,f,args...);
}
/*
* unused.
*
* @brief 用线程池启用任务(F是function, Args是参数)
* @param 超时时间 ,单位ms (为0时不做超时控制) ;若任务超时,此任务将被丢弃
* @param bind function
* @return 返回任务的future对象, 可以通过这个对象来获取返回值
*
* template <class F, class... Args>
* 它是c++里新增的最强大的特性之一,它对参数进行了高度泛化,它能表示0到任意个数、任意类型的参数
* auto exec(F &&f, Args &&... args) -> std::future<decltype(f(args...))>
* std::future<decltype(f(args...))>:返回future,调用者可以通过future获取返回值
* 返回值后置
*/
template<class F,class... Args>
auto exec(int64_t timeoutMs,F&& f,Args&&... args) -> future<decltype(f(args...))>
{
//获取现在时间
int64_t expireTime=(timeoutMs==0)?0:TNOWMS+timeoutMs;
// 定义返回值类型
using retType=decltype(f(args...));
// 封装任务
auto task=make_shared<packaged_task<retType()>>(bind(forward<F>(f),forward<Args>(args)...));
// 封装任务指针,设置过期时间
TaskFuncPtr fPtr=make_shared<TaskFunc>(expireTime);
fPtr->_func=[task](){
(*task)();
};
unique_lock<mutex> lock(_mutex);
// 插入任务
_tasks.push(fPtr);
// 唤醒阻塞的线程,可以考虑只有任务队列为空的情 况再去notify
_condition.notify_one();
return task->get_future();
}
2.7、等待所有线程结束
bool CPP_ThreadPool::waitForAllDone(int millsecond)
{
unique_lock<mutex> lock(_mutex);
if(_tasks.empty())
return true;
if(millsecond<0)
{
_condition.wait(lock,[this]{ return _tasks.empty();});
return true;
}
else
{
return _condition.wait_for(lock,chrono::milliseconds(millsecond),[this]{ return _tasks.empty();});
}
}
三、测试线程池
#include <iostream>
#include "cppThreadPool.h"
using namespace std;
void func1(int a)
{
cout << "func1() a=" << a << endl;
}
void func2(int a, string b)
{
cout << "func2() a=" << a << ", b=" << b<< endl;
}
void func3()
{
cout<<"func3"<<endl;
}
void test01()
{
cout<<"test 01"<<endl;
CPP_ThreadPool threadpool;
threadpool.init(2);
threadpool.start();//启动线程池
// 执行任务
threadpool.exec(func1,10);
threadpool.exec(func2,20,"FLY.");
threadpool.exec(1000,func3);
threadpool.waitForAllDone();
threadpool.stop();
}
int func1_future(int a)
{
cout << "func1() a=" << a << endl;
return a;
}
string func2_future(int a, string b)
{
cout << "func2() a=" << a << ", b=" << b<< endl;
return b;
}
void test02()
{
cout<<"test 02"<<endl;
CPP_ThreadPool threadpool;
threadpool.init(2);
threadpool.start();//启动线程池
future<decltype(func1_future(0))> ret01=threadpool.exec(func1_future,10);
future<string> ret02=threadpool.exec(func2_future,20,"FLY.");
threadpool.waitForAllDone();
cout<<"ret01 = "<<ret01.get()<<endl;
cout<<"ret02 = "<<ret02.get()<<endl;
threadpool.stop();
}
class Test{
public:
int test(int a)
{
cout<<_name<<": a = "<<a<<endl;
return a+1;
}
void setname(string name)
{
_name=name;
}
string _name;
};
void test03()
{
cout<<"test 03"<<endl;
CPP_ThreadPool threadpool;
threadpool.init(2);
threadpool.start();//启动线程池
Test t1;
Test t2;
t1.setname("Test 1");
t2.setname("Test 2");
auto f1=threadpool.exec(bind(&Test::test,&t1,placeholders::_1),10);
auto f2=threadpool.exec(bind(&Test::test,&t2,placeholders::_1),20);
threadpool.waitForAllDone();
cout<<"f1 = "<<f1.get()<<endl;
cout<<"f2 = "<<f2.get()<<endl;
threadpool.stop();
}
int main(int argc,char **argv)
{
// 简单测试线程池
test01();
// 测试任务函数返回值
test02();
// 测试类对象函数的绑定
test03();
return 0;
}
执行结果:
test 01
func1() a=10
func2() a=20, b=FLY.
func3
test 02
func1() a=10
func2() a=20, b=FLY.
ret01 = 10
ret02 = FLY.
test 03
Test 1: a = 10
Test 2: a = 20
f1 = 11
f2 = 21
四、源码地址
源码已经上传github。
总结
线程池的核心:初始化、线程启动、执行函数、线程停止。