目录
1.线程池的概念
2.线程池的实现
3.基于单例模式的线程池
(1).单例模式的概念
(2).基于单例模式的线程池
1.线程池的概念
池化技术本质上都是为了提高效率。线程池也是同理,提前准备好一些线程,用来随时处理任务(处理完任务又放回池子),就称为线程池。为了提高效率。避免了在处理短时间任务时由于创建与销毁线程影响任务处理效率
如果没有线程池,比如用户向服务器发起请求的时候,服务器收到请求需要先创建线程(需要耗时),然后再给用户提供服务,这对用户来说就是不必要的时间消耗。因此,可以提前准备好一些线程,服务器收到用户请求的时候马上就可以从线程池拿出一个线程来给用户提供服务。
2.线程池的实现
基于任务队列实现线程池,在线程池内部有一个任务队列,线程生产任务,将任务放到任务队列中,然后线程池内部的线程竞争式地去任务队列里面拿任务来执行。
(1).大致实现思路
1.线程池内部有NUM个线程,还有一个任务队列,这个队列是一个临界资源。
2.在init里面,创建NUM个线程,并且让他们执行Routine函数,这里的tid不需要被保存,最后一直存在线程池中,不需要被主线程wait
3.Routine方法内部首先执行detach方法将新线程和主线程分离,这样线程运行完毕后,会自动释放pcb。在当前方法中,线程不断while尝试从任务队列里面获取任务。
先加锁,while判断如果任务队列为空,就在条件变量下等待。如果任务队列不为空,就从队列中取出任务t,然后解锁。(因为这个队列是一个临界资源(之前除了这个队列还有其他资源也是临界资源),访问该队列需要加锁)
解锁以后,获取到任务,通过任务对象执行任务即可。
4.注意,这个Routine方法必须是static静态的,在类中要让线程执行类内成员方法,是不可行的;必须让线程执行静态方法。原因:只要是类内的成员方法,都含有一个隐含的参数this(加上void* arg是两个),而传入的Routine函数的参数只能接收一个参数(语法上规定),就是后面的void* arg,所以是不能够传入带两个参数的函数的。(静态函数就没有this指针,最终只会有一个函数)
5.PushTask函数,是给main函数调用的,所以需要加锁(可能有多个线程往任务队列里面加任务),往任务队列里面加入任务以后,唤醒其他线程。
6.PopTask函数,是在Routine方法内部给线程池内的线程调用的,对整个队列判空,获取pop获取任务都已经加过锁了,所以不需要加锁,直接从输出型参数输出任务就行。
7.这个线程池和我前两篇文章讲述的基于BlockQueue实现的多消费者多生产者模型很像。(这里任务队列没有设置上限,同时在类里面加上了创建线程并执行线程的逻辑)
(2).代码
#pragma once
#include <iostream>
#include <string>
#include <queue>
#include <unistd.h>
#include <pthread.h>
namespace ns_threadpool
{
const int NUM = 5;
template <class T>
class ThreadPool
{
private:
int _thread_num;
std::queue<T> _task_queue; //该成员是一个临界资源
pthread_mutex_t _mtx;
pthread_cond_t _cond; //条件变量,当队列为空时,线程不再继续从队列中取数据,在该条件变量下等嗲
public:
void Lock()
{
pthread_mutex_lock(&_mtx);
}
void Unlock()
{
pthread_mutex_unlock(&_mtx);
}
void Wait()
{
pthread_cond_wait(&_cond, &_mtx);
}
void Wakeup()
{
pthread_cond_signal(&_cond);
}
bool IsEmpey()
{
return _task_queue.empty();
}
public:
ThreadPool(int num = NUM) : _thread_num(num)
{
pthread_mutex_init(&_mtx, nullptr);
pthread_cond_init(&_cond, nullptr);
}
// 在类中要让线程执行类内成员方法,是不可行的
// 必须让线程执行静态方法
static void *Rountine(void *args)
{
pthread_detach(pthread_self()); //分离线程,线程运行完会自动释放资源
ThreadPool<T> *tp = (ThreadPool<T> *)args;
while (true)
{
tp->Lock();
while (tp->IsEmpey())
{
tp->Wait(); //队列空,线程在条件变量下等待
}
//该任务队列中一定有任务了
T t;
tp->PopTask(&t); //这里的t是输出型参数
tp->Unlock();
t.run();
}
}
void InitThreadPool()
{
pthread_t tid; //里的tid不需要被保存,线程最后一直存在线程池中,不需要被主线程wait
for (int i = 0; i < _thread_num; i++)
{
pthread_create(&tid, nullptr, Rountine, (void *)this);
//这里将线程池对象this指针传入,这样Routine内部就可以调用线程池对象里面的加锁解锁函数,以及对队列进行操作
}
}
void PushTask(const T &in)
{
Lock();
_task_queue.push(in);
Unlock();
Wakeup();
}
void PopTask(T *out)
{
*out = _task_queue.front();
_task_queue.pop();
}
~ThreadPool()
{
pthread_mutex_destroy(&_mtx);
pthread_cond_destroy(&_cond);
}
};
} // namespace ns_threadpool
(3).测试
这里使用的Task任务和上一篇的Task一样,给出两个操作数和一个操作符,进行计算。我们创建一个线程池对象,然后调用InitThreadPool方法初始化,接着就往队列中添加Task即可,这里模拟1s随机生成一个任务,添加到线程池中的任务队列中。
int main()
{
ThreadPool<Task> *tp = new ThreadPool<Task>(5);
tp->InitThreadPool();
srand((long long)time(nullptr));
while(true)
{
sleep(1); //模拟1s产生一个Task
Task t(rand()%100+1, rand()%50+1, "+-*/%"[rand()%5]);
tp->PushTask(t);
}
return 0;
}
测试结果:
3.基于单例模式的线程池
(1).单例模式的概念
某些类,只需要有一个对象(实例),就称之为单例。单例模式有很多种,这里我采用了线程安全的懒汉模式,懒汉方式最核心的思想是 "延时加载"(用到对象的时候才需要加载). 从而能够优化服务器的启动速度
(2).基于单例模式的线程池
在前面实现的线程池的基础上做修改,改成单例模式的线程池
1.首先单例模式的话,线程池对象只能有一个,所以在类里面定义一个ThreadPool类型的对象指针ins,而且必须是static类型的,该类只有一个实例。
2.将构造函数设置为私有,并且将拷贝构造函数和赋值运算符重载函数禁用(利用c++11特性,加上=delete)
3.基于线程安全版的懒汉模式实现单例模式,定义一个public的GetInstance方法,用于给用户获取当前类的单例对象,用户调用该方法的时候,外层判空校验当前ins对象是否已经被实例化,如果已经实例化,则直接返回;如果没有实例化,加锁,然后内层判空,如果为空,new创建对象,调用InitThreadPool初始化线程池。
4.最后不要忘记在类外对ins初始化(debug了半天,血与泪的教训)
如果没有初始化,会有如下错误
代码:
#pragma once
#include <iostream>
#include <string>
#include <queue>
#include <unistd.h>
#include <pthread.h>
namespace ns_threadpool
{
const int NUM = 5;
template <class T>
class ThreadPool
{
private:
int _thread_num;
std::queue<T> _task_queue; // 该成员是一个临界资源
pthread_mutex_t _mtx;
pthread_cond_t _cond; // 条件变量,当队列为空时,线程不再继续从队列中取数据,在该条件变量下等嗲
static ThreadPool<T> *ins; // 单例模式,一个类只能有一个实例,设置为static
private:
// 构造函数是必须要的,单例模式下,将构造函数设置为私有,不允许外部用户调用
ThreadPool(int num = NUM) : _thread_num(num)
{
pthread_mutex_init(&_mtx, nullptr);
pthread_cond_init(&_cond, nullptr);
}
ThreadPool(const ThreadPool<T> *tp) = delete; // 禁用构造函数
ThreadPool<T> &operator=(const ThreadPool<T> &tp) = delete; // 禁用拷贝构造函数
public:
static ThreadPool<T> *GetInstance()
{
static pthread_mutex_t mtx_single = PTHREAD_MUTEX_INITIALIZER;
if (ins == nullptr) // 如果不空,就没必要去竞争锁,双重校验锁,提高性能,外层校验时不用加锁,因为本身就是为了防止去竞争锁,而且后面具体要进行修改的时候会加锁
{
pthread_mutex_lock(&mtx_single);
if (ins == nullptr) // 此处判断必须加锁,因为ins本身就是互斥资源,访问的时候就需要加锁,而且外层循环是没有加锁的,走到这里可能ins已经被其他线程实例化了,所以这里必须再加锁判断一次
{
ins = new ThreadPool<T>();
ins->InitThreadPool();
std::cout << "加载线程池对象完成" << std::endl;
}
pthread_mutex_unlock(&mtx_single);
}
return ins;
}
void Lock()
{
pthread_mutex_lock(&_mtx);
}
void Unlock()
{
pthread_mutex_unlock(&_mtx);
}
void Wait()
{
pthread_cond_wait(&_cond, &_mtx);
}
void Wakeup()
{
pthread_cond_signal(&_cond);
}
bool IsEmpey()
{
return _task_queue.empty();
}
public:
// 在类中要让线程执行类内成员方法,是不可行的
// 必须让线程执行静态方法
static void *Rountine(void *args)
{
pthread_detach(pthread_self()); // 分离线程,线程运行完会自动释放资源
ThreadPool<T> *tp = (ThreadPool<T> *)args;
while (true)
{
tp->Lock();
while (tp->IsEmpey())
{
tp->Wait(); // 队列空,线程在条件变量下等待
}
// 该任务队列中一定有任务了
T t;
tp->PopTask(&t); // 这里的t是输出型参数
tp->Unlock();
t.run();
}
}
void InitThreadPool()
{
pthread_t tid; // 里的tid不需要被保存,线程最后一直存在线程池中,不需要被主线程wait
for (int i = 0; i < _thread_num; i++)
{
pthread_create(&tid, nullptr, Rountine, (void *)this);
// 这里将线程池对象this指针传入,这样Routine内部就可以调用线程池对象里面的加锁解锁函数,以及对队列进行操作
}
}
void PushTask(const T &in)
{
Lock();
_task_queue.push(in);
Unlock();
Wakeup();
}
void PopTask(T *out)
{
*out = _task_queue.front();
_task_queue.pop();
}
~ThreadPool()
{
pthread_mutex_destroy(&_mtx);
pthread_cond_destroy(&_cond);
}
};
template <class T>
ThreadPool<T> *ThreadPool<T>::ins = nullptr; //注意不要忘记了在类外对静态成员ins初始化
} // namespace ns_threadpool