简易线程池开发流程

news2024/11/28 6:54:31

简易线程池开发

线程池基本结构

#include"threadpool.h"
//任务队列
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<strings.h>
typedef struct Task
{
	void(*function)(void* arg);
	void*arg;
}Task;
//线程池结构体
struct ThreadPool
{
	Task*taskkQ;//任务队列
	int queueCapacity;//当前任务个数
	int queueSize; //当前任务个数
	int queueFront;//队头
	int queueRear;// 队尾部

	pthread_t manangerID;//管理者线程id
	pthread_t *threadIDs; //工作的线程Id
	int minNum;//最小线程数量
	int maxNum;//最大线程数量
	int busyNum;//忙的线程个数
	int liveNum;//存活的线程个数
	int exitNum; //要销毁的线程个数

	pthread_mutex_t mutexPoll; //锁整个的线程池
	pthread_mutex_t mutexBusy; //锁住忙线程(因为它变动频繁)
	int shutdown;// 是不是要销毁线程池,销毁为1,不销毁为0

	//增加条件变量,阻塞消费者和生产者

	pthread_cond_t noFull; //非满写数据
	pthread_cond_t noEmpty;//非空读数据

};

ThreadPool * threadPoolCreate(int min, int max, int queueSize)
{
	ThreadPool*pool = (ThreadPool*)malloc(sizeof(ThreadPool));

	if (pool == NULL)
	{
		printf("malloc threadpool fail...\n");
		/*释放内存 free*/
        return NULL;
	}
	pool->threadIDs = (pthread_t)malloc(sizeof(pthread_t)*max);
	if (pool == NULL)
	{
		printf("malloc threadIDs fail...\n");
			/*释放内存 free*/
        return NULL;
	}
	memset(pool->threadIDs, 0, sizeof(pthread_t)*max);
	pool->minNum = min;
	pool->maxNum = max;
	pool->busyNum = 0;
	pool->liveNum = min;//初始时和最小线程数相等

	pool->exitNum = 0; //不是退出状态
// 	初始化条件变量和互斥锁

	if (pthread_mutex_init(&pool->mutexBusy) != 0 ||
		pthread_mutex_init(&pool->mutexPoll) != 0 ||
		pthread_cond_init(&pool->noFull) != 0 ||
		pthread_cond_init(&pool->noEmpty) != 0)
	{
		printf("mutex or condition init fail...\n");
	}
//任务队列初始化
	pool->taskkQ = (Task*)malloc(sizeof(Task)*queueSize);
if (pool->taskkQ == NULL)
	{
		printf("malloc taskQ fail...\n");
			/*释放内存 free*/
        return NULL;
	}
    
    
    
	pool->queueCapacity = queueSize;
	pool->queueSize = 0;
	pool->queueFront = 0;
	pool->queueRear = 0;

	pool->shutdown = 0;



//创建线程
	pthread_create(&pool->manangerID, NULL, manager, NULL);
	for (int i = 0; i < min; ++i)
	{
		pthread_create(&pool->threadIDs[i], NULL, worker, NULL);
	}




}

优化代码结构,如果 每次 malloc 失败,return NULL 之前, 都要释放内存,比较麻烦. 直接放进do while 里

#include"threadpool.h"
//任务队列
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<strings.h>
typedef struct Task
{
	void(*function)(void* arg);
	void*arg;
}Task;
//线程池结构体
struct ThreadPool
{
	Task*taskkQ;//任务队列
	int queueCapacity;//当前任务个数
	int queueSize; //当前任务个数
	int queueFront;//队头
	int queueRear;// 队尾部

	pthread_t manangerID;//管理者线程id
	pthread_t *threadIDs; //工作的线程Id
	int minNum;//最小线程数量
	int maxNum;//最大线程数量
	int busyNum;//忙的线程个数
	int liveNum;//存活的线程个数
	int exitNum; //要销毁的线程个数

	pthread_mutex_t mutexPoll; //锁整个的线程池
	pthread_mutex_t mutexBusy; //锁住忙线程(因为它变动频繁)
	int shutdown;// 是不是要销毁线程池,销毁为1,不销毁为0

	//增加条件变量,阻塞消费者和生产者

	pthread_cond_t noFull; //非满写数据
	pthread_cond_t noEmpty;//非空读数据

};

ThreadPool * threadPoolCreate(int min, int max, int queueSize)
{

	ThreadPool*pool = (ThreadPool*)malloc(sizeof(ThreadPool));
	do {

		if (pool == NULL)
		{
			printf("malloc threadpool fail...\n");
			break;
		}
		pool->threadIDs = (pthread_t)malloc(sizeof(pthread_t)*max);
		if (pool->threadIDs == NULL)
		{
			printf("malloc threadIDs fail...\n");
			break;
		}
		memset(pool->threadIDs, 0, sizeof(pthread_t)*max);
		pool->minNum = min;
		pool->maxNum = max;
		pool->busyNum = 0;
		pool->liveNum = min;//初始时和最小线程数相等

		pool->exitNum = 0; //不是退出状态
	// 	初始化条件变量和互斥锁

		if (pthread_mutex_init(&pool->mutexBusy) != 0 ||
			pthread_mutex_init(&pool->mutexPoll) != 0 ||
			pthread_cond_init(&pool->noFull) != 0 ||
			pthread_cond_init(&pool->noEmpty) != 0)
		{
			printf("mutex or condition init fail...\n");
		}
		//任务队列初始化
		pool->taskkQ = (Task*)malloc(sizeof(Task)*queueSize);
		if (pool->taskkQ == NULL)
		{
			printf("malloc pool->taskQ fail...\n");
			break;
		}
		pool->queueCapacity = queueSize;
		pool->queueSize = 0;
		pool->queueFront = 0;
		pool->queueRear = 0;

		pool->shutdown = 0;



		//创建线程
		pthread_create(&pool->manangerID, NULL, manager, NULL);
		for (int i = 0; i < min; ++i)
		{
			pthread_create(&pool->threadIDs[i], NULL, worker, pool);  
		}
        return pool; //调用成功返回 线程池实例

	} while (0);


// 调用失败则  释放资源
	if (pool&&pool->threadIDs)
		free(pool->threadIDs);

	if (pool&&pool->taskkQ)free(pool->taskkQ);
	if (pool)free(pool);
/*
if (pool->threadIDs)free(pool->threadIDs);
if (pool->taskQ)free(pool->taskQ);
if (pool)free(pool); 为什么不能写成这样
你的代码在语法上是正确的,但是在逻辑上可能会有问题。如果pool是NULL,
那么pool->threadIDs和pool->taskQ可能会导致未定义的行为,
因为你正在尝试访问一个NULL指针的成员。这可能会导致程序崩溃。



在你的原始代码中,你首先检查pool是否为NULL,然后再检查pool->threadIDs和pool->taskQ。
这样做可以确保你不会尝试访问一个NULL指针的成员。这是一种防止程序崩溃的好方法。

所以,虽然你的代码在语法上是正确的,但是在逻辑上可能会有问题。你应该始终先检查pool是否为NULL,
然后再检查pool->threadIDs和pool->taskQ。这样可以防止程序崩溃。

*/
    return NULL ;

}

