线程池
什么是池?
池化技术的核心就是"提前准备并重复利用资源". 减少资源创建和销毁的成本.
那么线程池就是提前准备好一些线程, 当有任务来临时, 就可以直接交给这些线程运行, 当线程完成这些任务后, 并不会被销毁, 而是继续等待任务. 那么这些线程在程序运行过程中只需要创建一次. 相比当任务来临时创建线程, 当线程完成任务后销毁线程这种方法. 省去了很多线程创建销毁的成本.
一次性创建多个线程, 那么就需要将这些线程集中管理起来, 将这先线程集中起来并进行管理, 这样的集合就时线程池.
线程池实现
可以看到在线程池中, 我们至少需要对线程和任务进行管理
线程: 这里我们使用 vector 进行管理
任务: 这里使用 queue 对任务进行管理
首先对于传递给线程的数据先进行一步封装
class ThreadData
{
public:
ThreadData(const std::string& name, void* arg)
:_name(name),
_arg(arg)
{}
std::string _name; // 本线程名称
void* _arg; // 本线程所需(参数)数据
};
然后对线程也进行封装
class Thread
{
public:
Thread(const int& num, const std::string& name, func_t func, void* arg)
:_num(num),
_name(name),
_func(func),
_data(name, arg)
{}
void start() // 只有调用 start 函数, 线程才会真正被创建
{
pthread_create(&_t, NULL, _func, (void*)&_data);
}
void join()
{
pthread_join(_t, NULL);
}
private:
int _num; // 第几号线程
pthread_t _t; // 线程的 pthread_t
std::string _name; // 线程名称
func_t _func; // 线程要执行的函数
ThreadData _data; // 线程所需要的参数(数据)
};
最后再是线程池 threadpool 代码.
#include<iostream>
#include<pthread.h>
#include<vector>
#include<queue>
#include<functional>
#include<unistd.h>
// 对于线程要执行的函数的重命名
typedef void*(*func_t)(void*);
class ThreadData
{
public:
ThreadData(const std::string& name, void* arg)
:_name(name),
_arg(arg)
{}
std::string _name; // 本线程名称
void* _arg; // 本线程所需数据
};
class Thread
{
public:
Thread(const int& num, const std::string& name, func_t func, void* arg)
:_num(num),
_name(name),
_func(func),
_data(name, arg)
{}
void start()
{
pthread_create(&_t, NULL, _func, (void*)&_data);
}
void join()
{
pthread_join(_t, NULL);
}
private:
int _num; // 第几号线程
pthread_t _t; // 线程的 pthread_t
std::string _name; // 线程名称
func_t _func; // 线程要执行的函数
ThreadData _data; // 线程所需要的参数(数据)
};
template<class T>
class threadpool
{
public:
threadpool(int num = 5)
{
for(int i = 0; i < num; ++i)
{
std::string name = "线程" + std::to_string(i);
_threads.push_back(new Thread(i, name, route, (void*)this));
}
}
void run() // 让线程池中的线程都被创建出来, 并运行起来
{
for(int i = 0; i < _threads.size(); ++i)
{
_threads[i]->start();
}
}
// 将数据放入线程池中
void push(const T& data)
{
pthread_mutex_lock(&_mtx);
_tasks.push(data);
pthread_mutex_unlock(&_mtx);
pthread_cond_signal(&_cv);
}
// 析构函数
~threadpool()
{
for(int i = 0; i < _threads.size(); ++i)
{
_threads[i]->join();
}
}
private:
// 线程所要执行的函数
// pthread_create 函数中, 传递给线程的函数类型就是 void*(*)(void*)
// 如果这里不使用 static 静态函数, 而使用成员函数
// 成员函数中默认会添加一个参数(this指针), 这样的化成员函数的类型就和 pthread_create 函数要求的类型不匹配
static void* route(void* arg)
{
// 从 Thread 的 start 函数中可以看到, 我们传递给函数的参数类型是 ThreadData*
// 而 ThreadData 中的 _arg的实际类型, 通过 Thread 的构造函数和 run 函数可以看到
// 给 ThreadData 的第二个参数实际上就 this 指针, this == threadpool<T>*
// 所以 _arg 就是这个线程池对象的指针. 那么通过 _arg 也就能访问到
// 线程池中的 _tasks.
ThreadData* td = (ThreadData*)arg;
threadpool<T>* tp = (threadpool<T>*)td->_arg;
T task;
while(true)
{
// 获取数据
pthread_mutex_lock(&tp->_mtx);
while(tp->_tasks.empty() == true) // 如果没有数据了, 那么就进行等待
{
pthread_cond_wait(&tp->_cv, &tp->_mtx);
}
task = tp->_tasks.front();
tp->_tasks.pop();
pthread_mutex_unlock(&tp->_mtx);
// 处理数据
// 这里直接打印, 便于观察
std::cout << td->_name << ": "<< task << std::endl;
}
return NULL;
}
private:
std::vector<Thread*> _threads; // 管理所有的线程
std::queue<T> _tasks; // 管理所有的任务
pthread_mutex_t _mtx; // 线程间互斥的访问 _tasks
pthread_cond_t _cv; // 当 _tasks 中没有任务时, 让所有的线程进行等待, 直到被唤醒
};
int main()
{
threadpool<int> tp;
tp.run();
for(int i = 0; i < 10000; ++i)
{
tp.push(i);
}
sleep(5);
exit(0);
return 0;
}