11.5 Linux_线程_线程池

news2024/10/5 12:41:18

概述

什么是线程池:

线程池就是线程的集合,里面存放了一系列的线程。

线程池使用于需要大量创建和销毁线程的情况,使用线程池比单个线程的创建和销毁速度要快。

线程池的结构:

任务队列:任务队列是任务的生产者,存储需要处理的任务,工作线程会处理这些任务。

线程池:线程池中存放着工作线程,它是任务队列任务的消费者,等待新任务的信号。 

固定大小线程池的实现:

1、创建线程池的基本结构:任务队列结构体、线程池结构体

2、线程池初始化:创建线程池结构、互斥锁和条件变量初始化、创建n个工作线程

3、线程池添加任务:判断是否有空闲线程、给任务队列添加节点、给工作线程发送信号

4、实现工作线程:等待任务信号、从任务队列中取出任务、执行任务

5、线程池销毁:删除任务队列、互斥锁、条件变量、线程池

线程池的实现

1、结构体实现

1.1 任务队列结构体

任务被抽象为一个函数,队列需要一个指针指向下一个节点。

具体结构体声明如下:

//任务队列
typedef struct Task{
	void* (*func)(void* arg);//任务处理函数
	void* arg;               //任务处理函数的参数
	struct Task* pNext;      //链表指针
}Task;

1.2 线程池结构体

任务是一个临界资源,同一时刻只允许有一个线程处理该任务,所以需要一个互斥量。

任务和工作线程为生产者和消费者的关系,所以需要一个条件变量。

线程池中有多个工作线程,所以需要一个线程TID数组

线程与任务之间需要交互,所以需要当前正在工作线程的个数、当前需要处理的任务

具体结构体声明如下:

//线程池
#define POOL_NUM 5 
typedef struct ThreadPool{
	pthread_mutex_t taskLock;    //互斥锁
	pthread_cond_t newTask;      //条件变量

	pthread_t tid[POOL_NUM];     //线程TID
	Task* queue_head;            //任务队列头部
	Task* queue_tail;    		 //任务队列尾部
	int busyWork;                //当前正在工作的线程数量
}ThreadPool;
ThreadPool* pPool;

2、创建和销毁线程池

2.1 初始化线程池

线程池初始化就是申请线程池空间,并初始化互斥量、条件变量、任务队列、工作线程数量、创建工作线程。

具体代码实现如下:

/*
 * pool_init:线程池初始化
 * @ret -1--err 0--success
 */
int pool_init(void){

	//1.申请空间
	if((pPool = malloc(sizeof(ThreadPool))) == NULL){
		printf("malloc err\n");
		return -1;
	}
	//2.初始化
	pthread_mutex_init(&(pPool->taskLock),NULL); 			//互斥量初始化
	pthread_cond_init(&(pPool->newTask),NULL); 				//条件变量初始化
	pPool->busyWork = 0;									//当前工作的线程=0
	if((pPool->queue_head = malloc(sizeof(Task))) == NULL){ //任务队列初始化
		printf("malloc err\n");
		return -1;
	}
	pPool->queue_tail = pPool->queue_head;
	pPool->queue_head->pNext = NULL;
	for(int i=0;i<POOL_NUM;i++){ 							//创建工作线程
		if(pthread_create(&(pPool->tid[i]),NULL,workThread,NULL) != 0){
			perror("pthread_create");
			return -1;//这里没有做清理工作
		}
	}
	return 0;
}

2.2 销毁线程池 

销毁线程池就是释放创建时申请的空间。销毁顺序为:任务队列、互斥量和条件变量、线程池

具体代码实现如下:

/*
 * pool_destroy:销毁线程池
 * */
void pool_destroy(void){
	Task* pTask = NULL;
	//1.销毁任务队列
	while(pPool->queue_head!=NULL){
		pTask = pPool->queue_head->pNext;
		free(pPool->queue_head);
		pPool->queue_head = pTask;
	}
	//2.销毁互斥锁、条件变量
	pthread_mutex_destroy(&(pPool->taskLock));
	pthread_cond_destroy(&(pPool->newTask));
	//3.销毁线程池
	free(pPool);
}

