1.前言
多进程/线程并发服务器、多路I/O转接服务器的简单实现-CSDN博客
原先的多线程并发服务器,有多少个客户端连接服务器就有多少个线程,CPU需要在多个线程之间来回切换处理客户端的请求,系统消耗比较大(每次创建和消耗线程在操作系统内部消耗大)
而select、epoll的多路IO转接服务器,只需要一个进程就可以了,交给了内核维护,系统开销比较小
多进程也是,只不过多个进程的使得可使用的文件描述符变多,能同时处理的客户端数量变得更多,当然资源消耗也大
为了解决多线程每次线程创建好处理完客户端的数据后就销毁线程导致消耗过大,就可以采用线程池的方法,一次性创建一堆线程:
1. 预先创建阻塞于accept多线程,使用互斥锁上锁保护accept(server和clien通信之前就创建好的)
2. 预先创建多线程,由主线程调用accept
client1连接完后,pthread_cond_wati让线程1释放锁阻塞等 待任务队列中任务的到来,client1有数据传给server了,也就是到达server的任务拿到锁了将任务写进了任务队列当中,server用pthread_cond_signal唤醒正在阻塞等待的线程1去处理client的数据,处理完后重新上锁,将线程1还回给线程池而不是去销毁掉,避免掉了开销
线程池解决server创建线程处理数据时服务器的开销
select等则是解决多个client如何去和server建立连接的请求(accept的阻塞性质)
对于线程池应该有动态的机制,当客户量访问太多时,也就是正在忙活的线程数量pthread_busy_num过多,快接近目前线程池存储的线程总数量pthread_live_num时,需要对其进行扩容;当客户量变少时,对线程的数量pthread_live_num进行减少; -- 因此需要专门有一个管理线程来管理线程池
2.案例讲解代码
#define _CRT_SECURE_NO_WARNINGS 1
#include <stdlib.h>
#include<pthread.h>
#include<unistd.h>
#include<assert.h>
#include<stdio.h>
#include<string.h>
#include<signal.h>
#include<errno.h>
#include"threadpool.h"
#define DEFAULT_TIME 10 /*10s检测一次*/
#define MIN WAIT_TASK_NUM 10 /*如果queve size > MIN WAIT TASK NUM 添加新的线程到线程池*/
#define DEFAULT_THREAD_VARY 10 /*每次创建和销毁线程的个数*/
#define true 1
#define false 0
/* A */
typedef struct {
void* (*function)(void*); /*函数指针,回调函数 */
void* arg; /*上面函数的参数 */
}threadpool_task_t; /*各子线程任务结构体 */
/*描述符线程池相关信息*/
struct threadpool_t {
pthread_mutex_t lock; //用于锁住本结构体
pthread_mutex_t thread_counter; //记录忙状态线程个数de琐-- busy_thr_num
pthread_cond_t queue_not_full; //当任务队列满时,添加任务的线程阻塞,等待此条件变量
pthread_cond_t queue_not_empty; //任务队列里不为空时,通知等待任务的线程
pthread_t* threads; //存放线程池中每个线程的tid。数组
pthread_t adjust_tid; //存管理线程tid
threadpool_task_t* task_queue; //任务队列(数组首地址)
int min_thr_num; //线程池最小线程数
int max_thr_num; //线程池最大线程数
int live_thr_num; //当前存活线程个数
int busy_thr_num; //忙状态线程个数
int wait_exit_thr_num; //要销毁的线程个数
int queue_front; //task_queue队头下标
int queue_rear; //task queue队尾下标
int queue_size; //task_queue队中实际任务数
int queue_max_size; //task queue队列可容纳任务数上限
int shutdown; //标志位,线程池使用状态,true或false
};
void* adjust_thread(void* threadpool);
int is_thread_alive(pthread t tid);
int threadpool_free(threadpool t* pool);
/* C */
//threadpool create(3,100,100); 线程池的创建
threadpool_t* threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
{
int i;
threadpool _t* pool = NULL; /*线程池 结构体 */
do {
if ((pool = (threadpool_t*)malloc(sizeof(threadpool_t))) == NULL) {
printf("malloc threadpool fail");
break; /*跳出do while*/
}
pool->min_thr_num = min_thr_num; //最小线程数
pool->max_thr_num = max_thr_num; //最大线程数
pool->busy_thr_num = 0; //忙活的线程
pool->live_thr_num = min_thr_num; /*活着的线程数 初值=最小线程数 */
pool->wait_exit_thr_num = 0; //要销毁的线程数
pool->queue_size = 0; /*有0个产品 */
pool->queue_max_size = queue_max_size; /*最大任务队列数 */
pool->queue_front = 0; //任务队头
pool->queue_rear = 0; //任务队尾
pool->shutdown = false; /*不关闭线程池 */
/*根据最大线程上限数给工作线程数组开辟空间,并清零*/
pool->threads = (pthread_t*)malloc(sizeof(pthread_t) * max_thr_num);
if (pool->threads == NULL) {
printf("malloc threads fail");
break;
}
memset(pool->threads, 0, sizeof(pthread t) * max_thr_num);
/*给 任务队列 开辟空间 */
pool->task_queue = (threadpool_task_t*)malloc(sizeof(threadpool_task_t) * queue_max_size);
if (pool->task_queue == NULL) {
printf("malloc task_queue fail");
break;
}
/*初始化互斥琐、条件变量:两个锁和两个条件变量 */
/*
pthread_mutex_t lock; //用于锁住本结构体
pthread_mutex_t thread_counter; //记录忙状态线程个数de琐-- busy_thr_num
pthread_cond_t queue_not_full; //当任务队列满时,添加任务的线程阻塞,等待此条件变量
pthread_cond_t queue_not_empty; //任务队列里不为空时,通知等待任务的线程
*/
if (pthread_mutex_init(&(pool->lock), NULL) != 0
|| pthread_mutex_init(&(pool->thread_counter), NULL) != 0
|| pthread_cond_init(&(pool->queue_not_empty), NULL) != 0
|| pthread_cond_init(&(pool->queue_not_full), NULL) != 0)
{
printf("init the lock or cond fail");
break;
}
/*启动 min_thr_num个work thread */
//在线程池pool->threads中开启最小个数线程
for (i = 0; i < min_thr_num; i++) {
//int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void*)pool); /*pool指向当前线程池*/
//每次创建线程就将该线程池pool传给子线程(回调函数的参数),使子线程对线程池pool的数据进行更新
//比如创建子线程后将pid存储到pool->thread[i]中 -- 回调函数的工作
printf("start thread 0x%x...\n", (unsianed int)pool->threadsr[i]);
}
pthread_create(&(pool->adjust_tid), NULLL, adjust_thread, (void*)pool); //创建管理者线程
return pool;
} while (0);
threadpool_free(pool);
return NULL;
}
/* D */
//线程池中各个工作线程 -- 线程创建的回调函数
void* threadpool thread(void* threadpool) {
threadpool_t* pool = (threadpool t*)threadpool;
threadpool_task_t task; //各个子线程的任务结构体:函数指针--回调函数;回调函数的参数arg
while (true) {
/*Lock must be taken to wait on conditional variable */
/*刚创建出线程,等待任务队列里有任务,否则阻塞等待任务队列里有任务后再唤醒接收任务*/
pthread_mutex_lock(&(pool->lock));
/*queue_size == 0 说明没有任务,调 wait 阻塞在条件变量上,若有任务,跳过该while*/
while ((pool->queue_size == 0) && (!pool->shutdown)) {
printf("thread @x%x is waiting\n", (unsigned int)pthread_self());
pthread _cond_wait(&(pool->queue_not_empty), &(pool->lock));
// --- 看分析G 下面是销毁线程池时会调用到 146-166 --- //
/*清除指定数目的空闲线程,如果要结束的线程个数大于日,结束线程 -- 用于搜身的*/
if (pool->wait_exit_thr_num > 0) {
pool->wait_exit_thr_num--;
/*如果线程池里线程个数大于最小值时可以结束当前线程*/
if (pool->live_thr_num > pool->min_thr_num) {
printf("thread 0x%x is exiting\n", (unsigned int)pthread self());
pool->live_thr_num--;
pthread_mutex_unlock(&(pool->lock));
pthread_exit(NULL);
}
}
}
/*如果指定了true,要关闭线程池里的每个线程,自行退出处理---销毁线程池*/
if (pool->srutdown) {
pthread_mutex_unlock(&(pool->lock));
printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());
pthread_detach(pthread_self());
pthread_exit(NULL); /*线程自行结束 */
}
/*从任务队列里获取任务,是一个出队操作*/
task.function = pool->task queue[pool->queue front].function;
task.arg = pool->task_queue[pool->queue_front].arg;
pool->queue_front = (pool->queue_front + 1) % pool->queue _max_size;/*出队,模拟环形队列 */
pool->queue size--;
/*通知可以有新的任务添加进来*/
pthread_cond_broadcast(&(pool->queue_not_full));
/*任务取出后,立即将 线程池琐 释放*/
pthread_mutex_unlock(&(pool->lock));
/*执行任务*/
printf("thread 0x%x start working\n",(unsigned int)pthread_self());
pthread_mutex_lock(&(p0ol->thread_counter); /*忙状态线程数变量琐*/
pool->busy_thr_num++; /*忙状态线程数+1*/
pthread_mutex_unlock(&(pool->thread_counter));
(*(task.function))(task.arg); /*执行回调函数任务*/
//task.function(task.arg); /*执行回调函数任务*/
/*任务结束处理*/
printf("thread 0x%x end working\n", (unsigned int)pthread_self());
pthread_mutex_lock(&(pool->thread counter));
pool->busy thr num--; /*处理掉一个任务,忙状态数线程数-1*/
pthread_mutex_unlock(&(pool->thread_counter));
}
pthread_exit(NULL);
}
/* E 管理线程 */
void* adjust_thread(void* threadpool) {
int i;
threadpool_t* pool = (threadpool_t*)threadpool;
while (!pool->shutdown) {
sleep(DEFAULT_TIME); //定时对线程池管理
pthread_mutex_lock(8(pool->lock));
int queue_size = pool->queue_size; //关注 任务数
int live_thr_nun = pool->live_thr_num; //存活 线程数
pthread_mutex_unlock(&(pool->lock));
pthread_mutex_lock(&(pool->thread counter));
int busy_thr_num = pool->busy_thr_num; //忙着的线程数
pthread_mutex_unlock(&(pool->thread_counter));
/*创建新线程 算法 : 任务数大于最小线程池个数,且存活的线程数少于最大线程个数时 如:30>=10 && 40<100*/
if (queue size >= MIN WAIT TASK NUM && live thr num < pool->max thr _num) {
pthread mutex lock(&(pool->lock));
int add = 0;
/*一次增加 DEFAULT THREAD 个线程*/
for (i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY
&& pool->live_thr_num < pool->max_thr_num; i++) {
if (pool->threads[i] = 0 || is_thread_alive(pool->threads[i])) {
pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void*)pool);
add++;
pool->live_thr_num++;
}
pthread_mutex_unlock(&(pool->lock));
}
/*销毁多余的空闲线程 算法:忙线程X2 小于 存活的线程数 且 存活的线程数 大于 最小线程数时*/
if ((busy_thr_num * 2) < live_thr_num && live_thr_num > pool->min_thr_num) {
/*一次销毁DEFAULT_THREAD个线程,随機10個即可 */
pthread_mutex_lock(&(pool->lock));
pool->wait_exit_thr_num = DEFAULT_THREAD_VARY; /*要销毁的线程数 设置为10 */
pthread mutex_unlock(&(pool->lock));
for (i = 0; i < DEFAULT THREAD VARY; i++) {
/*通知处在空闲状态的线程,他们会自行终止*/
pthread cond signal(&(pool->queue_not_enpty));
}
}
}
return NULL
}
/* F */
//线程池中的线程,模拟处理业务
void* process(void* arg) {
printf("thread 0x%x working on taks %d/n", (usigned int)pthread_self(), (int)arg);
sleep(1);
printf("task %d is end \n", (int)arg);
return NULL;
}
/*向线程池中 添加一个任务 */
//threadpool_add(thp,process,(void*)&num[i]); /*向线程池中添加任务 process:小写---->大写*/
int threadpool_add(threadpool_t * pool, void* (*function)(void* arg), void* arg) {
pthread mutex_lock(&(pool->lock));
/*==为真,队列已经满,调wait阻塞 */
while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) {
pthread_cond_wait(&(pool->queue_not_full), & (pool->lock));
}
//为false,线程池不关闭,跳过
if (pool->shutdown) {
pthread_cond_broadcast(&(pool->queue_not_empty));
pthread_mutex_unlock(&(pool->lock));
return 0;
}
/*清空 工作线程 调用的回调函数 的参数arg */
if (pool->task_queue[pool->queue_rear].arg != NULL) {
pool->task_queue[pool->queue_rear].arg = NULL;
}
/*添加任务到任务队列里*/
pool->task_queue[pool->queue rear].function = function;
pool->task_queue[pool->queue rear].arg = arg;
pool->queue rear = (pool->queue rear + 1) % pool->queue_max_size; /*队尾指针移动,模拟环形*/
pool->queue_size++;
/*添加完任务后,队列不为空,唤醒线程池中 等待处理任务的线程*/
ptread_cond_signal(&(pool->queue_not_empty));
pthread_mutex_unlock(&(pool->lock));
return 0;
}
/* G */
//销毁线程池
int threadpool_destroy(threadpool_t * pool)
{
int i;
if (pool == NULL) {
return -1;
}
pool->shutdown = true;
/*先销毁管理线程 */
pthread_join(pool->adjust_tid, NULL);
for (i = 0; i < pool->adjust_tid, NULL) {
/*通知所有的空闲线程*/
//这个销毁函数是在任务都被处理完了才会被调用,所以线程池里的线程都空闲,阻塞在wait等待被唤醒
pthread_cond_broadcast(&(pool->queue_not_empty));
//任务队列没任务,这时候还去唤醒正在等待任务的线程,欺骗它有任务将其唤醒,使其销毁 -- 在 分析D 的while(任务数 == 0)
//的pthread_cond_wait -- 被唤醒
}
for (i = 0; i < pool->live_thr_num; i++) {
pthread_join(pool->threads[i], NULL);
}
threadpool_free(pool);
return 0;
}
//对锁和条件变量的销毁,释放线程池的空间等
int threadpool free(threadpool_t * pool) {
if (POOL == NULL) {
return -1;
}
if (pool->task_queue) {
free(pool->task_queue);
}
if (pool->threads) {
free(pool->threads);
pthread_mutex_lock(&(pool->lock));
pthread_mutex_destroy(&(pool->lock));
pthread_mutex_lock(&(pool->thread_counter));
pthread_mutex_destroy(&(pool->thread_counter));
pthread_cond_destroy(&(pool->queue_not_empty));
pthread_cond_destroy(&(pool->queue_not_full));
}
free(pool);
pool = NULL;
return 0;
}
/* B */
int main(void) {
/*threadpool_t *threadpool_create(int min_thr _num, int max_ thr_num, int queue_max_size);*/
threadpool_t* thp = threadpool_create(3, 100, 100); /*创建线程池,池里最小3个线程,最大100,队列最大100*/
printf("pool inited");
//int *num =(int *)malloc(sizeof(int)*20);
int num[20],i;
for (i = 0; i < 20; i++) {
num[i] = i;
printf("add task %d\n", i);
/*int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)*/
threadpool_add(thp,process, (void*)&num[i]); /* 向线程池中添加任务*/
}
sleep(10); /*等子线程完成任务*/
threadpool_destroy(thp);
return 0;
}
3.分析
A.线程池结构体pool
1.真正的线程池是pool->threads,是个存储各个线程pid的数组,pool其余的成员都是对线程池的限制完成
2.关于线程任务结构体:
-
typedef struct { void *(*function)(void *); /*函数指针,回调函数 */ void *arg; /*上面函数的参数 */ }threadpool_task_t; /*各子线程任务结构体 */ //pool中任务队列threadpool_task_t *task_queue的队员
- function是函数指针,指向模拟处理任务业务的回调函数process,arg是要处理内容,是function的参数
3.任务队列threadpool_task_t *task_queue:
pool中有任务队列头尾的下标 -- queue_front和queue_rear,需要注意的是采用的是环形队列的方法,下标rear的元素是空的
有新任务到来,该下标处的元素置空再将新任务赋值上去比较妥当:
--- 分析 F pthread_add任务添加函数
4.pool->shutdown,为false表示线程池不关闭
B.main
1.创建线程
分析C:pool的初始化; -- threadpool_create函数
分析D:线程池最少线程的创建;-- C中调用pthread_create创建线程,调用回调函数threadpool_thread(D)
分析E:管理线程的创建 -- 一样
2.模拟客户端产生线程,将其写进pool的任务队列task_queue当中 -- threadpool_add函数
也需要加锁,防止在添加进任务、对pool的queue_size现有任务数等进行更新时,被线程取走了任务,导致记录任务数和现有任务数不同 -- 分析F
3.销毁线程池 -- G
C.pool的初始化
创建线程池初始化好相关属性:最小线程数、最大线程数、开阔一篇地址(数组)作为线程池 -- 存储子线程pid的数组、管理线程、锁和条件变量的初始化、任务队列的开辟、创建好最小线程 -- 处理任务的回调函数、最大队列数等
do while(0),并不是说只需要一次,只要有一个步骤出现问题,就会break跳出循环,这就进入下一轮循环,知道do while中各个步骤都没问题没有break就会跳出do while
1.pthread_create创建线程,调用线程的回调函数threadpool_thread -- 分析D
- 刚创建线程池,任务队列也为空,线程池无法工作,释放锁阻塞等待 -- D
2.继续执行主线程的,创建管理线程 -- pool_adjust_tid:--分析E
pthread_create(&(pool->adjust_tid),NULLL,adjust_thread,(void *)pool); //创建管理者线程
调用回调函数adjust_thread -- 分析E
负责对线程池进行增加或者减少线程
刚创建没任务,线程也不需要扩展或者减少,让其一直在那循环监控就绪
初始化完成
D.创建子线程回调函数
void *threadpool_thread(void *threadpool) -- 工作线程
C中pool的初始化创建子线程时会调用,参数传的是pool;以及管理线程感知到线程池数量不够时也会调用来创建线程
1.将pool结构体锁住
2.没有任务(queue_not_empty不满足,队列为空):解锁 -- pthread_cond_wait -- 阻塞等待条件满足 -- 任务到来,满足条件变量queue_not_empty,队列不为空 -- pool加锁,处理任务
3.如果是 C 刚初始化后的,队列还没有任务,就一直阻塞,先回到 C 主线程继续创建管理线程
4.如果初始化完成的pool,B F 中模拟客户端产生了任务添加进pool中的任务队列threadpool_task_t *task_queue,queue_size != 0,对任务进行处理
5.定义一个threadpool_task_t task存放从任务队列中取出来的任务,队列有空间了,就可以通知正阻塞在任务队列不为满的条件变量上的任务可以进入队列;执行任务 --> 给busy_thr_num忙线程数加锁,并且该数++ --> 表示正式工作的
6.调用执行任务函数task.function -- process,传参传task.arg
7.任务处理完成,对忙状态线程数加锁--后解锁
之所以要设置忙状态线程数,是因为管理线程对线程池进行扩容或是缩减需要有个参照;而加锁是为了多个线程恰巧同时处理完任务,对忙状态线程数进行--时,本来该-3,结果还没写进内存又被更改覆盖掉了,总共变成-1
//线程池中各个工作线程 -- 线程创建的回调函数
void *threadpool thread(void *threadpool){
threadpool_t *pool=(threadpool t *)threadpool;
threadpool_task_t task; //各个子线程的任务结构体:函数指针--回调函数;回调函数的参数arg
while(true){
/*Lock must be taken to wait on conditional variable */
/*刚创建出线程,等待任务队列里有任务,否则阻塞等待任务队列里有任务后再唤醒接收任务*/
pthread_mutex_lock(&(pool->lock));
/* 3 初始化--- queue_size == 0 说明没有任务,调 wait 阻塞在条件变量上,将锁释放,等待任务到来;若有任务,跳过该while*/
while((pool->queue_size==0)&&(!pool->shutdown)){
printf("thread @x%x is waiting\n",(unsigned int)pthread_self());
pthread _cond_wait(&(pool->queue_not_empty),&(pool->lock));
//初始化完成后, F 的任务添加函数add被调用,任务添加,线程被唤醒,queue_size != 0,跳出循环
/*清除指定数目的空闲线程,如果要结束的线程个数大于0,结束线程*/
//这个if语句就是用于销毁扩容的线程的,管理者线程就是欺骗这些空闲的线程将其唤醒,使其指向下面的代码,实现线程池搜身
if(pool->wait_exit_thr_num >0){
pool->wait_exit_thr_num--;
/*如果线程池里线程个数大于最小值时可以结束当前线程*/
if(pool->live_thr_num > pool->min_thr_num){
printf("thread 0x%x is exiting\n",(unsigned int)pthread self());
pool->live_thr_num--;
pthread_mutex_unlock(&(pool->lock));
pthread_exit(NULL);
}
}
}
/*如果指定了true,要关闭线程池里的每个线程,自行退出处理---销毁线程池*/
if(pool->srutdown){
pthread_mutex_unlock(&(pool->lock));
printf("thread 0x%x is exiting\n",(unsigned int)pthread_self());
pthread_detach(pthread_self());
pthread_exit(NULL); /*线程自行结束 */
}
/*从任务队列里获取任务,是一个出队操作*/
task.function = pool->task queue[pool->queue front].function;
task.arg = pool->task_queue[pool->queue_front].arg;
pool->queue_front =(pool->queue_front + 1)% pool->queue _max_size;/*出队,模拟环形队列 */
pool->queue size--;
/*通知可以有新的任务添加进来*/
pthread_cond_broadcast(&(pool->queue_not_full));
/*任务取出后,立即将 线程池琐 释放*/
pthread_mutex_unlock(&(pool->lock));
/*执行任务*/
printf("thread 0x%x start working\n",(unsigned int)pthread_self());
pthread_mutex_lock(&(p0ol->thread_counter); /*忙状态线程数变量琐*/
pool->busy_thr_num++; /*忙状态线程数+1*/
pthread_mutex_unlock(&(pool->thread_counter));
(*(task.function))(task.arg); /*执行回调函数任务*/
//task.function(task.arg); /*执行回调函数任务*/
/*任务结束处理*/
printf("thread 0x%x end working\n",(unsigned int)pthread_self());
pthread_mutex_lock(&(pool->thread counter));
pool->busy_thr_num--; /*处理掉一个任务,忙状态数线程数-1*/
pthread_mutex_unlock(&(pool->thread_counter));
}
pthread_exit(NULL);
}
E.管理线程回调函数adjust_thread
在C中,对线程池的创建初始化
1.C中调用pthread_create,创建管理线程,调用其回调函数adjust_thread
2.对pool结构体加锁,查看目前的任务数和线程数(不加锁的话,比如在查看live_thr_num时,刚好有线程加入对live_thr_num做修改,同时进行出现问题),查看完后解锁对忙着的线程pool->busy_thr_num加锁,查看忙着的线程数,解锁(忙着的线程数专门有一把自己的锁,对busy查看修改时,不影响live_thr_num和任务数,其它线程可在这时候对pool加锁查看live等属性)
3.定时对线程数量进行监控
4.线程数量不够,调用pthread_create,传入 D 创建子线程的回调函数threadpool_thread,为线程池pool->threads添加线程
5.空闲线程数量过多,对其进行销毁
6.每10秒循环一次进行监控,该管理线程没有阻塞
void *adjust_thread(void *threadpool){
int i;
threadpool_t *pool =(threadpool_t*)threadpool;
while(!pool->shutdown){
sleep(DEFAULT_TIME); //定时对线程池管理
pthread_mutex_lock(8(pool->lock));
int queue_size = pool->queue_size; //关注 任务数
int live_thr_nun= pool->live_thr_num; //存活 线程数
pthread_mutex_unlock(&(pool->lock));
pthread_mutex_lock(&(pool->thread counter));
int busy_thr_num = pool->busy_thr_num; //忙着的线程数
pthread_mutex_unlock(&(pool->thread_counter));
/*创建新线程 算法 : 任务数大于最小线程池个数,且存活的线程数少于最大线程个数时 如:30>=10 && 40<100*/
if(queue size >= MIN_WAIT_TASK_NUM 8& live thr num < pool->max_thr_num){
pthread mutex lock(&(pool->lock));
int add = 0;
/*一次增加 DEFAULT THREAD 个线程*/
for(i=0;i < pool->max_thr_num && add < DEFAULT_THREAD_VARY
&& pool->live_thr_num< pool->max_thr_num;i++){
if(pool->threads[i]= 0 ||is_thread_alive(pool->threads[i])){
pthread_create(&(pool->threads[i]),NULL,threadpool_thread,(void*)pool);
add++;
pool->live_thr_num++;
}
pthread_mutex_unlock(&(pool->lock));
}
/*销毁多余的空闲线程 算法:忙线程X2 小于 存活的线程数 且 存活的线程数 大于 最小线程数时*/
if((busy_thr_num*2)<live_thr_num && live_thr_num> pool->min_thr_num){
/*一次销毁DEFAULT_THREAD个线程,随機10個即可 */
pthread_mutex_lock(&(pool->lock));
pool->wait_exit_thr_num = DEFAULT_THREAD_VARY; /*要销毁的线程数 设置为10 */
pthread mutex_unlock(&(pool->lock));
for(i=0;i<DEFAULT_THREAD_VARY; i++){
/*通知处在空闲状态的线程,他们会自行终止*/
pthread_cond_signal(&(pool->queue_not_empty));
//欺骗空闲的线程有任务了,将其唤醒
}
}
}
return NULL
}
D中的回调函数部分代码:
F.模拟添加任务 -- main 3
typedef struct {
void *(*function)(void *); /*函数指针,回调函数 */
void *arg; /*上面函数的参数 */
}threadpool_task_t; /*各子线程任务结构体 */
//pool中任务队列threadpool_task_t *task_queue的队员
main函数中调用该函数
/*int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)*/
threadpool_add(thp,process,(void*)&num[i]); /* 向线程池中添加任务*/
参1是pool
参2是处理任务的回调函数
参3存的是任务,是回调函数process的参数 -- 要处理的任务
参2和参3在add函数中会存进threadpool_task_t,然后存进pool->task_queue任务队列
1.函数process是模拟处理任务业务,每个线程通过调用该函数去处理任务,这里功能省略了
void *process(void *arg){
printf("thread 0x%x working on taks %d/n",(usigned int)pthread_self(),(int) arg);
sleep(1);
printf("task %d is end \n",(int) arg);
return NULL;
}
2.B main函数调用threadpool_add函数
3.对pool加锁,检查任务pool->queue_size是否满,满则调pthread_cond_wait阻塞,释放锁等待线程处理任务,一直while不断检查,知道有线程处理了任务,wait重新给pool上锁,继续执行代码
4.有空的空间,添加任务到任务队列
环形指针,rear处元素置空存放新来的任务
/*向线程池中 添加一个任务 */
//threadpool_add(thp,process,(void*)&num[i]); /*向线程池中添加任务 process:小写---->大写*/
int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg){
pthread mutex_lock(&(pool->lock));
/*==为真,队列已经满,调wait阻塞 */
while((pool->queue_size ==pool->queue_max_size)&&(!pool->shutdown)){
pthread_cond_wait(&(pool->queue_not_full),&(pool->lock));
}
//为false,线程池不关闭,跳过
if(pool->shutdown){
pthread_cond_broadcast(&(pool->queue_not_empty));
pthread_mutex_unlock(&(pool->lock));
return 0;
}
/*清空 工作线程 调用的回调函数 的参数arg */
if(pool->task_queue[pool->queue_rear].arg != NULL){
pool->task_queue[pool->queue_rear].arg = NULL;
}
/*添加任务到任务队列里*/
pool->task_queue[pool->queue rear].function = function; //process -- 处理任务的回调函数赋值给任务队列
pool->task_queue[pool->queue rear].arg = arg; // 要处理的任务arg传给赋值任务队列
pool->queue_rear =(pool->queue_rear + 1)% pool->queue_max_size; /*队尾指针移动,模拟环形*/
pool->queue_size++; //向任务队列中添加了一个任务,记录队列现有任务数++
/*添加完任务后,队列不为空,唤醒线程池中 等待处理任务的线程*/
ptread_cond_signal(&(pool->queue_not_empty)); //唤醒阻塞在队列为空的想要处理任务的线程 -- 初始化时会唤醒到
pthread_mutex_unlock(&(pool->lock)); //添加任务完毕,释放锁,等某个线程抢到锁处理任务
return 0;
}
G.销毁线程池
1.threadpool_destroy(threadpool_t *pool)
//销毁线程池
int threadpool_destroy(threadpool_t *pool)
{
int i;
if(pool == NULL){
return -1;
}
pool->shutdown = true;
/*先销毁管理线程 */
pthread_join(pool->adjust_tid,NULL);
for(i = 0;i< pool->live_thr_num;i++){
/*通知所有的空闲线程*/
//这个销毁函数是在任务都被处理完了才会被调用,所以线程池里的线程都空闲,阻塞在wait等待被唤醒
pthread_cond_broadcast(&(pool->queue_not_empty));
//任务队列没任务,这时候还去唤醒正在等待任务的线程,欺骗它有任务将其唤醒,使其销毁 -- 在 分析D 的while(任务数 == 0)
//的pthread_cond_wait -- 被唤醒
}
for(i = 0;i < pool->live_thr_num; i++){
pthread_join(pool->threads[i],NULL);
}
threadpool_free(pool);
return 0;
}
13行 - 19行:
2.int threadpool_free(threadpool_t *pool):
//对锁和条件变量的销毁,释放线程池的空间等
int threadpool free(threadpool_t *pool){
if(POOL == NULL){
return -1;
}
if(pool->task_queue){
free(pool->task_queue);
}
if(pool->threads){
free(pool->threads);
pthread_mutex_lock(&(pool->lock));
pthread_mutex_destroy(&(pool->lock));
pthread_mutex_lock(&(pool->thread_counter));
pthread_mutex_destroy(&(pool->thread_counter));
pthread_cond_destroy(&(pool->queue_not_empty));
pthread_cond_destroy(&(pool->queue_not_full));
}
free(pool);
pool = NULL;
return 0;
}