背景
最近在走读诊断项目代码时,发现其用到了线程池技术,感觉耳目一新。以前基本只是听过线程池,但是并没有实际应用。对它有一丝的好奇,于是趁这个机会深入了解一下线程池的实现原理。
线程池的优点
线程池出现的背景,其实对应CPU性能优化——“瑞士军刀“文章中提到的短时应用。
即短时间内通过创建线程处理大量请求,但是请求业务的执行时间过短,会造成一些缺陷。
- 浪费系统资源。比如我们创建一个线程,再销毁一个线程耗时10ms,但是业务的执行时间只有10ms。这就导致系统有效利用率较低。
- 系统不稳定。如果短时间内,来了大量的请求,每一个请求都通过创建线程的方式执行。可能存在瞬时负载很高,请求响应降低,从而导致系统不稳定。
于是我们可以通过线程池技术,减少线程创建和消耗的耗时,提高系统的资源利用;控制线程并行数量,确保系统的稳定性;
线程池实现
线程池的核心包括以下内容:
- 线程池任务节点结构。
- 线程池控制器。
- 线程池的控制流程。
线程池任务节点结构
线程池任务结点用来保存用户投递过来的的任务,并放入线程池中的线程来执行,任务结构如下:
struct worker_t {
void * (* process)(void * arg); /*回调函数*/
int paratype; /*函数类型(预留)*/
void * arg; /*回调函数参数*/
struct worker_t * next; /*链接下一个任务节点*/
};
线程池控制器
线程池控制器用来对线程池进行控制管理,描述当前线程池的最基本信息,包括任务的投递,线程池状态的更新与查询,线程池的销毁等,其结构如下:
/*线程控制器*/
struct CThread_pool_t {
pthread_mutex_t queue_lock; /*互斥锁*/
pthread_cond_t queue_ready; /*条件变量*/
worker_t * queue_head; /*任务节点链表 保存所有投递的任务*/
int shutdown; /*线程池销毁标志 1-销毁*/
pthread_t * threadid; /*线程ID*/
int max_thread_num; /*线程池可容纳最大线程数*/
int current_pthread_num; /*当前线程池存放的线程*/
int current_pthread_task_num; /*当前已经执行任务和已分配任务的线程数目和*/
int current_wait_queue_num; /*当前等待队列的的任务数目*/
int free_pthread_num; /*线程池允许最大的空闲线程数/*/
/**
* function: ThreadPoolAddWorkUnlimit
* description: 向线程池投递任务
* input param: pthis 线程池指针
* process 回调函数
* arg 回调函数参数
* return Valr: 0 成功
* -1 失败
*/
int (* AddWorkUnlimit)(void * pthis, void * (* process)(void * arg), void * arg);
/**
* function: ThreadPoolAddWorkLimit
* description: 向线程池投递任务,无空闲线程则阻塞
* input param: pthis 线程池指针
* process 回调函数
* arg 回调函数参数
* return Val: 0 成功
* -1 失败
*/
int (* AddWorkLimit)(void * pthis, void * (* process)(void * arg), void * arg);
/**
* function: ThreadPoolGetThreadMaxNum
* description: 获取线程池可容纳的最大线程数
* input param: pthis 线程池指针
*/
int (* GetThreadMaxNum)(void * pthis);
/**
* function: ThreadPoolGetCurrentThreadNum
* description: 获取线程池存放的线程数
* input param: pthis 线程池指针
* return Val: 线程池存放的线程数
*/
int (* GetCurrentThreadNum)(void * pthis);
/**
* function: ThreadPoolGetCurrentTaskThreadNum
* description: 获取当前正在执行任务和已经分配任务的线程数目和
* input param: pthis 线程池指针
* return Val: 当前正在执行任务和已经分配任务的线程数目和
*/
int (* GetCurrentTaskThreadNum)(void * pthis);
/**
* function: ThreadPoolGetCurrentWaitTaskNum
* description: 获取线程池等待队列任务数
* input param: pthis 线程池指针
* return Val: 等待队列任务数
*/
int (* GetCurrentWaitTaskNum)(void * pthis);
/**
* function: ThreadPoolDestroy
* description: 销毁线程池
* input param: pthis 线程池指针
* return Val: 0 成功
* -1 失败
*/
int (* Destroy)(void * pthis);
};
线程池的控制流程
线程池的控制流程可以分为三个步骤:
- 线程池创建。即创建
max_num
个线程ThreadPoolRoutine
,即空闲线程:
/**
* function: ThreadPoolConstruct
* description: 构建线程池
* input param: max_num 线程池可容纳的最大线程数
* free_num 线程池允许存在的最大空闲线程,超过则将线程释放回操作系统
* return Val: 线程池指针
*/
CThread_pool_t *
ThreadPoolConstruct(int max_num, int free_num)
{
int i = 0;
CThread_pool_t * pool = (CThread_pool_t *)malloc(sizeof(CThread_pool_t));
if(NULL == pool)
return NULL;
memset(pool, 0, sizeof(CThread_pool_t));
/*初始化互斥锁*/
pthread_mutex_init(&(pool->queue_lock), NULL);
/*初始化条件变量*/
pthread_cond_init(&(pool->queue_ready), NULL);
pool->queue_head = NULL;
pool->max_thread_num = max_num; // 线程池可容纳的最大线程数
pool->current_wait_queue_num = 0;
pool->current_pthread_task_num = 0;
pool->shutdown = 0;
pool->current_pthread_num = 0;
pool->free_pthread_num = free_num; // 线程池允许存在最大空闲线程
pool->threadid = NULL;
pool->threadid = (pthread_t *)malloc(max_num*sizeof(pthread_t));
/*该函数指针赋值*/
pool->AddWorkUnlimit = ThreadPoolAddWorkUnlimit;
pool->AddWorkLimit = ThreadPoolAddWorkLimit;
pool->Destroy = ThreadPoolDestroy;
pool->GetThreadMaxNum = ThreadPoolGetThreadMaxNum;
pool->GetCurrentThreadNum = ThreadPoolGetCurrentThreadNum;
pool->GetCurrentTaskThreadNum = ThreadPoolGetCurrentTaskThreadNum;
pool->GetCurrentWaitTaskNum = ThreadPoolGetCurrentWaitTaskNum;
for(i=0; i<max_num; i++) {
pool->current_pthread_num++; // 当前池中的线程数
/*创建线程*/
pthread_create(&(pool->threadid[i]), NULL, ThreadPoolRoutine, (void *)pool);
usleep(1000);
}
return pool;
}
- 投递任务。即将任务生产者,将任务节点投入线程池中。实现如下:
/**
* function: ThreadPoolAddWorkLimit
* description: 向线程池投递任务,无空闲线程则阻塞
* input param: pthis 线程池指针
* process 回调函数
* arg 回调函数参数
* return Val: 0 成功
* -1 失败
*/
int
ThreadPoolAddWorkLimit(void * pthis, void * (* process)(void * arg), void * arg)
{
// int FreeThreadNum = 0;
// int CurrentPthreadNum = 0;
CThread_pool_t * pool = (CThread_pool_t *)pthis;
/*为添加的任务队列节点分配内存*/
worker_t * newworker = (worker_t *)malloc(sizeof(worker_t));
if(NULL == newworker)
return -1;
newworker->process = process; // 回调函数,在线程ThreadPoolRoutine()中执行
newworker->arg = arg; // 回调函数参数
newworker->next = NULL;
pthread_mutex_lock(&(pool->queue_lock));
/*插入新任务队列节点*/
worker_t * member = pool->queue_head; // 指向任务队列链表整体
if(member != NULL) {
while(member->next != NULL) // 队列中有节点
member = member->next; // member指针往后移动
member->next = newworker; // 插入到队列链表尾部
} else
pool->queue_head = newworker; // 插入到队列链表头
assert(pool->queue_head != NULL);
pool->current_wait_queue_num++; // 等待队列加1
/*空闲的线程= 当前线程池存放的线程 - 当前已经执行任务和已分配任务的线程数目和*/
int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num;
/*如果没有空闲线程且池中当前线程数不超过可容纳最大线程*/
if((0 == FreeThreadNum) && (pool->current_pthread_num < pool->max_thread_num)) { //-> 条件为真进行新线程创建
int CurrentPthreadNum = pool->current_pthread_num;
/*新增线程*/
pool->threadid = (pthread_t *)realloc(pool->threadid,
(CurrentPthreadNum+1) * sizeof(pthread_t));
pthread_create(&(pool->threadid[CurrentPthreadNum]),
NULL, ThreadPoolRoutine, (void *)pool);
/*当前线程池中线程总数加1*/
pool->current_pthread_num++;
/*分配任务线程数加1*/
pool->current_pthread_task_num++;
pthread_mutex_unlock(&(pool->queue_lock));
/*发送信号给一个处与条件阻塞等待状态的线程*/
pthread_cond_signal(&(pool->queue_ready));
return 0;
}
pool->current_pthread_task_num++;
pthread_mutex_unlock(&(pool->queue_lock));
/*发送信号给一个处与条件阻塞等待状态的线程*/
pthread_cond_signal(&(pool->queue_ready));
// usleep(10); //看情况
return 0;
}
- 线程执行。即每一个线程的执行逻辑。实现如下:
/**
* function: ThreadPoolRoutine
* description: 线程池中执行的线程
* input param: arg 线程池指针
*/
void *
ThreadPoolRoutine(void * arg)
{
CThread_pool_t * pool = (CThread_pool_t *)arg;
while(1) {
/*上锁,pthread_cond_wait()调用会解锁*/
pthread_mutex_lock(&(pool->queue_lock));
/*队列没有等待任务*/
while((pool->current_wait_queue_num == 0) && (!pool->shutdown)) {
/*条件锁阻塞等待条件信号*/
pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));
}
if(pool->shutdown) {
pthread_mutex_unlock(&(pool->queue_lock));
pthread_exit(NULL); // 释放线程
}
assert(pool->current_wait_queue_num != 0);
assert(pool->queue_head != NULL);
pool->current_wait_queue_num--; // 等待任务减1,准备执行任务
worker_t * worker = pool->queue_head; // 去等待队列任务节点头
pool->queue_head = worker->next; // 链表后移
pthread_mutex_unlock(&(pool->queue_lock));
(* (worker->process))(worker->arg); // 执行回调函数
pthread_mutex_lock(&(pool->queue_lock));
pool->current_pthread_task_num--; // 函数执行结束
free(worker); // 释放任务结点
worker = NULL;
if((pool->current_pthread_num - pool->current_pthread_task_num) > pool->free_pthread_num) {
pthread_mutex_unlock(&(pool->queue_lock));
break; // 当线程池中空闲线程超过 free_pthread_num 则将线程释放回操作系统
}
pthread_mutex_unlock(&(pool->queue_lock));
}
pool->current_pthread_num--; // 当前线程数减1
pthread_exit(NULL); // 释放线程
return (void *)NULL;
}
这个就是用来执行任务的线程,在初始化创建线程时所有线程都全部阻塞在pthread_cond_wait()处,此时的线程就为空闲线程,也就是线程被挂起,当收到信号并取得互斥锁时,表明任务投递过来
则获取等待队列里的任务结点并执行回调函数;函数执行结束后回去判断当前等待队列是否还有任务,有则接下去执行,否则重新阻塞回到空闲线程状态。
若我的内容对您有所帮助,还请关注我的公众号。不定期分享干活,剖析案例,也可以一起讨论分享。
我的宗旨:
踩完您工作中的所有坑并分享给您,让你的工作无bug,人生尽是坦途
总结
实际上,我觉得在诊断项目中,线程池技术是非必要的。因此它不会涉及到大量的请求,以及每一个请求处理,一般都会比较耗时。
参考:https://www.cnblogs.com/zhaoosheLBJ/p/9337291.html