3、向线程池中添加任务(生产者)

该函数首先是生产者模型:加锁互斥量->生成资源->发送信号->解锁互斥量

除此之外,还需在生产资源之前判断是否存在空闲的线程,如果不存在则需等待一段时间再生产资源,这一步的目的是为了防止生产资源后发送的信号没有被工作线程接收,导致资源的丢失。

具体代码实现如下:

/*
 * pool_add_task:向线程池中添加任务
 * param pfun:任务处理函数
 * param arg:任务处理函数的参数
 * */
void pool_add_task(void*(pfun)(void*),void* arg){
	Task* pTask = NULL;
	//生产者代码框架
	pthread_mutex_lock(&(pPool->taskLock)); 		//上锁互斥量
	//判断当前是否有空闲的工作线程,如果不判断会导致条件变量信号丢失问题
	while(pPool->busyWork > POOL_NUM){
		pthread_mutex_unlock(&(pPool->taskLock));
		usleep(1000);
		pthread_mutex_lock(&(pPool->taskLock));
	}
	if((pTask = malloc(sizeof(Task))) == NULL){ 	//生产资源
		printf("malloc err\n");
		return;
	}
	pTask->func = pfun;
	pTask->arg = arg;
	pTask->pNext = NULL;
	pPool->queue_tail->pNext = pTask;
	pPool->queue_tail = pTask;
	pPool->busyWork++; 								//资源生产结束,代表将有一个线程会工作
	pthread_cond_signal(&(pPool->newTask)); 		//发送信号
	pthread_mutex_unlock(&(pPool->taskLock)); 		//解锁互斥量
}

4、从线程池中处理任务(消费者)

该函数首先是消费者模型:上锁互斥量->等待信号->获取资源->解锁互斥量

除此之外,这里的获取资源指的是将任务结构体获取到,而不是直接处理完成任务。任务的处理与生产者与消费者模型无关,所以可以放在解锁互斥量后面,这一步的目的是为了尽快解锁互斥量,让其他线程可以尽快处理新来的任务。

具体代码实现如下:

/*
 * workThread:工作线程
 * */
void* workThread(void* arg){
	Task* pTask = NULL;
	while(1){
		//消费者代码框架
		pthread_mutex_lock(&(pPool->taskLock)); 	//上锁互斥量
		while(pPool->queue_head->pNext == NULL){    //等待资源
			pthread_cond_wait(&(pPool->newTask),&(pPool->taskLock));
		}
		pTask = pPool->queue_head->pNext; 			//使用资源
		pPool->queue_head->pNext = pTask->pNext;
		if(pPool->queue_head->pNext == NULL){
			pPool->queue_tail = pPool->queue_head;
		}
		pthread_mutex_unlock(&(pPool->taskLock));   //解锁互斥量
		//资源中的任务与消费者无关,因此可以放在互斥量外
		pTask->func(pTask->arg); 					//执行相应任务函数
		free(pTask);
		pPool->busyWork--; 							//当前线程工作完成
	}
}

线程池完整代码

代码功能:

在main函数中不断的生产资源,生产资源后,工作线程会自动的处理资源。

具体代码实现如下:

#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include <errno.h>

//任务队列
typedef struct Task{
	void* (*func)(void* arg);//任务处理函数
	void* arg;               //任务处理函数的参数
	struct Task* pNext;      //链表指针
}Task;
//线程池
#define POOL_NUM 5 
typedef struct ThreadPool{
	pthread_mutex_t taskLock;    //互斥锁
	pthread_cond_t newTask;      //条件变量

	pthread_t tid[POOL_NUM];     //线程TID
	Task* queue_head;            //任务队列头部
	Task* queue_tail;    		 //任务队列尾部
	int busyWork;                //当前正在工作的线程数量
}ThreadPool;
ThreadPool* pPool;

int pool_init(void);
void* workThread(void* arg);
void pool_add_task(void* (pfun)(void*),void* arg);
void* task1(void* arg);
void pool_destroy(void);

