Linux 下 C语言版本的线程池

news2025/1/15 16:40:41

目录

1. 线程池引入

2. 线程池介绍

3. 线程池的组成

4. 任务队列

5. 线程池定义

6. 头文件声明

7. 函数实现

8. 测试代码


1. 线程池引入

        我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。

        那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务呢?

2. 线程池介绍

        线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件), 则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

3. 线程池的组成

        线程池的组成主要分为 3 个部分,这三部分配合工作就可以得到一个完整的线程池:

任务队列:存储需要处理的任务,由工作的线程来处理这些任务

        通过线程池提供的 API 函数,将一个待处理的任务添加到任务队列或者从任务队列中删除已处理的任务会被从任务队列中删除线程池的使用者,也就是调用线程池函数往任务队列中添加任务的线程就是生产者线程

工作的线程(任务队列任务的消费者) ,N个:

        线程池中维护了一定数量的工作线程他们的作用是是不停的读任务队列从里边取出任务并处理工作的线程相当于是任务队列的消费者角色, 如果任务队列为,工作的线程将会被阻塞 (使用条件变量 / 信号量阻塞)如果阻塞之后有了新的任务,由生产者将阻塞解除,工作线程开始工作

管理者线程(不处理任务队列中的任务),1个

        它的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测当任务过多的时候,可以适当的创建一些新的工作线程当任务过少的时候,可以适当的销毁一些工作的线程

4. 任务队列

// 任务结构体
typedef struct Task
{
	void(*function)(void* arg);//函数指针
	void* arg;// 函数参数
}Task;

5. 线程池定义

// 线程池结构体
struct ThreadPool
{
	//任务队列
	Task* taskQ; // 任务队列,数组,所以定义指针
	int queueCapacity;	//容量
	int queueSize;		//当前任务个数
	int queueFront;		//队头 -> 取数据
	int queueRear;		//队尾 -> 放数据
	
	pthread_t managerID;	// 管理者线程ID
	pthread_t *threadIDs;	// 工作的线程ID
	int minNum;				// 最少线程数
	int maxNum;				// 最大线程数
	int busyNum;			// 在忙中的线程数
	int liveNum;			// 存活线程数
	int exitNum;			// 需要杀死的线程数
 
	pthread_mutex_t mutexPool;	// 锁整个线程池
	pthread_mutex_t muteBusy;	// 锁busyNum变量
	pthread_cond_t	notFull;	// 任务队列是不是满了
	pthread_cond_t	notEmpty;	// 任务队列是不是空了
 
 
	int shutDown;		// 是不是需要销毁线程池,销毁为1,不销毁为0
};

6. 头文件声明

#ifndef __THREADPOOL_H
#define	__THREADPOOL_H
 
typedef struct ThreadPool ThreadPool; // 声明一下,说明该结构体在其他方定义了
 
// 创建线程池并初始化
ThreadPool* threadPoolCreate(int min, int max, int queueSize);
 
// 销毁线程池
int threadPoolDestroy(ThreadPool* pool);
 
// 给线程池添加任务
void threadPoolAdd(ThreadPool* pool, void(*func)(void*),void* arg);
 
 
// 获取线程池中工作的线程的个数
int threadPoolBusyNum(ThreadPool* pool);
 
// 获取线程池中活着的线程个数
int threadPoolAliveNum(ThreadPool* pool);
 
/
void* worker(void* arg);
void* manager(void* arg);
void threadExit(ThreadPool* pool);
 
 
#endif // !__THREADPOOL_H
 

7. 函数实现

