线程池并发服务器
- 概念
- 线程池和任务队列
- 任务队列
- 线程池
- 操作线程池的函数
- 初始化线程池
- 销毁线程池
- 向线程池添加任务
- 任务的回调函数
- 测试
概念
线程池是一个抽象概念,可以简单的认为若干线程在一起运行,线程不退出,等待有任务处理。
为什么要有线程池?
- 以网络编程服务器端为例,作为服务器端支持高并发,可以有多个客户端连接,发出请求,对于多个请求我们每次都去建立线程,这样线程会创建很多,而且线程执行完销毁也会有很大的系统开销,使用上效率很低。
- 创建线程并非多多益善,所以我们的思路是提前创建好若干个线程,不退出,等待任务的产生,去接收任务处理后等待下一个任务。
线程池和任务队列
线程池如何实现?需要思考2个问题?
1.假设线程池创建了,线程们如何去协调接收任务并且处理?
2.线程池上的线程如何能够执行不同的请求任务?
上述问题1就很像生产者和消费者模型,客户端对应生产者,服务器端这边的线程池对应消费者,需要借助互斥锁和条件变量来搞定。
问题2解决思路就是利用回调机制,我们同样可以借助结构体的方式,对任务进行封装,比如任务的数据和任务处理回调都封装在结构体上,这样线程池的工作线程拿到任务的同时,也知道该如何执行了。
任务队列
任务包含两个元素:回调函数及其参数,任务的各类属性数据
这里使用编号来标识任务的属性,因此,任务的结构体定义如下:
typedef _Task
{
unsigned int tasknum; //任务编号
void (*task_func)(void *arg); //回调函数
void *arg; //回调函数的参数
}Task;
线程池
线程池的核心部分包括:
- 任务数组(队列)
- 线程数组
配套的辅助部分为:
- 为了互斥访问加互斥锁和条件变量
- 任务数组的属性:最大任务数、当前任务数、任务出队和入队的下标
- 线程数组的属性:线程池内线程的个数、当前线程是否开启(标志是否要销毁线程)
typedef _ThreadPoll
{
Task *tasks; //任务队列
unsigned int max_job_num; //最大任务个数
unsigned int job_num; //实际任务个数
unsigned int job_push; //入队位置
unsigned int job_pop; //出队位置
pthread_t *threads; //线程池内线程数组
unsigned int thr_num; //线程池内线程个数
int shutdown; //是否关闭线程池
pthread_mutex_t pool_lock; //线程池的锁
pthread_cond_t empty_task; //任务队列为空的条件变量
pthread_cond_t not_empty_task; //任务队列不为空的条件变量
}ThreadPool;
操作线程池的函数
初始化线程池
void create_threadpool(int thrnum,int maxtasknum);
//创建线程池–thrnum 代表线程个数,maxtasknum 最大任务个数
步骤:
分配任务队列和线程队列的数组空间
循环创建线程
初始化线程池的其它属性变量
//创建线程池并初始化
void create_threadpool(int thrnum, int maxtasknum, ThreadPool *thrPool)
{
printf("initializing the ThreadPool\n");
//分配线程数组和任务数组
thrPool->threads = (pthread_t*)malloc(sizeof(pthread_t)*thrnum);
thrPool->tasks = (Task*)malloc(sizeof(Task)*maxtasknum);
//初始化ThreadPool 的属性
thrPool->max_job_num = maxtasknum;
thrPool->job_num = 0;
thrPool->job_pop = 0;
thrPool->job_push = 0;
thrPool->thr_num = thrnum;
thrPool->shutdown = 0; //是否销毁线程池,1代表销毁
pthread_mutex_init(&thrPool->pool_lock, NULL);
pthread_cond_init(&thrPool->empty_task, NULL);
pthread_cond_init(&thrPool->not_empty_task, NULL);
//创建线程, 设置默认为detach方式运行
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
for(int i=0; i<thrnum; i++)
{
pthread_create(&thrPool->threads[i], &attr, (void*)thrRun, (void*)thrPool);
}
printf("ThreadPool created\n");
}
创建线程时需要给出入口函数thrRun,该函数的核心功能是从任务队列取任务执行,执行结束再取任务,循环上述步骤。
注意:
- 从任务队列取任务前要加互斥锁,取出任务后立即从任务队列中删除,并释放锁
- 从任务队列将任务拷贝到线程空间,在线程销毁时要释放
- 线程循环执行,知道shutdown被置为1,即线程的生命周期持续到线程池被销毁
void thrRun(void *arg)
{
ThreadPool *thrPool = (ThreadPool*)arg;
unsigned int taskpose = 0;
Task *task = (Task*)malloc(sizeof(Task));
//将任务取出,并立即从任务队列删除,并让出互斥锁
while(1)
{
//加锁,获取任务
pthread_mutex_lock(&thrPool->pool_lock);
//任务队列为空并且不是要摧毁线程池,线程阻塞,等待任务到来
while(thrPool->job_num <=0 && !thrPool->shutdown)
{
//阻塞成功就会释放pool_lock, 被唤醒后再加锁
pthread_cond_wait(&thrPool->not_empty_task, &thrPool->pool_lock);
}
if(thrPool->job_num)
{
//取任务,修改线程池的任务队列和属性
taskpose = (thrPool->job_pop++) % thrPool->max_job_num;
memcpy(task, &thrPool->tasks[taskpose], sizeof(Task));
task->arg = task;
thrPool->job_num--;
thrPool->job_pop = thrPool->job_pop % thrPool->max_job_num;
//任务队列有空位置,通知任务生产者
pthread_cond_signal(&thrPool->empty_task);
}
//线程池处于销毁状态
if(thrPool->shutdown)
{
//退出线程, 并销毁线程使用的task的空间
pthread_mutex_unlock(&thrPool->pool_lock);
free(task);
pthread_exit(NULL);
}
//任务获取成功,释放锁,执行回调函数
pthread_mutex_unlock(&thrPool->pool_lock);
task->task_func(task->arg);
}
}
销毁线程池
核心步骤:设置shutdown,释放内存
注意:
- 处于等待任务的阻塞队列需要先将其唤醒(诱杀)
- 子线程申请的资源在shutdown为1时,会释放
- 主线程需要等待子线程全部结束(设置joinable)
// 销毁线程池
void destroy_threadpool(ThreadPool *pool)
{
//销毁线程,激活线程的销毁
pool->shutdown = 1;
//唤醒所有阻塞在等待任务步骤的线程,通知其自毁
pthread_cond_broadcast(&pool->not_empty_task);
//设置joinable,等待所有子线程退出
for(int i=0; i<pool->thr_num; i++)
{
pthread_join(pool->threads[i], NULL);
}
//释放内存
pthread_cond_destroy(&pool->not_empty_task);
pthread_cond_destroy(&pool->empty_task);
pthread_mutex_destroy(&pool->pool_lock);
free(pool->tasks);
free(pool->threads);
}
向线程池添加任务
核心功能:向任务队列添加任务,并通知因任务队列空而阻塞的线程
注意:
- 队列不空时要阻塞等待队列有空位置的信号(empty_task)
//向线程池添加任务
void addtask(ThreadPool *pool, int *beginnum)
{
//加锁
pthread_mutex_lock(&pool->pool_lock);
//等待任务队列有空位置
while(pool->max_job_num <= pool->job_num)
{
pthread_cond_wait(&pool->empty_task, &pool->pool_lock);
}
//修改线程池属性
int taskpose = (pool->job_push++)%pool->max_job_num;
pool->job_push = pool->job_push % pool->max_job_num;
(*beginnum) = (*beginnum) % 100000;
pool->tasks[taskpose].tasknum = (*beginnum)++;
pool->job_num++;
pool->tasks[taskpose].arg = (void*)&pool->tasks[taskpose];
pool->tasks[taskpose].task_func = taskRun;
//释放锁,并通知等待not_empty信号的线程
pthread_mutex_unlock(&pool->pool_lock);
pthread_cond_signal(&pool->not_empty_task);
}
任务的回调函数
为了简单,这里回调函数打印一下线程号和任务编号,然后sleep2秒
//任务回调函数
void taskRun(void *arg)
{
Task *task = (Task*)arg;
int num = task->tasknum;
printf("task %d is running %lu\n", num, pthread_self());
sleep(1);
printf("task %d is done %lu\n", num, pthread_self());
}
测试
测试4线程任务队列为40程序
int main()
{
ThreadPool thrPool;
create_threadpool(4, 40, &thrPool);
int beginnum = 0;
for(int i=0; i<80; i++)
addtask(&thrPool, &beginnum);
sleep(30);
destroy_threadpool(&thrPool);
return 0;
}
部分截图
完整代码