线程池666666

news2024/12/23 10:32:29

1. 作用

线程池内部维护了多个工作线程,每个工作线程都会去任务队列中拿取任务并执行,当执行完一个任务后不是马上销毁,而是继续保留执行其它任务。显然,线程池提高了多线程的复用率,减少了创建和销毁线程的时间。

2. 实现原理

线程池内部由任务队列、工作线程和管理者线程组成。

任务队列:存储需要处理的任务。每个任务其实就是具体的函数,在任务队列中存储函数指针和对应的实参。当工作线程获取任务后,就能根据函数指针来调用指定的函数。其实现可以是数组、链表、STL容器等。

工作线程:有N个工作线程,每个工作线程会去任务队列中拿取任务,然后执行具体的任务。当任务被处理后,任务队列中就不再有该任务了。当任务队列中没有任务时,工作线程就会阻塞。

管理者线程:周期性检测忙碌的工作线程数量和任务数量。当任务较多线程不够用时,管理者线程就会多创建几个工作线程来加快处理(不会超过工作线程数量的上限)。当任务较少线程空闲多时,管理者线程就会销毁几个工作线程来减少内存占用(不会低于工作线程数量的下限)。

注意:线程池中没有维护“生产者线程”,所谓的“生产者线程”就是往任务队列中添加任务的线程。

3. 手撕线程池

参考来源:爱编程的大丙。

【1】threadpool.c:

#include "threadpool.h"
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stdio.h>

#define NUMBER	2	//管理者线程增加或减少的工作线程数量

//任务结构体
typedef struct Task {
	void (*func)(void* arg);
	void* arg;
} Task;


//线程池结构体
struct ThreadPool {
	//任务队列,视为环形队列
	Task* taskQ;
	int queueCapacity;	//队列容量
	int queueSize;		//当前任务个数
	int queueFront;		//队头 -> 取任务
	int queueRear;		//队尾 -> 加任务
	//线程相关
	pthread_t managerID;	//管理者线程ID
	pthread_t* threadIDs;	//工作线程ID
	int minNum;				//工作线程最小数量
	int maxNum;				//工作线程最大数量
	int busyNum;			//工作线程忙的数量
	int liveNum;			//工作线程存活数量
	int exitNum;			//要销毁的工作线程数量
	pthread_mutex_t mutexPool;	//锁整个线程池
	pthread_mutex_t mutexBusy;	//锁busyNum
	pthread_cond_t notFull;		//任务队列是否满
	pthread_cond_t notEmpty;	//任务队列是否空
	//线程池是否销毁
	int shutdown;		//释放为1,否则为0
};

/***************************************************************
 * 函  数: threadPoolCreate
 * 功  能: 创建线程池并初始化
 * 参  数: min---工作线程的最小数量
 *         max---工作线程的最大数量
 *		   capacity---任务队列的最大容量
 * 返回值: 创建的线程池的地址
 **************************************************************/
ThreadPool* threadPoolCreate(int min, int max, int capacity)
{
	//申请线程池空间
	ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
	do {//此处循环只是为了便于失败释放空间,只会执行一次
		if (pool == NULL) {
			printf("pool create error!\n");
			break;
		}
		//申请任务队列空间,并初始化
		pool->taskQ = (Task*)malloc(sizeof(Task) * capacity);
		if (pool->taskQ == NULL) {
			printf("Task create error!\n");
			break;
		}
		pool->queueCapacity = capacity;
		pool->queueSize = 0;
		pool->queueFront = 0;
		pool->queueRear = 0;
		//初始化互斥锁和条件变量
		if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
			pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
			pthread_cond_init(&pool->notFull, NULL) != 0 ||
			pthread_cond_init(&pool->notEmpty, NULL) != 0)
		{
			printf("mutex or cond create error!\n");
			break;
		}
		//初始化shutdown
		pool->shutdown = 0;
		//初始化线程相关参数
		pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);
		if (pool->threadIDs == NULL) {
			printf("threadIDs create error!\n");
			break;
		}
		memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
		pool->minNum = min;
		pool->maxNum = max;
		pool->busyNum = 0;
		pool->liveNum = min;
		pool->exitNum = 0;
		//创建管理者线程和工作线程
		pthread_create(&pool->managerID, NULL, manager, pool);//创建管理线程
		for (int i = 0; i < min; ++i) {
			pthread_create(&pool->threadIDs[i], NULL, worker, pool);//创建工作线程
		}
		return pool;
	} while (0);
	//申请资源失败,释放已分配的资源
	if (pool && pool->taskQ) free(pool->taskQ);
	if (pool && pool->threadIDs) free(pool->threadIDs);
	if (pool) free(pool);
	return NULL;
}


