线程池设计与实现C

news2024/11/22 18:24:22

线程池实现

结构设计

先上图:

20221225191932

参数

  1. 线程池:
  • 包含一个执行队列、
  • 一个任务队列
  • mutex用来在多个线程取任务时锁任务队列,cond用来在任务队列为空时锁任务队列
    • 如线程A锁了任务队列,去取任务时,又发现任务队列为空,线程A会先释放锁,然后重新加锁,等待有新的任务加入任务队列,唤醒自己
  1. 执行队列:
  • 一般一个线程对应一个执行者,创建线程池时,没创建一个线程,就绑定一个执行者,因此其节点需要线程id
  • 取任务时,需要通过线程池结构体取任务队列,因此需要线程池对象
  • 在线程池回收时,所有线程和锁都需要释放,在释放的时候执行者不能再去获取新锁,因此需要停止
  1. 任务队列:
    取一个任务,也就是获取一个任务节点后,需要知道执行的函数地址参数

API

创建/关闭线程池、添加任务、线程入口函数(获取并执行任务)

  • 创建线程池:就是创建多个线程,以及初始化其结构体的参数
    • mutex和cond的静态初始化
    • 线程和执行队列的创建
  • 添加任务:添加任务并唤醒等待的线程
  • 线程入口函数:锁队列,如果队列为空就锁等待,取到任务后执行函数
  • 关闭线程池:所有执行者都停止,唤醒所有条件变量以释放锁,线程池、执行队列、任务队列对象置为NULL

代码实现

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>

// Tip:宏定义只对一行内有效,所以用 \ 吧回车转义了
// 而且宏以内联的方式编译进可执行文件速度快
#define LL_ADD(item, list) do { 	\
	item->prev = NULL;				\
	item->next = list;				\
    if (list!=NULL) list->prev=item; \
	list = item;					\
} while(0)

#define LL_REMOVE(item, list) do {						\
	if (item->prev != NULL) item->prev->next = item->next;	\
	if (item->next != NULL) item->next->prev = item->prev;	\
	if (list == item) list = item->next;					\
	item->prev = item->next = NULL;							\
} while(0)

// 任务执行队列
typedef struct NWORKER {
	pthread_t thread;  // 一个执行者一个线程,线程号
	int terminate;    // 相当于让这个工作者休息,抢不到任务队列的锁
	struct NWORKQUEUE *workqueue;   // 线程池,为了方便取任务
	struct NWORKER *prev;       // 任务链表
	struct NWORKER *next;
} nWorker;

// 任务队列
typedef struct NJOB {
    // Tip:函数指针,换成void (*job_function)(void *arg)更灵活
	void (*job_function)(struct NJOB *job);
	void *user_data;       // 参数
	struct NJOB *prev;
	struct NJOB *next;
} nJob;

// 线程池
typedef struct NWORKQUEUE {
	struct NWORKER *workers;
	struct NJOB *waiting_jobs;
	pthread_mutex_t jobs_mtx;         // 互斥锁,抢到锁的线程才去任务队列取数据
	pthread_cond_t jobs_cond;      // 条件等待
} nWorkQueue;

typedef nWorkQueue nThreadPool;

// 线程入口函数:对任务队列进行加锁,如果任务队列为空则进入条件等待
static void *thread_callback(void *ptr) {
	nWorker *worker = (nWorker*)ptr;

	while (1) {
        // pthread_cond_wait必须配合pthread_mutex_lock使用,否则,可能有多个线程一起等待条件被唤醒
        // 线程A在这里上锁
		pthread_mutex_lock(&worker->workqueue->jobs_mtx);
        // 通过线程池取任务,
        // 任务队列为空就阻塞,等待队列不为空被唤醒
		while (worker->workqueue->waiting_jobs == NULL) {
			if (worker->terminate) break;
            // 条件等待,线程A先解锁又重新加锁,所以线程A最终阻塞在这里,其他线程还阻塞在上面的mutex
			pthread_cond_wait(&worker->workqueue->jobs_cond, &worker->workqueue->jobs_mtx);
		}

		if (worker->terminate) {
			pthread_mutex_unlock(&worker->workqueue->jobs_mtx);
			break;
		}
		
		nJob *job = worker->workqueue->waiting_jobs;
		if (job != NULL) {
			LL_REMOVE(job, worker->workqueue->waiting_jobs);
		}
		
		pthread_mutex_unlock(&worker->workqueue->jobs_mtx);

		if (job == NULL) continue;

		job->job_function(job);
	}

	free(worker);
	pthread_exit(NULL);
}