pthread_create(&pool->threadIDs[i], NULL, worker, pool);这行代码是创建一个新的线程,并让这个线程开始执行worker函数。pthread_create函数的第四个参数是传递给worker函数的参数。

在这个例子中,pool是一个指向ThreadPool结构体的指针,它包含了线程池的所有信息,包括任务队列、线程ID、互斥锁和条件变量等。当你创建一个新的线程并让它执行worker函数时,你需要将pool传递给worker函数,这样worker函数就可以访问和操作线程池的所有信息了。

所以,pthread_create(&pool->threadIDs[i], NULL, worker, pool);这行代码的意思是:创建一个新的线程,让这个线程开始执行worker函数,并将pool作为参数传递给worker函数。这样,worker函数就可以通过pool来访问和操作线程池的所有信息了。希望这个解释对你有所帮助!


工作者函数

// 创建线程池
void* worker(void*arg) // 访问任务队列,任务队列属于线程池,每个线程都要对线程池操作,因此需要对线程池加锁
{
	ThreadPool*pool = (ThreadPool*)arg;
	while (1)
	{
		pthread_muxtex_lock(&pool->mutexPoll);
		//任务队列为空,且没有关闭时
		while (pool->queueSize == 0 && !pool->shutdown)
		{
			//阻塞工作线程,直到不为空时,将其唤醒
			pthread_cond_wait(&pool->noEmpty, &pool->mutexPoll);
		}
		//判断线程池是否被关闭了
		if (pool->shutdown)
		{

			pthread_mutex_unlock(&pool->mutexPoll); // 防止锁完不解,造成死锁
			pthread_exit(NULL);
		}
	//从任务队列中取出一个任务
		Task task;
		task.function = pool->taskkQ[pool->queueFront].function;
		task.arg = pool->taskkQ[pool->queueFront].arg;
		pthread_mutex_unlock(&pool->mutexPoll);





	}
	return NULL;
}

这段代码定义了一个线程池的结构体ThreadPool,以及一个任务的结构体Task

Task结构体包含两个成员:一个函数指针function和一个void类型的指针arg。这个结构体通常用于存储要在线程池中执行的任务。

ThreadPool结构体包含了线程池的所有信息,包括:

  • taskkQ:一个指向Task类型的指针,它指向一个Task类型的数组,这个数组用于存储任务队列。
  • queueCapacity:队列的容量。
  • queueSize:当前队列中的任务数量。
  • queueFrontqueueRear:分别表示任务队列的队首和队尾。
  • manangerID:管理者线程的ID。
  • threadIDs:一个指向pthread_t类型的指针,它指向一个pthread_t类型的数组,这个数组用于存储工作线程的ID。
  • minNummaxNum:分别表示线程池中的最小和最大线程数量。
  • busyNum:当前忙碌的线程数量。
  • liveNum:当前存活的线程数量。
  • exitNum:需要销毁的线程数量。
  • mutexPollmutexBusy:用于锁住整个线程池和忙碌线程的互斥锁。
  • shutdown:表示是否需要销毁线程池,如果需要销毁则为1,否则为0。
  • noFullnoEmpty:分别表示非满和非空的条件变量,用于阻塞消费者和生产者线程。

这个结构体通常用于管理线程池,包括任务的分配、线程的创建和销毁等。

taskkQ:一个指向Task类型的指针,它指向一个Task类型的数组, 这里我不太理解,这个不是创建了一个结构体类型的指针吗,为什么又是数组了

task.function = pool->taskkQ[pool->queueFront].function;

在这里,Task* taskkQ;确实是声明了一个指向Task类型的指针。然而,这个指针可以被用来指向一个Task类型的数组。这是因为在C语言中,数组名实际上就是一个指向数组第一个元素的指针。

所以,当我们说taskkQ是一个指向Task类型的数组的指针时,我们的意思是taskkQ可以被用来存储一个Task类型数组的地址。这样,你就可以通过taskkQ来访问和操作这个数组。

例如,你可以这样初始化taskkQ

pool->taskkQ = (Task*)malloc(sizeof(Task)*queueSize);

这行代码使用malloc函数动态分配了queueSizeTask类型的空间,并将返回的地址赋值给taskkQ。这样,taskkQ就指向了一个Task类型的数组,数组的大小是queueSize

然后,你就可以像操作普通数组一样操作taskkQ了。例如,taskkQ[i]就可以用来访问数组中的第i个元素。