ThreadPool* threadPoolCreate(int min, int max, int queueSize)
{
	ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
	//printf("threadPoolCreate start creating  \n");
	do
	{
		if (pool == NULL)
		{
			printf("malloc  threadpool fail ...\n");
			break;
		}
 
		pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t)*max); // 根据最大的线程池设置进行创建空间
		if (pool->threadIDs == NULL)
		{
			break;
		}
		memset(pool->threadIDs, 0, sizeof(pthread_t)*max);
		pool->minNum = min;
		pool->maxNum = max;
		pool->busyNum = 0;
		pool->liveNum = min;
		pool->exitNum = 0;
 
		if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
			pthread_mutex_init(&pool->muteBusy, NULL) != 0 ||
			pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
			pthread_cond_init(&pool->notFull, NULL) != 0)
		{
			printf("mutex or cond init error\n");
			break;
		}
 
		// 任务队列
		pool->taskQ = (Task*)malloc(sizeof(Task)*queueSize);
		pool->queueCapacity = queueSize;
		pool->queueSize = 0;
		pool->queueFront = 0;
		pool->queueRear = 0;
		pool->shutDown = 0;
 
		// 创建线程
		pthread_create(&pool->managerID, NULL, manager, pool);// 管理线程
		for (int i = 0; i < min; ++i)
		{
			//printf("creating is %d\n", i);
			pthread_create(&pool->threadIDs[i], NULL, worker, pool); // 工作线程
		}
		printf("threadPoolCreate create sucess  \n");
		return pool;
	} while (0);
 
	//是否资源
	if (pool&&pool->threadIDs) free(pool->threadIDs);
	if (pool&&pool->taskQ) free(pool->taskQ);
	if (pool) free(pool);
 
 
	return NULL;
}
 
 
void* worker(void* arg)
{
	ThreadPool* pool = (ThreadPool*)arg;//参数进行类型转换
	while (1)
	{
		printf("worker thread %d\n",pthread_self());
		pthread_mutex_lock(&pool->mutexPool);
		// 判断当前任务队列是否为空,且线程没有关闭
		while (pool->queueSize == 0 && !pool->shutDown)
		{
			printf("worker %d waiting \n", pthread_self());
			// 需要阻塞线程,锁和条件变量进行绑定,后面通过条件变量对其进行唤醒
			pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
			
			//判断是不是需要销毁线程
			if (pool->exitNum>0)
			{
				pool->exitNum--;
				if (pool->liveNum > pool->minNum)
				{
					pool->liveNum--;
					pthread_mutex_unlock(&pool->mutexPool);
					threadExit(pool);
				}
				
			}
		}
		// 如果有任务,则继续向下运行,开始消费任务了
		// 判断线程池是否关闭了
		if (pool->shutDown)
		{
			pthread_mutex_unlock(&pool->mutexPool);
			threadExit(pool);
		}
		// 从任务队列取出任务
		Task task;
		task.function = pool->taskQ[pool->queueFront].function; // 取出任务
		task.arg = pool->taskQ[pool->queueFront].arg;
		
 
		// 任务队列应该设计成环形的队列,只需要确定头部和尾部即可,添加任务进行覆盖即可
		// 移动头结点
		pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;//实现环形获取
		pool->queueSize--;
 
		// 解锁
		pthread_cond_signal(&pool->notFull);
		pthread_mutex_unlock(&pool->mutexPool);
 
		// 当前子线程开始工作了,因此忙的状态需要标定
		printf("thread %ld start working...\n",pthread_self());
		pthread_mutex_lock(&pool->muteBusy);
		pool->busyNum++;
		pthread_mutex_unlock(&pool->muteBusy);
 
		// 开始工作
		task.function(task.arg);
		free(task.arg);
		task.arg = NULL;
 
		// 当前子线程完成工作,因此把忙的状态取消
		printf("thread %ld end working...\n", pthread_self());
		pthread_mutex_lock(&pool->muteBusy);
		pool->busyNum--;
		pthread_mutex_unlock(&pool->muteBusy);
	}
	return NULL;
}
 
 
void* manager(void* arg)
{
	ThreadPool* pool = (ThreadPool*)arg;//参数进行类型转换
	while (!pool->shutDown)
	{
		// 每隔3s检测一次
		sleep(3);
 
		//取出线程池中任务的数量和当前线程的数量
		pthread_mutex_lock(&pool->mutexPool);
		int queueSize = pool->queueSize;
		int liveNum = pool->liveNum;
		pthread_mutex_unlock(&pool->mutexPool);
 
		// 取出忙的线程的数量
		pthread_mutex_lock(&pool->muteBusy);
		int busyNum = pool->busyNum;
		pthread_mutex_unlock(&pool->muteBusy);
 
		// 添加线程,需要制定一个添加的规则,可以根据实际情况进行设计即可
		// 任务的个数>存活的线程个数 && 存活的线程个数< 最大线程数
		if (queueSize > liveNum && liveNum < pool->maxNum)
		{
			pthread_mutex_lock(&pool->mutexPool);
			int counter = 0;
			for (int i = 0; i < pool->maxNum && counter < NUMBER && pool->liveNum < pool->maxNum; ++i)
			{
				if (pool->threadIDs[i] == 0)
				{
					pthread_create(&pool->threadIDs[i], NULL, worker, pool);
					counter++;
					pool->liveNum++;
				}
			}
			pthread_mutex_unlock(&pool->mutexPool);
		}
 
		// 销毁线程
		// 忙的线程*2 < 存活的线程数 && 存活的线程数>最小线程数
		if (busyNum * 2 < liveNum && liveNum > pool->minNum)
		{
			pthread_mutex_lock(&pool->mutexPool);
			pool->exitNum = NUMBER;
			pthread_mutex_unlock(&pool->mutexPool);
 
			// 让空闲的工作线程自杀
			for (int i = 0; i < NUMBER; i++)
			{
				pthread_cond_signal(&pool->notEmpty);
			}
		}
 
	}
 
	return NULL;
}
 