/***************************************************************
 * 函  数: threadPoolDestroy
 * 功  能: 销毁线程池
 * 参  数: pool---要销毁的线程池
 * 返回值: 0表示销毁成功,-1表示销毁失败
 **************************************************************/
int threadPoolDestroy(ThreadPool* pool)
{
	if (!pool) return -1;
	//关闭线程池
	pool->shutdown = 1;
	//阻塞回收管理者线程
	pthread_join(pool->managerID, NULL);
	//唤醒所有工作线程,让其自杀
	for (int i = 0; i < pool->liveNum; ++i) {
		pthread_cond_signal(&pool->notEmpty);
	}
	//释放所有互斥锁和条件变量
	pthread_mutex_destroy(&pool->mutexBusy);
	pthread_mutex_destroy(&pool->mutexPool);
	pthread_cond_destroy(&pool->notEmpty);
	pthread_cond_destroy(&pool->notFull);
	//释放堆空间
	if (pool->taskQ) {
		free(pool->taskQ);
		pool->taskQ = NULL;
	}
	if (pool->threadIDs) {
		free(pool->threadIDs);
		pool->threadIDs = NULL;
	}
	free(pool);
	pool = NULL;
	return 0;
}


/***************************************************************
 * 函  数: threadPoolAdd
 * 功  能: 生产者往线程池的任务队列中添加任务
 * 参  数: pool---线程池
 *		   func---函数指针,要执行的任务地址
 *		   arg---func指向的函数的实参
 * 返回值: 无
 **************************************************************/
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{
	pthread_mutex_lock(&pool->mutexPool);
	//任务队列满,阻塞生产者
	while (pool->queueSize == pool->queueCapacity && !pool->shutdown) {
		pthread_cond_wait(&pool->notFull, &pool->mutexPool);
	}
	//判断线程池是否关闭
	if (pool->shutdown) {
		pthread_mutex_unlock(&pool->mutexPool);
		return;
	}
	//添加任务进pool->taskQ
	pool->taskQ[pool->queueRear].func = func;
	pool->taskQ[pool->queueRear].arg = arg;
	pool->queueSize++;
	pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
	pthread_cond_signal(&pool->notEmpty);//唤醒工作线程
	pthread_mutex_unlock(&pool->mutexPool);
}


/***************************************************************
 * 函  数: getThreadPoolBusyNum
 * 功  能: 获取线程池忙的工作线程数量
 * 参  数: pool---线程池
 * 返回值: 忙的工作线程数量
 **************************************************************/
int getThreadPoolBusyNum(ThreadPool* pool)
{
	pthread_mutex_lock(&pool->mutexBusy);
	int busyNum = pool->busyNum;
	pthread_mutex_unlock(&pool->mutexBusy);
	return busyNum;
}


/***************************************************************
 * 函  数: getThreadPoolAliveNum
 * 功  能: 获取线程池存活的工作线程数量
 * 参  数: pool---线程池
 * 返回值: 存活的工作线程数量
 **************************************************************/
int getThreadPoolAliveNum(ThreadPool* pool)
{
	pthread_mutex_lock(&pool->mutexPool);
	int liveNum = pool->liveNum;
	pthread_mutex_unlock(&pool->mutexPool);
	return liveNum;
}


/***************************************************************
 * 函  数: worker
 * 功  能: 工作线程的执行函数
 * 参  数: arg---实参传入,这里传入的是线程池
 * 返回值: 空指针
 **************************************************************/