// 创建线程池
void* worker(void*arg) // 访问任务队列,任务队列属于线程池,每个线程都要对线程池操作,因此需要对线程池加锁
{
	ThreadPool*pool = (ThreadPool*)arg;
	while (1)
	{
		pthread_muxtex_lock(&pool->mutexPoll);    
		//任务队列为空,且没有关闭时
		while (pool->queueSize == 0 && !pool->shutdown)
		{
			//阻塞工作线程,直到不为空时,将其唤醒
			pthread_cond_wait(&pool->noEmpty, &pool->mutexPoll);
		}
		//判断线程池是否被关闭了
		if (pool->shutdown)
		{

			pthread_mutex_unlock(&pool->mutexPoll); // 防止锁完不解,造成死锁
			pthread_exit(NULL);
		}
		//从任务队列中取出一个任务
		Task task;
		task.function = pool->taskkQ[pool->queueFront].function;  // 
		task.arg = pool->taskkQ[pool->queueFront].arg;
		// 移动头节点(这个任务读完了,准备读取下一个任务)
		pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
		pool->queueSize--;
		// 解锁(对当前线程池的操作完成)
		pthread_mutex_unlock(&pool->mutexPoll);
		// 	调用任务函数,执行取出的任务,忙线程加+1 ,由于忙线程是频繁变化的共享变量加锁

		pthread_mutex_lock(&pool->mutexBusy);
		pool->busyNum++;
		pthread_mutex_unlock(&pool->mutexBusy);

		task.function(task.arg);// (*task.function)(task.arg);  	调用任务函数
		// 传入的arg 分配堆内存,防止分配栈内存被释放
		free(task.arg); //结束任务之后,释放内存
		task.arg = NULL;

		pthread_mutex_lock(&pool->mutexBusy);
		pool->busyNum--;
		pthread_mutex_unlock(&pool->mutexBusy);
       //执行完任务,忙线程减一
	



	}
	return NULL;
}

在C语言中,调用函数指针的方式是通过使用函数指针的名称,后面跟上括号和参数。例如,如果你有一个函数指针void (*functionPtr)(int),你可以这样调用它:

(*functionPtr)(5);

或者更简单地:

functionPtr(5);

这两种方式都会调用functionPtr指向的函数,并传递5作为参数。

在你的代码中,task.function是一个函数指针,所以你可以通过task.function(arg)来调用它,其中arg是你要传递给函数的参数。例如:

task.function(task.arg);

这行代码会调用task.function指向的函数,并传递task.arg作为参数。希望这个解释对你有所帮助!

或者使用(*task.function )(task.arg);


管理者函数


void* manager(void* arg)
{
	ThreadPool* pool = (ThreadPool*)arg;
	while (!pool->shutdown)
	{
		sleep(3); //每3s检测一次
		//取出线程池中任务的数量和当前线程的数量
		pthread_mutex_lock(&pool->mutexPoll);
		int queueSize = pool->queueSize;
		int liveNum = pool->liveNum;
		pthread_mutex_unlock(&pool->mutexPoll);

		//取出忙的线程的数量
		pthread_mutex_lock(&pool->mutexBusy);
		int busyNum = pool->busyNum;
		pthread_mutex_unlock(&pool->mutexBusy);

		//添加线程
		// 规定:当任务个数>存活的线程个数 && 存活的线程数<最大的线程数  时 才需要增加线程

		if (queueSize > liveNum && liveNum < pool->maxNum)
		{
			pthread_mutex_lock(&pool->mutexBusy);

			int counter = 0;       // const int NUMBER 指定为2 ,每次添加线程上限两个.
			// 因为进入if语句后,在线程添加过程中,
			// 线程池中的线程个数可能随时变化,而且循环期间会进行liveNum++,
			// (可能会出现存活线程数大于最大线程数,此时不应再进行 liveNum++),故循环之前需要判断pool->liveNum<pool->maxNum

			for (int i = 0; i < pool->maxNum && counter < NUMBER && pool->liveNum < pool->maxNum; ++i)
			{
				if (pool->threadIDs[i] == 0) // 遍历可用的线程
				{
					pthread_create(&pool->threadIDs[i], NULL, worker, pool);
					counter++;
					pool->liveNum++;
				}
			}
			pthread_mutex_unlock(&pool->mutexBusy);
		}

		// 销毁线程
		//规定: 当 忙线程数*2 < 存活线程  && 存活线程>最小线程数
		if (busyNum * 2 < liveNum && liveNum > pool->minNum)
		{
			pthread_mutex_lock(&pool->mutexPoll);
			pool->exitNum = NUMBER;
			pthread_mutex_unlock(&pool->mutexPoll);

			for (int i = 0; i < NUMBER; ++i)
			{
				//唤醒那些因为任务队列为空,而被阻塞的进程,让他们执行后面的 suicide代码
				pthread_cond_signal(&pool->noEmpty);
			}
		}
	}
}

修改工作者线程,响应管理者线程

当工作者线程因为任务队列为空而被阻塞时,

工作者 阻塞线程

//任务队列为空,且没有关闭时
		while (pool->queueSize == 0 && !pool->shutdown)
		{
			//阻塞工作线程,直到需要销毁多余线程时,将其唤醒
			pthread_cond_wait(&pool->noEmpty, &pool->mutexPoll);
		}

如果 忙线程数*2 < 存活线程 && 存活线程>最小线程数 则工作者线程会接受到来自管理者线程的信号而 销毁自身线程

管理者 唤醒 工作线程

		if (busyNum * 2 < liveNum && liveNum > pool->minNum)
		{
			pthread_mutex_lock(&pool->mutexPoll);
			pool->exitNum = NUMBER;
			pthread_mutex_unlock(&pool->mutexPoll);

			for (int i = 0; i < NUMBER; ++i)
			{
				//唤醒那些因为任务队列为空,而被阻塞的进程,让他们执行后面的 suicide代码
				pthread_cond_signal(&pool->noEmpty);
			}
		}