void threadExit(ThreadPool* pool)
{
	pthread_t tid = pthread_self();
	for (int i = 0; i < pool->maxNum; i++)
	{
		if (pool->threadIDs[i] == tid)
		{
			pool->threadIDs[i] = 0;
			printf("threadExit() called, %ld, exiting ...\n",tid);
			break;
		}
	}
	pthread_exit(NULL);
}
 
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{
	pthread_mutex_lock(&pool->mutexPool);
	while (pool->queueSize == pool->queueCapacity && !pool->shutDown)
	{
		// 阻塞生产者线程
		printf("pthread_cond_wait(&pool->notFull, &pool->mutexPool \n");
		pthread_cond_wait(&pool->notFull, &pool->mutexPool);
	}
 
	// 判断线程池是否被关闭
	if (pool->shutDown)
	{
		pthread_mutex_unlock(&pool->mutexPool);
		return;
	}
	// 添加任务
	pool->taskQ[pool->queueRear].function = func;
	pool->taskQ[pool->queueRear].arg = arg;
	pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
	pool->queueSize++;
 
	pthread_cond_signal(&pool->notEmpty);
	pthread_mutex_unlock(&pool->mutexPool);
 
	//printf("threadPoolAdd sucess \n");
}
 
 
int threadPoolBusyNum(ThreadPool* pool)
{
	pthread_mutex_lock(&pool->muteBusy);
	int busyNum = pool->busyNum;
	pthread_mutex_unlock(&pool->muteBusy);
	return busyNum;
}
 
int threadPoolAliveNum(ThreadPool* pool)
{
	pthread_mutex_lock(&pool->mutexPool);
	int aliveNum = pool->liveNum;
	pthread_mutex_unlock(&pool->mutexPool);
	return aliveNum;
}
 
int threadPoolDestroy(ThreadPool* pool)
{
	if (pool == NULL)
	{
		return -1;
	}
	//关闭线程池
	pool->shutDown = 1; 
	// 阻塞回收管理者线程
	pthread_join(pool->managerID, NULL);
	// 唤醒阻塞的消费者线程
	for (int i = 0; i < pool->liveNum; i++)
	{
		pthread_cond_signal(&pool->notEmpty);
	}
	// 释放堆内存
	if (pool->taskQ)
	{
		free(pool->taskQ);
	}
	if (pool->threadIDs)
	{
		free(pool->threadIDs);
	}
	
	pthread_mutex_destroy(&pool->muteBusy);
	pthread_mutex_destroy(&pool->mutexPool);
	pthread_cond_destroy(&pool->notEmpty);
	pthread_cond_destroy(&pool->notFull);
	free(pool);
	pool = NULL;
 
}

8. 测试代码

#include <stdio.h>
#include"ThreadPool.h"
#include<pthread.h>
#include<unistd.h>
#include<stdlib.h>
 
 
void taskFunc(void* arg)
{
	int num = *(int*)arg;
	printf("thread %ld is working, number = %d\n",pthread_self(), num);
	sleep(1);
}
 
int main()
{
    printf("hello from ThreadPool!\n");
	//创建线程池
	ThreadPool* pool = threadPoolCreate(3, 10, 100);
	for (int i = 0; i < 100; i++)
	{
		
		int* num = (int*)malloc(sizeof(int));
		*num = i + 100;
		//printf("*num = %d\n", *num);
		threadPoolAdd(pool, taskFunc, num);
	}
	sleep(30);
	threadPoolDestroy(pool);
 
    return 0;
}