int main(){
	
	int tmp = 1;
	pool_init();
	while(1){
		pool_add_task(task1,(void*)tmp);//注意:这里tmp是常变化的值,不能传入地址&tmp
		tmp++;
	}

	return 0;
}
/*
 * pool_destroy:销毁线程池
 * */
void pool_destroy(void){
	Task* pTask = NULL;
	//1.销毁任务队列
	while(pPool->queue_head!=NULL){
		pTask = pPool->queue_head->pNext;
		free(pPool->queue_head);
		pPool->queue_head = pTask;
	}
	//2.销毁互斥锁、条件变量
	pthread_mutex_destroy(&(pPool->taskLock));
	pthread_cond_destroy(&(pPool->newTask));
	//3.销毁线程池
	free(pPool);
}

/*
 * task1:模拟任务
 * */
void* task1(void* arg){
	printf("%ld\n",((long int)arg));
	sleep(1);
	return NULL;
}

/*
 * pool_add_task:向线程池中添加任务
 * param pfun:任务处理函数
 * param arg:任务处理函数的参数
 * */
void pool_add_task(void*(pfun)(void*),void* arg){
	Task* pTask = NULL;
	//生产者代码框架
	pthread_mutex_lock(&(pPool->taskLock)); 		//上锁互斥量
	//判断当前是否有空闲的工作线程,如果不判断会导致条件变量信号丢失问题
	while(pPool->busyWork > POOL_NUM){
		pthread_mutex_unlock(&(pPool->taskLock));
		usleep(1000);
		pthread_mutex_lock(&(pPool->taskLock));
	}
	if((pTask = malloc(sizeof(Task))) == NULL){ 	//生产资源
		printf("malloc err\n");
		return;
	}
	pTask->func = pfun;
	pTask->arg = arg;
	pTask->pNext = NULL;
	pPool->queue_tail->pNext = pTask;
	pPool->queue_tail = pTask;
	pPool->busyWork++; 								//资源生产结束,代表将有一个线程会工作
	pthread_cond_signal(&(pPool->newTask)); 		//发送信号
	pthread_mutex_unlock(&(pPool->taskLock)); 		//解锁互斥量
}

/*
 * workThread:工作线程
 * */
void* workThread(void* arg){
	Task* pTask = NULL;
	while(1){
		//消费者代码框架
		pthread_mutex_lock(&(pPool->taskLock)); 	//上锁互斥量
		while(pPool->queue_head->pNext == NULL){    //等待资源
			pthread_cond_wait(&(pPool->newTask),&(pPool->taskLock));
		}
		pTask = pPool->queue_head->pNext; 			//使用资源
		pPool->queue_head->pNext = pTask->pNext;
		if(pPool->queue_head->pNext == NULL){
			pPool->queue_tail = pPool->queue_head;
		}
		pthread_mutex_unlock(&(pPool->taskLock));   //解锁互斥量
		//资源中的任务与消费者无关,因此可以放在互斥量外
		pTask->func(pTask->arg); 					//执行相应任务函数
		free(pTask);
		pPool->busyWork--; 							//当前线程工作完成
	}
}

/*
 * pool_init:线程池初始化
 * @ret -1--err 0--success
 */