void* worker(void* arg)
{
	ThreadPool* pool = (ThreadPool*)arg;
	while (1) {
		/* 1.取出任务队列中的队头任务 */
		pthread_mutex_lock(&pool->mutexPool);
		//无任务就阻塞线程
		while (pool->queueSize == 0 && !pool->shutdown) {
			pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
			//唤醒后,判断是不是要销毁线程
			if (pool->exitNum > 0) {//线程自杀
				pool->exitNum--;//销毁指标-1
				if (pool->liveNum > pool->minNum) {
					pool->liveNum--;//活着的工作线程-1
					pthread_mutex_unlock(&pool->mutexPool);
					threadExit(pool);
				}
			}
		}
		//线程池关闭了就退出线程
		if (pool->shutdown) {
			pthread_mutex_unlock(&pool->mutexPool);
			threadExit(pool);
		}
		//取出pool中taskQ的任务
		Task task;
		task.func = pool->taskQ[pool->queueFront].func;
		task.arg = pool->taskQ[pool->queueFront].arg;
		pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;//移动队头
		pool->queueSize--;
		//通知生产者添加任务
		pthread_cond_signal(&pool->notFull);
		pthread_mutex_unlock(&pool->mutexPool);

		/* 2.设置pool的busyNum+1 */
		pthread_mutex_lock(&pool->mutexBusy);
		pool->busyNum++;
		pthread_mutex_unlock(&pool->mutexBusy);

		/* 3.执行取出的任务 */
		printf("thread %ld start working ...\n", pthread_self());
		task.func(task.arg);
		free(task.arg);
		task.arg = NULL;
		printf("thread %ld end working ...\n", pthread_self());

		/* 4.设置pool的busyNum-1 */
		pthread_mutex_lock(&pool->mutexBusy);
		pool->busyNum--;
		pthread_mutex_unlock(&pool->mutexBusy);
	}
	return NULL;
}


/***************************************************************
 * 函  数: manager
 * 功  能: 管理者线程的执行函数
 * 参  数: arg---实参传入,这里传入的是线程池
 * 返回值: 空指针
 **************************************************************/
void* manager(void* arg)
{
	ThreadPool* pool = (ThreadPool*)arg;
	while (!pool->shutdown) {
		/* 每隔3秒检测一次 */
		sleep(3);

		/* 获取pool中相关变量 */
		pthread_mutex_lock(&pool->mutexPool);
		int taskNum = pool->queueSize;	//任务队列中的任务数量
		int liveNum = pool->liveNum;	//存活的工作线程数量
		int busyNum = pool->busyNum;	//忙碌的工作线程数量
		pthread_mutex_unlock(&pool->mutexPool);

		/* 功能一:增加工作线程,每次增加NUMBER个 */
		//当任务个数大于存活工作线程数,且存活工作线程数小于最大值
		if (taskNum > liveNum && liveNum < pool->maxNum) {
			pthread_mutex_lock(&pool->mutexPool);
			int counter = 0;
			for (int i = 0; i < pool->maxNum && counter < NUMBER
				&& pool->liveNum < pool->maxNum; ++i)
			{
				if (pool->threadIDs[i] == 0) {
					pthread_create(&pool->threadIDs[i], NULL, worker, pool);
					counter++;
					pool->liveNum++;
				}
			}
			pthread_mutex_unlock(&pool->mutexPool);
		}

		/* 功能二:销毁工作线程,每次销毁NUMBER个 */
		//当忙的线程数*2 < 存活线程数,且存活线程数 > 最小线程数
		if (busyNum * 2 < liveNum && liveNum > pool->minNum) {
			pthread_mutex_lock(&pool->mutexPool);
			pool->exitNum = NUMBER;
			//唤醒NUMBER个工作线程,让其解除阻塞,在worker函数中自杀
			for (int i = 0; i < NUMBER; ++i) {
				pthread_cond_signal(&pool->notEmpty);
			}
			pthread_mutex_unlock(&pool->mutexPool);
		}
	}
	return NULL;
}



/***************************************************************
 * 函  数: threadExit
 * 功  能: 工作线程退出函数,将工作线程的ID置为0,然后退出
 * 参  数: pool---线程池
 * 返回值: 无
 **************************************************************/
void threadExit(ThreadPool* pool)
{
	//将pool->threadIDs中的ID改为0
	pthread_t tid = pthread_self();
	for (int i = 0; i < pool->maxNum; i++) {
		if (pool->threadIDs[i] == tid) {
			pool->threadIDs[i] = 0;
			printf("threadExit() called, %ld exiting...\n", tid);
			break;
		}
	}
	pthread_exit(NULL);//退出
}

【2】threadpool.h:

#ifndef _THREADPOOL_H
#define _THREADPOOL_H

typedef struct ThreadPool ThreadPool;

//创建线程池并初始化
ThreadPool* threadPoolCreate(int min, int max, int capacity);

//销毁线程池
int threadPoolDestroy(ThreadPool* pool);

//给线程池添加任务
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);

//获取当前忙碌的工作线程的数量
int getThreadPoolBusyNum(ThreadPool* pool);

//获取当前存活的工作线程的数量
int getThreadPoolAliveNum(ThreadPool* pool);

/*********************其它函数**********************/
void* worker(void* arg);//工作线程的执行函数
void* manager(void* arg);//管理者线程的执行函数
void threadExit(ThreadPool* pool);//线程退出函数

#endif

【3】main.c:

#include <stdio.h>
#include "threadpool.h"
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

//任务函数,所有线程都执行此任务
void testFunc(void* arg)
{
	int* num = (int*)arg;
	printf("thread %ld is working, number = %d\n", pthread_self(), *num);
	sleep(1);
}

int main()
{
	//创建线程池: 最少3个工作线程,最多10个,任务队列容量为100
	ThreadPool* pool = threadPoolCreate(3, 10, 100);
	//加入100个任务于任务队列
	for (int i = 0; i < 100; ++i) {
		int* num = (int*)malloc(sizeof(int));
		*num = i + 100;
		threadPoolAdd(pool, testFunc, num);
	}
	//销毁线程池
	sleep(30);//保证任务全部运行完毕
	threadPoolDestroy(pool);
	return 0;
}

【4】运行结果:

......

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1887799.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【FFmpeg】avformat_find_stream_info函数

【FFmpeg】avformat_find_stream_info 1.avformat_find_stream_info1.1 初始化解析器&#xff08;av_parser_init&#xff09;1.2 查找探测解码器&#xff08;find_probe_decoder&#xff09;1.3 尝试打开解码器&#xff08;avcodec_open2&#xff09;1.4 读取帧&#xff08;re…

Redis的使用(二)redis的命令总结

1.概述 这一小节&#xff0c;我们主要来研究一下redis的五大类型的基本使用&#xff0c;数据类型如下&#xff1a; redis我们接下来看一看这八种类型的基本使用。我们可以在redis的官网查询这些命令:Commands | Docs,同时我们也可以用help 数据类型查看命令的帮助文档。 2. 常…

新鲜出炉!恭喜这 5 位同学中选 NebulaGraph 社区 2024 开源之夏项目!

开源之夏是中国科学院软件研究所发起的“开源软件供应链点亮计划”系列暑期活动&#xff0c;旨在鼓励高校学生积极参与开源软件的开发维护&#xff0c;促进优秀开源软件社区的蓬勃发展。活动联合各大开源社区&#xff0c;针对重要开源软件的开发与维护提供项目开发任务&#xf…

【JPCS出版,PSESG 2024,8月16-18】2024年电力系统工程与智能电网国际学术会议

2024年电力系统工程与智能电网国际学术会议(PSESG 2024)于2024年8月16-18日在中国北京隆重召开。 会议旨在为从事“电力系统工程”、“智能电网”、“储能技术”等领域的专家学者、工程技术人员、研发人员提供一个共享科研成果和前沿技术&#xff0c;了解学术发展趋势&#xf…

linux的Top学习

学习文档 https://www.cnblogs.com/liulianzhen99/articles/17638178.html TOP 问题 1&#xff1a;top 输出的利用率信息是如何计算出来的&#xff0c;它精确吗&#xff1f; top 命令访问 /proc/stat 获取各项 cpu 利用率使用值内核调用 stat_open 函数来处理对 /proc/sta…

PMP通过率为什么高?

很多人在初步了解PMP的时候&#xff0c;都会考虑到PMP考试的难度以及通过率&#xff0c;继而在网上查询到很多资料后&#xff0c;都会发现&#xff0c;其实PMP的国内通过率一直都是很高的。 通过率高≠含金量低 看到PMP的通过率这么高&#xff0c;很多人觉得证书的水分很大&a…

鼠标连点器:解放双手的自动化效率神器,鼠标自动快速连点!

日常使用电脑整理工作时&#xff0c;总会做一些重复的工作&#xff0c;比如&#xff1a;刷题、做任务、浏览多张图片、浏览多个文件等。这些操作的工作量在于鼠标左键&#xff0c;需要一直重复的点&#xff0c;略微有些枯燥了。 面对重复且枯燥的工作&#xff0c;我们可以借助第…

Windows系统安装NVM,实现Node.js多版本管理

目录 一、前言 二、NVM简介 三、准备工作 1、卸载Node 2、创建文件夹 四、下载NVM 五、安装NVM 六、使用NVM 1、NVM常用操作命令 2、查看NVM版本信息 3、查看Node.js版本列表&#xff1b; 4、下载指定版本Node.js 5、使用指定版本Node.js 6、查看已安装Node.js列…

快速入门FreeRTOS心得(正点原子学习版)

对于FreeROTS&#xff0c;我第一反应想到的就是通信里的TDM&#xff08;时分多址&#xff09;。不同任务给予分配不同的时间间隔&#xff0c;也就是任务之间在每个timeslot都在来回切换。 这里有重要的一点&#xff0c;就是中断要短小&#xff0c;优先级是自高到底进行打断。 …