// 创建线程池:初始化线程池参数、为每个线程构建一个执行者
int ntyThreadPoolCreate(nThreadPool *workqueue, int num_thread) {

	if (num_thread < 1) num_thread = 1;       // 线程数量初始化
	memset(workqueue, 0, sizeof(nThreadPool));  // 线程池初始化为0,防止有脏数据
	
    // 条件变量和互斥量静态初始化;  动态方式调用pthread_cond_init()函数
	pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;     
	memcpy(&workqueue->jobs_cond, &blank_cond, sizeof(workqueue->jobs_cond));
	
	pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
	memcpy(&workqueue->jobs_mtx, &blank_mutex, sizeof(workqueue->jobs_mtx));

    // 一个线程对应一个执行者
	int i = 0;
	for (i = 0;i < num_thread;i ++) {
		nWorker *worker = (nWorker*)malloc(sizeof(nWorker));
		if (worker == NULL) {
			perror("malloc");
			return i;         // 返回线程创建成功的数量
		}

		memset(worker, 0, sizeof(nWorker));
		worker->workqueue = workqueue;
        // 线程号、线程属性、入口函数、函数参数
		int ret = pthread_create(&worker->thread, NULL, thread_callback, (void *)worker);
		if (ret) {
			perror("pthread_create");
			free(worker);
			return i;
		}
        // 创建线程的同时,将worker加入执行队列
		LL_ADD(worker, worker->workqueue->workers);
	}

	return 0;
}

// 关闭线程池
void ntyThreadPoolShutdown(nThreadPool *workqueue) {
	nWorker *worker = NULL;

	for (worker = workqueue->workers;worker != NULL;worker = worker->next) {
		worker->terminate = 1;    // 所有执行者休息,不要再去锁任务队列
	}

	pthread_mutex_lock(&workqueue->jobs_mtx);

	workqueue->workers = NULL;
	workqueue->waiting_jobs = NULL;

	pthread_cond_broadcast(&workqueue->jobs_cond);  // 激活所有条件变量

	pthread_mutex_unlock(&workqueue->jobs_mtx);
	
}

// 添加任务
void ntyThreadPoolQueue(nThreadPool *workqueue, nJob *job) {

	pthread_mutex_lock(&workqueue->jobs_mtx);

	LL_ADD(job, workqueue->waiting_jobs);
	
	pthread_cond_signal(&workqueue->jobs_cond);    // 唤醒条件等待线程
	pthread_mutex_unlock(&workqueue->jobs_mtx);
	
}




/************************** debug thread pool **************************/

#if 1

#define ZENGER_MAX_THREAD			80
#define ZENGER_COUNTER_SIZE		1000

void ZENGER_counter(nJob *job) {

	int index = *(int*)job->user_data;

	printf("index : %d, selfid : %lu\n", index, pthread_self());
	free(job->user_data);
	free(job);
}



int main(int argc, char *argv[]) {
	nThreadPool pool;

	ntyThreadPoolCreate(&pool, ZENGER_MAX_THREAD); // 创建线程池
	
	int i = 0;             // 创建任务
	for (i = 0;i < ZENGER_COUNTER_SIZE;i ++) {
		nJob *job = (nJob*)malloc(sizeof(nJob));
		if (job == NULL) {
			perror("malloc");
			exit(1);
		}
		
		job->job_function = ZENGER_counter;
        // Tip:这里不能用int i; 因为for循环一结束,i就被回收了,所以要在堆上申请
		job->user_data = malloc(sizeof(int));   
		*(int*)job->user_data = i;

		ntyThreadPoolQueue(&pool, job);
		
	}

	getchar();
	printf("\n");
}

#endif

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/115289.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【C++求解数学题】大圆圈里面三角形个数相等

本文介绍的问题是一道来自于二年级&#xff08;上&#xff09;数学的练习题。 问题 在下图中画8个Δ\DeltaΔ,使每个大圆圈里都有4个Δ\DeltaΔ. 示例&#xff1a; 每个大圆圈里面均有4个Δ\DeltaΔ. 方法 按照“变量-范围-条件”的三段式穷举法解题框架&#xff0c;对…

分布式系列之聊聊Nginx实现原理

Nginx作为开源的轻量级的HTTP服务器&#xff0c;广泛应用于分布式应用架构中。本文简要介绍了Nginx的特点及使用场景、Nginx的进程模型和请求处理流程&#xff0c;并结合不同场景进行配置&#xff0c;对Nginx的架构和实现原理有个初步的了解。 1、Nginx是什么 Nginx&#xff0…

Echarts之甘特图type: ‘custom‘参数详解

甘特图 const groupData XEUtils.groupBy(data, "eqpName"); //分组后的数据 const yAxisData Object.keys(groupData); const seriesData Object.keys(groupData).map((item, index) > {let arr [];groupData[item].forEach((GItem) > {arr.push([index,f…

Graphviz安装向导及入门指南

目录 1、首先在官网下载graphviz 2、安装。 3、测试并在Windows命令行中使用 4、在Python中使用 5、在自带的gvedit.exe 程序中使用 6、在语雀中使用 7、绘制一棵简单的二叉树 8、详细语法介绍 8.1 带标签 8.2 修改方框颜色和形状 8.3子视图 8.4 结构视图 8.5 …

【网络安全】Centos7安装杀毒软件----ClamAV

一、ClamAV介绍 Clam AntiVirus是一个Linux系统上使用的反病毒软件包。主要应用于邮件服务器&#xff0c;采用多线程后台操作&#xff0c;可以自动升级病毒库。 二、安装 1.下载rpm wget https://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm 2.升级epe…

4.1、网络层概述

1、主要任务 网络层的主要任务是实现网络互连\color{red}实现网络互连实现网络互连&#xff0c;进而实现数据包在各网路之间的传输\color{red}实现数据包在各网路之间的传输实现数据包在各网路之间的传输 例如&#xff1a; 这些异构型网络若只是需要各自内部通信&#xff0c…

高质量发展指标构建:全国各省高质量发展需求(2014-2021年)

高质量发展是坚持更高层次和更高水平对外开放的发展。中国改革开放四十年的实践充分证明&#xff0c;不断扩大对外开放是推动中国经济社会发展的重要动力&#xff0c;是实现国家繁荣富强的根本出路。因此&#xff0c;在中国经济发展的新时代&#xff0c;推动新一轮高水平开放&a…

docker logs实时查看日志tail

docker logs实时查看日志tail docker logs -f -t --since="2017-05-31" --tail=10 container说明: --since : 指定输出日志开始日期。 -f : 查看实时日志 -t : 查看日志产生的时间戳 -tail=10 : 查看最后的10条日志。 container : 容器名docker logs -f --until=2s说…

Docker常用操作命令总结(一)

文章目录一、Docker的应用场景二、Docker 的优点三、Docker 架构四、安装Docker1、更新 apt 包索引2、安装docker3、安装完成之后&#xff0c;运行命令sudo docker info&#xff0c;检查安装状态4、有可能&#xff0c;第一次需要手动启动服务.就需要执行下面的命令&#xff0c;…

LabVIEW如何减少下一代测试系统中的硬件过时4

LabVIEW如何减少下一代测试系统中的硬件过时4 DSSP Class Definition DSSP父类定义有三种不同类型的函数:仅父类、公共类和基于度量的函数。DSSP父类&#xff0c;DSSP.Lvclass包含所有子类函数的超集&#xff0c;加上父类特有的一些函数。DSSP父类的单个子实例(例如AgSigGen.…

2022年总结(2022年1月1日至2022年12月25日)

