1. 作用
线程池内部维护了多个工作线程,每个工作线程都会去任务队列中拿取任务并执行,当执行完一个任务后不是马上销毁,而是继续保留执行其它任务。显然,线程池提高了多线程的复用率,减少了创建和销毁线程的时间。
2. 实现原理
线程池内部由任务队列、工作线程和管理者线程组成。
任务队列:存储需要处理的任务。每个任务其实就是具体的函数,在任务队列中存储函数指针和对应的实参。当工作线程获取任务后,就能根据函数指针来调用指定的函数。其实现可以是数组、链表、STL容器等。
工作线程:有N个工作线程,每个工作线程会去任务队列中拿取任务,然后执行具体的任务。当任务被处理后,任务队列中就不再有该任务了。当任务队列中没有任务时,工作线程就会阻塞。
管理者线程:周期性检测忙碌的工作线程数量和任务数量。当任务较多线程不够用时,管理者线程就会多创建几个工作线程来加快处理(不会超过工作线程数量的上限)。当任务较少线程空闲多时,管理者线程就会销毁几个工作线程来减少内存占用(不会低于工作线程数量的下限)。
注意:线程池中没有维护“生产者线程”,所谓的“生产者线程”就是往任务队列中添加任务的线程。
3. 手撕线程池
参考来源:爱编程的大丙。
【1】threadpool.c:
#include "threadpool.h"
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stdio.h>
#define NUMBER 2 //管理者线程增加或减少的工作线程数量
//任务结构体
typedef struct Task {
void (*func)(void* arg);
void* arg;
} Task;
//线程池结构体
struct ThreadPool {
//任务队列,视为环形队列
Task* taskQ;
int queueCapacity; //队列容量
int queueSize; //当前任务个数
int queueFront; //队头 -> 取任务
int queueRear; //队尾 -> 加任务
//线程相关
pthread_t managerID; //管理者线程ID
pthread_t* threadIDs; //工作线程ID
int minNum; //工作线程最小数量
int maxNum; //工作线程最大数量
int busyNum; //工作线程忙的数量
int liveNum; //工作线程存活数量
int exitNum; //要销毁的工作线程数量
pthread_mutex_t mutexPool; //锁整个线程池
pthread_mutex_t mutexBusy; //锁busyNum
pthread_cond_t notFull; //任务队列是否满
pthread_cond_t notEmpty; //任务队列是否空
//线程池是否销毁
int shutdown; //释放为1,否则为0
};
/***************************************************************
* 函 数: threadPoolCreate
* 功 能: 创建线程池并初始化
* 参 数: min---工作线程的最小数量
* max---工作线程的最大数量
* capacity---任务队列的最大容量
* 返回值: 创建的线程池的地址
**************************************************************/
ThreadPool* threadPoolCreate(int min, int max, int capacity)
{
//申请线程池空间
ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
do {//此处循环只是为了便于失败释放空间,只会执行一次
if (pool == NULL) {
printf("pool create error!\n");
break;
}
//申请任务队列空间,并初始化
pool->taskQ = (Task*)malloc(sizeof(Task) * capacity);
if (pool->taskQ == NULL) {
printf("Task create error!\n");
break;
}
pool->queueCapacity = capacity;
pool->queueSize = 0;
pool->queueFront = 0;
pool->queueRear = 0;
//初始化互斥锁和条件变量
if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
pthread_cond_init(&pool->notFull, NULL) != 0 ||
pthread_cond_init(&pool->notEmpty, NULL) != 0)
{
printf("mutex or cond create error!\n");
break;
}
//初始化shutdown
pool->shutdown = 0;
//初始化线程相关参数
pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);
if (pool->threadIDs == NULL) {
printf("threadIDs create error!\n");
break;
}
memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
pool->minNum = min;
pool->maxNum = max;
pool->busyNum = 0;
pool->liveNum = min;
pool->exitNum = 0;
//创建管理者线程和工作线程
pthread_create(&pool->managerID, NULL, manager, pool);//创建管理线程
for (int i = 0; i < min; ++i) {
pthread_create(&pool->threadIDs[i], NULL, worker, pool);//创建工作线程
}
return pool;
} while (0);
//申请资源失败,释放已分配的资源
if (pool && pool->taskQ) free(pool->taskQ);
if (pool && pool->threadIDs) free(pool->threadIDs);
if (pool) free(pool);
return NULL;
}
/***************************************************************
* 函 数: threadPoolDestroy
* 功 能: 销毁线程池
* 参 数: pool---要销毁的线程池
* 返回值: 0表示销毁成功,-1表示销毁失败
**************************************************************/
int threadPoolDestroy(ThreadPool* pool)
{
if (!pool) return -1;
//关闭线程池
pool->shutdown = 1;
//阻塞回收管理者线程
pthread_join(pool->managerID, NULL);
//唤醒所有工作线程,让其自杀
for (int i = 0; i < pool->liveNum; ++i) {
pthread_cond_signal(&pool->notEmpty);
}
//释放所有互斥锁和条件变量
pthread_mutex_destroy(&pool->mutexBusy);
pthread_mutex_destroy(&pool->mutexPool);
pthread_cond_destroy(&pool->notEmpty);
pthread_cond_destroy(&pool->notFull);
//释放堆空间
if (pool->taskQ) {
free(pool->taskQ);
pool->taskQ = NULL;
}
if (pool->threadIDs) {
free(pool->threadIDs);
pool->threadIDs = NULL;
}
free(pool);
pool = NULL;
return 0;
}
/***************************************************************
* 函 数: threadPoolAdd
* 功 能: 生产者往线程池的任务队列中添加任务
* 参 数: pool---线程池
* func---函数指针,要执行的任务地址
* arg---func指向的函数的实参
* 返回值: 无
**************************************************************/
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{
pthread_mutex_lock(&pool->mutexPool);
//任务队列满,阻塞生产者
while (pool->queueSize == pool->queueCapacity && !pool->shutdown) {
pthread_cond_wait(&pool->notFull, &pool->mutexPool);
}
//判断线程池是否关闭
if (pool->shutdown) {
pthread_mutex_unlock(&pool->mutexPool);
return;
}
//添加任务进pool->taskQ
pool->taskQ[pool->queueRear].func = func;
pool->taskQ[pool->queueRear].arg = arg;
pool->queueSize++;
pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
pthread_cond_signal(&pool->notEmpty);//唤醒工作线程
pthread_mutex_unlock(&pool->mutexPool);
}
/***************************************************************
* 函 数: getThreadPoolBusyNum
* 功 能: 获取线程池忙的工作线程数量
* 参 数: pool---线程池
* 返回值: 忙的工作线程数量
**************************************************************/
int getThreadPoolBusyNum(ThreadPool* pool)
{
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
return busyNum;
}
/***************************************************************
* 函 数: getThreadPoolAliveNum
* 功 能: 获取线程池存活的工作线程数量
* 参 数: pool---线程池
* 返回值: 存活的工作线程数量
**************************************************************/
int getThreadPoolAliveNum(ThreadPool* pool)
{
pthread_mutex_lock(&pool->mutexPool);
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
return liveNum;
}
/***************************************************************
* 函 数: worker
* 功 能: 工作线程的执行函数
* 参 数: arg---实参传入,这里传入的是线程池
* 返回值: 空指针
**************************************************************/
void* worker(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
while (1) {
/* 1.取出任务队列中的队头任务 */
pthread_mutex_lock(&pool->mutexPool);
//无任务就阻塞线程
while (pool->queueSize == 0 && !pool->shutdown) {
pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
//唤醒后,判断是不是要销毁线程
if (pool->exitNum > 0) {//线程自杀
pool->exitNum--;//销毁指标-1
if (pool->liveNum > pool->minNum) {
pool->liveNum--;//活着的工作线程-1
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
}
}
//线程池关闭了就退出线程
if (pool->shutdown) {
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
//取出pool中taskQ的任务
Task task;
task.func = pool->taskQ[pool->queueFront].func;
task.arg = pool->taskQ[pool->queueFront].arg;
pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;//移动队头
pool->queueSize--;
//通知生产者添加任务
pthread_cond_signal(&pool->notFull);
pthread_mutex_unlock(&pool->mutexPool);
/* 2.设置pool的busyNum+1 */
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum++;
pthread_mutex_unlock(&pool->mutexBusy);
/* 3.执行取出的任务 */
printf("thread %ld start working ...\n", pthread_self());
task.func(task.arg);
free(task.arg);
task.arg = NULL;
printf("thread %ld end working ...\n", pthread_self());
/* 4.设置pool的busyNum-1 */
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexBusy);
}
return NULL;
}
/***************************************************************
* 函 数: manager
* 功 能: 管理者线程的执行函数
* 参 数: arg---实参传入,这里传入的是线程池
* 返回值: 空指针
**************************************************************/
void* manager(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
while (!pool->shutdown) {
/* 每隔3秒检测一次 */
sleep(3);
/* 获取pool中相关变量 */
pthread_mutex_lock(&pool->mutexPool);
int taskNum = pool->queueSize; //任务队列中的任务数量
int liveNum = pool->liveNum; //存活的工作线程数量
int busyNum = pool->busyNum; //忙碌的工作线程数量
pthread_mutex_unlock(&pool->mutexPool);
/* 功能一:增加工作线程,每次增加NUMBER个 */
//当任务个数大于存活工作线程数,且存活工作线程数小于最大值
if (taskNum > liveNum && liveNum < pool->maxNum) {
pthread_mutex_lock(&pool->mutexPool);
int counter = 0;
for (int i = 0; i < pool->maxNum && counter < NUMBER
&& pool->liveNum < pool->maxNum; ++i)
{
if (pool->threadIDs[i] == 0) {
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
counter++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->mutexPool);
}
/* 功能二:销毁工作线程,每次销毁NUMBER个 */
//当忙的线程数*2 < 存活线程数,且存活线程数 > 最小线程数
if (busyNum * 2 < liveNum && liveNum > pool->minNum) {
pthread_mutex_lock(&pool->mutexPool);
pool->exitNum = NUMBER;
//唤醒NUMBER个工作线程,让其解除阻塞,在worker函数中自杀
for (int i = 0; i < NUMBER; ++i) {
pthread_cond_signal(&pool->notEmpty);
}
pthread_mutex_unlock(&pool->mutexPool);
}
}
return NULL;
}
/***************************************************************
* 函 数: threadExit
* 功 能: 工作线程退出函数,将工作线程的ID置为0,然后退出
* 参 数: pool---线程池
* 返回值: 无
**************************************************************/
void threadExit(ThreadPool* pool)
{
//将pool->threadIDs中的ID改为0
pthread_t tid = pthread_self();
for (int i = 0; i < pool->maxNum; i++) {
if (pool->threadIDs[i] == tid) {
pool->threadIDs[i] = 0;
printf("threadExit() called, %ld exiting...\n", tid);
break;
}
}
pthread_exit(NULL);//退出
}
【2】threadpool.h:
#ifndef _THREADPOOL_H
#define _THREADPOOL_H
typedef struct ThreadPool ThreadPool;
//创建线程池并初始化
ThreadPool* threadPoolCreate(int min, int max, int capacity);
//销毁线程池
int threadPoolDestroy(ThreadPool* pool);
//给线程池添加任务
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);
//获取当前忙碌的工作线程的数量
int getThreadPoolBusyNum(ThreadPool* pool);
//获取当前存活的工作线程的数量
int getThreadPoolAliveNum(ThreadPool* pool);
/*********************其它函数**********************/
void* worker(void* arg);//工作线程的执行函数
void* manager(void* arg);//管理者线程的执行函数
void threadExit(ThreadPool* pool);//线程退出函数
#endif
【3】main.c:
#include <stdio.h>
#include "threadpool.h"
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
//任务函数,所有线程都执行此任务
void testFunc(void* arg)
{
int* num = (int*)arg;
printf("thread %ld is working, number = %d\n", pthread_self(), *num);
sleep(1);
}
int main()
{
//创建线程池: 最少3个工作线程,最多10个,任务队列容量为100
ThreadPool* pool = threadPoolCreate(3, 10, 100);
//加入100个任务于任务队列
for (int i = 0; i < 100; ++i) {
int* num = (int*)malloc(sizeof(int));
*num = i + 100;
threadPoolAdd(pool, testFunc, num);
}
//销毁线程池
sleep(30);//保证任务全部运行完毕
threadPoolDestroy(pool);
return 0;
}
【4】运行结果:
......