如何避免删库跑路?

如何避免删库跑路&#xff0c;这几乎是一个老生常谈的话题&#xff0c;也是大部分上了规模的企业都很关心的话题&#xff0c;京东到家、微盟、链家、思科... 在这些大企业上发生过的删库事件仍然历历在目&#xff0c;无论是否当事人有意为之还是系统 BUG 导致&#xff0c;造成的…

vue-advanced-chat 聊天控件的使用

测试代码&#xff1a;https://github.com/robinfoxnan/vue-advanced-chat-test0 控件源码&#xff1a;https://github.com/advanced-chat/vue-advanced-chat 先上个效果图&#xff1a; 这个控件就是专门为聊天而设计的&#xff0c;但是也有一些不足&#xff1a; 1&#xf…

国际数字影像产业园:汇聚全球力量,共绘影像新蓝图

在数字化浪潮席卷全球的今天&#xff0c;我们自豪地宣布&#xff0c;国际数字影像产业园已正式起航&#xff0c;以全球视野为引领&#xff0c;致力于推动数字影像产业的创新发展&#xff0c;引领全球潮流。 一、汇聚全球智慧 国际数字影像产业园以开放包容的姿态&#xff0c;汇…

MIX OTP——使用 ETS 加速

每次我们需要查找存储容器时&#xff0c;我们都需要向注册表发送一条消息。如果我们的注册表被多个进程同时访问&#xff0c;注册表可能会成为瓶颈&#xff01; 在本章中&#xff0c;我们将了解 ETS&#xff08;Erlang Term Storage&#xff09;以及如何将其用作缓存机制。 警…

【信息系统项目管理师】常见图表

作文里面的画图题用语言描述画图过程 合同 采购综合评分标准 责任分配矩阵 成本预算表 成本估算 成本管理计划 活动清单 活动属性 变更日志 问题日志 项目章程 自己再添加更多内容 甘特图 甘特图包含以下三个含义&#xff1a; 1、以图形或表格的形式显示活动&#xff1b; 2、…

JavaScript中window对象 , location对象以及history对象使用方法详细介绍

2.BOM&#xff08;Browser Object Model&#xff09; 操作浏览器的。常用的浏览器对象&#xff1a; 1.window对象&#xff1a;Window 对象表示浏览器中打开的窗口。 2.location对象&#xff1a;Location 对象包含有关当前 URL 的信息。Location 对象是 window 对象的一部分&…

[PyTorch]:加速Pytorch 模型训练的几种方法(几行代码),最快提升八倍(附实验记录)

本篇文章转自&#xff1a;Some Techniques To Make Your PyTorch Models Train (Much) Faster 本篇博文概述了在不影响 PyTorch 模型准确性的情况下提高其训练性能的技术。为此&#xff0c;将 PyTorch 模型包装在 LightningModule 中&#xff0c;并使用 Trainer 类来实现各种训…

使用 Python 五年后,我发现学 python 必看这三本书!少走一半弯路

第一本 《Python编程-从入门到实践》 适合零基础的读者 豆瓣评分&#xff1a;9.1 推荐指数&#xff1a;5颗星 推荐理由&#xff1a; 本书是针对所有层次的 Python 读者而作的 Python 入门书。全书分为两部分&#xff1a; 第一部分介绍使用Python 编程所必须了解的…

将excel表格转换为element table(上)

最近有个功能需要将excel展示到html 界面里面&#xff0c;看是简单的一个需求也是需要费尽心思才完得成 原始数据 想要把excel 读取出来&#xff0c;于是使用xlsl的插件 npm i xlsx通过插件可以获取到已经分析好的数据 然后使用sheet_to_html将数据转换为html 再使用v-htm…

ROS2 RQT

1. RQT是什么 RQT是一个GUI框架&#xff0c;通过插件的方式实现了各种各样的界面工具。 强行解读下&#xff1a;RQT就像插座&#xff0c;任何电器只要符合插座的型号就可以插上去工作。 2.选择插件 这里我们可以选择现有的几个RQT插件来试一试&#xff0c;可以看到和话题、参…

视频太大怎么压缩变小?6款视频压缩软件免费版分享

视频太大怎么压缩得又小又清晰呢&#xff1f;无论是视频文件传输、视频文件存储&#xff0c;还是进行自媒体视频上传&#xff0c;都对视频文件的大小有一定的限制。高质量的视频文件往往伴随着文件占据大量存储空间&#xff0c;导致文件传输速度变慢。今天教大家6种视频压缩软件…