1. 线程池的了解
预先申请线程。线程池维护着多个线程,等待着监督管理者分配可并发执行的任务,这避免了在处理短时间任务时创建与销毁线程的代价。
2.线程池框架
2.1 线程的封装
首先我们需要写线程的构造函数,他的编号是什么,回调函数是什么,给回调函数的参数是什么?
然后是调用pthread_create接口 让线程运行起来,以及调用pthread_join(),等待线程。还额外提供了一个可以直接返回线程名字的接口。
这里由于传入的num是要编写入线程名字的,不在传入的args参数里面,我们进行第二次封装
封装Thread.hpp
2.2 线程池基本结构 ThreadPool.hpp
- 在线程池的构造函数中,我们需要先创建出可以构造线程的空间,并把他们放在一起(vector里)便于管理。
- 我们还需要明确线程的具体功能是什么,哪个线程去哪个回调函数。(这里为了演示,全部调用同一函数)
- 由于线程池本身就是预先申请的线程,所以一个线程池对象如果没有结束生命周期,里面的线程也不会停止。所以我们把pthread_join接口放在线程池的析构函数中。
- 派发任务时,所有的线程都需要从task_queue_里拿任务,在使用时需要保证线程安全,所以还需要封装一把锁。
- 还需要条件变量来唤醒程序
2.2.1 锁的封装
#pragma once
#include <iostream>
#include <pthread.h>
class Mutex
{
public:
Mutex(pthread_mutex_t *mtx):pmtx_(mtx)
{}
void lock()
{
// std::cout << "要进行加锁" << std::endl;
pthread_mutex_lock(pmtx_);
}
void unlock()
{
// std::cout << "要进行解锁" << std::endl;
pthread_mutex_unlock(pmtx_);
}
~Mutex()
{}
private:
pthread_mutex_t *pmtx_;
};
// RAII风格的加锁方式
class lockGuard
{
public:
lockGuard(pthread_mutex_t *mtx):mtx_(mtx)
{
mtx_.lock();
}
~lockGuard()
{
mtx_.unlock();
}
private:
Mutex mtx_;
};
说明:
先在程序内定义一把锁,初始化完成后,在需要加锁的地方用初始化后的锁作为lockGuard的构造函数的参数,创建一个 lockGuard对象,对象在创建时会调用构造函数,就会上锁。出了作用域后,自动调用析构函数以解锁。
2.2.2 线程池的具体代码
#pragma once
#include <iostream>
#include <queue>
#include <vector>
#include <string>
#include <unistd.h>
#include "Thread.hpp"
#include "lockGuard.hpp"
const int g_thread_num=3;
template<class T>
class ThreadPool
{
public:
ThreadPool(int thread_num=g_thread_num)
:num_(thread_num)
{
//创造线程的空间 构造线程
for(int i=1;i<=num_;i++)
{
//每个线程的编号 回调函数 输出型参数
threads_.push_back(new Thread(i,routine,nullptr)); //nullptr后续会改
}
pthread_mutex_init(&lock,nullptr);
pthread_cond_init(&cond,nullptr);
}
//回调函数 相当于消费者
static void* routine(void* args) //这里单独拎出来说
{
ThreadData* td=(ThreadData*)(args);
}
//创造线程 pthread_create
void run()
{
for(auto& iter:threads_)
{
iter->start();
std::cout<<iter->name()<<" 启动成功"<<std::endl;
}
}
//相当于生产者
void pushTask(const T& task)
{
lockGuard lockguard(&lock)
task_queue_.push(task);
pthread_cond_signal(&cond);
}
~ThreadPool()
{
for(auto& iter:threads_)
{
iter->join();
delete iter;
}
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);
}
private:
std::vector<Thread*> threads_;
int num_;
std::queue<T> task_queue_;
pthread_mutex_t lock;//保护临界区(任务队列)的一把锁
pthread_cond_t cond;
};
2.2.3 static void* routine
由于回调函数是类似消费者的角色,但问题是他是一个静态成员函数,无法访问类内的属性,他要如何 task_queue_.pop(); ?
解决方法,我们在线程给回调函数的参数栏中传入this指针。
即上面的threads_.push_back(new Thread(i,routine,/*nullptr*/ this ));
说明:
由于lock task_queue_都是成员属性,静态成员函数中无法访问,所以我们需要在线程池内多添加几个接口以便于用tp指针帮助我们完成。
为了使用lockGuard的构造和析构函数以上锁和解锁,我们弄了一个作用域。
static void* routine(void* args)
{
ThreadData* td=(ThreadData*)(args);
ThreadPool<T>*tp=(ThreadPool<T>)td->args_;
while(true)
{
T task;
{
lockGuard lockguard(tp->getMutex()/*&lock*/);
while(tp->isEmpty()/*task_queue_.empty()*/) tp->waitCond();
// 读取任务
task = tp->getTask(); // 任务队列是共享的-> 将任务从共享,拿到自己的私有空间
}
task(td->name_);
}
}
class ThreadPool
{
//...
public:
pthread_mutex_t *getMutex()
{
return &lock;
}
bool isEmpty()
{
return task_queue_.empty();
}
void waitCond()
{
pthread_cond_wait(&cond, &lock);
}
T getTask()
{
T t = task_queue_.front();
task_queue_.pop();
return t;
}
//...
};
3.调用模块
3.1 任务模块
#pragma once
#include <iostream>
#include <string>
#include <functional>
typedef std::function<int(int, int)> func_t;
class Task
{
public:
Task(){}
Task(int x, int y, func_t func):x_(x), y_(y), func_(func)
{}
void operator ()(const std::string &name)
{
std::cout << "线程 " << name << " 处理完成, 结果是: " << x_ << "+" << y_ << "=" << func_(x_, y_) << std::endl;
// logMessage(WARNING, "%s处理完成: %d+%d=%d | %s | %d",
// name.c_str(), x_, y_, func_(x_, y_), __FILE__, __LINE__);
}
public:
int x_;
int y_;
// int type;
func_t func_;
};
3.2 主函数模块
#include "ThreadPool.hpp"
#include "Task.hpp"
#include <ctime>
#include <cstdlib>
#include <iostream>
#include <unistd.h>
int main()
{
srand((unsigned long)time(nullptr)^getpid());
ThreadPool<Task>*tp=new ThreadPool<Task>();
tp->run();
while(true)
{
int x=rand()%100+1;
usleep(7234);
int y=rand()%30+1;
Task t(x,y,[](int x,int y)->int
{
return x+y;
});
std::cout<<"制作任务完成: "<<x <<" + "<<y<<"=?"<<std::endl;
//推送任务
tp->pushTask(t);
sleep(1);
}
return 0;
}