🐱作者:一只大喵咪1201
🐱专栏:《Linux学习》
🔥格言:你只管努力,剩下的交给时间!
线程池 | 单例模式
- 一、 线程池
- 1.1 Thread.hpp
- 1.2 ThreadPool.hpp
- 1.3 main.cpp
- 1.4 RAII方式加锁
- 二、 单例模式
- 2.1 饿汉模式
- 2.2 懒汉模式
- 三、 总结
多线程部分的知识讲解到此就告一段落了,现在创建一个线程池来检验一下我们的学习成果。
一、 线程池
- 一种线程使用模式,线程过多会带来调度开销,进而影响缓存局部性和整体性能。
- 线程池维护着多个线程,等待着监督管理者分配可并发执行的任务,这避免了在处理短时间任务时创建与销毁线程的代价。
- 线程池不仅能够保证内核的充分利用,还能防止过分调度。
- 可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
每个线程在创建的时候会进行一系列系统调用,所以线程创建是有系统开销的。如果每需要一个线程再去创建,就会导致系统的性能下降等问题。
所以线程池就是维护着一些已经创建但是处于阻塞等待状态的线程,当有任务需要处理时,线程被唤醒并且执行对应的任务。
此时就避免了新建线程的系统开销,并且提高了响应效率。
1.1 Thread.hpp
这是一个小组件,在前面学习线程的时候本喵讲解过,现在将其拿过来并进行一定的改造:
#include <string>
#include <functional>
#include <cassert>
#include <pthread.h>
const int name_size = 1024;
class Thread
{
typedef std::function<void*(void*)> func_t;
private:
static void* start_routine(void* args)
{
Thread* _this = static_cast<Thread*>(args);
return _this->callback();
}
public:
//构造函数
Thread()
{
//构建线程名字
char namebuffer[name_size];
snprintf(namebuffer,sizeof namebuffer,"thread-%d",_threadNum++);
_name = namebuffer;
}
//启动线程
void start(func_t func, void* args = nullptr)
{
_func = func;
_args = args;
//创建线线程
int n = pthread_create(&_tid,nullptr,start_routine,this);
assert(n==0);
(void)n;
}
//回调函数
void* callback()
{
//调用新线程函数
return _func(_args);
}
//获取线程名字
std::string threadname()
{
return _name;
}
//线程等待
void join()
{
int n = pthread_join(_tid,nullptr);
assert(n==0);
(void)n;
}
private:
std::string _name;//线程名字
pthread_t _tid;//线程tid
void* _args;//传给线程函数的参数
func_t _func;//线程函数
static int _threadNum;//线程编号
};
int Thread::_threadNum = 1;//定义初始值是1
成员变量包括线程名字_name
,线程_tid
,给线程函数传递的参数_args
,以及线程要执行的函数_func
,还有线程编号_threadNum
。
_func
使用了包装器,将返回值为void*
,参数类型为void*
的函数包装,并且重命名。- 在构造函数中不创建线程,仅仅是形成线程的名字,并且赋给
_name
,线程编号使用的是_threadNum
,这是一个static变量,必须在类外进行定义初始化,每创建一个线程就将该值加一。 - 成员函数
start()
有两个形参func
和args
,在调用该成员函数的时候需要将新线程执行的函数以及参数传给start()
,线程是在该接口中创建并开始执行的。
创建新线程:
- 在使用
pthread_create
创建新线程时,传给新线程的执行函数是start_routine
,每创建一个线程都会去执行这个函数。 - 如果
start_routine
是一个普通成员函数,那么它就会隐藏存在第一个参数this指针
,它的形参就成了(Thread* const this, void* args)
,而创建新线程的时候传递的函数必须只能有一个参数void* args
。 - 所以使用static修饰成员函数
start_routine
,此时就没有了this指针
,创建的新新线程就可以调用它了。 - 由于创建新线程的时候是在类内创建的,所以将
start_routine
设置成私有。
回调:
- 创建一个回调成员函数
callback()
供start_routine
去调用,在回调函数内部,再去调用传参时传入的真正要执行的函数_func
。 start_routine
是静态成员函数,是没有this指针的,所以是无法直接调用普通成员函数和普通成员变量的,所以在创建新线程时,给start_routine
传的形参void* args
就是当前线程对象的this指针
。- 在
start_routine
中,通过this指针来调用回调函数callback
,再在回调函数中调用_func
。
综上所诉,在调用start(func_t func, void* args)
后新线程执行的函数就是传入的形参——函数指针。
经过测试,我们封装的创建新线程的类是没有问题的。
1.2 ThreadPool.hpp
在这个类中,将实现多个线程的创建和维护,和一个基于阻塞队列的生产者消费者模型。其中生产者就是生成任务的线程,而消费者就是所维护的好几个线程,阻塞队列和所有消费者共同组成线程池。
#include <vector>
#include <queue>
#include <string>
#include <mutex>
#include "Thread.hpp"
const int threadNum = 10;
//前置声明
template <class T>
class ThreadPool;
//线程属性
template <class T>
class ThreadData
{
public:
ThreadPool<T>* threadpool;//线程池this指针
std::string _threadname;//线程名字
//构造函数
ThreadData(ThreadPool<T>* tp, std::string name)
:threadpool(tp)
,_threadname(name)
{}
};
//线程池
template <class T>
class ThreadPool
{
private:
static void* handerTask(void* args)
{
ThreadData<T>* tpd = static_cast<ThreadData<T>*>(args);
while(1)
{
tpd->threadpool->lockQueue();//加锁
while(tpd->threadpool->isQueueEmpty())
{
//任务队列为空,进行等待
tpd->threadpool->threadWait();
}
T t = tpd->threadpool->pop();//获取任务到线程独立的栈结构中
tpd->threadpool->unlockQueue();//解锁
std::cout<<tpd->_threadname<<",接受了任务:"<<t.toTaskString()<<",并处理完成:"<< t() <<std::endl;//处理任务
}
delete tpd;
return nullptr;
}
public:
//静态成员函数访问非静态成员接口
bool isQueueEmpty() {return _task_queue.empty();}//判断任务队列是否为空
void lockQueue() {pthread_mutex_lock(&_mutex);}//给任务队列加锁
void unlockQueue() {pthread_mutex_unlock(&_mutex);}//给任务队列解锁
void threadWait() {pthread_cond_wait(&_cond,&_mutex);}//将线程放入条件变量的等待队列中
//获取任务
T pop()
{
T t = _task_queue.front();
_task_queue.pop();
return t;
}
public:
ThreadPool(const int& num = threadNum)
:_num(num)
{
pthread_mutex_init(&_mutex,nullptr);//初始化互斥锁
pthread_cond_init(&_cond,nullptr);//初始化条件变量
//创建一批线程
for(size_t i = 0; i < _num; ++i)
{
_threads.push_back(new Thread());
}
}
//所有线程启动
void run()
{
for(const auto& t : _threads)
{
//线程属性初始化
ThreadData<T>* tpd = new ThreadData<T>(this,t->threadname());
t->start(handerTask,tpd);
std::cout<<t->threadname()<<" start..."<<std::endl;//显式已经启动的线程
}
}
//推送任务
void push(T& in)
{
_mtx.lock();//加锁
_task_queue.push(in);
pthread_cond_signal(&_cond);
_mtx.unlock();//解锁
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);//摧毁互斥锁
pthread_cond_destroy(&_cond);//摧毁条件变量
//释放所有线程
for(const auto& t : _threads)
{
delete t;
}
}
private:
int _num;//维护的线程数量
std::vector<Thread*> _threads;//多个线程
std::queue<T> _task_queue;//任务队列
pthread_mutex_t _mutex;//互斥锁
pthread_cond_t _cond;//条件变量
std::mutex _mtx;//生成任务时的互斥锁
};
成员变量包含线程池的线程数量_num
,管理线程的数据结构_threads
,这是一个vector容器
,存放任务的任务队列_task_queue
,保证多线程互斥访问任务队列的互斥锁_mutex
,以及让多线程同步的条件变量_cond
,生成任务时使用的互斥锁使用的是C++11线程库提供的std::mutex
。
创建线程池:
- 线程池构造函数的形参使用缺失值
num
,该值是确定线程池中维护的线程数量,用户也可以自己在构造的时候指定num
。 - 在构造函数中,将要使用到的互斥锁
_mutex
条件变量_cond
进行初始化,并且创建指定数量的Thread
对象,将其地址放入到vector
容器中进行管理。此时仅有Thread
对象,新线程还没有被创建。 - 在线程池的析构函数中将互斥锁和条件变量销毁,以及
vector
中的Thread
对象也全部释放掉,因为是new出来的,需要主动归还资源。
创建一批线程:
- 提供一个接口
run()
,通过该接口真正创建对应数量的新线程,并且开始执行,每成功创建一个且开始执行后,打印该线程开始运行的信息threadname() start run...
。 run()
函数内调用的是前面Thread
的start()
方法,只需要让vector
容器中的所有Thread
对象调用该方法,所有的线程就会启动。- 所有线程在启动时执行的都是
handerTask()
函数,同Thread
中一样,需要将该函数的this
指针去掉,所以这是一个static成员函数,没有this指针。 - 创建一个
ThreadData
结构体,用来存放线程属性,包括线程池的this指针ThreadPool<T>* threadpool
和当前启动线程的名字_threadname
。 - 在创建线程时需要将线程池的
this
指针和当前线程的名字当作形参传给handerTask
静态函数。
从任务队列中取任务:
- 在
handerTask
中,线程池中的所有线程从任务队列_task_queue
中取任务去执行,并且要按照一定顺序去访问,所以多线程之间是同步和互斥的关系。 - 在访问任务队列的时候先加锁,如果任务队列为空则挂起等待,如果不为空则取走任务并处理任务。
- 因为
handerTask
是一个静态成员函数,所以该函数无法直接访问非静态成员,必须通过this指针。 - 又因为执行
handerTask
是Thread
类对象在执行,所以handerTask
中不能直接访问ThreadPool
中的私有成员。所以提供了公有的接口供handerTask
来访问私有成员,进行加锁,解锁,条件判断,以及取任务等操作。 - 从任务队列中获取任务后,应该在解锁之后进行任务处理。
- 当线程从任务队列中获取到任务以后,本质是将任务队列中的任务获取到自己独立的栈结构中,所以此时对于任务的处理所有线程是相互独立的。
- 如果处理任务放在解锁之前,那么所有线程只能先加锁,再获取任务并处理,再解锁,就成了串行的了。
- 线程处理完任务后,将在堆区存放当前线程属性的
ThreadData
对象释放掉。
推送任务:
- 任务队列中的任务是由主线程或者是生产者推送进来的,如果是多线程推送任务,同样会存在线程安全问题,所以推送任务也是互斥的,这里使用的是C++11线程库中的互斥锁
std::mutex
。 - 在推送任务到任务队列前加锁,推送完成后唤醒在条件变量
_cond
下等待的一个线程,再进行解锁。
1.3 main.cpp
在main
函数中,要做的就是创建线程池,将所有线程启动,然后推送相应的任务到线程池中。
Task.hpp贴图:
这个模板类在前面已经出现很多次了,本喵就不再详细讲解了,主要的功能就是构建任务,获取到任务的线程通过调用该类中的仿函数来执行相应的逻辑。
main.cpp:
int main()
{
srand((unsigned int)time(nullptr)^getpid()^0x11223344);//产生随机数种子
std::unique_ptr<ThreadPool<CalTask>> tp(new ThreadPool<CalTask>);//智能指针管理线程池
tp->run();//启动所有线程
int x,y;
char op;
//每隔1秒向线程池中推送一个任务
while(1)
{
//生成任务
x = rand()%10 + 1;
y = rand()%10 + 1;
op = oper[rand()%oper.size()];
CalTask t(x,y,op,myath);
//推送任务
tp->push(t);
std::cout<<"主线程推送任务推送任务:"<<t.toTaskString()<<std::endl;
sleep(1);
}
return 0;
}
main
函数中,向线程池中推送的是计算任务,两个操作数以及进行的运算操作都是随机生成的,然后构建CalTask
对象,并推送到线程池中。推送完成后打印推送的任务。
- 在线程池中维护着3个线程,这三个线程在任务队列中没有任务的时候,均处于阻塞等待状态,是被挂起的。
- 主线程每推送一个任务到线程池,就会有一个线程被唤醒取任务队列中获取并处理任务。
- 3个线程按照一定的顺序从任务队列中获取任务并处理。
- 根据上面代码可以看到,线程池是根本不知道推送到任务队列中的任务是什么。所维护的线程同样也不知道。
- 具体是什么任务是由任务的推送方决定的,线程池只负责从任务队列中获取并处理任务。
1.4 RAII方式加锁
LockGuard.hpp:
#include <iostream>
#include <pthread.h>
class Mutex
{
public:
Mutex(pthread_mutex_t* lock_p = nullptr):_lock_p(lock_p)
{}
void lock()
{
pthread_mutex_lock(_lock_p);//加锁
}
void unlock()
{
pthread_mutex_unlock(_lock_p);//解锁
}
private:
pthread_mutex_t* _lock_p;
};
class LockGuard
{
public:
LockGuard(pthread_mutex_t* lock_p):_mutex(lock_p)
{
_mutex.lock();//构造函数内加锁
}
~LockGuard()
{
_mutex.unlock();//析构函数内解锁
}
private:
Mutex _mutex;
};
创建一个LockGuard
类模仿C++11中的lock_guard
,在构造函数中加锁,析构函数中解锁,将锁的生命周期和对象的生命周期绑定在一起。
- 创建一个
Mutex
类,该类用pthread_mutex_t*
类型的锁初始化,并且包括加锁和解锁两个接口。 - 在
LockGuard
中包括Mutex
这个类,在构造函数中的初始化列表中,使用外部传进来的pthread_mutex_t*
类型的锁定义Mutex
对象,在构造函数中进行加锁。 - 在析构函数中进行解锁。
获取任务:
- 在
handerTask
中,使用LockGuard
对任务队列进行RAII方式的加锁。 - 同样需要一个公共接口供
handerTask
获取线程池中的私有变量互斥锁_mtuex
,以此来构造LockGuard
对象。 - 加一个代码块,来控制
LockGuard
对象的生命周期的起始和结束。
推送任务:
- 推送任务的接口中,使用的是C++11线程库提供的
std::lock_guard
进行RAII方式的加锁和解锁。 - 开始执行
push
函数的时候加锁,执行完毕后解锁。
使用RAII的加锁方式后,程序的运行结果和之前一样,没有发生改变。
二、 单例模式
在C++11中的特殊类设计中,本喵详细讲解过单例模式的原理以及设计,有兴趣的小伙伴可以去看看传送门,这里本喵就不再介绍了。
- 我们在使用
malloc
以及new
等函数时,系统并不会立刻给我们在物理内存中开辟相应的空间,只是将虚拟地址空间中start
和end
指针的地址范围扩大。- 当第一次使用当开辟的动态空间时,会发生缺页中断,操作系统在页表中建立相应的映射关系,并且在物理内存中开辟对应的空间。
这是一种典型的延时加载模式,就是单例模式中的懒汉模式一样。试想,如果在使用malloc
的时候就开辟真实的物理空间,如果有10个100个进程开辟空间,但是确不使用,此时就会浪费物理空间中的内存,甚至导致因为内存不足而无法调度其他线程。
上面仅是一个背景知识的补充,下面本喵来将前面实现的线程池改成单例模式。
2.1 饿汉模式
饿汉模式就是在执行main
函数之间,将单例创建出来:
- 在线程池的私有成员变量中加一个它本身的静态成员变量
static ThreadPool<T> _singleton
,该成员在静态区,只能有一个,所以它就是单例。 - 类的静态成员必须在类外进行定义初始化,所以在类外定义创建单例对象
_singleton
。
- 将拷贝构造函数私有化,只有定义静态单例对象
_singleton
的时候可以调用,其他位置无法调用,也就无法创建对象。 - 为了防止单例对象被拷贝,将拷贝构造函数以及赋值运算符重载函数都使用
delete
禁掉。
提供一个获取单例对象的公共接口GetInstance()
,该对象是一个静态成员函数。
- 单例对象是一个私有的静态成员变量,所以在类外是无法直接访问的,除了通过接口就无法拿到这个单例对象去使用。
- 如果
GetInstance
不是静态成员函数,是一个普通的成员函数,那么调用它时必须传this
指针。但是此时相当于不存在单例对象,也就无法调用GetInstance
。 - 而静态的
GetInstance
在调用时不用传this
指针,以为它只属于类而不属于对象,而且静态成员函数可以直接访问类中的静态成员。所以通过GetInstance
就可以直接获取到单例对象_singleton
去使用。
main.cpp
包含了Thread.hpp
头文件,所以在预处理后,main()
函数的前面就有定义创建单例对象的语句。
- 在
main
函数中,使用单例的线程池对象都得通过静态成员函数GetInstance
去获取,然后再执行和之前一样的操作。
运行结果和之前一样,本喵就不贴图了。
2.2 懒汉模式
懒汉模式讲究的就是一个延时加载,既然操作系统在很多方面都采用这种方式,说明这种方式非常重要,同样这里将线程池再改造成懒汉模式的单例对象:
增加静态成员变量,线程池本身对象的指针_singleton
,增加一把静态的锁_singlock
,用来维护单例对象的线程安全,如上图中红色框中所示。
静态成员变量必须在类外进行定义初始化:
- 单例对象指针变量的定义初始化:
ThreadPool<T>*
中的ThreadPool<T>
虽然还没有实例化,但是并不妨碍给ThreadPool<T>*
这个指针赋值为空,就像void*
虽然不知道void
是什么类型,但是却可以给这个指针赋值。
- 静态互斥锁的初始化:
单例对象只有一个,所以也只需要一个互斥锁来维护线程安全, 所以同样放在静态区上。
std::mutex
表示互斥锁是标准库中的互斥锁类型,ThreadPool<T>
表示是在先线程池这个作用域中。
- 在第一次使用单例对象的时候再在堆区
new
一个单例对象出来。 - 为了维护单例对象的线程安全,所以在判断单例对象是否存在的时候,需要加锁。
- 为了提高效率,单例对象被创建后就不再申请锁去判断,采样双检查加锁的方式。
其他内容,像构造函数,拷贝构造以及赋值运算符重载等处理和饿汉模式一样。
此时在main
函数的红色框中第一次使用单例对象,所以在这里创建单例对象,在绿色框中以及之后使用单例对象的时候,仅仅是获取单例对象。
从运行结果上看,和之前的一样。
三、 总结
这篇文章中并没有新的内容,将前面学习的和线程有关的内容进行了一个应用。至此,Liux系统部分的学习就暂时告一段落,接下来就要开启网络的学习了。