一.介绍
1.1概念
一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。
线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。
可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets
等的数量。
1.2应用场景
- 需要大量的线程来完成任务,且完成任务的时间比较短。
- 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
- 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。短时间内产生大量线程可能使内存到达极限,出现错误。
二.线程池的实现
设计一个线程池的类,用队列来实现。指定线程的数量,并提供pop与push接口。
外部线程可通过push接口将任务放入队列中,线程池中多个线程去执行pop的任务。
代码示例:
基本框架:
#define pthnums 3
template<class T>
class threadpool
{
public:
threadpool(int nums=pthnums)
{
pthread_num=nums;
pthread_mutex_init(&mutex_,nullptr);
pthread_cond_init(&cond_,nullptr);
is_start=true;
}
~threadpool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
static void* Routine(void* argv)//类内函数有this指针,将其设置为静态,argv接受this
{
pthread_detach(pthread_self());
threadpool<T> *tp = static_cast<threadpool<T>*>(argv);
while(true)
{
cout<<"pthread["<<pthread_self()<<"]running"<<endl;
sleep(1);
tp->lockQueue();
while(tp->isempty())
{
tp->waitTask();
}
T t=tp->pop();//拿到任务
tp->unlockQueue();
int one, two;
t.get(&one, &two);
cout<< "新线程 "<<pthread_self()<<" 完成计算任务: " << one << "+" << two << "=" << t.run() << "\n";
}
void start()//创建pthread_num个线程
{
assert(is_start);
for(int i=0;i<pthread_num;i++)
{
pthread_t tid;
pthread_create(&tid,nullptr,Routine,this);
}
is_start=false;
}
}
private:
//封装的接口
void lockQueue()
{
pthread_mutex_lock(&mutex_);
}
void unlockQueue()
{
pthread_mutex_unlock(&mutex_);
}
void waitTask()
{
pthread_cond_wait(&cond_,&mutex_);
}
void SignalTask()
{
pthread_cond_signal(&cond_);
}
bool isempty()
{
return _q.empty();
}
bool isFull()
{
return _q.size()==capacity;
}
private:
queue<T> _q;
pthread_mutex_t mutex_;//互斥锁
pthread_cond_t cond_;//信号量
int pthread_num;//创建线程数量
bool is_start;
};
pop,push与执行任务的接口:
void push(const T x)
{
lockQueue();
_q.push(x);
unlockQueue();
SignalTask();
}
T pop()
{
T x=_q.front();
_q.pop();
return x;
}
将其改为单例模式:
ThreadPool(const ThreadPool<T> &) = delete;
void operator=(const ThreadPool<T>&) = delete;
增加一静态成员变量,用来创建线程池,并初始为空。
static ThreadPool<T> *instance
template <class T>
ThreadPool<T> *ThreadPool<T>::instance = nullptr;
提供一个访问单例对象的函数
static ThreadPool<T> *getInstance()
{
if (nullptr == instance) //过滤重复的判断
{
if (nullptr == instance)
{
instance = new ThreadPool<T>();
}
}
return instance;
}
任务函数:完成两数相加
class Task
{
public:
Task(int a = 10, int b = 9)
: a_(a), b_(b)
{
}
int run()
{
return a_ + b_;
}
void getTask(int *a, int *b)
{
*a = a_;
*b = b_;
}
private:
int a_;
int b_;
};
完整代码:
#include "Task.hpp"
#define pthnums 3
#include<unistd.h>
#include<queue>
#include<assert.h>
#include<pthread.h>
#include<iostream>
using namespace std;
template <class T>
class threadpool
{
public:
threadpool(const threadpool<T> &) = delete;
void operator=(const threadpool<T> &) = delete;
static threadpool<T> *getInstance()
{
if (nullptr == instance) //过滤重复的判断
{
if (nullptr == instance)
{
instance = new threadpool<T>;
}
}
return instance;
}
threadpool(int nums = pthnums)
{
pthread_num = nums;
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
is_start = true;
}
~threadpool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
static void *Routine(void *argv) //类内函数有this指针,将其设置为静态,argv接受this
{
pthread_detach(pthread_self());
threadpool<T> *tp = static_cast<threadpool<T> *>(argv);
while (true)
{
cout << "pthread[" << pthread_self() << "]running" << endl;
tp->lockQueue();
while (tp->isempty())
{
tp->waitTask();
}
T t = tp->pop(); //拿到任务
tp->unlockQueue();
int one, two;
t.get(&one, &two);
cout << "新线程 " << pthread_self() << " 完成计算任务: " << one << "+" << two << "=" << t.run() << "\n";
}
}
void start() //创建pthread_num个线程
{
assert(is_start);
for (int i = 0; i < pthread_num; i++)
{
pthread_t tid;
pthread_create(&tid, nullptr, Routine, this);
}
is_start = false;
}
void push(const T x)
{
lockQueue();
_q.push(x);
unlockQueue();
SignalTask();
}
T pop()
{
T x = _q.front();
_q.pop();
return x;
}
private:
//封装的接口
void lockQueue()
{
pthread_mutex_lock(&mutex_);
}
void unlockQueue()
{
pthread_mutex_unlock(&mutex_);
}
void waitTask()
{
pthread_cond_wait(&cond_, &mutex_);
}
void SignalTask()
{
pthread_cond_signal(&cond_);
}
bool isempty()
{
return _q.empty();
}
private:
queue<T> _q;
pthread_mutex_t mutex_; //互斥锁
pthread_cond_t cond_; //信号量
int pthread_num; //创建线程数量
bool is_start;
static threadpool<T> *instance;
};
template <class T>
threadpool<T> *threadpool<T>::instance = nullptr;
进行测试:
#include "threadpool.hpp"
#include <time.h>
int main()
{
srand(time(nullptr));
threadpool<Task> *p = threadpool<Task>::getInstance();
int a;
int b;
p->start();
while (true)
{
a = rand() % 100;
b = rand() % 50;
Task t(a, b);
p->push(t);
//sleep(1);
}
return 0;
}
结果: