unix网络编程(四) 线程池并发服务器

news2024/11/19 1:31:01

线程池并发服务器

  • 概念
  • 线程池和任务队列
    • 任务队列
    • 线程池
  • 操作线程池的函数
    • 初始化线程池
    • 销毁线程池
    • 向线程池添加任务
    • 任务的回调函数
  • 测试

概念

线程池是一个抽象概念,可以简单的认为若干线程在一起运行,线程不退出,等待有任务处理。
为什么要有线程池?

  • 以网络编程服务器端为例,作为服务器端支持高并发,可以有多个客户端连接,发出请求,对于多个请求我们每次都去建立线程,这样线程会创建很多,而且线程执行完销毁也会有很大的系统开销,使用上效率很低。
  • 创建线程并非多多益善,所以我们的思路是提前创建好若干个线程,不退出,等待任务的产生,去接收任务处理后等待下一个任务。

在这里插入图片描述

线程池和任务队列

线程池如何实现?需要思考2个问题?

1.假设线程池创建了,线程们如何去协调接收任务并且处理?
2.线程池上的线程如何能够执行不同的请求任务?

上述问题1就很像生产者和消费者模型,客户端对应生产者,服务器端这边的线程池对应消费者,需要借助互斥锁和条件变量来搞定。
问题2解决思路就是利用回调机制,我们同样可以借助结构体的方式,对任务进行封装,比如任务的数据和任务处理回调都封装在结构体上,这样线程池的工作线程拿到任务的同时,也知道该如何执行了。

任务队列

任务包含两个元素:回调函数及其参数,任务的各类属性数据

这里使用编号来标识任务的属性,因此,任务的结构体定义如下:

typedef _Task
{
	unsigned int tasknum;			//任务编号
	void (*task_func)(void *arg);		//回调函数
	void *arg;						//回调函数的参数
}Task;

线程池

线程池的核心部分包括:

  • 任务数组(队列)
  • 线程数组

配套的辅助部分为:

  • 为了互斥访问加互斥锁和条件变量
  • 任务数组的属性:最大任务数、当前任务数、任务出队和入队的下标
  • 线程数组的属性:线程池内线程的个数、当前线程是否开启(标志是否要销毁线程)
typedef _ThreadPoll
{
	Task *tasks;						//任务队列
	unsigned int max_job_num;			//最大任务个数
	unsigned int job_num;				//实际任务个数
	unsigned int job_push;				//入队位置
	unsigned int job_pop;				//出队位置
	
	pthread_t *threads;					//线程池内线程数组
	unsigned int thr_num;				//线程池内线程个数
	int shutdown;						//是否关闭线程池

	pthread_mutex_t pool_lock;			//线程池的锁
	pthread_cond_t empty_task;			//任务队列为空的条件变量
	pthread_cond_t not_empty_task;		//任务队列不为空的条件变量
}ThreadPool;

操作线程池的函数

初始化线程池

void create_threadpool(int thrnum,int maxtasknum);
//创建线程池–thrnum 代表线程个数,maxtasknum 最大任务个数

步骤:
分配任务队列和线程队列的数组空间
循环创建线程
初始化线程池的其它属性变量

//创建线程池并初始化
void create_threadpool(int thrnum, int maxtasknum, ThreadPool *thrPool)
{
	printf("initializing the ThreadPool\n");
	//分配线程数组和任务数组
	thrPool->threads = (pthread_t*)malloc(sizeof(pthread_t)*thrnum);
	thrPool->tasks = (Task*)malloc(sizeof(Task)*maxtasknum);

	//初始化ThreadPool 的属性
	thrPool->max_job_num = maxtasknum;
	thrPool->job_num = 0;
	thrPool->job_pop = 0;
	thrPool->job_push = 0;

	thrPool->thr_num = thrnum;
	thrPool->shutdown = 0;				//是否销毁线程池,1代表销毁
	
	pthread_mutex_init(&thrPool->pool_lock, NULL);
	pthread_cond_init(&thrPool->empty_task, NULL);
	pthread_cond_init(&thrPool->not_empty_task, NULL);

	//创建线程, 设置默认为detach方式运行
	pthread_attr_t attr;
	pthread_attr_init(&attr);
	pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
	for(int i=0; i<thrnum; i++)
	{
		pthread_create(&thrPool->threads[i], &attr, (void*)thrRun, (void*)thrPool);
	}
	printf("ThreadPool created\n");
}

创建线程时需要给出入口函数thrRun,该函数的核心功能是从任务队列取任务执行,执行结束再取任务,循环上述步骤。
注意:

  • 从任务队列取任务前要加互斥锁,取出任务后立即从任务队列中删除,并释放锁
  • 从任务队列将任务拷贝到线程空间,在线程销毁时要释放
  • 线程循环执行,知道shutdown被置为1,即线程的生命周期持续到线程池被销毁
void thrRun(void *arg)
{
	ThreadPool *thrPool = (ThreadPool*)arg;
	unsigned int taskpose = 0;
	Task *task = (Task*)malloc(sizeof(Task));
	//将任务取出,并立即从任务队列删除,并让出互斥锁
	
	while(1)
	{
		//加锁,获取任务
		pthread_mutex_lock(&thrPool->pool_lock);
		
		//任务队列为空并且不是要摧毁线程池,线程阻塞,等待任务到来
		while(thrPool->job_num <=0 && !thrPool->shutdown)
		{
			//阻塞成功就会释放pool_lock, 被唤醒后再加锁
			pthread_cond_wait(&thrPool->not_empty_task, &thrPool->pool_lock);
		}
		if(thrPool->job_num)
		{
			//取任务,修改线程池的任务队列和属性
			taskpose = (thrPool->job_pop++) % thrPool->max_job_num;
			memcpy(task, &thrPool->tasks[taskpose], sizeof(Task));
			task->arg = task;
			thrPool->job_num--;
			thrPool->job_pop = thrPool->job_pop % thrPool->max_job_num;
			//任务队列有空位置,通知任务生产者
			pthread_cond_signal(&thrPool->empty_task);
		}

		//线程池处于销毁状态
		if(thrPool->shutdown)
		{
			//退出线程, 并销毁线程使用的task的空间
			pthread_mutex_unlock(&thrPool->pool_lock);
			free(task);
			pthread_exit(NULL);
		}

		//任务获取成功,释放锁,执行回调函数
		pthread_mutex_unlock(&thrPool->pool_lock);
		task->task_func(task->arg);
	}
}

销毁线程池

核心步骤:设置shutdown,释放内存
注意:

  • 处于等待任务的阻塞队列需要先将其唤醒(诱杀)
  • 子线程申请的资源在shutdown为1时,会释放
  • 主线程需要等待子线程全部结束(设置joinable)
// 销毁线程池
void destroy_threadpool(ThreadPool *pool)
{
	//销毁线程,激活线程的销毁
	pool->shutdown = 1;
	//唤醒所有阻塞在等待任务步骤的线程,通知其自毁
	pthread_cond_broadcast(&pool->not_empty_task);
	
	//设置joinable,等待所有子线程退出
	for(int i=0; i<pool->thr_num; i++)
	{
		pthread_join(pool->threads[i], NULL);
	}

	//释放内存
	pthread_cond_destroy(&pool->not_empty_task);
	pthread_cond_destroy(&pool->empty_task);
	pthread_mutex_destroy(&pool->pool_lock);

	free(pool->tasks);
	free(pool->threads);
}

向线程池添加任务

核心功能:向任务队列添加任务,并通知因任务队列空而阻塞的线程
注意:

  • 队列不空时要阻塞等待队列有空位置的信号(empty_task)
//向线程池添加任务
void addtask(ThreadPool *pool, int *beginnum)
{
	//加锁
	pthread_mutex_lock(&pool->pool_lock);
	
	//等待任务队列有空位置
	while(pool->max_job_num <= pool->job_num)
	{
		pthread_cond_wait(&pool->empty_task, &pool->pool_lock);
	}

	//修改线程池属性
	int taskpose = (pool->job_push++)%pool->max_job_num;
	pool->job_push = pool->job_push % pool->max_job_num;
	(*beginnum) = (*beginnum) % 100000;
	pool->tasks[taskpose].tasknum = (*beginnum)++;
	pool->job_num++;

	pool->tasks[taskpose].arg = (void*)&pool->tasks[taskpose];
	pool->tasks[taskpose].task_func = taskRun;

	//释放锁,并通知等待not_empty信号的线程
	pthread_mutex_unlock(&pool->pool_lock);
	pthread_cond_signal(&pool->not_empty_task);
}

任务的回调函数

为了简单,这里回调函数打印一下线程号和任务编号,然后sleep2秒

//任务回调函数
void taskRun(void *arg)
{
	Task *task = (Task*)arg;
	int num = task->tasknum;
	printf("task %d is running %lu\n", num, pthread_self());

	sleep(1);
	printf("task %d is done %lu\n", num, pthread_self());
}

测试

测试4线程任务队列为40程序

int main()
{
	ThreadPool thrPool;
	create_threadpool(4, 40, &thrPool);
	int beginnum = 0;
	for(int i=0; i<80; i++)
		addtask(&thrPool, &beginnum);
	
	sleep(30);
	destroy_threadpool(&thrPool);

	return 0;
}

部分截图
在这里插入图片描述
完整代码

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/81058.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

通过选择集获取元素

通过使用内置对象document上的getElementsByTagName方法来获取页面上的某一种标签&#xff0c;获取的是一个选择集&#xff0c;不是数组&#xff0c;但是可以用下标的方式操作选择集里面的标签元素 <!DOCTYPE html> <html lang"en"> <head><me…

Javaweb安全——Weblogic反序列化漏洞(一)

从原生反序列化过程开始谈起。 原生反序列化 序列化就是把对象转换成字节流&#xff0c;便于保存在内存、文件、数据库中&#xff1b;反序列化即逆过程&#xff0c;由字节流还原成对象。 大致是这么一个过程&#xff0c;简单画了个图&#xff1a; 测试类如下&#xff1a; p…

spring mvc——@RequestMapping注解的作用

RequestMapping注解 1、RequestMapping注解的功能 从注解名称上我们可以看到&#xff0c;RequestMapping注解的作用就是将请求和处理请求的控制器方法关联起来&#xff0c;建立映射关系。 SpringMVC 接收到指定的请求&#xff0c;就会来找到在映射关系中对应的控制器方法来处理…

从源码编译linux内核并运行一个最小的busybox文件系统

从源码编译linux内核并运行一个最小的busybox文件系统 环境基础&#xff1a; 开发环境&#xff1a;ubuntu 18.04 linux源码版本&#xff1a;linux-4.9.229 busybox源码版本&#xff1a;busybox-1.30.0 qemu-system-x86_64版本&#xff1a;2.0.0 这篇文章将按照如下4个步骤来…

【hexo系列】01.hexo环境搭建及github.io搭建

文章目录基础环境要求安装hexohexo初体验创建hexo工程初体验创建自己的第一篇笔记推送到github网站新建github.io推送到github推送到github(ssh方式 免密)参考资料基础环境要求 检测Node.js是否安装成功&#xff0c;在命令行中输入 node -v 检测npm是否安装成功&#xff0c;在…

机器学习中的数学原理——多重回归算法

这个专栏主要是用来分享一下我在机器学习中的学习笔记及一些感悟&#xff0c;也希望对你的学习有帮助哦&#xff01;感兴趣的小伙伴欢迎私信或者评论区留言&#xff01;这一篇就更新一下《白话机器学习中的数学——多重回归算法》&#xff01; 目录 一、什么是多重回归 二、案…

物联网开发笔记(60)- 使用Micropython开发ESP32开发板之SPI接口控制Micro SD卡TF卡模块

一、目的 这一节我们学习如何使用我们的ESP32开发板来通过SPI接口控制Micro SD卡TF卡模块。 二、环境 ESP32 SPI接口控制Micro SD卡TF卡模块 Thonny IDE 几根杜邦线 接线方法&#xff1a; Soft SPI接线说明 # 接线说明: # MISO -> GPTO13 # MOSI -> GPIO12 # SCK …

[附源码]Python计算机毕业设计SSM基于的楼盘销售系统(程序+LW)

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

SpringCloud入门实战-Ribbon

SpringCloud入门实战-Ribbon使用 原创目录概述需求&#xff1a;设计思路实现思路分析1.Ribbon原理2.Ribbon负载均衡策略参考资料和推荐阅读Survive by day and develop by night. talk for import biz , show your perfect code,full busy&#xff0c;skip hardness,make a bet…

计算机软技术,如何画好一张架构图?

什么是架构图&#xff1f; 如何画好一张架构图&#xff0c;要做好这件事情首先要回答的就是什么是架构图。我们日常工作中经常能看到各种各样的架构图&#xff0c;而且经常会发现大家对架构图的理解各有侧重。深入追究到这个问题&#xff0c;可能一下子还很难有一个具象的定义…

动态路由协议RIP

数据来源 一、动态路由 基于某种协议实现 1&#xff09;动态路由拓补图 2&#xff09;动态路由特点 减少了管理任务占用了网络带宽 3&#xff09;动态路由协议概述 路由器之间用来交换信息的语言 4&#xff09;度量值 跳数、带宽、负载、时延、可靠性、成本 跳数&#xff1a…

JavaScript数据结构【数组---for...of循环迭代】

继for循环&#xff0c;和forEach方法迭代数组后&#xff0c;要想迭代数组的值还可以用for...of循环 使用&#xff1a; // for...of循环示例 let array [1, 2, 3] for (let key of array) {console.log(key); } /* 输出&#xff1a;123 */ 可以看到&#xff1a;使用for...of…

嵌入式介绍与应用

嵌入式介绍与应用1 概念桌面对比2 特点3 发展历史3.1 计算机发展3.2 嵌入式发展4 开发能力要求5 应用6 规模参考1 概念 嵌入式系统由硬件和软件组成。是能够独立进行运作的器件。其软件内容只包括软件运行环境及其操作系统。硬件内容包括信号处理器、存储器、通信模块等在内的…

构建过程:从源码到dist文件

问题 有没有好奇过&#xff0c;自己写的前端代码是怎么变成上线可用的代码的&#xff1f; 前言 目前实现从源码到可用的静态文件&#xff0c;我们都是借助打包工具实现的&#xff0c;目前用的比较多的是webpack、rollup、vite..., 那么以上问题也可以描述为“构建工具是如何…

ChatGPT教程之 03 ChatGPT 中构建 Python 解释器

这个故事的灵感来自于一个类似的故事,在 ChatGPT 中构建虚拟机。我印象深刻并决定尝试类似的东西,但这次不是 Linux 命令行工具,而是让 ChatGPT 成为我们的 Python 解释器。 这是初始化 ChatGPT 的初始命令: I want you to act as a Python interpreter. I will type com…

<<两万字通关Java IO流>>

✨✨hello&#xff0c;愿意点进来的小伙伴们&#xff0c;你们好呐&#xff01; &#x1f43b;&#x1f43b;系列专栏&#xff1a;【JavaEE】 &#x1f432;&#x1f432;本篇内容&#xff1a;详解Java IO流 &#x1f42f;&#x1f42f;作者简介:一名现大二的三非编程小白&#…

python----函数、文件、以及高级特性

文章目录前言一、函数的基本概念二、文件OS模块json模块高级特性生成式生成器闭包装饰器前言 一、函数的基本概念 **全局变量&#xff1a;**在函数外边定义的变量&#xff0c;全局生效 **局部变量&#xff1a;**在函数里边定义的变量&#xff0c;局部生效 如果要在函数中修改全…

【BL808】缘起:M1s开发板的第一个示例-LVGL

一、sipeed M1s介绍 1.1 M1s开发板介绍 1.1.1 开发板特性 板载两个USB口&#xff08;一个用于USB-TTL&#xff0c;一个用于通过模拟U盘的方式烧录c906的固件&#xff09;板载1.69 inch的触摸屏和摄像头接口板载MIC、LED和TF卡座板载一个BL702做成的集USB-TTL和JTAG的调试器。…

面试收集汇总

最近的工作情况&#xff0c;难度比较大的项目。 http servlet生命周期&#xff0c;在springmvc对原生servlet做了一个怎么样的包装来实现一个自己的mvc能力的&#xff1f; 1.加载和实例化。Servlet容器负责加载和实例化Servlet。当Servlet容器启动时&#xff0c;或者在容器检…

[附源码]JAVA毕业设计养老院老人日常生活管理系统(系统+LW)

[附源码]JAVA毕业设计养老院老人日常生活管理系统&#xff08;系统LW&#xff09; 项目运行 环境项配置&#xff1a; Jdk1.8 Tomcat8.5 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。…