一、线程池原理
如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。
线程池是一种多线程处理形式,处理过程中将任务添加到任务队列,然后在创建线程后自动启动这些任务。
- 线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。
- 如果某个线程在托管代码中空闲(如正在等待某个事件), 则线程池将插入另一个辅助线程来使所有处理器保持繁忙。
- 如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。
- 超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。
二、线程池组成
- 任务队列(存储需要处理的任务,由工作的线程来处理这些任务)
- 通过线程池提供的 API 函数,将一个待处理的任务添加到任务队列,或者从任务队列中删除
- 已处理的任务会被从任务队列中删除
- 线程池的使用者,也就是调用线程池函数往任务队列中添加任务的线程就是生产者线程
- 工作的线程(任务队列任务的消费者,worker) ,N个
- 线程池中维护了一定数量的工作线程,他们的作用是是不停的读任务队列,从里边取出任务并处理
- 工作的线程相当于是任务队列的消费者角色,
- 如果任务队列为空,工作的线程将会被阻塞 (使用条件变量 / 信号量阻塞)
- 如果阻塞之后有了新的任务,由生产者将阻塞解除,工作线程开始工作
- 管理者线程(不处理任务队列中的任务),1个
- 它的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测
- 当任务过多的时候,可以适当的创建一些新的工作线程
- 当任务过少的时候,可以适当的销毁一些工作的线程
三、线程池C实现
3.1 cmake
cmake_minimum_required(VERSION 2.8.12)
project(Thread_Pool CXX C)
message(STATUS "CMake version: " ${CMAKE_VERSION})
message(STATUS "CMake system name: " ${CMAKE_SYSTEM_NAME})
message(STATUS "CMake system processor: " ${CMAKE_SYSTEM_PROCESSOR})
set(EXECUTABLE_OUTPUT_PATH ${PROJECT_BINARY_DIR}/bin)
include_directories(${CMAKE_SOURCE_DIR}/include)
file(GLOB SRC_FILES
"${PROJECT_SOURCE_DIR}/src/*.c"
"${PROJECT_SOURCE_DIR}/src/*.cpp"
)
find_package(Threads REQUIRED)
add_executable(${CMAKE_PROJECT_NAME} ${SRC_FILES})
target_link_libraries(${PROJECT_NAME} ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(${CMAKE_PROJECT_NAME} Threads::Threads)
target_link_libraries(${CMAKE_PROJECT_NAME} pthread)
3.2 threadPool.h
1. 创建任务结构体(task类型)
- 任务的函数指针
- 任务函数的参数
2. 创建线程池结构体(ThreadPoll类型)
- 任务队列(任务结构体(Task)的指针(Task *taskQ),本质是一个环形队列)
- 任务队列的的容量,任务队列头(存放的是任务队列数组的下标),任务队列尾(同上)
- master线程的ID,worker线程的ID(指针,数组的头)
- 最小线程数,最大线程数、繁忙线程数、存活线程数、销毁线程数
- 锁:线程池的锁、繁忙线程数锁
- 条件变量:队列是不是满、队列是不是空
- 销毁线程池标志
3. 相关API函数的声明
- 线程池创建初始化
- 销毁线程池
- 线程池添加任务
- 获取工作线程数、存活线程数
- 单个线程退出
#ifndef __THREAD_POOL_H__
#define __THREAD_POOL_H__
#include<stdio.h>
#include<stdio.h>
#include<pthread.h>
//任务结构体
struct Task{
void(*function)(void* arg); //任务的函数指针
void* arg; // 任务函数的参数(void指针类型)
};
typedef struct Task Task;
//线程池结构体
struct ThreadPool
{
Task* taskQ; /* 任务队列 */
int queueCapacity; //容量
int queueSize; //当前任务个数
int queueFront; //队头 -> 取数据
int queueRear; //队尾 -> 放数据
pthread_t managerID; //管理者线程ID(只有1个!)
pthread_t *threadIDs; //工作的线程ID(有许多个!)
int minNum; //最小线程数量
int maxNum; //最大线程数量
int busyNum; //繁忙的线程数量(当前正在执行工作函数的线程)
int liveNum; //存活的线程数量(当前存在的工作线程数,liveNum = busyNum + 没有任务阻塞休眠的线程)
int exitNum; //要销毁的线程数,准备要删除的线程数量
pthread_mutex_t mutexPool; //锁整个线程池 pthread_mutex_t <==> std::mutex(C++11)
pthread_mutex_t mutexBusy; //锁busyNum变量 pthread_mutex_t <==> std::mutex(C++11)
pthread_cond_t notFull; //任务队列是不是满了 pthread_cond_t <==> std::condition_varibale类(C++11)
pthread_cond_t notEmpty; //任务队列是不是空了
int shutdown; //是不是要销毁线程池,销毁为1,不销毁为0
};
typedef struct ThreadPool ThreadPool;
//创建线程池并且初始化
ThreadPool* threadPoolCreate(int min,int max,int queueSize);
//销毁线程池
int threadPoolDestroy(ThreadPool* pool);
//给线程池添加任务
void threadPoolAdd(ThreadPool* pool,void(*func)(void*),void* arg);
//获取线程池中工作的线程的个数
int threadPoolBusyNum(ThreadPool* pool);
//获取线程池中活着的线程的个数
int threadPoolAliveNum(ThreadPool* pool);
//注意:工作的线程一定是活着的!但是,活着的线程不一定在工作!!!
//工作的线程(消费者线程)的任务函数
void* worker(void* arg);
//管理者线程任务函数
void* manager(void* arg);// (主要用于创建和销毁线程池的!)
//单个线程退出
void threadExit(ThreadPool* pool);
#endif
3.3 threadPool.c
1. threadPoolCreate,创建线程池
- 输入:工作线程最小数、最大数 , 任务队列大小 ;返回值:线程池指针
- 1 申请线程池内存,申请max*woker线程的内存,
- 2 申请pool、busy锁;申请enmpty、full条件变量
- 3 申请Task * queueSize任务队列内存
- 4 创建一个master线程、min个woker线程
- 5 如果申请过程中失败,释放申请内存
2. threadPoolDestroy,销毁线程池
- 输入:线程池指针;返回值:0
- 1 销毁线程池标志shutdown置1,
- 2 回收master线程
- 3 empty条件变量唤醒worker线程
- woker线程没有任务的时候,empty条件变量将woker阻塞
- worker线程因为empty条件变量阻塞有两种唤醒情况:
- 1、添加新任务,唤醒empty阻塞线程,布置新任务;
- 2、线程池销毁,唤醒empty阻塞线程,然后判断销毁标志shutdown == 1,woker线程自杀(使用threadExit)
- 4 释放申请的内存
- 5 释放锁和条件变量
3. threadPoolAdd,添加任务
- 输入:线程池指针,任务函数指针,任务函数参数指针;返回值:空
- 1 如果线程池任务队列数等于最大容量,使用full条件变量阻塞master线程
- 2 任务队列添加任务(添加任务函数和函数参数)
- 3 队尾后移(对队列容量取余,实现环形队列),队列大小++
- 4 empty条件变量唤醒woker线程,之前因为任务队列空而empty条件休眠的woker线程
4. threadPoolBusyNum,获取忙线程个数
- 输入:线程池指针;返回值:忙线程数
5. threadPoolAliveNum,获取活线程个数
- 输入:线程池指针;返回值:活线程数
6. worker,工作线程函数
- 输入:线程池指针;返回值:空
- 1 判断任务队列是否为空,为空则empty条件变量阻塞线程
- 2 判断销毁线程数是否大于0,如果大于该worker线程自杀(threadExit函数)
- 3 判断销毁线程池标志shutdown是否为1,如果是该worker线程自杀(threadExit函数)
- 4 创建新任务task,从任务队列头中取任务,然后放入新创建的task。
- 5 移动队列头,queueSize–,busyNum++
- 6 唤醒full条件变量阻塞的生产者线程,之前add函数中任务队列满了,full条件变量阻塞生产者线程,现在释放
- 7 执行真正的任务,task.function(task.arg);
- 8 任务结束busyNum–
7. manager,管理线程函数
- 输入:线程池指针;返回值:空
- 1 检测线程池销毁标志shutdown,没有销毁一直检测,每隔一段时间管理一下
- 2 取出任务队列尺寸,活线程数,忙线程数
- 3 如果 活线程数 < 最大线程数 && 活线程数 < 队列尺寸,说明忙不过来,添加新woker线程
- 4 如果 忙线程*2 < 活线程数 && 最小线程数 < 活线程数 ,说明woker过多,设置销毁线程数,然后使用empty条件变量唤醒woker线程,唤醒的worker线程自杀
8. threadExit,线程退出函数
- 输入:线程池指针;返回值:空
- 1 获取当前pid
- 2 在任务队列里对比pid
- 3 找到自己的pid,改为0,给以后备用
- 4 线程自杀,pthread_exit(NULL);
#include"../include/thread_pool.h"
#include<string.h>//用memset函数
#include<unistd.h>//用sleep函数
#include<pthread.h>//使用pthread_self() 打印当前线程的id的函数
#include<stdlib.h>//使用malloc操作
const int ADDORDESTROYNUMBER = 2;//每次添加/销毁的线程的number个数
//创建线程池并且初始化
//形参表分别为:最小线程个数,最大线程个数以及队列大小!
ThreadPool* threadPoolCreate(int min,int max,int queueSize)
{
ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
do{
if(pool == NULL){//分配内存失败 return 空
printf("malloc threadpool fail ...\n");
break;
// return NULL;
}
//succeed to create a thread pool!
pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);//工作线程中,分别Max个空间的heap区数组空间
if(pool->threadIDs == NULL){
printf("malloc threadIDs fail ...\n");//分配内存失败 return 空
break;
// return NULL;
}
//初始化工作的线程IDs们
memset(pool->threadIDs, 0, sizeof(pthread_t) * max);//memset为cstring中的函数,用来赋值!
//把线程id数组中的元素全都赋值为0
pool->minNum = min;
pool->maxNum = max;
pool->busyNum = 0;
pool->liveNum = min;//初始化时以线程的个数最小值来创建活着的线程的个数!
pool->exitNum = 0;//一开始初始化肯定没有销毁线程的,这个数量要根据程序运行中的状态来decide!
if(pthread_mutex_init(&pool->mutexPool,NULL) != 0||
pthread_mutex_init(&pool->mutexBusy,NULL) != 0||
pthread_cond_init(&pool->notEmpty,NULL) != 0||
pthread_cond_init(&pool->notFull,NULL) != 0)
{
//此时创建失败
printf("mutex or condition inti fail ...\n");
break;
// return 0;
}
//此时创建锁mutex和条件变量cond成功!
//然后创建任务队列
pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);
//开辟一块arr内存存放任务队列,arr所存储的任务最大值为容量那么大
pool->queueCapacity = queueSize;
pool->queueSize = 0;//当前任务数为0个
pool->queueFront = 0;//因为没有任务,所有头部执行0index
pool->queueRear = 0;//因为没有任务,所有尾部执行0index
pool->shutdown = 0;//初始化时肯定不能销毁线程池,所有标记为0(自己规定的)
//创建线程
pthread_create(&pool->managerID,NULL,manager,pool);//创建管理者这一个线程
for(int i=0;i<min;i++){//一开始就创建min个线程(这是线程池中的最小线程数!)
pthread_create(&pool->threadIDs[i],NULL,worker,pool);
}
//如果能成功执行到这里,那么就表示成功执行了线程池!
//此时直接返回线程池即可!
return pool;
}while(0);//只会执行一次!只要有开辟不成功的case,马上break出while循环!
//下面再进行资源释放的工作!
if(pool && pool->threadIDs) free(pool->threadIDs);//线程池存在的case下,开辟了线程IDs的空间时,就释放它!
if(pool && pool->taskQ) free(pool->taskQ);//线程池存在的case下,开辟了taskQ的空间时,就释放它!
if(pool) free(pool);//释放线程池
return NULL;
}
//销毁线程池(当线程池被销毁时,线程池中的所有成员都必须被销毁!)
int threadPoolDestroy(ThreadPool* pool){
if(pool == NULL) return -1;// 表示此时线程池已空,不需要销毁了
// 线程池不空(没被销毁时)
pool->shutdown = 1;//关闭线程池!
// 阻塞回收管理者线程
pthread_join(pool->managerID,NULL);
// 唤醒阻塞的(活着的)消费者线程
// 唤醒后他们会自动退出,为什么退出呢?(因为我们写了让他们退出的条件判断代码!)
for(int i = 0;i < pool->liveNum; i++){
pthread_cond_signal(&pool->notEmpty);
}
// 释放申请的堆区内存
if(pool->taskQ){
free(pool->taskQ);
pool->taskQ = NULL;
}
if(pool->threadIDs){
free(pool->threadIDs);
pool->threadIDs = NULL;
}
//再释放互斥量锁还有条件类锁
pthread_mutex_destroy(&pool->mutexBusy);
pthread_mutex_destroy(&pool->mutexPool);
pthread_cond_destroy(&pool->notEmpty);
pthread_cond_destroy(&pool->notFull);
free(pool);
pool = NULL;
return 0;//return 0 就表示的是成功Destory了线程池并返回了!
}
//给线程池的任务队列中添加任务
void threadPoolAdd(ThreadPool* pool,void(*func)(void*),void* arg){
//由于你给该线程池的任务队列中添加任务时,很有可能此时你正在对任务进行读or写的操作
//那么因此这里就必须要用线程池的锁mutex_pool来锁住(防止这份共享代码因为OS的调度切换搞乱了!)
pthread_mutex_lock(&pool->mutexPool);
while(pool->queueSize == pool->queueCapacity && !pool->shutdown){
// 此时线程池的任务队列数 = 其最大容量了 并且 该线程池还没有被销毁 时
// 阻塞生产者线程
pthread_cond_wait(&pool->notFull,&pool->mutexPool);
}
// 此时被堵塞的生产者线程 被唤醒了(注意:此时该线程还是拿到了mutex互斥锁的状态的!)
// 先判断线程池是否已经被销毁了!
// 线程池被销毁
if(pool->shutdown){
pthread_mutex_unlock(&pool->mutexPool);// 解锁
return;// 并 退出程序
}
// 线程池没被销毁时
// 给线程池中的任务队列 添加任务
pool->taskQ[pool->queueRear].function = func;
pool->taskQ[pool->queueRear].arg = arg;
// 让队尾index (循环)后移!
pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
pool->queueSize++;// 队列任务+1
//生产者生产出新的任务,唤醒消费者,消费任务
pthread_cond_signal(&pool->notEmpty);//pool->notEmpty用来worker中!
//通知pool->notEmpty这个condition_variable条件变量的对象 返回true 让他唤醒了去工作干活了!
pthread_mutex_unlock(&pool->mutexPool);
}
// 获取线程池中工作(忙)的线程的个数
int threadPoolBusyNum(ThreadPool* pool){
// 注意 读取pool中的成员变量时 也为了防止别的线程给这些变量写or读数据
// 应该加上锁!
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
return busyNum;
}
// 获取线程池中活着的线程的个数
int threadPoolAliveNum(ThreadPool* pool){
// 注意 读取pool中的成员变量时 也为了防止别的线程给这些变量写or读数据
// 应该加上锁!且这里因为没有给aliveNum 定义对应的互斥量 so 直接用线程池的锁即可
pthread_mutex_lock(&pool->mutexPool);
int aliveNum = pool->liveNum ;
pthread_mutex_unlock(&pool->mutexPool);
return aliveNum ;
}
// 工作的线程(消费者线程)的任务函数
void* worker(void* arg){
//首先把传入进来的参数类型转换为线程池类型
ThreadPool* pool = (ThreadPool*)arg;
//接下来就算不断地读取任务队列中的任务(每个任务就是一个函数)
while(1){
pthread_mutex_lock(&pool->mutexPool);//加锁
// 当前任务队列是否为空
while(pool->queueSize == 0 && !pool->shutdown)
{
// 在当前队列中任务为空 并且 线程池没有被销毁时 就阻塞在这个工作线程中
pthread_cond_wait(&pool->notEmpty,&pool->mutexPool);//只有当pool->notEmpty的返回值为true时,就继续往下执行!
// 当该哦工作的线程被唤醒时,判断一下该线程是否需要被销毁
if(pool->exitNum > 0)
{
pool->exitNum--;//每次判断好要让该线程自杀后,必须让exitNum--才行!
// 当然,当条件变量wait被唤醒后,肯定是拿到了互斥锁才继续往下面执行的
// 此时就必须要把pool->mutexPool这把锁头给unlock解锁一下,再退出该程序,否则你没解锁就退出的话该程序就直接死锁了!
if(pool->liveNum > pool->minNum)
{
pool->liveNum--;
pthread_mutex_unlock(&pool->mutexPool);//解锁
// pthread_exit(NULL);//退出该线程!
threadExit(pool);
}
}
}
// 判断线程池是否被关闭了
if(pool->shutdown){// 若线程池被销毁了,可以直接退出线程!
pthread_mutex_unlock(&pool->mutexPool);//解锁
// pthread_exit(NULL);//退出线程
threadExit(pool);
}
// 若线程池没有被销毁了,可以直接做任务了(消费)
// 从任务队列中取出一个任务(真正 do things!)
struct Task task;
task.function = pool->taskQ[pool->queueFront].function;
task.arg = pool->taskQ[pool->queueFront].arg;
// 移动头节点 使之do循环移动 构成一个循环队列 , 取余数的目的的为了形成环形队列,到最后一个进行取余变成0,其他不变
pool->queueFront = (pool->queueFront+1) % pool->queueCapacity;
pool->queueSize--;//取出1个任务了,所有--
//解锁,消费者消费成功,唤醒阻塞的生产者生产。
pthread_cond_signal(&pool->notFull);
pthread_mutex_unlock(&pool->mutexPool);
// do 真正的工作(在function中)
printf("thread %ld start working...\n",pthread_self());
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum++;//工作时++
pthread_mutex_unlock(&pool->mutexBusy);
// do things 真正工作的函数进行执行
task.function(task.arg);// <==> (*task.function)(task.arg);
// 同时,因为此时这个线程在忙,所有busyNum++且用对应的锁来锁住,防止这个线程的小任务还没真正开始干活呢,
// OS就因为调度切换到别的线程去了
free(task.arg);
task.arg = NULL;
printf("thread %ld end working...\n",pthread_self());
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum--;//工作完成后--
pthread_mutex_unlock(&pool->mutexBusy);
pthread_mutex_unlock(&pool->mutexPool);//解锁
}
return NULL;
}
//管理者线程任务函数(主要用于创建和销毁线程池的!)
void* manager(void* arg){
ThreadPool* pool = (ThreadPool*)arg;
while(!pool->shutdown){
// 只要线程池没有销毁 就继续管理它
// 每隔3s钟 管理一下
sleep(3);//linux 下要包含头文件unistd.h才能使用!
// 取出线程池钟的任务数量以及当前线程的数量
// (由于你读取的过程钟有可能别的线程正在写数据),为了防止这种case你就必须要上锁才行!
pthread_mutex_lock(&pool->mutexPool);//加锁
int queueSize = pool->queueSize;
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);//解锁
// 取出忙的线程的数量
pthread_mutex_lock(&pool->mutexBusy);//加锁
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);//解锁
// 添加线程(此时可以根据你项目的需要来制定相应的添加线程个数的规则)
// 比如:当存活的线程数 < max && 当前存活的线程数 < 当前任务队列钟的任务数量 时 就添加线程!(此时表明当前的线程数已经忙不过来了)
if(liveNum < pool->maxNum && liveNum < queueSize){
pthread_mutex_lock(&pool->mutexPool);//加锁
int cnt = 0;
for(int i=0;i< pool->maxNum && cnt < ADDORDESTROYNUMBER
&& pool->liveNum < pool->maxNum;i++)
{
if(pool->threadIDs[i] == 0)//为0就 表明此时该线程ID可用!(之前没被使用,线程我可以用了!)
{
// 创建线程元素并放进去数组中!
pthread_create(&pool->threadIDs[i],NULL,worker,pool);
cnt++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->mutexPool);//解锁
}
// 销毁线程(此时可以根据你项目的需要来制定相应的销毁线程个数的规则)
// 比如:当 忙的线程数*2 < 存活的线程数 && 最小线程数 < 存活的线程数 时 就销毁线程
if(busyNum * 2 < liveNum && pool->minNum < liveNum){
pthread_mutex_lock(&pool->mutexPool);
pool->exitNum = ADDORDESTROYNUMBER;//要销毁的线程数字定义为ADDORDESTROYNUMBER(2)
pthread_mutex_unlock(&pool->mutexPool);
// 让工作的线程自杀
for(int i=0;i<ADDORDESTROYNUMBER;i++){
//唤醒线程,唤醒的是存活且不忙的线程
pthread_cond_signal(&pool->notEmpty);// ==> condition_variable中的.notify_one()
}
}
}
return NULL;
}
//单个线程退出
void threadExit(ThreadPool* pool){
//先获取当前线程的线程id
pthread_t threadId = pthread_self();// <==> std::this_thread::get_id();
for(int i=0;i<pool->maxNum;i++)
{
if(pool->threadIDs[i] == threadId)
{
pool->threadIDs[i] = 0;//重新置为0,表示该线程ID可在后续被使用了!
printf("threadExit() called,%ld existing...\n",threadId);
break;
}
}
pthread_exit(NULL);
}
四、线程池CPP实现
4.1 任务队列类的声明
// 定义任务结构体
using callback = void(*)(void*);
struct Task
{
Task()
{
function = nullptr;
arg = nullptr;
}
Task(callback f, void* arg)
{
function = f;
this->arg = arg;
}
callback function;
void* arg;
};
// 任务队列
class TaskQueue
{
public:
TaskQueue();
~TaskQueue();
// 添加任务
void addTask(Task& task);
void addTask(callback func, void* arg);
// 取出一个任务
Task takeTask();
// 获取当前队列中任务个数
inline int taskNumber()
{
return m_queue.size();
}
private:
pthread_mutex_t m_mutex; // 互斥锁
std::queue<Task> m_queue; // 任务队列
};
4.2 任务队列类的定义
TaskQueue::TaskQueue()
{
pthread_mutex_init(&m_mutex, NULL);
}
TaskQueue::~TaskQueue()
{
pthread_mutex_destroy(&m_mutex);
}
void TaskQueue::addTask(Task& task)
{
pthread_mutex_lock(&m_mutex);
m_queue.push(task);
pthread_mutex_unlock(&m_mutex);
}
void TaskQueue::addTask(callback func, void* arg)
{
pthread_mutex_lock(&m_mutex);
Task task;
task.function = func;
task.arg = arg;
m_queue.push(task);
pthread_mutex_unlock(&m_mutex);
}
Task TaskQueue::takeTask()
{
Task t;
pthread_mutex_lock(&m_mutex);
if (m_queue.size() > 0)
{
t = m_queue.front();
m_queue.pop();
}
pthread_mutex_unlock(&m_mutex);
return t;
}
4.3 线程池类的声明
class ThreadPool
{
public:
ThreadPool(int min, int max);
~ThreadPool();
// 添加任务
void addTask(Task task);
// 获取忙线程的个数
int getBusyNumber();
// 获取活着的线程个数
int getAliveNumber();
private:
// 工作的线程的任务函数
static void* worker(void* arg);
// 管理者线程的任务函数
static void* manager(void* arg);
void threadExit();
private:
pthread_mutex_t m_lock;
pthread_cond_t m_notEmpty;
pthread_t* m_threadIDs;
pthread_t m_managerID;
TaskQueue* m_taskQ;
int m_minNum;
int m_maxNum;
int m_busyNum;
int m_aliveNum;
int m_exitNum;
bool m_shutdown = false;
};
4.4 线程池类的定义
ThreadPool::ThreadPool(int minNum, int maxNum)
{
// 实例化任务队列
m_taskQ = new TaskQueue;
do {
// 初始化线程池
m_minNum = minNum;
m_maxNum = maxNum;
m_busyNum = 0;
m_aliveNum = minNum;
// 根据线程的最大上限给线程数组分配内存
m_threadIDs = new pthread_t[maxNum];
if (m_threadIDs == nullptr)
{
cout << "malloc thread_t[] 失败...." << endl;;
break;
}
// 初始化
memset(m_threadIDs, 0, sizeof(pthread_t) * maxNum);
// 初始化互斥锁,条件变量
if (pthread_mutex_init(&m_lock, NULL) != 0 ||
pthread_cond_init(&m_notEmpty, NULL) != 0)
{
cout << "init mutex or condition fail..." << endl;
break;
}
/// 创建线程 //
// 根据最小线程个数, 创建线程
for (int i = 0; i < minNum; ++i)
{
pthread_create(&m_threadIDs[i], NULL, worker, this);
cout << "创建子线程, ID: " << to_string(m_threadIDs[i]) << endl;
}
// 创建管理者线程, 1个
pthread_create(&m_managerID, NULL, manager, this);
} while (0);
}
ThreadPool::~ThreadPool()
{
m_shutdown = 1;
// 销毁管理者线程
pthread_join(m_managerID, NULL);
// 唤醒所有消费者线程
for (int i = 0; i < m_aliveNum; ++i)
{
pthread_cond_signal(&m_notEmpty);
}
if (m_taskQ) delete m_taskQ;
if (m_threadIDs) delete[]m_threadIDs;
pthread_mutex_destroy(&m_lock);
pthread_cond_destroy(&m_notEmpty);
}
void ThreadPool::addTask(Task task)
{
if (m_shutdown)
{
return;
}
// 添加任务,不需要加锁,任务队列中有锁
m_taskQ->addTask(task);
// 唤醒工作的线程
pthread_cond_signal(&m_notEmpty);
}
int ThreadPool::getAliveNumber()
{
int threadNum = 0;
pthread_mutex_lock(&m_lock);
threadNum = m_aliveNum;
pthread_mutex_unlock(&m_lock);
return threadNum;
}
int ThreadPool::getBusyNumber()
{
int busyNum = 0;
pthread_mutex_lock(&m_lock);
busyNum = m_busyNum;
pthread_mutex_unlock(&m_lock);
return busyNum;
}
// 工作线程任务函数
void* ThreadPool::worker(void* arg)
{
ThreadPool* pool = static_cast<ThreadPool*>(arg);
// 一直不停的工作
while (true)
{
// 访问任务队列(共享资源)加锁
pthread_mutex_lock(&pool->m_lock);
// 判断任务队列是否为空, 如果为空工作线程阻塞
while (pool->m_taskQ->taskNumber() == 0 && !pool->m_shutdown)
{
cout << "thread " << to_string(pthread_self()) << " waiting..." << endl;
// 阻塞线程
pthread_cond_wait(&pool->m_notEmpty, &pool->m_lock);
// 解除阻塞之后, 判断是否要销毁线程
if (pool->m_exitNum > 0)
{
pool->m_exitNum--;
if (pool->m_aliveNum > pool->m_minNum)
{
pool->m_aliveNum--;
pthread_mutex_unlock(&pool->m_lock);
pool->threadExit();
}
}
}
// 判断线程池是否被关闭了
if (pool->m_shutdown)
{
pthread_mutex_unlock(&pool->m_lock);
pool->threadExit();
}
// 从任务队列中取出一个任务
Task task = pool->m_taskQ->takeTask();
// 工作的线程+1
pool->m_busyNum++;
// 线程池解锁
pthread_mutex_unlock(&pool->m_lock);
// 执行任务
cout << "thread " << to_string(pthread_self()) << " start working..." << endl;
task.function(task.arg);
delete task.arg;
task.arg = nullptr;
// 任务处理结束
cout << "thread " << to_string(pthread_self()) << " end working...";
pthread_mutex_lock(&pool->m_lock);
pool->m_busyNum--;
pthread_mutex_unlock(&pool->m_lock);
}
return nullptr;
}
// 管理者线程任务函数
void* ThreadPool::manager(void* arg)
{
ThreadPool* pool = static_cast<ThreadPool*>(arg);
// 如果线程池没有关闭, 就一直检测
while (!pool->m_shutdown)
{
// 每隔5s检测一次
sleep(5);
// 取出线程池中的任务数和线程数量
// 取出工作的线程池数量
pthread_mutex_lock(&pool->m_lock);
int queueSize = pool->m_taskQ->taskNumber();
int liveNum = pool->m_aliveNum;
int busyNum = pool->m_busyNum;
pthread_mutex_unlock(&pool->m_lock);
// 创建线程
const int NUMBER = 2;
// 当前任务个数>存活的线程数 && 存活的线程数<最大线程个数
if (queueSize > liveNum && liveNum < pool->m_maxNum)
{
// 线程池加锁
pthread_mutex_lock(&pool->m_lock);
int num = 0;
for (int i = 0; i < pool->m_maxNum && num < NUMBER
&& pool->m_aliveNum < pool->m_maxNum; ++i)
{
if (pool->m_threadIDs[i] == 0)
{
pthread_create(&pool->m_threadIDs[i], NULL, worker, pool);
num++;
pool->m_aliveNum++;
}
}
pthread_mutex_unlock(&pool->m_lock);
}
// 销毁多余的线程
// 忙线程*2 < 存活的线程数目 && 存活的线程数 > 最小线程数量
if (busyNum * 2 < liveNum && liveNum > pool->m_minNum)
{
pthread_mutex_lock(&pool->m_lock);
pool->m_exitNum = NUMBER;
pthread_mutex_unlock(&pool->m_lock);
for (int i = 0; i < NUMBER; ++i)
{
pthread_cond_signal(&pool->m_notEmpty);
}
}
}
return nullptr;
}
// 线程退出
void ThreadPool::threadExit()
{
pthread_t tid = pthread_self();
for (int i = 0; i < m_maxNum; ++i)
{
if (m_threadIDs[i] == tid)
{
cout << "threadExit() function: thread "
<< to_string(pthread_self()) << " exiting..." << endl;
m_threadIDs[i] = 0;
break;
}
}
pthread_exit(NULL);
}
五、线程池实现难点
4.1 empty条件变量
判断任务队列是否满了
- 阻塞休眠
- 当worker函数消费者,消耗任务队列为空,empty条件变量让该woker线程阻塞休眠
- 释放唤醒
- 当add函数添加新任务到队列中,empty条件变量唤醒阻塞woker线程,执行新任务
- 当threadPoolDestroy函数销毁线程池,shutdown标志为1,empty条件变量唤醒阻塞woker线程。woker线程醒来后发现shutdown标志为1,进行自杀
- 当manager函数判断当前woker线程过多,设置销毁线程数exitNum,empty条件变量唤醒阻塞woker线程。woker线程醒来后发现s销毁线程数exitNum 大于0,进行自杀
4.2 full条件变量
判断任务队列是否空了
- 阻塞休眠
- 当add函数发现任务队列满了,无法添加新任务时,full条件变量让该生产者线程(调用add函数的线程)阻塞休眠
- 释放唤醒
- 当worker函数从任务队列中取出新任务,任务队列有空间的时候,full条件变量唤醒阻塞生产者线程,创建新任务