目录
POSIX信号量
信号量的原理
信号量的概念
信号量函数接口
信号量与互斥锁
二元信号量
二元信号量模拟实现互斥功能
基于环形队列的生产消费模型
空间资源的数据资源
申请和释放资源
两个原则
Linux线程池
线程池的概念
线程池的应用场景
线程池的实现
线程池成员变量
构造和析构函数
Init函数、start函数
线程池处理任务相关函数
线程池优化版本
本文涉及到的详细代码链接如下:
Linux_beginner: 这是一个用来保存linux学习过程中的代码的仓库 (gitee.com)
POSIX信号量
信号量的原理
- 我们将可能会被多个执行流同时访问的资源叫做临界资源,临界资源需要进行保护否则会出现数据不一致等问题。
- 我们将可能会被多个执行流同时访问的资源叫做临界资源,临界资源需要进行保护否则会出现数据不一致等问题。
- 但实际我们可以将这块临界资源再分割为多个区域,当多个执行流需要访问临界资源时,如果这些执行流访问的是临界资源的不同区域,那么我们可以让这些执行流同时访问临界资源的不同区域,此时不会出现数据不一致等问题。
信号量的概念
信号量本质是一个计数器,是描述临界资源中资源数目的计数器,信号量能够更细粒度的对临界资源进行管理。
每个执行流在进入临界区之前都应该先申请信号量,申请成功就有了操作特点的临界资源的权限,当操作完毕后就应该释放信号量。
每一个线程,在访问对应的资源的时候,先申请信号量。申请成功,则表示该线程允许使用资源;申请失败,则目前无法使用资源。
信号量的工作机制类似于看电影买票,是一种资源的预定机制。
信号量的PV操作:
- P操作:我们将申请信号量称为P操作,申请信号量的本质就是申请获得临界资源中某块资源的使用权限,当申请成功时临界资源中资源的数目应该减一,因此P操作的本质就是让计数器减一。
- V操作:我们将释放信号量称为V操作,释放信号量的本质就是归还临界资源中某块资源的使用权限,当释放成功时临界资源中资源的数目就应该加一,因此V操作的本质就是让计数器加一。
PV操作必须是原子操作
多个执行流为了访问临界资源会竞争式的申请信号量,因此信号量是会被多个执行流同时访问的,也就是说信号量本质也是临界资源。
但信号量本质就是用于保护临界资源的,我们不可能再用信号量去保护信号量,所以信号量的PV操作必须是原子操作。
申请信号量失败被挂起等待
当执行流在申请信号量时,可能此时信号量的值为0,也就是说信号量描述的临界资源已经全部被申请了,此时该执行流就应该在该信号量的等待队列当中进行等待,直到有信号量被释放时再被唤醒。
注意: 信号量的本质是计数器,但不意味着只有计数器,信号量还包括一个等待队列。POSIX信号量和System V信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的,但是POSIX信号量可以用于线程间同步。
信号量函数接口
初始化信号量
初始化信号量的函数叫做sem_init,该函数的函数原型如下:
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数说明:
- sem:需要初始化的信号量。
- pshared:传入0值表示线程间共享,传入非零值表示进程间共享。
- value:信号量的初始值(计数器的初始值)。
返回值说明:
- 初始化信号量成功返回0,失败返回-1。
销毁信号量
销毁信号量的函数叫做sem_destroy,该函数的函数原型如下:
int sem_destroy(sem_t *sem);
参数说明:
- sem:需要销毁的信号量。
返回值说明:
- 销毁信号量成功返回0,失败返回-1。
等待信号量(申请信号量)
等待信号量的函数叫做sem_wait,该函数的函数原型如下:
int sem_wait(sem_t *sem);
参数说明:
- sem:需要等待的信号量。
返回值说明:
- 等待信号量成功返回0,信号量的值减一。
- 等待信号量失败返回-1,信号量的值保持不变。
发布信号量(释放信号量)
发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);
参数说明:
- sem:需要发布的信号量。
返回值说明:
- 发布信号量成功返回0,信号量的值加一。
- 发布信号量失败返回-1,信号量的值保持不变。
信号量与互斥锁
二元信号量
信号量是多线程同步用的,一个线程完成了某一个动作就通过信号量告诉别的线程,别的线程再去完成其他的动作。
互斥量是多线程互斥用的,比如说一个线程占用了某一个资源,那么别的线程无法访问,直到这个线程离开,其他的线程才能够使用这个资源。
比如对全局变量的访问,有时要加锁,操作完了,在解锁。尽管两个概念有点类似,但是他们的侧重点不一样,信号量不一定是锁定某一个资源,而是流程上的概念,比如:有A,B两个线程,B线程要等A线程完成某一任务以后再进行自己下面的步骤,这个任务并不一定是锁定某一资源,还可以是进行一些计算或者数据处理之类。而线程互斥量则是“锁住某一资源”的概念,在锁定期间内,其他线程无法对被保护的数据进行操作。不难看出,mutex是semaphore的一种特殊情况(n=1时)。也就是说,完全可以用后者替代前者。但是,因为mutex较为简单,且效率高,所以在必须保证资源独占的情况下,还是采用这种设计。
互斥量值只能为0/1,信号量值可以为非负整数。但是如果我们定义信号量的时候,将其定义成二元信号量时,此时信号量的功能就和互斥锁差不多了。
二元信号量模拟实现互斥功能
信号量本质是一个计数器,如果将信号量的初始值设置为1,那么此时该信号量叫做二元信号量。
信号量的初始值为1,说明信号量所描述的临界资源只有一份,此时信号量的作用基本等价于互斥锁。
例如,下面我们实现一个多线程抢票系统,其中我们用二元信号量模拟实现多线程互斥。
我们在主线程当中创建四个新线程,让这四个新线程执行抢票逻辑,并且每次抢完票后打印输出此时剩余的票数,其中我们用全局变量tickets记录当前剩余的票数,此时tickets是会被多个执行流同时访问的临界资源,在下面的代码中我们并没有对tickets进行任何保护操作。
下面我们在抢票逻辑当中加入二元信号量,让每个线程在访问全局变量tickets之前先申请信号量,访问完毕后再释放信号量,此时二元信号量就达到了互斥的效果。
#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
//构造一个sem类利用RAII机制方便使用信号量
class Sem{
public:
//RAII
Sem(int num)
{
sem_init(&_sem, 0, num);
}
~Sem()
{
sem_destroy(&_sem);
}
void P()
{
sem_wait(&_sem);
}
void V()
{
sem_post(&_sem);
}
private:
sem_t _sem;
};
Sem sem(1); //二元信号量
int tickets = 2000;
//信号量实现抢票系统
void* TicketGrabbing(void* arg)
{
std::string name = (char*)arg;
while (true){
sem.P();
if (tickets > 0){
usleep(1000);
std::cout << name << " get a ticket, tickets left: " << --tickets << std::endl;
sem.V();
}
else{
sem.V();
break;
}
}
std::cout << name << " quit..." << std::endl;
pthread_exit((void*)0);
}
int main()
{
pthread_t tid1, tid2, tid3, tid4;
pthread_create(&tid1, nullptr, TicketGrabbing, (void*)"thread 1");
pthread_create(&tid2, nullptr, TicketGrabbing, (void*)"thread 2");
pthread_create(&tid3, nullptr, TicketGrabbing, (void*)"thread 3");
pthread_create(&tid4, nullptr, TicketGrabbing, (void*)"thread 4");
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
pthread_join(tid3, nullptr);
pthread_join(tid4, nullptr);
return 0;
}
运行结果如下:
基于环形队列的生产消费模型
空间资源的数据资源
与传统的环形队列不同,基于信号量实现的环形队列的生产消费模型,不必在意head和tail,对于如何判空和判满,需要关注的是环形队列中的空间资源和数据资源。
生产者关注的是空间资源,消费者关注的是数据资源
对于生产者和消费者来说,它们关注的资源是不同的:
- 生产者关注的是环形队列当中是否有空间(blank),只要有空间生产者就可以进行生产。
- 消费者关注的是环形队列当中是否有数据(data),只要有数据消费者就可以进行消费。
blank_sem和data_sem的初始值设置
现在我们用信号量来描述环形队列当中的空间资源(_space_sem)和数据资源(_data_sem),在我们初始信号量时给它们设置的初始值是不同的:
- _space_sem的初始值我们应该设置为环形队列的容量,因为刚开始时环形队列当中全是空间。
- _data_sem的初始值我们应该设置为0,因为刚开始时环形队列当中没有数据。
申请和释放资源
生产者申请空间资源,释放数据资源
对于生产者来说,生产者每次生产数据前都需要先申请_space_sem:
- 如果_space_sem的值不为0,则信号量申请成功,此时生产者可以进行生产操作。
- 如果_space_sem的值为0,则信号量申请失败,此时生产者需要在blank_sem的等待队列下进行阻塞等待,直到环形队列当中有新的空间后再被唤醒。
当生产者生产完数据后,应该释放_data_sem:
- 虽然生产者在进行生产前是对_space_sem进行的P操作,但是当生产者生产完数据,应该对data_sem进行V操作。
- 生产者在生产数据前申请到的是space位置,当生产者生产完数据后,该位置当中存储的是生产者生产的数据,在该数据被消费者消费之前,该位置不再是space位置而应该是data位置。
- 当生产者生产完数据后,意味着环形队列当中多了一个data位置,因此我们应该对data_sem进行V操作。
消费者申请数据资源,释放空间资源
对于消费者来说,消费者每次消费数据前都需要先申请data_sem:
- 如果data_sem的值不为0,则信号量申请成功,此时消费者可以进行消费操作。
- 如果data_sem的值为0,则信号量申请失败,此时消费者需要在data_sem的等待队列下进行阻塞等待,直到环形队列当中有新的数据后再被唤醒。
当消费者消费完数据后,应该释放_space_sem:
- 虽然消费者在进行消费前是对data_sem进行的P操作,但是当消费者消费完数据,应该对space_sem进行V操作。
- 消费者在消费数据前申请到的是data位置,当消费者消费完数据后,该位置当中的数据已经被消费过了,再次被消费就没有意义了,为了让生产者后续可以在该位置生产新的数据,我们应该将该位置算作space位置。
- 当消费者消费完数据后,意味着环形队列当中多了一个space位置,因此我们应该对_space_sem进行V操作。
两个原则
在基于环形队列的生产者和消费者模型当中,生产者和消费者必须遵守如下两个规则。
第一个规则:生产者和消费者不能对同一个位置进行访问。
生产者和消费者在访问环形队列时:
- 如果生产者和消费者访问的是环形队列当中的同一个位置,那么此时生产者和消费者就相当于同时对这一块临界资源进行了访问,这当然是不允许的。
- 而如果生产者和消费者访问的是环形队列当中的不同位置,那么此时生产者和消费者是可以同时进行生产和消费的,此时不会出现数据不一致等问题。
第二个规则:无论是生产者还是消费者,都不应该覆盖对方
- 生产者从消费者的位置开始一直按顺时针方向进行生产,如果生产者生产的速度比消费者消费的速度快,那么当生产者绕着消费者生产了一圈数据后再次遇到消费者,此时生产者就不应该再继续生产了,因为再生产就会覆盖还未被消费者消费的数据。
- 同理,消费者从生产者的位置开始一直按顺时针方向进行消费,如果消费者消费的速度比生产者生产的速度快,那么当消费者绕着生产者消费了一圈数据后再次遇到生产者,此时消费者就不应该再继续消费了,因为再消费就会消费到缓冲区中保存的废弃数据。
下面是环形队列的具体实现代码:
#pragma once
#include <iostream>
#include <vector>
#include <pthread.h>
#include <semaphore.h>
static const int N = 5;
template<class T>
class RingQueue
{
private:
void P(sem_t &s)
{
sem_wait(&s);
}
void V(sem_t &s)
{
sem_post(&s);
}
void Lock(pthread_mutex_t &m)
{
pthread_mutex_lock(&m);
}
void Unlock(pthread_mutex_t &m)
{
pthread_mutex_unlock(&m);
}
public:
RingQueue(int num = N)
:_cap(num)
,_ring(num)
{
//POSIX信号量初始化
sem_init(&_data_sem,0,0);
sem_init(&_space_sem,0,num);
_c_step = _p_step = 0;
//锁初始化
pthread_mutex_init(&_c_mutex, nullptr);
pthread_mutex_init(&_p_mutex, nullptr);
}
void push(const T& in)
{
P(_space_sem);
Lock(_p_mutex);
_ring[_p_step++] = in;
_p_step %= _cap;
Unlock(_p_mutex);
V(_data_sem);
}
void pop(T *out)
{
P(_data_sem);
Lock(_c_mutex);
*out = _ring[_c_step++];
_c_step %= _cap;
Unlock(_c_mutex);
V(_space_sem);
}
~RingQueue()
{
sem_destroy(&_data_sem);
sem_destroy(&_space_sem);
pthread_mutex_destroy(&_c_mutex);
pthread_mutex_destroy(&_p_mutex);
}
private:
std::vector<T> _ring;
int _cap; // 环形队列容器大小
sem_t _data_sem; // 只有消费者关心
sem_t _space_sem; // 只有生产者关心
int _c_step; // 消费位置
int _p_step; // 生产位置
pthread_mutex_t _c_mutex;
pthread_mutex_t _p_mutex;
};
Linux线程池
线程池的概念
线程池是一种线程使用模式。
线程过多会带来调度开销,进而影响缓存局部和整体性能,而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。
这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利
用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
线程池的应用场景
线程池常见的应用场景如下:
- 需要大量的线程来完成任务,且完成任务的时间比较短。
- 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
- 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。
相关解释:
- 像Web服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。
- 对于长时间的任务,比如Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
- 突发性大量客户请求,在没有线程池的情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,但短时间内产生大量线程可能使内存到达极限,出现错误。
线程池的实现
下面我们实现一个简单的线程池V1版本,线程池中提供了一个任务队列,以及若干个线程(多线程)。 这里实现的V1版本直接使用的系统的线程库,后面为了代码的简洁,还会再实现其他的版本。
- 线程池中的多个线程负责从任务队列当中拿任务,并将拿到的任务进行处理。
- 线程池对外提供一个Push接口,用于让外部线程能够将任务Push到任务队列当中。
线程池成员变量
为了实现一个基本的线程池,我们至少要定义以下几个成员变量。
template <class T>
class ThreadPool
{
public:
private:
std::vector<pthread_t> _threads;
int _num;
std::queue<T> _tasks;
pthread_mutex_t _lock;
pthread_cond_t _cond;
};
- _threads:用一个vector存储线程池的多个线程
- _num:表示线程池中可以调用的线程最大数量
- _tasks:用一个queue存储线程池要处理的任务,将待处理任务直接push到队列中,即可以使用线程池进行处理。
- _lock:互斥锁
- _cond:条件变量
为什么线程池中需要有互斥锁和条件变量?
线程池中的任务队列是会被多个执行流同时访问的临界资源,因此我们需要引入互斥锁对任务队列进行保护。
线程池当中的线程要从任务队列里拿任务,前提条件是任务队列中必须要有任务,因此线程池当中的线程在拿任务之前,需要先判断任务队列当中是否有任务,若此时任务队列为空,那么该线程应该进行等待,直到任务队列中有任务时再将其唤醒,因此我们需要引入条件变量。
当外部线程向任务队列中Push一个任务后,此时可能有线程正处于等待状态,因此在新增任务后需要唤醒在条件变量下等待的线程。
注意:
- 当某线程被唤醒时,其可能是被异常或是伪唤醒,或者是一些广播类的唤醒线程操作而导致所有线程被唤醒,使得在被唤醒的若干线程中,只有个别线程能拿到任务。此时应该让被唤醒的线程再次判断是否满足被唤醒条件,所以在判断任务队列是否为空时,应该使用while进行判断,而不是if。
- pthread_cond_broadcast函数的作用是唤醒条件变量下的所有线程,而外部可能只Push了一个任务,我们却把全部在等待的线程都唤醒了,此时这些线程就都会去任务队列获取任务,但最终只有一个线程能得到任务。一瞬间唤醒大量的线程可能会导致系统震荡,这叫做惊群效应。因此在唤醒线程时最好使用pthread_cond_signal函数唤醒正在等待的一个线程即可。
- 当线程从任务队列中拿到任务后,该任务就已经属于当前线程了,与其他线程已经没有关系了,因此应该在解锁之后再进行处理任务,而不是在解锁之前进行。因为处理任务的过程可能会耗费一定的时间,所以我们不要将其放到临界区当中。
- 如果将处理任务的过程放到临界区当中,那么当某一线程从任务队列拿到任务后,其他线程还需要等待该线程将任务处理完后,才有机会进入临界区。此时虽然是线程池,但最终我们可能并没有让多线程并行的执行起来。
构造和析构函数
定义好线程池的成员变量,以及对线程池需要用到的互斥锁和条件变量进行初始化,方便后续的工作正常进行。
ThreadPool(int num = N)
:_num(num)
,_threads(num)
{
pthread_mutex_init(&_lock, nullptr);
pthread_cond_init(&_cond, nullptr);
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
Init函数、start函数
Init函数作用是初始化线程池中的各个线程 start函数的作用是启动线程池中的各个线程
但是由于调用pthread库中的pthread_create函数会直接启动线程,所以V1版本的线程池,Init函数和start函数只需要实现一个即可。
void init()
{
// TODO
}
void start()
{
for (int i = 0; i < _num; i++)
{
pthread_create(&_threads[i], nullptr, threadRoutine, this); // ?
}
}
线程池处理任务相关函数
上面完成了线程池的大体框架,接下来就要实现线程池要如何处理任务了。
线程池主要逻辑:主线程负责不断向任务队列当中Push任务就行了,此后线程池当中的线程会从任务队列当中获取到这些任务并进行处理。
首先为了方便代码的编写,先写好互斥锁和条件变量的加锁解锁函数。
void lockQueue()
{
pthread_mutex_lock(&_lock);
}
void unlockQueue()
{
pthread_mutex_unlock(&_lock);
}
void threadWait()
{
pthread_cond_wait(&_cond, &_lock);
}
void threadWakeup()
{
pthread_cond_signal(&_cond);
}
然后就是线程池中任务队列的相关函数
void pushTask(const T &t)
{
lockQueue();
_tasks.push(t);
threadWakeup();
unlockQueue();
}
T popTask()
{
T t = _tasks.front();
_tasks.pop();
return t;
}
bool isEmpty()
{
return _tasks.empty();
}
最后就是线程池运行任务队列中的具体任务的函数
static void *threadRoutine(void *args)
{
pthread_detach(pthread_self());
ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
while (true)
{
// 1. 检测有没有任务
// 2. 有:处理
// 3. 无:等待
// 细节:必定加锁
tp->lockQueue();
while (tp->isEmpty())
{
tp->threadWait();
}
T t = tp->popTask(); // 从公共区域拿到私有区域
tp->unlockQueue();
// for test
t();
std::cout << "thread handler done, result: " << t.formatRes() << std::endl;
// t.run(); // 处理任务,不应该在临界区中处理
}
为什么线程池中的线程执行例程需要设置为静态方法?
使用pthread_create函数创建线程时,需要为创建的线程传入一个Routine(执行例程),该Routine只有一个参数类型为void*的参数,以及返回类型为void*的返回值。
而此时Routine作为类的成员函数,该函数的第一个参数是隐藏的this指针,因此这里的Routine函数,虽然看起来只有一个参数,而实际上它有两个参数,此时直接将该Routine函数作为创建线程时的执行例程是不行的,无法通过编译。
静态成员函数属于类,而不属于某个对象,也就是说静态成员函数是没有隐藏的this指针的,因此我们需要将Routine设置为静态方法,此时Routine函数才真正只有一个参数类型为void*的参数。
但是在静态成员函数内部无法调用非静态成员函数,而我们需要在Routine函数当中调用该类的某些非静态成员函数,比如Pop。因此我们需要在创建线程时,向Routine函数传入的当前对象的this指针,此时我们就能够通过该this指针在Routine函数内部调用非静态成员函数了。
主函数中调用线程池
#include"ThreadPool_V1.hpp"
#include"Task.hpp"
#include<memory>
int main()
{
ThreadPool<Task> tp;
tp.start();
while(true)
{
int x, y;
char op;
std::cout << "please Enter x> ";
std::cin >> x;
std::cout << "please Enter y> ";
std::cin >> y;
std::cout << "please Enter op(+-*/%)> ";
std::cin >> op;
Task t(x, y, op);
tp.pushTask(t);
}
return 0;
}
运行代码后一瞬间就有六个线程,其中一个是主线程,另外五个是线程池内处理任务的线程。
并且我们会发现这五个线程在处理时会呈现出一定的顺序性,因为主线程是每秒Push一个任务,这五个线程只会有一个线程获取到该任务,其他线程都会在等待队列中进行等待,当该线程处理完任务后就会因为任务队列为空而排到等待队列的最后,当主线程再次Push一个任务后会唤醒等待队列首部的一个线程,这个线程处理完任务后又会排到等待队列的最后,因此这五个线程在处理任务时会呈现出一定的顺序性。
注意: 此后我们如果想让线程池处理其他不同的任务请求时,我们只需要提供一个任务类,在该任务类当中提供对应的任务处理方法就行了。
线程池优化版本
为了使线程池的模拟实现的代码更加简单易读,我们可以先封装好一个thread库和lock库。
Thread.hpp
#pragma once
#include <iostream>
#include <string>
#include <cstdlib>
#include <pthread.h>
class Thread
{
public:
typedef enum
{
NEW = 0,
RUNNING,
EXITED
} ThreadStatus;
typedef void (*func_t)(void *);
public:
Thread(int num, func_t func, void *args) : _tid(0), _status(NEW), _func(func), _args(args)
{
char name[128];
snprintf(name, sizeof(name), "thread-%d", num);
_name = name;
}
int status() { return _status; }
std::string threadname() { return _name; }
pthread_t threadid()
{
if (_status == RUNNING)
return _tid;
else
{
return 0;
}
}
// 是不是类的成员函数,而类的成员函数,具有默认参数this,需要static
// 但是会有新的问题:static成员函数,无法直接访问类属性和其他成员函数
static void *runHelper(void *args)
{
Thread *ts = (Thread*)args; // 就拿到了当前对象
// _func(_args);
(*ts)();
return nullptr;
}
void operator ()() //仿函数
{
if(_func != nullptr) _func(_args);
}
void run()
{
int n = pthread_create(&_tid, nullptr, runHelper, this);
if(n != 0) exit(1);
_status = RUNNING;
}
void join()
{
int n = pthread_join(_tid, nullptr);
if( n != 0)
{
std::cerr << "main thread join thread " << _name << " error" << std::endl;
return;
}
_status = EXITED;
}
~Thread()
{
}
private:
pthread_t _tid;
std::string _name;
func_t _func; // 线程未来要执行的回调
void *_args;
ThreadStatus _status;
};
lockGuard.hpp
#pragma once
#include <iostream>
#include <pthread.h>
class Mutex // 自己不维护锁,有外部传入
{
public:
Mutex(pthread_mutex_t *mutex):_pmutex(mutex)
{}
void lock()
{
pthread_mutex_lock(_pmutex);
}
void unlock()
{
pthread_mutex_unlock(_pmutex);
}
~Mutex()
{}
private:
pthread_mutex_t *_pmutex;
};
class LockGuard // 自己不维护锁,有外部传入
{
public:
LockGuard(pthread_mutex_t *mutex):_mutex(mutex)
{
_mutex.lock();
}
~LockGuard()
{
_mutex.unlock();
}
private:
Mutex _mutex;
};
线程池V2版本,主要通过自己封装的Thread库优化了线程池的Init函数、start函数,并且使用自己封装的lockGuard库更方便处理任务时的加锁解锁工作,代码更加简洁易读。
#pragma once
#include"Thread.hpp"
#include"lockGuard.hpp"
#include<iostream>
#include<string>
#include<vector>
#include<queue>
const static int N = 5;
template <class T>
class ThreadPool
{
public:
ThreadPool(int num = N)
:_num(num)
{
pthread_mutex_init(&_lock, nullptr);
pthread_cond_init(&_cond, nullptr);
}
~ThreadPool()
{
for (auto &t : _threads)
{
t.join();
}
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_cond);
}
pthread_mutex_t* getlock()
{
return &_lock;
}
void threadWait()
{
pthread_cond_wait(&_cond, &_lock);
}
void threadWakeup()
{
pthread_cond_signal(&_cond);
}
void pushTask(const T &t)
{
LockGuard lockgrard(&_lock);
_tasks.push(t);
threadWakeup();
}
T popTask()
{
T t = _tasks.front();
_tasks.pop();
return t;
}
bool isEmpty()
{
return _tasks.empty();
}
static void threadRoutine(void *args)
{
// pthread_detach(pthread_self());
ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
while (true)
{
// 1. 检测有没有任务
// 2. 有:处理
// 3. 无:等待
// 细节:必定加锁
T t;
{
LockGuard lockguard(tp->getlock());
while (tp->isEmpty())
{
tp->threadWait();
}
t = tp->popTask(); // 从公共区域拿到私有区域
}
// for test
t();
std::cout << "thread handler done, result: " << t.formatRes() << std::endl;
// t.run(); // 处理任务,不应该在临界区中处理
}
}
void init()
{
for (int i = 0; i < _num; i++)
{
_threads.push_back(Thread(i,threadRoutine,this));
}
}
void start()
{
for(auto &t : _threads)
{
t.run();
}
}
private:
std::vector<Thread> _threads;
int _num;
std::queue<T> _tasks;
pthread_mutex_t _lock;
pthread_cond_t _cond;
};