线程池基础概述
为什么要有线程池?假设没有使用线程池时,一个请求用一个子线程来处理。每来一个请求,都得创建子线程,子线程执行请求,关闭子线程。当请求量(并发)比较大的时候,频繁地创建和关闭子线程,也是有开销的。
因此提出线程池,提前开辟好N个子线程,当有任务过来的时候,先放到任务队列中,之后N个子线程从任务队列中获取任务并执行,这样能大大提高程序的执行效率。
其实当任务数大于线程池中子线程的数目的时候,就需要将任务放到缓冲区(队列)里面,所以本质上还是一个生产者消费者模型。
补充一个查看线程的状态的命令:
ps -elLf | grep xxx 1
面向对象的线程池封装
UML 类图设计
而 TaskQueue 类的类图设计是在之前的文章里就介绍过的,这里只添加了一点改动,将原来的 int 类型数据给改成了一个 Elem 类型,关于 TaskQueue 类可以参考我之前的文章:C++ 面向对象技术实战:实现基于 POSIX 线程标准的线程封装并解决生产者消费者问题;
代码实现
首先本文的代码内容全部建立在之前的解决生产者消费者问题的基础之上,因此代码会复用之前的,如果还没看过的话可以先去学习一下上一篇文章:C++ 面向对象技术实战:实现基于 POSIX 线程标准的线程封装并解决生产者消费者问题;
那么接下来我们就正式开始线程池的代码编写!
首先将之前的代码拷贝进当前目录:
现在我们先对 TaskQueue.h 进行修改:
#ifndef _TASKQUEUE_H__
#define _TASKQUEUE_H__
#include <queue>
#include "MutexLock.h"
#include "Condition.h"
#include "Task.h"
using std::queue;
//先重定义一个任务类型Task,泛化接收各种数据
using Elem = Task*;
class TaskQueue{
public:
TaskQueue(size_t _queSize);
~TaskQueue();
bool empty() const;
bool full() const;
void push(const Elem &value);
Elem pop();
//用来唤醒 ThreadPool 中阻塞线程使用的函数
//将所有等待在_notEmpty上的线程唤醒
void wakeup();
private:
size_t _queSize;
queue<Elem> _que;
MutexLock _mutex;
Condition _notEmpty;
Condition _notFull;
bool _flag;
};
#endif
因为引入了 Task 类型,因此我们现在补一下 Task.h :
#ifndef _TASK_H__
#define _TASK_H__
//对于 Task 类只需要有一个虚方法process即可
//因此 Task 实际上是个接口
class Task{
public:
//其子类当实现这个接口以提供给线程池要执行的任务
virtual void process() = 0;
//如果一个类里面含有虚函数,那么这个时候将它的
//析构函数也给设为虚函数是一种最佳实践
virtual ~Task(){
}
};
#endif
现在我们就可以去写一个 TaskQueue.cc 实现文件了:
#include "TaskQueue.h"
TaskQueue::TaskQueue(size_t queSize)
:_queSize(queSize)
,_que()
,_mutex()
,_notEmpty(_mutex)
,_notFull(_mutex)
,_flag(true)
{
}
TaskQueue::~TaskQueue(){
}
bool TaskQueue::empty() const{
return _que.size() == 0;
}
bool TaskQueue::full() const{
return _que.size() == _queSize;
}
void TaskQueue::push(const Elem &value){
MutexLockGuard autoLock(_mutex);
while(full()){
_notFull.wait();
}
_que.push(value);
_notEmpty.notify();
}
Elem TaskQueue::pop(){
MutexLockGuard autoLock(_mutex);
while(_flag && empty()){
_notEmpty.wait();
}
if(_flag){
Elem res = _que.front();
_que.pop();
_notFull.notify();
return res;
}
else{
return nullptr;
}
}
void TaskQueue::wakeup(){
_flag = false;
_notEmpty.notifyAll();
}
接下来就开始实现重头戏,也就是线程池这个类 ThreadPool.h :
#ifndef __THREADPOOL_H__
#define __THREADPOOL_H__
#include "Thread.h"
#include "TaskQueue.h"
#include <vector>
#include <memory>
using std::vector;
using std::unique_ptr;
class ThreadPool{
//这样WorkThread才能访问私有的 threadFunc 方法
friend class WorkThread;
public:
ThreadPool(size_t threadNum,size_t queSize);
~ThreadPool();
void start(); //线程池启动,也就是初始化所有子线程
void stop(); //线程池关闭,也就是join掉所有子线程
void addTask(Task*); //往线程池中的任务队列里添加任务
Task* getTask(); //从任务队列中获取任务
private:
void threadFunc(); //子线程真正执行的操作
private:
size_t _threadNum; //线程池大小,也就是线程数量
size_t _queSize; //任务队列大小
vector<unique_ptr<Thread>> _threads; //用来存放线程的容器
TaskQueue _taskQue; //任务队列
bool _isExit; //线程池是否退出
};
#endif
然后是实现文件 ThreadPool.cc:
#include "ThreadPool.h"
#include "WorkThread.h"
#include <unistd.h>
ThreadPool::ThreadPool(size_t threadNum,size_t queSize)
: _threadNum(threadNum)
,_queSize(queSize)
,_taskQue(_queSize)
,_isExit(false)
{
//初始化 vector 的空间大小
_threads.reserve(_threadNum);
}
ThreadPool::~ThreadPool(){
if(!_isExit){
stop();
_isExit = true;
}
}
//线程池开始执行的时候,其实就是工作线程已经开启
void ThreadPool::start(){
//先将全部的线程创建并保存起来
for(size_t idx=0; idx<_threadNum; ++idx){
//创建线程
unique_ptr<Thread> up(new WorkThread(*this));
//存入线程容器中
//注意对于 unique_ptr而言是没有拷贝构造函数的
//因此要用move将其转为右值调用移动语义来完成push_back
_threads.push_back(std::move(up));
}
//然后再全部启动起来
for(auto& th : _threads){
th->start();//创建工作线程的id,让工作线程开始运行
}
}
//线程池关闭,也就是join掉所有子线程
//子线程也就是工作线程WorkThread就是从Thread继承过来的
//所以每个工作线程执行join方法即可
void ThreadPool::stop(){
//确保_taskQue任务队列中的任务都被做完了才开始关闭线程池
while(!_taskQue.empty()){
sleep(1);
}
//表示线程池将要退出了
_isExit = true;
//唤醒一下可能正在沉睡的线程以进行join
_taskQue.wakeup();
//进行线程回收
for(auto& th : _threads){
th->join();
}
}
//往线程池中的任务队列里添加任务
//等同于生产者消费者问题中的生产者角色
void ThreadPool::addTask(Task* ptask){
if(ptask){
_taskQue.push(ptask);
}
}
//从任务队列中获取任务
Task* ThreadPool::getTask(){
return _taskQue.pop();
}
//在线程池中封装的任务,这个任务的实际执行者就是WorkThread
//这个threadFunc方法也就是子线程真正所要执行的操作
void ThreadPool::threadFunc(){
//只要线程池还没有退出,那么就应该一直获取并执行任务
while(!_isExit){
//先获取任务
//但如果getTask()执行的过快的话,会来不及将_isExit设置为 true
//会一直卡在getTask这里,也就是卡在Condition的wait上面,
//那这样的话就没法正常退出线程池了
//因此我们要让getTask()执行的慢一点,让线程池的stop也有可能先执行,
//让stop函数将_isExit 设置为 true,这样就能够顺利退出线程池
Task* ptask = getTask();
//再执行任务
if(ptask){
ptask->process();
}
}
}
在编写 ThreadPool.cc 的过程中我们用到了 WorkThread 类,因此我们现在去将 WorkThread 类实现一下,依然是先实现头文件 WorkThread.h:
#ifndef __WORKTHREAD_H__
#define __WORKTHREAD_H__
#include "Thread.h"
#include "ThreadPool.h"
class WorkThread: public Thread{
public:
WorkThread(ThreadPool& threadPool): _threadPool(threadPool){
}
void run() override{
_threadPool.threadFunc();
}
~WorkThread(){
}
private:
ThreadPool& _threadPool;
};
#endif
现在只剩 MyTask 文件还没实现了,不过为了方便起见,我们可以将这个类直接写在 TestThreadPool.cc 测试文件中即可,因为毕竟测试也要用到这个嘛,来写一下 TestThreadPool.cc 测试文件:
#include "Task.h"
#include "ThreadPool.h"
#include <stdlib.h>
#include <unistd.h>
#include <iostream>
using namespace std;
class MyTask: public Task{
private:
void process() override{
::srand(clock());
int number = ::rand()%100;
cout << "MyTask number = " << number << endl;
//sleep(1);
}
};
int main(){
//创建一个线程池,线程数量为5,任务队列大小为10
ThreadPool threadPool(5, 10);
//创建任务
unique_ptr<Task> task(new MyTask());
//启动线程池,五个子线程创建并运行
threadPool.start();
//向线程池中投放任务,放20个任务
int cnt = 20;
while(cnt-- > 0){
//因为task是智能指针,是个对象,而addTask函数的参数是Task*
//因此我们可以使用unique_ptr的get函数获得其所托管内存资源的原始裸指针
threadPool.addTask(task.get());
cout << "cnt = " << cnt << endl;
//sleep(1);
}
//关闭线程池,五个子线程进行退出操作
threadPool.stop();
return 0;
}
运行结果如下:
总结
最后的线程池项目目录结构如下:
最后如果有需要源码的朋友,可以直接访问我的仓库获取源码:基于面向对象的方式实现的线程池;
以上就是本文的全部内容。