工作者线程 接收条件变量信号 销毁自己


	while (1)
	{
		pthread_mutex_lock(&pool->mutexPoll);
		//任务队列为空,且没有关闭时
		while (pool->queueSize == 0 && !pool->shutdown)
		{
			pthread_cond_wait(&pool->noEmpty, &pool->mutexPoll);
			//	这行代码会阻塞当前线程,并等待条件变量 pool->noEmpty 的信号。当其他线程调用 pthread_cond_signal(&pool->noEmpty) 或 pthread_cond_broadcast(&pool->noEmpty) 时,这个被阻塞的线程就会被唤醒,执行后面的代码。

		//解除阻塞之后 , 自己销毁自己,销毁掉线程
			if (pool->exitNum > 0)
			{
				pool->exitNum--;
				if (pool->liveNum > pool->minNum)
				{
					pool->liveNum--;
				}
				pthread_mutex_unlock(&pool->mutexPoll);
				pthread_exit(NULL);
				
			}
		}
		

增加 线程退出函数

由于管理者函数添加线程时,需要根据线程id 重复利用线程,

		for (int i = 0; i < pool->maxNum && counter < NUMBER && pool->liveNum < pool->maxNum; ++i)
			{
				if (pool->threadIDs[i] == 0) // 遍历可用的线程
				{
					pthread_create(&pool->threadIDs[i], NULL, worker, pool);
					counter++;
					pool->liveNum++;
				}
			}
			pthread_mutex_unlock(&pool->mutexBusy);
		}

而在工作者函数中退出线程时,只是简单的 pthread_exit, 没有将threadIDs 置为0

	if (pool->shutdown)
		{
	
			pthread_mutex_unlock(&pool->mutexPoll); // 防止锁完不解,造成死锁
			pthread_exit(NULL);
		}
		
		

故 用封装好的 threadExit 替换 工作者线程的 pthread_exit(NULL)

void* threadExit(ThreadPool* pool)
{
	pthread_t tid = pthread_self();
	for (int i = 0; i < pool->maxNum; ++i)
	{
		if (pool->threadIDs[i] == tid)
		{
			pool->threadIDs[i] = 0;
			printf("threadExit() called, %ld exiting...\n", tid);
			break;
		}
	}
	pthread_exit(NULL);
};

综上 的工作者线程代码

// 创建线程池
void* worker(void* arg) // 访问任务队列,任务队列属于线程池,每个线程都要对线程池操作,因此需要对线程池加锁
{
	ThreadPool* pool = (ThreadPool*)arg;
	while (1)
	{
		pthread_mutex_lock(&pool->mutexPoll);
		//任务队列为空,且没有关闭时
		while (pool->queueSize == 0 && !pool->shutdown)
		{
			pthread_cond_wait(&pool->noEmpty, &pool->mutexPoll);
			//	这行代码会阻塞当前线程,并等待条件变量 pool->noEmpty 的信号。当其他线程调用 pthread_cond_signal(&pool->noEmpty) 或 pthread_cond_broadcast(&pool->noEmpty) 时,这个被阻塞的线程就会被唤醒。

		//解除阻塞之后 , 自己销毁自己,销毁掉线程
			if (pool->exitNum > 0)
			{
				pool->exitNum--;
				if (pool->liveNum > pool->minNum)
				{
					pool->liveNum--;
				}
				pthread_mutex_unlock(&pool->mutexPoll);
				//线程退出并将threadID 置为 0 表明此线程是可以被下次添加的

				threadExit(pool);// 封装的线程退出函数,将threadID 置为0
			}
		}
		//判断线程池是否被关闭了
		if (pool->shutdown)
		{
			pthread_mutex_unlock(&pool->mutexPoll); // 防止锁完不解,造成死锁
			//	pthread_exit(NULL);
			threadExit(pool);
		}
		//从任务队列中取出一个任务
		Task task;
		task.function = pool->taskkQ[pool->queueFront].function;
		task.arg = pool->taskkQ[pool->queueFront].arg;
		// 移动头节点(这个任务读完了,准备读取下一个任务)
		pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
		pool->queueSize--;
		// 解锁(对当前线程池的操作完成)
		pthread_mutex_unlock(&pool->mutexPoll);
		// 	调用任务函数,执行取出的任务,忙线程加+1 ,由于忙线程是频繁变化的共享变量加锁

		pthread_mutex_lock(&pool->mutexBusy);
		pool->busyNum++;
		pthread_mutex_unlock(&pool->mutexBusy);

		task.function(task.arg);// (*task.function)(task.arg);  	调用任务函数
		// 传入的arg 分配堆内存,防止分配栈内存被释放
		free(task.arg); //结束任务之后,释放内存
		task.arg = NULL;

		pthread_mutex_lock(&pool->mutexBusy);
		pool->busyNum--;
		pthread_mutex_unlock(&pool->mutexBusy);
		//执行完任务,忙线程减一
	}

	return NULL;
}

给线程池增加任务


void threadpoolAdd(ThreadPool* pool, void(*func)(void*), void* arg) // 第二个参数是任务 ,任务是struct Task 类型,
// 每个task 里面有两个指针,一个是函数指针 void(*function)(void* arg);	   一个是 函数指针的参数 是 void * 类型
{
	pthread_mutex_lock(&pool->mutexPoll);
	//当任务队列满了的时候,阻塞生产者
	while (pool->queueSize == pool->queueCapacity && !pool->shutdown)
	{
		pthread_cond_wait(&pool->noFull, &pool->mutexPoll);
	}
	//	阻塞解除后 ,判断线程池是不是被关闭了
	if (pool->shutdown)
	{
		pthread_mutex_unlock(&pool->mutexPoll);//防止死锁
		return;
	}
	//添加任务到队尾

	pool->taskkQ[pool->queueRear].function = func;
	pool->taskkQ[pool->queueRear].arg = arg;

	//	队尾向后移动,准备添加下一个
	pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
	pool->queueSize++; //任务队列个数+1

	pthread_cond_signal(&pool->noFull);

	pthread_mutex_unlock(&pool->mutexPoll);
};