前言 时光飞逝&#xff0c;又到了一年一度的年终总结的时间了&#xff0c;2022年充满磨难的一年&#xff0c;悲哉&#xff0c;痛哉~~ 但对于我而言&#xff0c;其实还好&#xff0c;基本无太大影响&#xff0c;黄金单身汉&#xff0c;一人吃饱&#xff0c;全家不饿~&#xff…

spring之手写框架

文章目录前言一、手写spring框架之核心接口实现二、手写spring框架之实例化Bean三、手写spring框架之获取所有set方法四、手写spring框架之给属性赋值4.1 非简单类型属性赋值4.2 简单类型属性赋值附&#xff1a;前言 Spring IoC容器的实现原理&#xff1a;工厂模式解析XML反射…

学习性能所必须的知识之算法

什么是算法? 通过有效地缩小查找范围,只需要很少的次数就能很快速的找到需要的数字,这样的策略或方法就称为“算法”。 算法的好坏对性能有很大的影响。 学习算法的窍门 掌握算法优点与缺陷,“折中”是一个很重要的思维通过在图上推演来思考评价算法的指标 通过复杂度(…

各种型号西门子PLC所支持的通信协议小结

西门子PLC有4大类&#xff0c;几十个型号类型&#xff0c;PLC不同所支持的通讯协议也不相同。 按照大类型来划分&#xff0c;具体可分为串口协议和以太网通信协议两大类。 串口协议主要有&#xff1a;MODBUS RTU 通信协议&#xff1b;PROFIBUS 通信协议&#xff1b;USS通信协…

疫情信息管理系统(附源代码及数据库)

本系统是一个可以对各种疫情进行管理的系统&#xff0c;管理员可以直接对居民、住户进行统一的管理&#xff0c;这样就能在疫情期间大大减轻了管理者的工作量&#xff0c;使管理社区的渠道更加的方便。其主要功能有&#xff1a;登录功能&#xff0c;公告的发布&#xff0c;到访…

2022, 6年技术路, 后疫情时代复盘

专注 聚焦 持续复盘写下你一年的希望...又到了每年一度的复盘时间。转眼一想, 做技术已经 6 年了。说实话&#xff0c;有点疲惫了。今年整个互联网行业都不好过, 加上疫情的反复不断, 从耳边流出了很多裁员的信息, 股市也比较低迷, 身处底层的我们只能夹缝生存。但是, 我又是…

【MySQL基础教程】DQL语句详细介绍

前言 本文为 【MySQL基础教程】DQL语句 相关内容介绍&#xff0c;下边具体将对DQL语句基本语法&#xff0c;基础查询&#xff0c;条件查询&#xff0c;聚合函数&#xff0c;分组查询&#xff0c;排序查询&#xff0c;分页查询&#xff0c;相关案例&#xff0c;执行顺序等进行详…

Elasticsearch 核心技术(二):elasticsearch-head 插件安装和使用

❤️ 个人主页&#xff1a;水滴技术 &#x1f680; 支持水滴&#xff1a;点赞&#x1f44d; 收藏⭐ 留言&#x1f4ac; &#x1f338; 订阅专栏&#xff1a;大数据核心技术从入门到精通 文章目录一、安装方式二、下载 head 插件三、安装 head 插件四、运行 head 插件五、使用…

服务器硬件规格常用查看命令——网卡相关命令

lspci 使用lspci命令可以显示系统中的PCI总线和连接到它们的设备信息&#xff0c;在默认情况下&#xff0c;显示一个简短格式的设备列表。但是可以使用“lspci -vvx”或“lspci -vvxxx”显示更加详细的设备信息&#xff0c;在这些信息中包含了PCI设备驱动程序或lspci本身的错误…

GitHub与微信开启“秘密扫描”计划,来确保数据安全

近日GitHub 官方博客更是宣布&#xff1a;" 腾讯微信现在是 GitHub 秘密扫描合作伙伴。" “秘密扫描”是Github发起的一个计划&#xff0c;可别被它名字吓到了&#xff0c;它并不是说秘密的扫描用户的隐私数据&#xff0c;而是和微信合作发起&#xff0c;防止微信开…