线程池是一个可以巩固一些线程相关接口 && 加强理解的一个小项目。
注意:这里的线程池使用的线程并不是Linux原生接口,而是经过封装的,具体请看线程封装,为什么不使用原生接口?
因为原生接口一旦进行pthread_create直接就去执行目标函数了,我们就不能很好的控制线程;
而封装过后的接口可以进行start,在适当的时机去执行。
这里具体说一下封装后的线程,构造时需要传入名字与将要执行的可调用对象(函数指针,Lambda,仿函数),启动时start即可。
目录
- 目标图画展示 && 设计思想
- 设计框架
- main的大纲
- ThreadPool.hpp大纲
- 代码实现细节
- 全部代码
目标图画展示 && 设计思想
每当我们有任务时直接将任务push到线程池中即可,新线程会帮助我们处理任务。
那么该如何设计?
从图中可以看出线程池本质上就是一个生产者消费者模型。
而这个模型考虑到3个点足以
其一:一个用来存放数据的数据结构
其二:参与这个模型有生产者与消费者的两个角色
其三:注意生产者与生产者,消费者与消费者,生产者与消费者之间的关系即可。
设计框架
main的大纲
我们一般都是先从主函数进行一个框架的大概设计,并逐步进行填充
当然,以下的main函数肯定不是最终版本。
int main()
{
std::unique_ptr<ThreadPool> tp = std::make_unique<ThreadPool>();
tp->Init();
tp->Start();
while (true)
{
// 生产数据
tp->Equeue(/*push数据*/);
sleep(1);
}
tp->Stop();
return 0;
}
ThreadPool.hpp大纲
将T设置为模板,其中T为可调用对象,为我们的任务
下段代码中比较详细的介绍了各个部分需要做的事情
template <class T>
class ThreadPool
{
public:
void HandlerTask()
{
// 不断循环,直到_isrunning变为false && queue中没有任务后再退出
while (true)
{
// 在这段区域中进行对queue的读取,并进行处理数据
// 这里本质上就是属于消费者部分的临界区了
// 于是需要进行加锁,并且需要处理一下生产者与消费者的互斥关系
}
}
public:
ThreadPool()
{}
~ThreadPool()
{}
void Init()
{
// 创建一批线程(未启动),使用vector管理起来
// 注意创建时需要将 线程名与HandlerTask 都传入
}
void Start()
{
// 启动线程
}
void Equeue()
{
// 这里是生产者进行放入数据的区域
// 与消费者那里类似,同样需要注意对临界区的保护与处理 生产者与消费者的关系
}
void Stop()
{
// 将线程池运行标志位置为false,注意一些细节
}
private: // 可以根据自己的需要进行增删,我这里只是一个实例
std::vector<MyThread> _thds; // 管理线程的数据结构
int _cap; // 线程池容量
std::queue<T> _q; // 生产消费模型中的交易场所
bool _isrunning; // 线程池运行标志位
int _sleep_num; // 睡眠线程个数
pthread_mutex_t _mutex; // 锁
pthread_cond_t _cond; // 条件变量
};
代码实现细节
我们先给出一个任务hpp,没什么好说的,没有一点细节
#include <iostream>
class Task
{
public:
Task(int x, int y) : _x(x), _y(y)
{
}
void operator()()
{
_result = _x + _y;
}
void Debug()
{
std::cout << _x << " + " << _y << " = ?" << std::endl;
}
void Result()
{
std::cout << _x << " + " << _y << " = " << _result << std::endl;
}
private:
int _x;
int _y;
int _result;
};
线程池部分完整代码
template <class T>
class ThreadPool
{
private:
void Lock()
{
pthread_mutex_lock(&_mutex);
}
void Unlock()
{
pthread_mutex_unlock(&_mutex);
}
void WakeUp()
{
pthread_cond_signal(&_cond);
}
void WakeUpAll()
{
pthread_cond_broadcast(&_cond);
}
bool Empty()
{
return _q.empty();
}
void Sleep()
{
pthread_cond_wait(&_cond, &_mutex);
}
public:
void HandlerTask(const std::string &name)
{
while (true)
{
Lock();
while (Empty() && _isrunning)
{
_sleep_num++;
Sleep();
_sleep_num--;
}
if (Empty() && !_isrunning)
{
Unlock();
break;
}
// 有任务
T t = _q.front();
_q.pop();
Unlock();
// 在自己独立的栈帧中并发执行任务。
t();
std::cout << name << " :";
t.Result();
}
}
public:
ThreadPool(int cap = defaultCap) : _cap(cap), _sleep_num(0), _isrunning(false)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
void Init()
{
std::function<void(const std::string &)> f = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);
for (int i = 0; i < _cap; i++)
{
std::string name = "thread-" + std::to_string(i + 1);
_thds.emplace_back(name, f);
}
}
void Start()
{
_isrunning = true;
for (auto &thd : _thds)
{
thd.Start();
}
}
void Equeue(const T &in)
{
Lock();
if (_isrunning)
{
_q.emplace(in);
if (_sleep_num > 0)
{
WakeUp();
}
}
Unlock();
}
void Stop()
{
Lock();
_isrunning = false;
WakeUpAll();
Unlock();
}
private:
std::vector<MyThread> _thds;
int _cap;
std::queue<T> _q;
int _sleep_num;
bool _isrunning;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
};
其中这里有很关键的几个点
一.、HandlerTask中关于生产者与消费者关系的维护
我们的新线程没有任务时直接去休眠即可,但是由于我们还要控制线程池(_isrunning),于是这里需要增加一些逻辑,只有queue中没有任务并且_isrunning为true时才能休眠;
同样退出时需要_isrunning为false并且没有任务才能退出
二、生产者处理数据时要在锁外进行处理,否则在锁内的话就失去了多线程意义了。
三、init中,构造vector时,我们需要HandlerTask传过去,但是由于隐含的this指针,所以利用std::bind就会很舒服
四:stop中,我们除了设为false,还要进行wakeUpAll,原因在于 可能当你的queue中没有任务并且_isrunning还为true都在条件变量下休眠了,这样的话如果stop没有wakeUpAll,那么新线程就会永远sleep。
全部代码
模拟线程的代码:
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
#include <cstdio>
#include <pthread.h>
#include <functional>
namespace cyc
{
class MyThread
{
public:
// typedef void (*func_t)();
using func_t = std::function<void(const std::string&)>;
MyThread(const std::string &name, func_t func) : _func(func), _isRunning(false), _name(name)
{
}
~MyThread()
{
}
void Excute()
{
_isRunning = true;
_func(_name);
_isRunning = false;
}
static void *routine(void *arg)
{
MyThread *self = static_cast<MyThread*>(arg);
self->Excute();
return nullptr;
}
void Start()
{
int n = ::pthread_create(&_tid, nullptr, routine, this);
if (n != 0)
{
perror("pthread_create fail");
exit(1);
}
std::cout << _name << " start..." << std::endl;
}
void Stop()
{
if (_isRunning)
{
pthread_cancel(_tid);
_isRunning = false;
}
}
void Join()
{
int n = pthread_join(_tid, nullptr);
if (n != 0)
{
perror("pthread_join fail");
exit(1);
}
std::cout << _name << " Join sucess..." << std::endl;
}
std::string GetStatus()
{
if (_isRunning)
return "Running";
else
return "sleeping";
}
private:
pthread_t _tid;
func_t _func;
std::string _name;
bool _isRunning;
};
}
任务代码:
#include <iostream>
class Task
{
public:
Task(int x, int y) : _x(x), _y(y)
{
}
void operator()()
{
_result = _x + _y;
}
void Debug()
{
std::cout << _x << " + " << _y << " = ?" << std::endl;
}
void Result()
{
std::cout << _x << " + " << _y << " = " << _result << std::endl;
}
private:
int _x;
int _y;
int _result;
};
线程池代码:
#pragma once
#include "MyThread.hpp"
#include <vector>
#include <queue>
#include <string>
#include <pthread.h>
using namespace cyc;
const int defaultCap = 5;
void test()
{
while (true)
{
std::cout << "hello world" << std::endl;
}
}
template <class T>
class ThreadPool
{
private:
void Lock()
{
pthread_mutex_lock(&_mutex);
}
void Unlock()
{
pthread_mutex_unlock(&_mutex);
}
void WakeUp()
{
pthread_cond_signal(&_cond);
}
void WakeUpAll()
{
pthread_cond_broadcast(&_cond);
}
bool Empty()
{
return _q.empty();
}
void Sleep()
{
pthread_cond_wait(&_cond, &_mutex);
}
public:
void HandlerTask(const std::string &name)
{
while (true)
{
Lock();
while (Empty() && _isrunning)
{
_sleep_num++;
Sleep();
_sleep_num--;
}
if (Empty() && !_isrunning)
{
Unlock();
break;
}
// 有任务
T t = _q.front();
_q.pop();
Unlock();
// 在自己独立的栈帧中并发执行任务。
t();
std::cout << name << " :";
t.Result();
}
}
public:
ThreadPool(int cap = defaultCap) : _cap(cap), _sleep_num(0), _isrunning(false)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
void Init()
{
std::function<void(const std::string &)> f = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);
for (int i = 0; i < _cap; i++)
{
std::string name = "thread-" + std::to_string(i + 1);
_thds.emplace_back(name, f);
}
}
void Start()
{
_isrunning = true;
for (auto &thd : _thds)
{
thd.Start();
}
}
void Equeue(const T &in)
{
Lock();
if (_isrunning)
{
_q.emplace(in);
if (_sleep_num > 0)
{
WakeUp();
}
}
Unlock();
}
void Stop()
{
Lock();
_isrunning = false;
WakeUpAll();
Unlock();
}
private:
std::vector<MyThread> _thds;
int _cap;
std::queue<T> _q;
int _sleep_num;
bool _isrunning;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
};
main代码:
#include <unistd.h>
#include <memory>
#include "ThreadPool.hpp"
#include "Task.hpp"
int main()
{
ThreadPool<Task> *tp = new ThreadPool<Task>;
tp->Init();
tp->Start();
// sleep(1);
int count = 3;
while (count--)
{
// 生产数据
Task t(1, 2);
tp->Equeue(t);
sleep(1);
}
tp->Stop();
delete tp;
return 0;
}
有问随时找博主~