这篇文章来源linux下c语言版线程池_linux c 线程池_zsffuture的博客-CSDN博客,我是根据视频讲解写的代码,测试通过,感觉代码结构设计很好,怕后面找不到,因此搬运过来参考

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/983752.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

壁炉在文学和艺术中的代表着什么呢,能给我们带来什么样的影响?

壁炉&#xff0c;不仅仅是家庭温暖的来源&#xff0c;也是文学和艺术中常见的重要元素。它的形象在文学作品、绘画和电影中频繁出现&#xff0c;不仅为故事情节提供了背景&#xff0c;还象征着情感、温馨和安全感。让我们一起深入探讨壁炉在文学和艺术中的形象&#xff0c;以及…

Android相机调用-CameraX【外接摄像头】【USB摄像头】

Android相机调用有原生的Camera和Camera2&#xff0c;我觉得调用代码都太复杂了&#xff0c;CameraX调用代码简洁很多。 说明文档&#xff1a;https://developer.android.com/jetpack/androidx/releases/camera?hlzh-cn 现有查到的调用资料都不够新&#xff0c;对于外接摄像…

RecyclerView的item布局预览显示是一行两块 运行后显示了一行一块,怎么回事

QQ有人问我了&#xff1a; 没有起作用&#xff1f;有 解决问题&#xff1a;在item布局上第一个外层宽度、高度分别为match_parent和wrap_content&#xff0c;第二个外层宽度和高度都为match_parent&#xff0c;运行后可以显示一行两块 其他学习资料&#xff1a; 1、付费专栏…

mybatis 数据库字段加密

目录 1、引入依赖 2、添加配置 3、指定加密字段 4、测试&#xff0c;效果 1、引入依赖 <dependency><groupId>io.github.whitedg</groupId><artifactId>mybatis-crypto-spring-boot-starter</artifactId><version>1.2.3</version>…

C#文件拷贝工具

目录 工具介绍 工具背景 4个文件介绍 CopyTheSpecifiedSuffixFiles.exe.config DataSave.txt 拷贝的存储方式 文件夹介绍 源文件夹 目标文件夹 结果 使用 *.mp4 使用 *.* 重名时坚持拷贝 可能的报错 C#代码如下 Form1.cs Form1.cs设计 APP.config Program.c…

微信小程序踩坑记录

1、原生微信小程序开发&#xff0c;切换版本后一直报错&#xff0c;切回去还是报错&#xff0c;后来排查发现是编辑器未更新的问题 2、本地开发静态时&#xff0c;要把这个勾选&#xff0c;否则每次提交代码都会冲突 持续记录踩坑。。。

果粉崩溃!Icloud又要涨价?我来教你使用群晖生态将本地SSD“上云“!

文章目录 前言本教程解决的问题是&#xff1a;按照本教程方法操作后&#xff0c;达到的效果是想使用群晖生态软件&#xff0c;就必须要在服务端安装群晖系统&#xff0c;具体如何安装群晖虚拟机请参考&#xff1a; 1. 安装并配置synology drive1.1 安装群辉drive套件1.2 在局域…

Android离线文字识别-tesseract4android调用

Android在线文字识别可以调阿里云的接口Android文字识别-阿里云OCR调用__花花的博客-CSDN博客 需要离线文字识别的话&#xff0c;可以调tesseract4android。个人测试效果不是特别理想&#xff0c;但是速度真的很快&#xff0c;VIVO S10后摄照片&#xff0c;80ms内识别完成。现…

手写Spring:第13章-把AOP扩展到Bean的生命周期

文章目录 一、目标&#xff1a;把AOP扩展到Bean的生命周期二、设计&#xff1a;把AOP扩展到Bean的生命周期三、实现&#xff1a;把AOP扩展到Bean的生命周期3.1 工程结构3.2 AOP动态代理融入Bean的生命周期类图3.3 定义Advice拦截器链3.3.1 定义拦截器链接口3.3.2 方法拦截器链接…

算法笔记:堆