获取线程池中工作的线程个数 获取线程池中活着的线程个数

// 获取线程池中工作的线程个数
int threadPoolBusyNum(ThreadPool* pool)
{
	pthread_mutex_lock(&pool->mutexBusy);
	int busyNum = pool->busyNum;
	pthread_mutex_unlock(&pool->mutexBusy);
	return busyNum;
};

// 获取线程池中活着的线程个数
int threadPoolAliveNum(ThreadPool* pool)
{
	pthread_mutex_lock(&pool->mutexPool);
	int aliveNum = pool->liveNum;
	pthread_mutex_unlock(&pool->mutexPool);
	return aliveNum;
};

销毁线程池

int threadPoolDestroy(ThreadPool* pool)
{
	if (pool == NULL)
	{
		return -1;
	}
	//关闭线程池
	pool->shutdown = 1;  //管理者线程就会退出
	//阻塞回收管理者线程
	pthread_join(pool->manangerID, NULL);
	//唤醒阻塞的消费者线程
	for (int i = 0; i < pool->liveNum; ++i)  
	{
		pthread_cond_signal(&pool->noEmpty);
	}
	// 释放堆内存
	if (pool->taskkQ)
	{
		free(pool->taskkQ);
		pool->taskkQ = NULL;
	}
	if (pool->threadIDs)
	{
		free(pool->threadIDs);
		pool->threadIDs = NULL;
	}

	//销毁互斥锁
	pthread_mutex_destroy(&pool->mutexBusy);
	pthread_mutex_destroy(&pool->mutexPool);

	pthread_cond_destroy(&pool->noEmpty);
	pthread_cond_destroy(&pool->noFull);


	free(pool);
	pool = NULL;

	return 0;
}

pool->shutdown = 1; //管理者线程就会退出


void* manager(void* arg)
{
	ThreadPool* pool = (ThreadPool*)arg;
	while (!pool->shutdown)
	{
        /***其他内容***/
        
    }
}
 pthread_join(pool->manangerID, NULL); //阻塞回收管理者线程

//唤醒阻塞的消费者线程, 就会执行后面的销毁代码

	for (int i = 0; i < pool->liveNum; ++i)  
	{
		pthread_cond_signal(&pool->noEmpty);
	}

	while (1)
	{
		pthread_mutex_lock(&pool->mutexPoll);
		//任务队列为空,且没有关闭时
		while (pool->queueSize == 0 && !pool->shutdown)
		{
			pthread_cond_wait(&pool->noEmpty, &pool->mutexPoll);
			//	这行代码会阻塞当前线程,并等待条件变量 pool->noEmpty 的信号。当其他线程调用 pthread_cond_signal(&pool->noEmpty) 或 pthread_cond_broadcast(&pool->noEmpty) 时,这个被阻塞的线程就会被唤醒,执行后面的代码。

		//解除阻塞之后 , 自己销毁自己,销毁掉线程
			if (pool->exitNum > 0)
			{
				pool->exitNum--;
				if (pool->liveNum > pool->minNum)
				{
					pool->liveNum--;
				}
				pthread_mutex_unlock(&pool->mutexPoll);
				pthread_exit(NULL);
				
			}
		}
        
    }




完整代码

1. 线程池原理

我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。

那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务呢?

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

在各个编程语言的语种中都有线程池的概念,并且很多语言中直接提供了线程池,作为程序猿直接使用就可以了,下面给大家介绍一下线程池的实现原理:

  • 线程池的组成主要分为3个部分,这三部分配合工作就可以得到一个完整的线程池:

    1. 任务队列,存储需要处理的任务,由工作的线程来处理这些任务
      
      • 通过线程池提供的API函数,将一个待处理的任务添加到任务队列,或者从任务队列中删除
      • 已处理的任务会被从任务队列中删除
      • 线程池的使用者,也就是调用线程池函数往任务队列中添加任务的线程就是生产者线程
    2. 工作的线程(任务队列任务的消费者) ,N个
      
      • 线程池中维护了一定数量的工作线程, 他们的作用是是不停的读任务队列, 从里边取出任务并处理
      • 工作的线程相当于是任务队列的消费者角色,
      • 如果任务队列为空, 工作的线程将会被阻塞 (使用条件变量/信号量阻塞)
      • 如果阻塞之后有了新的任务, 由生产者将阻塞解除, 工作线程开始工作
    3. 管理者线程(不处理任务队列中的任务),1个
      
      • 它的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测
        • 当任务过多的时候, 可以适当的创建一些新的工作线程
        • 当任务过少的时候, 可以适当的销毁一些工作的线程

2. 任务队列