int pool_init(void){

	//1.申请空间
	if((pPool = malloc(sizeof(ThreadPool))) == NULL){
		printf("malloc err\n");
		return -1;
	}
	//2.初始化
	pthread_mutex_init(&(pPool->taskLock),NULL); 			//互斥量初始化
	pthread_cond_init(&(pPool->newTask),NULL); 				//条件变量初始化
	pPool->busyWork = 0;									//当前工作的线程=0
	if((pPool->queue_head = malloc(sizeof(Task))) == NULL){ //任务队列初始化
		printf("malloc err\n");
		return -1;
	}
	pPool->queue_tail = pPool->queue_head;
	pPool->queue_head->pNext = NULL;
	for(int i=0;i<POOL_NUM;i++){ 							//创建工作线程
		if(pthread_create(&(pPool->tid[i]),NULL,workThread,NULL) != 0){
			perror("pthread_create");
			return -1;//这里没有做清理工作
		}
	}
	return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2189876.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【LeetCode: 19. 删除链表的倒数第 N 个结点 | 链表 + 快慢指针】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…

大学生就业桥梁:基于Spring Boot的招聘系统

1系统概述 1.1 研究背景 如今互联网高速发展&#xff0c;网络遍布全球&#xff0c;通过互联网发布的消息能快而方便的传播到世界每个角落&#xff0c;并且互联网上能传播的信息也很广&#xff0c;比如文字、图片、声音、视频等。从而&#xff0c;这种种好处使得互联网成了信息传…

【操作系统】引导(Boot)电脑的奇妙开机过程

&#x1f339;&#x1f60a;&#x1f339;博客主页&#xff1a;【Hello_shuoCSDN博客】 ✨操作系统详见 【操作系统专项】 ✨C语言知识详见&#xff1a;【C语言专项】 目录 什么是操作系统的引导&#xff1f; 操作系统的引导&#xff08;开机过程&#xff09; Windows操作系…

JavaScript-下篇

上篇我们学习了&#xff0c;一些基础语法和函数&#xff0c;现在我们学习下篇&#xff0c;主要包括,对象和事件。而对象又包括&#xff0c;数组Arrays&#xff0c;String字符串&#xff0c;BOM&#xff0c;DOM等 JS对象 Arrays数组 数组是一种特殊的对象&#xff0c;用于存储…

【多线程】多线程(8):单例模式:阻塞队列

【阻塞队列】 阻塞队列是在普通的“先进先出”队列的基础上&#xff0c;做出了扩充&#xff0c;可以组成「生产者消费者模型」 1.线程安全性 标准库是原有的队列Queue和其子类&#xff0c;默认都是“线程不安全的”&#xff0c;而阻塞队列是“线程安全的” 2.具有阻塞特性 …

【pytorch】张量求导3

再接上文,补一下作者未补完的矩阵运算的坑。 首先贴一下原作者的图,将其转化为如下代码: import torch import torch.nn as nn import torch.optim as optim# 定义一个简单的两层神经网络 class TwoLayerNet(nn.Module):def __init__(self):super(TwoLayerNet, self).__in…

Markdown 语法详解大全(超级版)(二)

Markdown 语法详解大全(超级版)&#xff08;二&#xff09; Markdown 语法详解大全(超级版)&#xff08;一&#xff09; Markdown 语法详解大全(超级版)&#xff08;二&#xff09; Markdown 语法详解大全(超级版)&#xff08;三&#xff09; &#xff08;歌词节选&#xff09…

Ubuntu 中 Redis ,MySQL 基本使用

1、Redis &#xff08;1&#xff09;启动Redis 服务端客户端命令 服务端 ps aux | grep redis 查看redis服务器进程 sudo kill -9 pid 杀死redis服务器 sudo redis-server /etc/redis/redis.conf 指定加载的配置文件客户端 连接redis&#xff1a; redis-cli运⾏测试命令&am…

C++结构体定义和创建

// // Created by 徐昌真 on 2024/10/5. // #include <iostream> using namespace std;int main() {//结构体的定义 struct 结构体名字 { 结构体成员名字 }struct Book{string name;double price;int value;}java; //java是创建的结构体//创建结构体//这是第一种方式Boo…

目标检测 DAB-DETR(2022)

文章目录 前言Query是什么&#xff0c;Detr收敛速度慢的原因是什么&#xff1f;改进策略位置先验和模型框架设置温度系数 前言 本文认为原始的Detr系列论文中&#xff1a;可学习的object queries仅仅是给model预测box提供了锚点&#xff08;中心点&#xff09;信息&#xff0c…

SpringBoot环境下古典舞交流平台的快速开发

第三章 系统分析 3.1 可行性分析 需要使用大部分精力开发的古典舞在线交流平台为了充分降低开发风险&#xff0c;特意在开发之前进行可行性分析这个验证系统开发是否可行的步骤。本文就会从技术角度&#xff0c;经济角度&#xff0c;还有操作角度等进行综合阐述。 3.1.1技术可行…

Python案例--三数排序

一、引言 在信息爆炸的时代&#xff0c;我们每天都会接触到大量的数据。无论是工作中的报表、学习中的数据集&#xff0c;还是日常生活中的购物清单&#xff0c;数据的有序性对于提高效率和决策质量都至关重要。排序算法作为数据处理的基础工具&#xff0c;其重要性不言而喻。…

日记学习小迪安全27

感觉复制粘贴没有意思&#xff0c;而且还有点浪费时间&#xff0c;主要是学习&#xff0c;不是复制&#xff0c;那就复制别人的吧 第27关就参考这篇文章吧&#xff0c;以下大部分内容都是参考以下文章&#xff08;侵权删除&#xff09; 第27天&#xff1a;WEB攻防-通用漏洞&a…

芯片录-低压差线性稳压器AZ1084D-ADJE1失效记录与原理分析

背景 最近主板坏了&#xff0c;现象是主机不停重启&#xff0c;USB设备LED灯一会儿亮&#xff0c;一会儿灭。经过一些分析和定位&#xff0c;通过测温发现主板上AZ1084D-ADJE1芯片高达91摄氏度&#xff0c;进一步测量该芯片的输出引脚和接地引脚短路&#xff0c;初步推测某些原…

Shell-使用函数

在 Shell 脚本中&#xff0c;函数是由一段代码定义的&#xff0c;可以被重复调用。Shell 函数的定义和调用相对简单&#xff0c;并且它支持参数传递和返回值。错误处理在 Shell 中也非常重要&#xff0c;通常通过检查返回的状态码来判断是否有错误发生。 1.Shell 函数的定义和…

构建高效招聘流程:Spring Boot大学生就业系统

1系统概述 1.1 研究背景 如今互联网高速发展&#xff0c;网络遍布全球&#xff0c;通过互联网发布的消息能快而方便的传播到世界每个角落&#xff0c;并且互联网上能传播的信息也很广&#xff0c;比如文字、图片、声音、视频等。从而&#xff0c;这种种好处使得互联网成了信息传…

28 基于51单片机的两路电压检测(ADC0808)

目录 一、主要功能 二、硬件资源 三、程序编程 四、实现现象 一、主要功能 基于51单片机&#xff0c;通过ADC0808获取两路电压&#xff0c;通过LCD1602显示 二、硬件资源 基于KEIL5编写C代码&#xff0c;PROTEUS8.15进行仿真&#xff0c;全部资源在页尾&#xff0c;提供…

【中间件学习】Git的命令和企业级开发

一、Git命令 1.1 创建Git本地仓库 仓库是进行版本控制的一个文件目录。我们要想对文件进行版本控制&#xff0c;就必须创建出一个仓库出来。创建一个Git本地仓库对应的命令是 git init &#xff0c;注意命令要在文件目录下执行。 hrxlavm-1lzqn7w2w6:~/gitcode$ pwd /home/hr…

Yocto - 使用Yocto开发嵌入式Linux系统_06 掌握Bitbake工具

Grasping the BitBake Tool 在上一章中&#xff0c;我们了解了元数据、元数据集合概念以及 conf/layer.conf 的重要性。在本章中&#xff0c;我们将更深入地研究元数据&#xff0c;了解配方如何相互依赖&#xff0c;并了解 BitBake 如何处理依赖关系。 In the previous chapter…

Python机器视觉:01- 利用列表和切片操作 - 做一个弧线和图片相交的mask区域

前言&#xff1a; Python的列表处理&#xff0c;在机器视觉中经常被用到&#xff0c;这里结合基本的概念机器视觉实践案例&#xff0c;成文如下&#xff1a; 本身将实现一个&#xff0c;弧线的mask填充&#xff1a;这个mask是我的一个天文项目的应用&#xff0c;目的在于将月…