【如无特别说明&#xff0c;皆为最小二叉堆】 1 介绍 2 特性 结构性&#xff1a;符合完全二叉树的结构有序性&#xff1a;满足父节点小于子节点&#xff08;最小化堆&#xff09;或父节点大于子节点&#xff08;最大化堆&#xff09; 3 二叉堆的存储 顺序存储 二叉堆的有序…

问道管理:a股交易时间和规则?

A股是指中国境内流通的人民币普通股票&#xff0c;是国内投资者最为熟悉和广泛的投资工具之一。作为投资者&#xff0c;了解A股的买卖时刻和规矩是非常重要的。本文从买卖时刻、买卖规矩、买卖方式等多个视点来分析A股买卖时刻和规矩&#xff0c;期望对读者有所协助。 A股买卖…

C#事件event

事件模型的5个组成部分 事件拥有者&#xff08;event source&#xff09;&#xff08;类对象&#xff09;&#xff08;有些书将其称为事件发布者&#xff09; 事件成员&#xff08;event&#xff09;&#xff08;事件拥有者的成员&#xff09;&#xff08;事件成员就是事件本身…

2023年03月 C/C++(八级)真题解析#中国电子学会#全国青少年软件编程等级考试

C/C编程&#xff08;1~8级&#xff09;全部真题・点这里 第1题&#xff1a;最短路径问题 平面上有n个点&#xff08;n<100&#xff09;&#xff0c;每个点的坐标均在-10000~10000之间。其中的一些点之间有连线。 若有连线&#xff0c;则表示可从一个点到达另一个点&#xff…

C++——STL容器【map和set】

文档&#xff1a;map、set 文章目录 &#x1f36f;1. 关联式容器&#x1fad6;2. set&#x1f37c;1. 模板参数&#x1f37c;2. 构造函数&#x1f37c;3. 修改&#x1f37c;4.操作&#x1f95b;find&#x1f95b;count&#x1f95b;lower_bound & upper_bound & equal_…

Increment Selection 插件

Increment Selection 插件实现递增 初次使用 按下快捷键 Alt Shift 鼠标左键向下拖拽 向下拖拽之后&#xff0c;在输入一个数字&#xff0c;比如我这里输入了一个数字1 然后按下快捷键 Ctrl Shift ← 进行选中数字 然后按下快捷键 Ctrl Alt i 建自动递增。 然后鼠标随…

网络是如何进行通信

网络是如何进行通信的 简介 在现代社会中&#xff0c;网络已经成为我们生活中不可或缺的一部分。从上网搜索信息、在线购物到远程工作和社交媒体&#xff0c;我们几乎无时无刻不与网络保持着联系。但是&#xff0c;网络究竟是个什么玩意&#xff0c;它是如何工作的呢&#xf…

MinIO集群模式信息泄露漏洞(CVE-2023-28432)

前言&#xff1a;MinIO是一个用Golang开发的基于Apache License v2.0开源协议的对象存储服务。虽然轻量&#xff0c;却拥有着不错的性能。它兼容亚马逊S3云存储服务接口&#xff0c;非常适合于存储大容量非结构化的数据。该漏洞会在前台泄露用户的账户和密码。 0x00 环境配置 …

数字信封技术概论

数字信封技术是一种通过加密手段实现信息保密性和验证的技术&#xff0c;它在保护敏感信息传输过程中得到了广泛应用。本文将详细介绍数字信封技术的原理、实现和应用场景。 一、数字信封技术的原理 数字信封技术是一种将对称密钥通过非对称加密手段分发的方法。在数字信封中…

《C++设计模式》——行为型

前言 行为型模式是对在不同的对象之间划分责任和算法的抽象化。行为型模式不仅仅关注类和对象的结构&#xff0c;而且重点关注它们之间的相互作用。 Interpreter(解释器) Template Method(模板方法) Chain of Responsibility(责任链) Command(命令) Iterator(迭代器) Me…

海康NVR(Network Video Recorder)启用SSH过程摸索

文章目录 海康NVR具备的特点启用SSH模式优劣比较启用SSH模式的优势启用SSH模式的坏处 Hik NVR启用SSH功能1&#xff0c;Web登录NVR2&#xff0c;SSH登录NVR SSH shell模式特点SSH shell模式指令作用1&#xff0c;简要帮助“help”可以列出常用的shell指令部分可用shell指令输出…