`在这里插入图片描述

// 任务结构体
typedef struct Task
{
    void (*function)(void* arg);
    void* arg;
}Task;

3. 线程池定义

C++
// 线程池结构体
struct ThreadPool
{
    // 任务队列
    Task* taskQ;
    int queueCapacity;  // 容量
    int queueSize;      // 当前任务个数
    int queueFront;     // 队头 -> 取数据
    int queueRear;      // 队尾 -> 放数据

    pthread_t managerID;    // 管理者线程ID
    pthread_t *threadIDs;   // 工作的线程ID
    int minNum;             // 最小线程数量
    int maxNum;             // 最大线程数量
    int busyNum;            // 忙的线程的个数
    int liveNum;            // 存活的线程的个数
    int exitNum;            // 要销毁的线程个数
    pthread_mutex_t mutexPool;  // 锁整个的线程池
    pthread_mutex_t mutexBusy;  // 锁busyNum变量
    pthread_cond_t notFull;     // 任务队列是不是满了
    pthread_cond_t notEmpty;    // 任务队列是不是空了

    int shutdown;           // 是不是要销毁线程池, 销毁为1, 不销毁为0
};

4. 头文件声明

C
#ifndef _THREADPOOL_H
#define _THREADPOOL_H

typedef struct ThreadPool ThreadPool;
// 创建线程池并初始化
ThreadPool *threadPoolCreate(int min, int max, int queueSize);

// 销毁线程池
int threadPoolDestroy(ThreadPool* pool);

// 给线程池添加任务
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);

// 获取线程池中工作的线程的个数
int threadPoolBusyNum(ThreadPool* pool);

// 获取线程池中活着的线程的个数
int threadPoolAliveNum(ThreadPool* pool);

//
// 工作的线程(消费者线程)任务函数
void* worker(void* arg);
// 管理者线程任务函数
void* manager(void* arg);
// 单个线程退出
void threadExit(ThreadPool* pool);
#endif  // _THREADPOOL_H

5. 源文件定义

C

ThreadPool* threadPoolCreate(int min, int max, int queueSize)
{
    ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
    do 
    {
        if (pool == NULL)
        {
            printf("malloc threadpool fail...\n");
            break;
        }

        pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);
        if (pool->threadIDs == NULL)
        {
            printf("malloc threadIDs fail...\n");
            break;
        }
        memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
        pool->minNum = min;
        pool->maxNum = max;
        pool->busyNum = 0;
        pool->liveNum = min;    // 和最小个数相等
        pool->exitNum = 0;

        if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
            pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
            pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
            pthread_cond_init(&pool->notFull, NULL) != 0)
        {
            printf("mutex or condition init fail...\n");
            break;
        }

        // 任务队列
        pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);
        pool->queueCapacity = queueSize;
        pool->queueSize = 0;
        pool->queueFront = 0;
        pool->queueRear = 0;

        pool->shutdown = 0;

        // 创建线程
        pthread_create(&pool->managerID, NULL, manager, pool);
        for (int i = 0; i < min; ++i)
        {
            pthread_create(&pool->threadIDs[i], NULL, worker, pool);
        }
        return pool;
    } while (0);

    // 释放资源
    if (pool && pool->threadIDs) free(pool->threadIDs);
    if (pool && pool->taskQ) free(pool->taskQ);
    if (pool) free(pool);

    return NULL;
}

int threadPoolDestroy(ThreadPool* pool)
{
    if (pool == NULL)
    {
        return -1;
    }

    // 关闭线程池
    pool->shutdown = 1;
    // 阻塞回收管理者线程
    pthread_join(pool->managerID, NULL);
    // 唤醒阻塞的消费者线程
    for (int i = 0; i < pool->liveNum; ++i)
    {
        pthread_cond_signal(&pool->notEmpty);
    }
    // 释放堆内存
    if (pool->taskQ)
    {
        free(pool->taskQ);
    }
    if (pool->threadIDs)
    {
        free(pool->threadIDs);
    }

    pthread_mutex_destroy(&pool->mutexPool);
    pthread_mutex_destroy(&pool->mutexBusy);
    pthread_cond_destroy(&pool->notEmpty);
    pthread_cond_destroy(&pool->notFull);

    free(pool);
    pool = NULL;

    return 0;
}


void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{
    pthread_mutex_lock(&pool->mutexPool);
    while (pool->queueSize == pool->queueCapacity && !pool->shutdown)
    {
        // 阻塞生产者线程
        pthread_cond_wait(&pool->notFull, &pool->mutexPool);
    }
    if (pool->shutdown)
    {
        pthread_mutex_unlock(&pool->mutexPool);
        return;
    }
    // 添加任务
    pool->taskQ[pool->queueRear].function = func;
    pool->taskQ[pool->queueRear].arg = arg;
    pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
    pool->queueSize++;

    pthread_cond_signal(&pool->notEmpty);
    pthread_mutex_unlock(&pool->mutexPool);
}

int threadPoolBusyNum(ThreadPool* pool)
{
    pthread_mutex_lock(&pool->mutexBusy);
    int busyNum = pool->busyNum;
    pthread_mutex_unlock(&pool->mutexBusy);
    return busyNum;
}

int threadPoolAliveNum(ThreadPool* pool)
{
    pthread_mutex_lock(&pool->mutexPool);
    int aliveNum = pool->liveNum;
    pthread_mutex_unlock(&pool->mutexPool);
    return aliveNum;
}

void* worker(void* arg)
{
    ThreadPool* pool = (ThreadPool*)arg;

    while (1)
    {
        pthread_mutex_lock(&pool->mutexPool);
        // 当前任务队列是否为空
        while (pool->queueSize == 0 && !pool->shutdown)
        {
            // 阻塞工作线程
            pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);

            // 判断是不是要销毁线程
            if (pool->exitNum > 0)
            {
                pool->exitNum--;
                if (pool->liveNum > pool->minNum)
                {
                    pool->liveNum--;
                    pthread_mutex_unlock(&pool->mutexPool);
                    threadExit(pool);
                }
            }
        }

        // 判断线程池是否被关闭了
        if (pool->shutdown)
        {
            pthread_mutex_unlock(&pool->mutexPool);
            threadExit(pool);
        }

        // 从任务队列中取出一个任务
        Task task;
        task.function = pool->taskQ[pool->queueFront].function;
        task.arg = pool->taskQ[pool->queueFront].arg;
        // 移动头结点
        pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
        pool->queueSize--;
        // 解锁
        pthread_cond_signal(&pool->notFull);
        pthread_mutex_unlock(&pool->mutexPool);

        printf("thread %ld start working...\n", pthread_self());
        pthread_mutex_lock(&pool->mutexBusy);
        pool->busyNum++;
        pthread_mutex_unlock(&pool->mutexBusy);
        task.function(task.arg);
        free(task.arg);
        task.arg = NULL;

        printf("thread %ld end working...\n", pthread_self());
        pthread_mutex_lock(&pool->mutexBusy);
        pool->busyNum--;
        pthread_mutex_unlock(&pool->mutexBusy);
    }
    return NULL;
}

void* manager(void* arg)
{
    ThreadPool* pool = (ThreadPool*)arg;
    while (!pool->shutdown)
    {
        // 每隔3s检测一次
        sleep(3);

        // 取出线程池中任务的数量和当前线程的数量
        pthread_mutex_lock(&pool->mutexPool);
        int queueSize = pool->queueSize;
        int liveNum = pool->liveNum;
        pthread_mutex_unlock(&pool->mutexPool);

        // 取出忙的线程的数量
        pthread_mutex_lock(&pool->mutexBusy);
        int busyNum = pool->busyNum;
        pthread_mutex_unlock(&pool->mutexBusy);

        // 添加线程
        // 任务的个数>存活的线程个数 && 存活的线程数<最大线程数
        if (queueSize > liveNum && liveNum < pool->maxNum)
        {
            pthread_mutex_lock(&pool->mutexPool);
            int counter = 0;
            for (int i = 0; i < pool->maxNum && counter < NUMBER
                && pool->liveNum < pool->maxNum; ++i)
            {
                if (pool->threadIDs[i] == 0)
                {
                    pthread_create(&pool->threadIDs[i], NULL, worker, pool);
                    counter++;
                    pool->liveNum++;
                }
            }
            pthread_mutex_unlock(&pool->mutexPool);
        }
        // 销毁线程
        // 忙的线程*2 < 存活的线程数 && 存活的线程>最小线程数
        if (busyNum * 2 < liveNum && liveNum > pool->minNum)
        {
            pthread_mutex_lock(&pool->mutexPool);
            pool->exitNum = NUMBER;
            pthread_mutex_unlock(&pool->mutexPool);
            // 让工作的线程自杀
            for (int i = 0; i < NUMBER; ++i)
            {
                pthread_cond_signal(&pool->notEmpty);
            }
        }
    }
    return NULL;
}

void threadExit(ThreadPool* pool)
{
    pthread_t tid = pthread_self();
    for (int i = 0; i < pool->maxNum; ++i)
    {
        if (pool->threadIDs[i] == tid)
        {
            pool->threadIDs[i] = 0;
            printf("threadExit() called, %ld exiting...\n", tid);
            break;
        }
    }
    pthread_exit(NULL);
}

6. 测试代码

C
void taskFunc(void* arg)
{
    int num = *(int*)arg;
    printf("thread %ld is working, number = %d\n",
        pthread_self(), num);
    sleep(1);
}

int main()
{
    // 创建线程池
    ThreadPool* pool = threadPoolCreate(3, 10, 100);
    for (int i = 0; i < 100; ++i)
    {
        int* num = (int*)malloc(sizeof(int));
        *num = i + 100;
        threadPoolAdd(pool, taskFunc, num);
    }

    sleep(30);

    threadPoolDestroy(pool);
    return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1175086.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

1.UML面向对象类图和关系

文章目录 4种静态结构图类图类的表示类与类之间的关系依赖关系(Dependency)关联关系(Association)聚合(Aggregation)组合(Composition)实现(Realization)继承/泛化(Inheritance/Generalization)常用的UML工具reference欢迎访问个人网络日志🌹🌹知行空间🌹🌹 4种静态结构…

力扣 141.环形链表和142.环形链表2

目录 1.环形链表Ⅰ解题思路2.环形链表Ⅰ代码实现3.环形链表Ⅱ解题思路4.环形链表Ⅱ代码实现 1.环形链表Ⅰ解题思路 利用快慢指针&#xff0c;快指针一次走两个&#xff0c;慢指针一次走一个&#xff0c;如果出现了快指针为空或者快指针的next为空的现象则说明不带环&#xff0…

2023年亚太杯APMCM数学建模大赛ABC题辅导及组队

2023年亚太杯APMCM数学建模大赛 ABC题 一元线性回归分析类 回归分析&#xff08;Regression Analysis)是确定两种或两种以上变量间相互依赖的定量关系的一种统计分析方法。   – 按涉及变量个数划分   • 一元回归分析   • 多元回归分析   – 按自变量和因变量之间关…

【Python从入门到进阶】41、有关requests代理的使用

接上篇《40、requests的基本使用》 上一篇我们介绍了requests库的基本使用&#xff0c;本篇我们来学习requests的代理。 一、引言 在网络爬虫和数据抓取的过程中&#xff0c;我们经常需要发送HTTP请求来获取网页内容或与远程服务器进行通信。然而&#xff0c;在某些情况下&…

【SpringSecurity6.x】会话管理

只需在两个浏览器中用同一个账号登录就会发现,到目前为止,系统尚未有任何会话并发限制。一个账户能多处同时登录可不是一个好的策略。事实上,Spring Security已经为我们提供了完善的会话管理功能,包括会话固定攻击、会话超时检测以及会话并发控制。 理解会话 会话(sessi…

白标软件:时间与金钱的双赢助手

白标的好处是你不需要从零开始构建一个应用程序。供应商提供软件解决方案&#xff0c;而你提供品牌&#xff0c;并将应用程序包装、市场推广和盈利。 白标软件帮助节省时间和金钱的六种方式&#xff1a; 1、不需要招募软件开发组织或专业人员 传统上&#xff0c;软件开发需要…

腾讯云CVM服务器标准型S5、SA3、S6详细介绍

腾讯云CVM服务器标准型实例的各项性能参数平衡&#xff0c;标准型云服务器适用于大多数常规业务&#xff0c;例如&#xff1a;web网站及中间件等&#xff0c;常见的标准型云服务器有CVM标准型S5、S6、SA3、SR1、S5se等规格&#xff0c;腾讯云服务器网txyfwq.com来详细说下云服务…

腾讯云服务器CVM S5服务器CPU性能测评和优惠价格表

腾讯云服务器CVM标准型S5有活动&#xff0c;CVM 2核2G S5优惠价280.8元一年自带1M带宽&#xff0c;15个月313.2元、2核4G配置748.2元15个月、4核8G配置1437.24元15个月、8核16G优惠价3048.48元15个月&#xff0c;公网带宽可选1M、3M、5M或10M&#xff0c;腾讯云服务器网txyfwq.…

喜讯!极限科技成功签约中国一汽搜索数据库三年许可订阅合同!

中标喜讯&#xff01;极限科技 INFINI Easysearch 成功签约中国第一汽车股份有限公司三年订阅合同&#xff01; 一汽集团作为国内汽车行业龙头企业&#xff0c;数字化转型伴随业务发展不断深化&#xff0c;非结构化数据日益成为各类组织数据的增长主力&#xff0c;逐渐成为数据…

.NET Framework中自带的泛型委托Func

Func<>是.NET Framework中自带的泛型委托&#xff0c;可以接收一个或多个输入参数&#xff0c;并且有返回值&#xff0c;和Action类似&#xff0c;.NET基类库也提供了多达16个输入参数的Func委托&#xff0c;输出参数只有1个。 1、Func泛型委托 .NET Framework为我们提…

C#学习中关于Visual Studio中ctrl+D快捷键(快速复制当前行)失效的解决办法

1、进入VisualStudio主界面点击工具——>再点击选项 2、进入选项界面后点击环境——>再点击键盘&#xff0c;我们可用看到右边的界面的映射方案是VisualC#2005 3、 最后点击下拉框&#xff0c;选择默认值&#xff0c;点击之后确定即可恢复ctrlD的快捷键功能 4、此时可以正…

有限域的Fast Multiplication和Modular Reduction算法实现

1. 引言 关于有限域的基础知识&#xff0c;可参考&#xff1a; RISC Zero团队2022年11月视频 Intro to Finite Fields: RISC Zero Study Club 有限域几乎是密码学中所有数学的基础。 ZKP证明系统中的所有运算都是基于有限域的&#xff1a; 使用布尔运算的数字电路&#xf…

基于单片机的养殖场温度控制系统设计

博主主页&#xff1a;单片机辅导设计 博主简介&#xff1a;专注单片机技术领域和毕业设计项目。 主要内容&#xff1a;毕业设计、简历模板、学习资料、技术咨询。 文章目录 主要介绍一、控制系统设计二、系统方案设计2.1 系统运行方案设计2.1.1 羊舍环境温度的确定 三、 系统仿…

在Windows或Mac上安装并运行LLAMA2

LLAMA2在不同系统上运行的结果 LLAMA2 在windows 上运行的结果 LLAMA2 在Mac上运行的结果 安装Llama2的不同方法 方法一&#xff1a; 编译 llama.cpp 克隆 llama.cpp git clone https://github.com/ggerganov/llama.cpp.git 通过conda 创建或者venv. 下面是通过conda 创建…

基于ssm车位租赁系统+vue(2023年☆全网唯一)【附开发文档|表结构|万字文档(LW)和搭建文档】

主要功能 前台登录&#xff1a; 注册用户&#xff1a;用户账号、密码、姓名、手机号、身份证号、性别、邮箱 用户&#xff1a; ①首页、车位展示、公告展示、查看更多 ②车位类型、车位介绍、车位收藏、留言、我要租赁、公告、留言板 ③个人中心、车位收藏、车位租赁订单、已到…

由于找不到msvcr110.dll,无法继续执行代码。重新安装程序可能会解决此问题,解决方法分享

MSVCR110.dll是Microsoft Visual C 2012 Redistributable的一个组件&#xff0c;它包含了许多运行库文件&#xff0c;这些文件是许多应用程序和游戏所必需的。当您在运行某些程序或游戏时&#xff0c;可能会遇到“msvcr110.dll丢失”的错误提示。这是因为您的计算机上缺少了MSV…

Linux Vim撤销和恢复撤销快捷键

使用 Vim 编辑文件内容时&#xff0c;经常会有如下 2 种需求&#xff1a; 对文件内容做了修改之后&#xff0c;却发现整个修改过程是错误或者没有必要的&#xff0c;想将文件恢复到修改之前的样子。 将文件内容恢复之后&#xff0c;经过仔细考虑&#xff0c;又感觉还是刚才修改…

kubenetes认证、授权、准入控制

一、Api Server kube-apiserver是 Kubernetes 最重要的核心组件之一&#xff0c;主要提供以下的功能&#xff1a; 提供集群管理的REST API接口&#xff0c;包括认证授权、数据校验以及集群状态变更等&#xff1b; 提供其他模块之间的数据交互和通信的枢纽&#xff08;其他模块…

基于猎食者算法的无人机航迹规划-附代码

基于猎食者算法的无人机航迹规划 文章目录 基于猎食者算法的无人机航迹规划1.猎食者搜索算法2.无人机飞行环境建模3.无人机航迹规划建模4.实验结果4.1地图创建4.2 航迹规划 5.参考文献6.Matlab代码 摘要&#xff1a;本文主要介绍利用猎食者算法来优化无人机航迹规划。 1.猎食者…

高效处理文件:批量顺序编号重命名方法

每个人都面临着文件管理的挑战&#xff0c;特别是那些需要处理大量文件的人。如何高效地管理这些文件一直是一个难题。为了解决这个问题&#xff0c;我向大家推荐一款强大的文件管理工具——固乔文件管家。这个工具可以帮助你快速有效地给文件进行批量重命名和编号&#xff0c;…