简易线程池开发
线程池基本结构
#include"threadpool.h"
//任务队列
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<strings.h>
typedef struct Task
{
void(*function)(void* arg);
void*arg;
}Task;
//线程池结构体
struct ThreadPool
{
Task*taskkQ;//任务队列
int queueCapacity;//当前任务个数
int queueSize; //当前任务个数
int queueFront;//队头
int queueRear;// 队尾部
pthread_t manangerID;//管理者线程id
pthread_t *threadIDs; //工作的线程Id
int minNum;//最小线程数量
int maxNum;//最大线程数量
int busyNum;//忙的线程个数
int liveNum;//存活的线程个数
int exitNum; //要销毁的线程个数
pthread_mutex_t mutexPoll; //锁整个的线程池
pthread_mutex_t mutexBusy; //锁住忙线程(因为它变动频繁)
int shutdown;// 是不是要销毁线程池,销毁为1,不销毁为0
//增加条件变量,阻塞消费者和生产者
pthread_cond_t noFull; //非满写数据
pthread_cond_t noEmpty;//非空读数据
};
ThreadPool * threadPoolCreate(int min, int max, int queueSize)
{
ThreadPool*pool = (ThreadPool*)malloc(sizeof(ThreadPool));
if (pool == NULL)
{
printf("malloc threadpool fail...\n");
/*释放内存 free*/
return NULL;
}
pool->threadIDs = (pthread_t)malloc(sizeof(pthread_t)*max);
if (pool == NULL)
{
printf("malloc threadIDs fail...\n");
/*释放内存 free*/
return NULL;
}
memset(pool->threadIDs, 0, sizeof(pthread_t)*max);
pool->minNum = min;
pool->maxNum = max;
pool->busyNum = 0;
pool->liveNum = min;//初始时和最小线程数相等
pool->exitNum = 0; //不是退出状态
// 初始化条件变量和互斥锁
if (pthread_mutex_init(&pool->mutexBusy) != 0 ||
pthread_mutex_init(&pool->mutexPoll) != 0 ||
pthread_cond_init(&pool->noFull) != 0 ||
pthread_cond_init(&pool->noEmpty) != 0)
{
printf("mutex or condition init fail...\n");
}
//任务队列初始化
pool->taskkQ = (Task*)malloc(sizeof(Task)*queueSize);
if (pool->taskkQ == NULL)
{
printf("malloc taskQ fail...\n");
/*释放内存 free*/
return NULL;
}
pool->queueCapacity = queueSize;
pool->queueSize = 0;
pool->queueFront = 0;
pool->queueRear = 0;
pool->shutdown = 0;
//创建线程
pthread_create(&pool->manangerID, NULL, manager, NULL);
for (int i = 0; i < min; ++i)
{
pthread_create(&pool->threadIDs[i], NULL, worker, NULL);
}
}
优化代码结构,如果 每次 malloc 失败,return NULL 之前, 都要释放内存,比较麻烦. 直接放进do while 里
#include"threadpool.h"
//任务队列
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<strings.h>
typedef struct Task
{
void(*function)(void* arg);
void*arg;
}Task;
//线程池结构体
struct ThreadPool
{
Task*taskkQ;//任务队列
int queueCapacity;//当前任务个数
int queueSize; //当前任务个数
int queueFront;//队头
int queueRear;// 队尾部
pthread_t manangerID;//管理者线程id
pthread_t *threadIDs; //工作的线程Id
int minNum;//最小线程数量
int maxNum;//最大线程数量
int busyNum;//忙的线程个数
int liveNum;//存活的线程个数
int exitNum; //要销毁的线程个数
pthread_mutex_t mutexPoll; //锁整个的线程池
pthread_mutex_t mutexBusy; //锁住忙线程(因为它变动频繁)
int shutdown;// 是不是要销毁线程池,销毁为1,不销毁为0
//增加条件变量,阻塞消费者和生产者
pthread_cond_t noFull; //非满写数据
pthread_cond_t noEmpty;//非空读数据
};
ThreadPool * threadPoolCreate(int min, int max, int queueSize)
{
ThreadPool*pool = (ThreadPool*)malloc(sizeof(ThreadPool));
do {
if (pool == NULL)
{
printf("malloc threadpool fail...\n");
break;
}
pool->threadIDs = (pthread_t)malloc(sizeof(pthread_t)*max);
if (pool->threadIDs == NULL)
{
printf("malloc threadIDs fail...\n");
break;
}
memset(pool->threadIDs, 0, sizeof(pthread_t)*max);
pool->minNum = min;
pool->maxNum = max;
pool->busyNum = 0;
pool->liveNum = min;//初始时和最小线程数相等
pool->exitNum = 0; //不是退出状态
// 初始化条件变量和互斥锁
if (pthread_mutex_init(&pool->mutexBusy) != 0 ||
pthread_mutex_init(&pool->mutexPoll) != 0 ||
pthread_cond_init(&pool->noFull) != 0 ||
pthread_cond_init(&pool->noEmpty) != 0)
{
printf("mutex or condition init fail...\n");
}
//任务队列初始化
pool->taskkQ = (Task*)malloc(sizeof(Task)*queueSize);
if (pool->taskkQ == NULL)
{
printf("malloc pool->taskQ fail...\n");
break;
}
pool->queueCapacity = queueSize;
pool->queueSize = 0;
pool->queueFront = 0;
pool->queueRear = 0;
pool->shutdown = 0;
//创建线程
pthread_create(&pool->manangerID, NULL, manager, NULL);
for (int i = 0; i < min; ++i)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
}
return pool; //调用成功返回 线程池实例
} while (0);
// 调用失败则 释放资源
if (pool&&pool->threadIDs)
free(pool->threadIDs);
if (pool&&pool->taskkQ)free(pool->taskkQ);
if (pool)free(pool);
/*
if (pool->threadIDs)free(pool->threadIDs);
if (pool->taskQ)free(pool->taskQ);
if (pool)free(pool); 为什么不能写成这样
你的代码在语法上是正确的,但是在逻辑上可能会有问题。如果pool是NULL,
那么pool->threadIDs和pool->taskQ可能会导致未定义的行为,
因为你正在尝试访问一个NULL指针的成员。这可能会导致程序崩溃。
在你的原始代码中,你首先检查pool是否为NULL,然后再检查pool->threadIDs和pool->taskQ。
这样做可以确保你不会尝试访问一个NULL指针的成员。这是一种防止程序崩溃的好方法。
所以,虽然你的代码在语法上是正确的,但是在逻辑上可能会有问题。你应该始终先检查pool是否为NULL,
然后再检查pool->threadIDs和pool->taskQ。这样可以防止程序崩溃。
*/
return NULL ;
}
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
这行代码是创建一个新的线程,并让这个线程开始执行worker
函数。pthread_create
函数的第四个参数是传递给worker
函数的参数。在这个例子中,
pool
是一个指向ThreadPool
结构体的指针,它包含了线程池的所有信息,包括任务队列、线程ID、互斥锁和条件变量等。当你创建一个新的线程并让它执行worker
函数时,你需要将pool
传递给worker
函数,这样worker
函数就可以访问和操作线程池的所有信息了。所以,
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
这行代码的意思是:创建一个新的线程,让这个线程开始执行worker
函数,并将pool
作为参数传递给worker
函数。这样,worker
函数就可以通过pool
来访问和操作线程池的所有信息了。希望这个解释对你有所帮助!
工作者函数
// 创建线程池
void* worker(void*arg) // 访问任务队列,任务队列属于线程池,每个线程都要对线程池操作,因此需要对线程池加锁
{
ThreadPool*pool = (ThreadPool*)arg;
while (1)
{
pthread_muxtex_lock(&pool->mutexPoll);
//任务队列为空,且没有关闭时
while (pool->queueSize == 0 && !pool->shutdown)
{
//阻塞工作线程,直到不为空时,将其唤醒
pthread_cond_wait(&pool->noEmpty, &pool->mutexPoll);
}
//判断线程池是否被关闭了
if (pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPoll); // 防止锁完不解,造成死锁
pthread_exit(NULL);
}
//从任务队列中取出一个任务
Task task;
task.function = pool->taskkQ[pool->queueFront].function;
task.arg = pool->taskkQ[pool->queueFront].arg;
pthread_mutex_unlock(&pool->mutexPoll);
}
return NULL;
}
这段代码定义了一个线程池的结构体ThreadPool
,以及一个任务的结构体Task
。
Task
结构体包含两个成员:一个函数指针function
和一个void
类型的指针arg
。这个结构体通常用于存储要在线程池中执行的任务。
ThreadPool
结构体包含了线程池的所有信息,包括:
taskkQ
:一个指向Task
类型的指针,它指向一个Task
类型的数组,这个数组用于存储任务队列。queueCapacity
:队列的容量。queueSize
:当前队列中的任务数量。queueFront
和queueRear
:分别表示任务队列的队首和队尾。manangerID
:管理者线程的ID。threadIDs
:一个指向pthread_t
类型的指针,它指向一个pthread_t
类型的数组,这个数组用于存储工作线程的ID。minNum
和maxNum
:分别表示线程池中的最小和最大线程数量。busyNum
:当前忙碌的线程数量。liveNum
:当前存活的线程数量。exitNum
:需要销毁的线程数量。mutexPoll
和mutexBusy
:用于锁住整个线程池和忙碌线程的互斥锁。shutdown
:表示是否需要销毁线程池,如果需要销毁则为1,否则为0。noFull
和noEmpty
:分别表示非满和非空的条件变量,用于阻塞消费者和生产者线程。
这个结构体通常用于管理线程池,包括任务的分配、线程的创建和销毁等。
taskkQ:一个指向Task类型的指针,它指向一个Task类型的数组, 这里我不太理解,这个不是创建了一个结构体类型的指针吗,为什么又是数组了
task.function = pool->taskkQ[pool->queueFront].function;
在这里,
Task* taskkQ;
确实是声明了一个指向Task
类型的指针。然而,这个指针可以被用来指向一个Task
类型的数组。这是因为在C语言中,数组名实际上就是一个指向数组第一个元素的指针。所以,当我们说
taskkQ
是一个指向Task
类型的数组的指针时,我们的意思是taskkQ
可以被用来存储一个Task
类型数组的地址。这样,你就可以通过taskkQ
来访问和操作这个数组。例如,你可以这样初始化
taskkQ
:pool->taskkQ = (Task*)malloc(sizeof(Task)*queueSize);
这行代码使用
malloc
函数动态分配了queueSize
个Task
类型的空间,并将返回的地址赋值给taskkQ
。这样,taskkQ
就指向了一个Task
类型的数组,数组的大小是queueSize
。然后,你就可以像操作普通数组一样操作
taskkQ
了。例如,taskkQ[i]
就可以用来访问数组中的第i
个元素。
// 创建线程池
void* worker(void*arg) // 访问任务队列,任务队列属于线程池,每个线程都要对线程池操作,因此需要对线程池加锁
{
ThreadPool*pool = (ThreadPool*)arg;
while (1)
{
pthread_muxtex_lock(&pool->mutexPoll);
//任务队列为空,且没有关闭时
while (pool->queueSize == 0 && !pool->shutdown)
{
//阻塞工作线程,直到不为空时,将其唤醒
pthread_cond_wait(&pool->noEmpty, &pool->mutexPoll);
}
//判断线程池是否被关闭了
if (pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPoll); // 防止锁完不解,造成死锁
pthread_exit(NULL);
}
//从任务队列中取出一个任务
Task task;
task.function = pool->taskkQ[pool->queueFront].function; //
task.arg = pool->taskkQ[pool->queueFront].arg;
// 移动头节点(这个任务读完了,准备读取下一个任务)
pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
pool->queueSize--;
// 解锁(对当前线程池的操作完成)
pthread_mutex_unlock(&pool->mutexPoll);
// 调用任务函数,执行取出的任务,忙线程加+1 ,由于忙线程是频繁变化的共享变量加锁
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum++;
pthread_mutex_unlock(&pool->mutexBusy);
task.function(task.arg);// (*task.function)(task.arg); 调用任务函数
// 传入的arg 分配堆内存,防止分配栈内存被释放
free(task.arg); //结束任务之后,释放内存
task.arg = NULL;
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexBusy);
//执行完任务,忙线程减一
}
return NULL;
}
在C语言中,调用函数指针的方式是通过使用函数指针的名称,后面跟上括号和参数。例如,如果你有一个函数指针void (*functionPtr)(int)
,你可以这样调用它:
(*functionPtr)(5);
或者更简单地:
functionPtr(5);
这两种方式都会调用functionPtr
指向的函数,并传递5
作为参数。
在你的代码中,task.function
是一个函数指针,所以你可以通过task.function(arg)
来调用它,其中arg
是你要传递给函数的参数。例如:
task.function(task.arg);
这行代码会调用task.function
指向的函数,并传递task.arg
作为参数。希望这个解释对你有所帮助!
或者使用(*task.function )(task.arg);
管理者函数
void* manager(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
while (!pool->shutdown)
{
sleep(3); //每3s检测一次
//取出线程池中任务的数量和当前线程的数量
pthread_mutex_lock(&pool->mutexPoll);
int queueSize = pool->queueSize;
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPoll);
//取出忙的线程的数量
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
//添加线程
// 规定:当任务个数>存活的线程个数 && 存活的线程数<最大的线程数 时 才需要增加线程
if (queueSize > liveNum && liveNum < pool->maxNum)
{
pthread_mutex_lock(&pool->mutexBusy);
int counter = 0; // const int NUMBER 指定为2 ,每次添加线程上限两个.
// 因为进入if语句后,在线程添加过程中,
// 线程池中的线程个数可能随时变化,而且循环期间会进行liveNum++,
// (可能会出现存活线程数大于最大线程数,此时不应再进行 liveNum++),故循环之前需要判断pool->liveNum<pool->maxNum
for (int i = 0; i < pool->maxNum && counter < NUMBER && pool->liveNum < pool->maxNum; ++i)
{
if (pool->threadIDs[i] == 0) // 遍历可用的线程
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
counter++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->mutexBusy);
}
// 销毁线程
//规定: 当 忙线程数*2 < 存活线程 && 存活线程>最小线程数
if (busyNum * 2 < liveNum && liveNum > pool->minNum)
{
pthread_mutex_lock(&pool->mutexPoll);
pool->exitNum = NUMBER;
pthread_mutex_unlock(&pool->mutexPoll);
for (int i = 0; i < NUMBER; ++i)
{
//唤醒那些因为任务队列为空,而被阻塞的进程,让他们执行后面的 suicide代码
pthread_cond_signal(&pool->noEmpty);
}
}
}
}
修改工作者线程,响应管理者线程
当工作者线程因为任务队列为空而被阻塞时,
工作者 阻塞线程
//任务队列为空,且没有关闭时
while (pool->queueSize == 0 && !pool->shutdown)
{
//阻塞工作线程,直到需要销毁多余线程时,将其唤醒
pthread_cond_wait(&pool->noEmpty, &pool->mutexPoll);
}
如果 忙线程数*2 < 存活线程 && 存活线程>最小线程数 则工作者线程会接受到来自管理者线程的信号而 销毁自身线程
管理者 唤醒 工作线程
if (busyNum * 2 < liveNum && liveNum > pool->minNum)
{
pthread_mutex_lock(&pool->mutexPoll);
pool->exitNum = NUMBER;
pthread_mutex_unlock(&pool->mutexPoll);
for (int i = 0; i < NUMBER; ++i)
{
//唤醒那些因为任务队列为空,而被阻塞的进程,让他们执行后面的 suicide代码
pthread_cond_signal(&pool->noEmpty);
}
}
工作者线程 接收条件变量信号 销毁自己
while (1)
{
pthread_mutex_lock(&pool->mutexPoll);
//任务队列为空,且没有关闭时
while (pool->queueSize == 0 && !pool->shutdown)
{
pthread_cond_wait(&pool->noEmpty, &pool->mutexPoll);
// 这行代码会阻塞当前线程,并等待条件变量 pool->noEmpty 的信号。当其他线程调用 pthread_cond_signal(&pool->noEmpty) 或 pthread_cond_broadcast(&pool->noEmpty) 时,这个被阻塞的线程就会被唤醒,执行后面的代码。
//解除阻塞之后 , 自己销毁自己,销毁掉线程
if (pool->exitNum > 0)
{
pool->exitNum--;
if (pool->liveNum > pool->minNum)
{
pool->liveNum--;
}
pthread_mutex_unlock(&pool->mutexPoll);
pthread_exit(NULL);
}
}
增加 线程退出函数
由于管理者函数添加线程时,需要根据线程id 重复利用线程,
for (int i = 0; i < pool->maxNum && counter < NUMBER && pool->liveNum < pool->maxNum; ++i)
{
if (pool->threadIDs[i] == 0) // 遍历可用的线程
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
counter++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->mutexBusy);
}
而在工作者函数中退出线程时,只是简单的 pthread_exit, 没有将threadIDs 置为0
if (pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPoll); // 防止锁完不解,造成死锁
pthread_exit(NULL);
}
故 用封装好的 threadExit 替换 工作者线程的 pthread_exit(NULL)
void* threadExit(ThreadPool* pool)
{
pthread_t tid = pthread_self();
for (int i = 0; i < pool->maxNum; ++i)
{
if (pool->threadIDs[i] == tid)
{
pool->threadIDs[i] = 0;
printf("threadExit() called, %ld exiting...\n", tid);
break;
}
}
pthread_exit(NULL);
};
综上 的工作者线程代码
// 创建线程池
void* worker(void* arg) // 访问任务队列,任务队列属于线程池,每个线程都要对线程池操作,因此需要对线程池加锁
{
ThreadPool* pool = (ThreadPool*)arg;
while (1)
{
pthread_mutex_lock(&pool->mutexPoll);
//任务队列为空,且没有关闭时
while (pool->queueSize == 0 && !pool->shutdown)
{
pthread_cond_wait(&pool->noEmpty, &pool->mutexPoll);
// 这行代码会阻塞当前线程,并等待条件变量 pool->noEmpty 的信号。当其他线程调用 pthread_cond_signal(&pool->noEmpty) 或 pthread_cond_broadcast(&pool->noEmpty) 时,这个被阻塞的线程就会被唤醒。
//解除阻塞之后 , 自己销毁自己,销毁掉线程
if (pool->exitNum > 0)
{
pool->exitNum--;
if (pool->liveNum > pool->minNum)
{
pool->liveNum--;
}
pthread_mutex_unlock(&pool->mutexPoll);
//线程退出并将threadID 置为 0 表明此线程是可以被下次添加的
threadExit(pool);// 封装的线程退出函数,将threadID 置为0
}
}
//判断线程池是否被关闭了
if (pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPoll); // 防止锁完不解,造成死锁
// pthread_exit(NULL);
threadExit(pool);
}
//从任务队列中取出一个任务
Task task;
task.function = pool->taskkQ[pool->queueFront].function;
task.arg = pool->taskkQ[pool->queueFront].arg;
// 移动头节点(这个任务读完了,准备读取下一个任务)
pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
pool->queueSize--;
// 解锁(对当前线程池的操作完成)
pthread_mutex_unlock(&pool->mutexPoll);
// 调用任务函数,执行取出的任务,忙线程加+1 ,由于忙线程是频繁变化的共享变量加锁
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum++;
pthread_mutex_unlock(&pool->mutexBusy);
task.function(task.arg);// (*task.function)(task.arg); 调用任务函数
// 传入的arg 分配堆内存,防止分配栈内存被释放
free(task.arg); //结束任务之后,释放内存
task.arg = NULL;
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexBusy);
//执行完任务,忙线程减一
}
return NULL;
}
给线程池增加任务
void threadpoolAdd(ThreadPool* pool, void(*func)(void*), void* arg) // 第二个参数是任务 ,任务是struct Task 类型,
// 每个task 里面有两个指针,一个是函数指针 void(*function)(void* arg); 一个是 函数指针的参数 是 void * 类型
{
pthread_mutex_lock(&pool->mutexPoll);
//当任务队列满了的时候,阻塞生产者
while (pool->queueSize == pool->queueCapacity && !pool->shutdown)
{
pthread_cond_wait(&pool->noFull, &pool->mutexPoll);
}
// 阻塞解除后 ,判断线程池是不是被关闭了
if (pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPoll);//防止死锁
return;
}
//添加任务到队尾
pool->taskkQ[pool->queueRear].function = func;
pool->taskkQ[pool->queueRear].arg = arg;
// 队尾向后移动,准备添加下一个
pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
pool->queueSize++; //任务队列个数+1
pthread_cond_signal(&pool->noFull);
pthread_mutex_unlock(&pool->mutexPoll);
};
获取线程池中工作的线程个数 获取线程池中活着的线程个数
// 获取线程池中工作的线程个数
int threadPoolBusyNum(ThreadPool* pool)
{
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
return busyNum;
};
// 获取线程池中活着的线程个数
int threadPoolAliveNum(ThreadPool* pool)
{
pthread_mutex_lock(&pool->mutexPool);
int aliveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
return aliveNum;
};
销毁线程池
int threadPoolDestroy(ThreadPool* pool)
{
if (pool == NULL)
{
return -1;
}
//关闭线程池
pool->shutdown = 1; //管理者线程就会退出
//阻塞回收管理者线程
pthread_join(pool->manangerID, NULL);
//唤醒阻塞的消费者线程
for (int i = 0; i < pool->liveNum; ++i)
{
pthread_cond_signal(&pool->noEmpty);
}
// 释放堆内存
if (pool->taskkQ)
{
free(pool->taskkQ);
pool->taskkQ = NULL;
}
if (pool->threadIDs)
{
free(pool->threadIDs);
pool->threadIDs = NULL;
}
//销毁互斥锁
pthread_mutex_destroy(&pool->mutexBusy);
pthread_mutex_destroy(&pool->mutexPool);
pthread_cond_destroy(&pool->noEmpty);
pthread_cond_destroy(&pool->noFull);
free(pool);
pool = NULL;
return 0;
}
pool->shutdown = 1; //管理者线程就会退出
void* manager(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
while (!pool->shutdown)
{
/***其他内容***/
}
}
pthread_join(pool->manangerID, NULL); //阻塞回收管理者线程
//唤醒阻塞的消费者线程, 就会执行后面的销毁代码
for (int i = 0; i < pool->liveNum; ++i)
{
pthread_cond_signal(&pool->noEmpty);
}
while (1)
{
pthread_mutex_lock(&pool->mutexPoll);
//任务队列为空,且没有关闭时
while (pool->queueSize == 0 && !pool->shutdown)
{
pthread_cond_wait(&pool->noEmpty, &pool->mutexPoll);
// 这行代码会阻塞当前线程,并等待条件变量 pool->noEmpty 的信号。当其他线程调用 pthread_cond_signal(&pool->noEmpty) 或 pthread_cond_broadcast(&pool->noEmpty) 时,这个被阻塞的线程就会被唤醒,执行后面的代码。
//解除阻塞之后 , 自己销毁自己,销毁掉线程
if (pool->exitNum > 0)
{
pool->exitNum--;
if (pool->liveNum > pool->minNum)
{
pool->liveNum--;
}
pthread_mutex_unlock(&pool->mutexPoll);
pthread_exit(NULL);
}
}
}
完整代码
1. 线程池原理
我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。
那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务呢?
线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。
在各个编程语言的语种中都有线程池的概念,并且很多语言中直接提供了线程池,作为程序猿直接使用就可以了,下面给大家介绍一下线程池的实现原理:
-
线程池的组成主要分为3个部分,这三部分配合工作就可以得到一个完整的线程池:
-
任务队列,存储需要处理的任务,由工作的线程来处理这些任务
- 通过线程池提供的API函数,将一个待处理的任务添加到任务队列,或者从任务队列中删除
- 已处理的任务会被从任务队列中删除
- 线程池的使用者,也就是调用线程池函数往任务队列中添加任务的线程就是生产者线程
-
工作的线程(任务队列任务的消费者) ,N个
- 线程池中维护了一定数量的工作线程, 他们的作用是是不停的读任务队列, 从里边取出任务并处理
- 工作的线程相当于是任务队列的消费者角色,
- 如果任务队列为空, 工作的线程将会被阻塞 (使用条件变量/信号量阻塞)
- 如果阻塞之后有了新的任务, 由生产者将阻塞解除, 工作线程开始工作
-
管理者线程(不处理任务队列中的任务),1个
- 它的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测
- 当任务过多的时候, 可以适当的创建一些新的工作线程
- 当任务过少的时候, 可以适当的销毁一些工作的线程
- 它的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测
-
2. 任务队列
`
// 任务结构体
typedef struct Task
{
void (*function)(void* arg);
void* arg;
}Task;
3. 线程池定义
C++
// 线程池结构体
struct ThreadPool
{
// 任务队列
Task* taskQ;
int queueCapacity; // 容量
int queueSize; // 当前任务个数
int queueFront; // 队头 -> 取数据
int queueRear; // 队尾 -> 放数据
pthread_t managerID; // 管理者线程ID
pthread_t *threadIDs; // 工作的线程ID
int minNum; // 最小线程数量
int maxNum; // 最大线程数量
int busyNum; // 忙的线程的个数
int liveNum; // 存活的线程的个数
int exitNum; // 要销毁的线程个数
pthread_mutex_t mutexPool; // 锁整个的线程池
pthread_mutex_t mutexBusy; // 锁busyNum变量
pthread_cond_t notFull; // 任务队列是不是满了
pthread_cond_t notEmpty; // 任务队列是不是空了
int shutdown; // 是不是要销毁线程池, 销毁为1, 不销毁为0
};
4. 头文件声明
C
#ifndef _THREADPOOL_H
#define _THREADPOOL_H
typedef struct ThreadPool ThreadPool;
// 创建线程池并初始化
ThreadPool *threadPoolCreate(int min, int max, int queueSize);
// 销毁线程池
int threadPoolDestroy(ThreadPool* pool);
// 给线程池添加任务
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);
// 获取线程池中工作的线程的个数
int threadPoolBusyNum(ThreadPool* pool);
// 获取线程池中活着的线程的个数
int threadPoolAliveNum(ThreadPool* pool);
//
// 工作的线程(消费者线程)任务函数
void* worker(void* arg);
// 管理者线程任务函数
void* manager(void* arg);
// 单个线程退出
void threadExit(ThreadPool* pool);
#endif // _THREADPOOL_H
5. 源文件定义
C
ThreadPool* threadPoolCreate(int min, int max, int queueSize)
{
ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
do
{
if (pool == NULL)
{
printf("malloc threadpool fail...\n");
break;
}
pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);
if (pool->threadIDs == NULL)
{
printf("malloc threadIDs fail...\n");
break;
}
memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
pool->minNum = min;
pool->maxNum = max;
pool->busyNum = 0;
pool->liveNum = min; // 和最小个数相等
pool->exitNum = 0;
if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
pthread_cond_init(&pool->notFull, NULL) != 0)
{
printf("mutex or condition init fail...\n");
break;
}
// 任务队列
pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);
pool->queueCapacity = queueSize;
pool->queueSize = 0;
pool->queueFront = 0;
pool->queueRear = 0;
pool->shutdown = 0;
// 创建线程
pthread_create(&pool->managerID, NULL, manager, pool);
for (int i = 0; i < min; ++i)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
}
return pool;
} while (0);
// 释放资源
if (pool && pool->threadIDs) free(pool->threadIDs);
if (pool && pool->taskQ) free(pool->taskQ);
if (pool) free(pool);
return NULL;
}
int threadPoolDestroy(ThreadPool* pool)
{
if (pool == NULL)
{
return -1;
}
// 关闭线程池
pool->shutdown = 1;
// 阻塞回收管理者线程
pthread_join(pool->managerID, NULL);
// 唤醒阻塞的消费者线程
for (int i = 0; i < pool->liveNum; ++i)
{
pthread_cond_signal(&pool->notEmpty);
}
// 释放堆内存
if (pool->taskQ)
{
free(pool->taskQ);
}
if (pool->threadIDs)
{
free(pool->threadIDs);
}
pthread_mutex_destroy(&pool->mutexPool);
pthread_mutex_destroy(&pool->mutexBusy);
pthread_cond_destroy(&pool->notEmpty);
pthread_cond_destroy(&pool->notFull);
free(pool);
pool = NULL;
return 0;
}
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{
pthread_mutex_lock(&pool->mutexPool);
while (pool->queueSize == pool->queueCapacity && !pool->shutdown)
{
// 阻塞生产者线程
pthread_cond_wait(&pool->notFull, &pool->mutexPool);
}
if (pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
return;
}
// 添加任务
pool->taskQ[pool->queueRear].function = func;
pool->taskQ[pool->queueRear].arg = arg;
pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
pool->queueSize++;
pthread_cond_signal(&pool->notEmpty);
pthread_mutex_unlock(&pool->mutexPool);
}
int threadPoolBusyNum(ThreadPool* pool)
{
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
return busyNum;
}
int threadPoolAliveNum(ThreadPool* pool)
{
pthread_mutex_lock(&pool->mutexPool);
int aliveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
return aliveNum;
}
void* worker(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
while (1)
{
pthread_mutex_lock(&pool->mutexPool);
// 当前任务队列是否为空
while (pool->queueSize == 0 && !pool->shutdown)
{
// 阻塞工作线程
pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
// 判断是不是要销毁线程
if (pool->exitNum > 0)
{
pool->exitNum--;
if (pool->liveNum > pool->minNum)
{
pool->liveNum--;
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
}
}
// 判断线程池是否被关闭了
if (pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
// 从任务队列中取出一个任务
Task task;
task.function = pool->taskQ[pool->queueFront].function;
task.arg = pool->taskQ[pool->queueFront].arg;
// 移动头结点
pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
pool->queueSize--;
// 解锁
pthread_cond_signal(&pool->notFull);
pthread_mutex_unlock(&pool->mutexPool);
printf("thread %ld start working...\n", pthread_self());
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum++;
pthread_mutex_unlock(&pool->mutexBusy);
task.function(task.arg);
free(task.arg);
task.arg = NULL;
printf("thread %ld end working...\n", pthread_self());
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexBusy);
}
return NULL;
}
void* manager(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
while (!pool->shutdown)
{
// 每隔3s检测一次
sleep(3);
// 取出线程池中任务的数量和当前线程的数量
pthread_mutex_lock(&pool->mutexPool);
int queueSize = pool->queueSize;
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
// 取出忙的线程的数量
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
// 添加线程
// 任务的个数>存活的线程个数 && 存活的线程数<最大线程数
if (queueSize > liveNum && liveNum < pool->maxNum)
{
pthread_mutex_lock(&pool->mutexPool);
int counter = 0;
for (int i = 0; i < pool->maxNum && counter < NUMBER
&& pool->liveNum < pool->maxNum; ++i)
{
if (pool->threadIDs[i] == 0)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
counter++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->mutexPool);
}
// 销毁线程
// 忙的线程*2 < 存活的线程数 && 存活的线程>最小线程数
if (busyNum * 2 < liveNum && liveNum > pool->minNum)
{
pthread_mutex_lock(&pool->mutexPool);
pool->exitNum = NUMBER;
pthread_mutex_unlock(&pool->mutexPool);
// 让工作的线程自杀
for (int i = 0; i < NUMBER; ++i)
{
pthread_cond_signal(&pool->notEmpty);
}
}
}
return NULL;
}
void threadExit(ThreadPool* pool)
{
pthread_t tid = pthread_self();
for (int i = 0; i < pool->maxNum; ++i)
{
if (pool->threadIDs[i] == tid)
{
pool->threadIDs[i] = 0;
printf("threadExit() called, %ld exiting...\n", tid);
break;
}
}
pthread_exit(NULL);
}
6. 测试代码
C
void taskFunc(void* arg)
{
int num = *(int*)arg;
printf("thread %ld is working, number = %d\n",
pthread_self(), num);
sleep(1);
}
int main()
{
// 创建线程池
ThreadPool* pool = threadPoolCreate(3, 10, 100);
for (int i = 0; i < 100; ++i)
{
int* num = (int*)malloc(sizeof(int));
*num = i + 100;
threadPoolAdd(pool, taskFunc, num);
}
sleep(30);
threadPoolDestroy(pool);
return 0;
}