线程池-手写线程池C++11版本(生产者-消费者模型)

news2024/11/25 0:31:06

本项目是基于C++11的线程池。使用了许多C++的新特性,包含不限于模板函数泛型编程、std::future、std::packaged_task、std::bind、std::forward完美转发、std::make_shared智能指针、decltype类型推断、std::unique_lock锁等C++11新特性功能。


本项目有一定的上手难度。推荐参考系列文章

C++11实用技术(一)auto与decltype的使用

C++11实用技术(二)std::function和bind绑定器

C++11实用技术(三)std::future、std::promise、std::packaged_task、async

C++ operator关键字的使用(重载运算符、仿函数、类型转换操作符)

右值引用以及move移动语义和forward 完美转发

C++标准库中的锁lock_guard、unique_lock、shared_lock、scoped_lock、recursive_mutex


代码结构

本项目线程池功能分以下几个函数去实现:

threadpool.init(isize_t num);设置线程的数量
threadpool::get(TaskFuncPtr& task);读取任务队列中的任务
threadpool::run();通过get()读取任务并执行
threadpool.start(); 启动线程池,并通过run()执行任务
threadpool.exec();封装任务到任务队列中
threadpool.waitForAllDone();等待所有任务执行完成
threadpool.stop();分离线程,释放内存

threadpool.init

init的功能是初始化线程池,主要是设置线程的数量到类的成员变量中。

bool ZERO_ThreadPool::init(size_t num)
{
	std::unique_lock<std::mutex> lock(mutex_);

	if (!threads_.empty())
	{
		return false;
	}

	threadNum_ = num;
	return true;
}

threadNum_:保存线程的数量,在init函数中被赋值

此处使用unique_lock或lock_guard的加锁方式都能实现自动加锁和解锁。但是unique_lock可以进行临时解锁和再上锁,而lock_guard不行,特殊情况下还是必须使用unique_lock(用到条件变量的情况)。(lock_guard比较简单,相对来说性能要好一点)

threadpool::get

从任务队列中获取获取任务,这里其实就是我们的消费者模块

bool ZERO_ThreadPool::get(TaskFuncPtr& task)
{
	std::unique_lock<std::mutex> lock(mutex_);
	if (tasks_.empty()) //判断任务是否存在
	{
		//要终止线程池   bTerminate_设置为true,任务队列不为空
		condition_.wait(lock, [this] { return bTerminate_ || !tasks_.empty(); });
	}
	if (bTerminate_)
		return false;
	if (!tasks_.empty())
	{
		task = std::move(tasks_.front());  // 使用了移动语义
		tasks_.pop(); //释放资源,释放一个任务
		return true;
	}
	return false;
}
  • 条件变量condition_.wait(lock, [this] { return bTerminate_ || !tasks_.empty(); });是需要一直等待条件完成才退出。即任务终止,或者任务队列不为空时,就会退出条件变量的阻塞状态,继续执行下面的逻辑。

  • task = std::move(tasks_.front()); 使用了移动语义,将 tasks_.front() 的内容移动到了 task 中。可以减少内容拷贝。移动完之后tasks_.front() 的内容会变为未指定的状态,所以直接pop掉就好了。

threadpool::run

这里是运行我们的任务部分。包括调用get在任务队列中获取任务,以及执行任务。

void ZERO_ThreadPool::run()  // 执行任务的线程
{
	//调用处理部分
	while (!isTerminate()) // 判断是不是要停止
	{
		TaskFuncPtr task;
		bool ok = get(task);        // 读取任务
		if (ok)
		{
			++atomic_;
			try
			{
				if (task->_expireTime != 0 && task->_expireTime < TNOWMS)
				{//如果设置了超时,并且超时了,就需要执行本逻辑
				//超时任务,本代码未实现,有需要可实现在此处
				}
				else
				{
					task->_func();  // 执行任务
				}
			}
			catch (...)
			{
			}
			--atomic_;
			}
		}
	}
}

atomic_:运行一个任务,该参数+1;执行完毕,该参数-1。这里是为了待会停止线程池时判断是否还有运行中的任务(未完成的线程)。

threadpool.start

创建线程,并把线程池存入vector中,后面释放线程池时,好一一释放线程。

bool ZERO_ThreadPool::start()
{
	std::unique_lock<std::mutex> lock(mutex_);
	if (!threads_.empty())
	{
		return false;
	}
	for (size_t i = 0; i < threadNum_; i++)
	{
		threads_.push_back(new thread(&ZERO_ThreadPool::run, this));
	}
	return true;
}

threads_.push_back(new thread(&ZERO_ThreadPool::run, this));创建线程,线程的回调函数为run。

threadpool.exec

exec是将我们的任务存入任务队列中,这段代码是本项目最难的,用了很多C++的新特性。

/*
	template <class F, class... Args>
	它是c++里新增的最强大的特性之一,它对参数进行了高度泛化,它能表示0到任意个数、任意类型的参数
	auto exec(F &&f, Args &&... args) -> std::future<decltype(f(args...))>
	std::future<decltype(f(args...))>:返回future,调用者可以通过future获取返回值
	返回值后置
	*/
	template <class F, class... Args>
	auto exec(int64_t timeoutMs, F&& f, Args&&... args) -> std::future<decltype(f(args...))>//接受一个超时时间 `timeoutMs`,一个可调用对象 `f` 和其它参数 `args...`,并返回一个 `std::future` 对象,该对象可以用于获取任务执行的结果。
	{
		int64_t expireTime = (timeoutMs == 0 ? 0 : TNOWMS + timeoutMs);  // 根据超时时间计算任务的过期时间 `expireTime`,如果超时时间为 0,则任务不会过期。
		//定义返回值类型
		using RetType = decltype(f(args...));  // 使用 `decltype` 推导出 `f(args...)` 的返回值类型,并将其定义为 `RetType`(这里的using和typedef功能一样,就是为一个类型起一个别名)。
		// 封装任务 使用 `std::packaged_task` 将可调用对象 `f` 和其它参数 `args...` 封装成一个可执行的函数,并将其存储在一个 `std::shared_ptr` 对象 `task` 中。
		auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));

		TaskFuncPtr fPtr = std::make_shared<TaskFunc>(expireTime);  // 封装任务指针,设置过期时间 创建一个 `TaskFunc` 对象,并将任务的过期时间 `expireTime` 传递给它。
		fPtr->_func = [task]() {  // 具体执行的函数 将封装好的任务函数存储在 `TaskFunc` 对象的 `_func` 成员中,该函数会在任务执行时被调用。
			(*task)();
		};

		std::unique_lock<std::mutex> lock(mutex_);
		tasks_.push(fPtr);              // 将任务插入任务队列中
		condition_.notify_one();        // 唤醒阻塞的线程,可以考虑只有任务队列为空的情况再去notify

		return task->get_future();; //返回一个 `std::future` 对象,该对象可以用于获取任务执行的结果。
	}

使用了可变参数模板函数。
tasks_:保存任务的队列
condition_.notify_one():保存一个任务唤醒一个条件变量
std::future : 异步指向某个任务,然后通过future特性去获取任务函数的返回结果。
std::bind:将参数列表和函数绑定,生成一个新的可调用对象
std::packaged_task:将任务和feature绑定在一起的模板,是一种封装对任务的封装。

本函数用到了泛型编程模板函数,输入参数有3个:一个超时时间 timeoutMs,一个可调用对象 f 和参数 args...。采用返回值后置的方式返回一个std::future对象。这里采用返回值后置是为了方便使用decltype(f(args…)推导数据类型。

auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward(f), std::forward(args)…));是将我们传进来的任务函数和参数bind成一个对象,这个对象可以看作是一整个函数,其返回值就是RetType 类型,并且没有输入参数。所以用std::packaged_task<RetType()>这样的格式来打包封装。封装好的对象用智能指针(std::make_shared)来管理。

同时还要创建一个TaskFunc的对象,同样用智能指针管理,这个对象包括两项内容,一个就是超时时间,一个就是我们封装好的task对象。
通过TaskFuncPtr fPtr = std::make_shared(expireTime);和fPtr->_func = task {(*task)();};两条代码将这两项传进去。

最后会通过task->get_future()返回我们任务函数执行的结果返回值。

threadpool.waitForAllDone

等待所有任务执行完成。

bool ZERO_ThreadPool::waitForAllDone(int millsecond)
{
	std::unique_lock<std::mutex> lock(mutex_);
	if (tasks_.empty() && atomic_ == 0)
		return true;
	if (millsecond < 0)
	{
		condition_.wait(lock, [this] { return tasks_.empty() && atomic_ == 0; });
		return true;
	}
	else
	{
		return condition_.wait_for(lock, std::chrono::milliseconds(millsecond), [this] { return tasks_.empty() && atomic_ == 0; });
	}
}

使用条件变量来等待任务执行完成。支持超时执行功能。

此处unique_lock的使用是必须的: 条件变量condition_在wait时会进行unlock再进入休眠, lock_guard并无该操作接口

threadpool.stop

终止线程池。会调用waitForAllDone等待所有任务执行完成再终止。

void ZERO_ThreadPool::stop()
{
	{
		std::unique_lock<std::mutex> lock(mutex_);
		bTerminate_ = true;
		condition_.notify_all();
	}
	waitForAllDone();
	for (size_t i = 0; i < threads_.size(); i++)
	{
		if (threads_[i]->joinable())
		{
			threads_[i]->join();
		}
		delete threads_[i];
		threads_[i] = NULL;
	}
	std::unique_lock<std::mutex> lock(mutex_);
	threads_.clear();
}

通过join等线程执行完成后才返回。

主函数调用

class Test
{
public:
	int test(int i) {
		cout << _name << ", i = " << i << endl;
		Sleep(1000);
		return i;
	}
	void setName(string name) {
		_name = name;
	}
	string _name;
};
void test3() // 测试类对象函数的绑定
{
	ZERO_ThreadPool threadpool;
	threadpool.init(2);
	threadpool.start(); // 启动线程池
	Test t1;
	Test t2;
	t1.setName("Test1");
	t2.setName("Test2");
	auto f1 = threadpool.exec(std::bind(&Test::test, &t1, std::placeholders::_1), 10);
	auto f2 = threadpool.exec(std::bind(&Test::test, &t2, std::placeholders::_1), 20);
	cout << "t1 " << f1.get() << endl;
	cout << "t2 " << f2.get() << endl;
	threadpool.stop();
}
int main()
{
	test3(); // 测试类对象函数的绑定
	cout << "main finish!" << endl;
	return 0;
}

运行结果:
在这里插入图片描述

本项目完整代码下载地址基于C++11的线程池

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/849801.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

浏览器是如何渲染页面的?

浏览器是如何渲染页面的&#xff1f; 当浏览器的网络线程收到 HTML 文档后&#xff0c;会产生一个渲染任务&#xff0c;并将其传递给渲染主线程的消息队列。 在事件循环机制的作用下&#xff0c;渲染主线程取出消息队列中的渲染任务&#xff0c;开启渲染流程。 整个渲染流程分…

对员工画饼out了,领导力之我的地盘TA做主

90后的员工为什么爱离职? 我想了一年&#xff0c;才嗅到一丝隐晦的内味。因为不结婚、不生娃、不买房、不买车&#xff0c;没压力&#xff0c;骂TA还敢顶嘴。 员工动力不足&#xff0c;没有干劲&#xff0c;宁愿熬夜打游戏&#xff0c;也不愿意多加班&#xff0c;到底是工资…

Springboot03--restful、swagger+orm/mybatis,mybatis-plus

参考这个方法配置&#xff0c;主要是我的springboot和swagger的版本号的问题 SpringBoot2.7.14集成Swagger3.0 (liqinglin0314.com) 常用的一些注解 放在controller里面 2. mybatisplus <!-- MyBatisPlus依赖--><dependency><groupId>com.baomidou</gr…

FL Studio低版本怎么免费升级:FL Studio升级要钱吗?

为了更好的服务国内FL Studio用户&#xff0c;FL Studio 官网提供了跨版本升级的服务&#xff0c;用户可以通过缴纳一定的费用&#xff0c;将自己已购买的入门版或其他非完整版的版本&#xff0c;升级为更高的版本&#xff0c;解锁更多的插件&#xff0c;而无需重新购买整套版本…

嵌入式开发学习(STC51-4-蜂鸣器)

内容 控制蜂鸣器发出声音&#xff0c;一段时间后关闭 蜂鸣器简介 蜂鸣器是一种一体化结构的电子讯响器&#xff0c;采用直流电压供电&#xff0c;广泛应用于计算机、打印机、复印机、报警器、电子玩具、汽车电子设备、电话机、定时器等电子产品中作发声器件&#xff1b; 蜂…

算法通关村第六关——如何使用中序和后序来恢复一颗二叉树

1 树的基础知识 1.1 树的定义 树(Tree)&#xff1a;表现得是一种层次关系&#xff0c;为 n &#xff08; n ≥ 0 &#xff09; n&#xff08;n≥0&#xff09; n&#xff08;n≥0&#xff09;个节点构成的有限集合&#xff0c;当n0时&#xff0c;称为空树&#xff0c;对于任一…

锁定Mac的内置键盘,防止外接键盘时的误触

场景&#xff1a;把你的外接键盘放在mac上&#xff0c;然后打字时&#xff0c;发现外接键盘误触mac键盘&#xff0c;导致使用体验极差 解决方案&#xff1a;下载Karabiner-Elements这款软件&#xff0c;并给它开启相关权限。 地址&#xff1a;https://github.com/pqrs-org/Ka…

并网逆变器学习笔记6---三电平SVPWM下的连续和不连续调制

之前在学习中总结过一次DPWM策略选择&#xff1a;并网逆变器学习笔记5---三电平DPWM 但是对于三电平逆变器而言&#xff0c;如何从连续调制切换到不连续调制&#xff0c;存在一些疑惑点&#xff0c;下午闲来无事&#xff0c;把SVPWM下的连续调制和不连续调制的开关状态选择&am…

C++实现一个链栈

C实现一个链栈 什么是链栈如何实现链栈链栈的实现开发环境代码实现运行结果 什么是链栈 链栈不名思意&#xff0c;就是既具有链表的特性&#xff0c;又具有栈的特性。 即&#xff1a; 链栈中的元素由指针域和数据域组成&#xff0c;通过指针指向下一个元素&#xff1b;2.链栈同…

软件测试学习:师傅领进门修行看个人

本课程学习导图 2-1 软件测试阶段 1、单元测试 概念&#xff1a; 对软件中的 最小可测试单元 进行检查和验证。 原则&#xff1a; &#xff08;1&#xff09;尽可能测试用例相互独立 &#xff08;2&#xff09;一般由代码开发人员实施 好处&#xff1a;&#xff08;…

影响亚马逊Listing转化率的14大因素你知道吗?

我们都知道亚马逊listing转化率对于链接的推新和维稳来说有多么重要&#xff0c;只要转化率的比值无法达到整体市场平均比值的及格线&#xff0c;你就很可能会慢慢被亚马逊的飞轮算法所淘汰。 那么&#xff0c;具体是哪些因素在影响着你的listing转化率呢?这里我们可以分为显…

容器——1.集合概述

文章目录 1.1. Java 集合概览1.2. 说说 List,Set,Map 三者的区别&#xff1f;1.3. 集合框架底层数据结构总结1.3.1. List1.3.2. Set1.3.3. Map 1.4. 如何选用集合?1.5. 为什么要使用集合&#xff1f; 1.1. Java 集合概览 从下图可以看出&#xff0c;在 Java 中除了以 Map 结尾…

【深度学习】【风格迁移】Visual Concept Translator,一般图像到图像的翻译与一次性图像引导,论文

General Image-to-Image Translation with One-Shot Image Guidance 论文&#xff1a;https://arxiv.org/abs/2307.14352 代码&#xff1a;https://github.com/crystalneuro/visual-concept-translator 文章目录 Abstract1. Introduction2. 相关工作2.1 图像到图像转换2.2. Di…

Node.js |(三)Node.js API:path模块及Node.js 模块化 | 尚硅谷2023版Node.js零基础视频教程

学习视频&#xff1a;尚硅谷2023版Node.js零基础视频教程&#xff0c;nodejs新手到高手 文章目录 &#x1f4da;path模块&#x1f4da;Node.js模块化&#x1f407;介绍&#x1f407;模块暴露数据⭐️模块初体验⭐️暴露数据 &#x1f407;导入文件模块&#x1f407;导入文件夹的…

c++日志工具之——log4cpp

1、log4cpp概述 Log4cpp是一个开源的C类库&#xff0c;它提供了C程序中使用日志和跟踪调试的功能&#xff0c;它的优点如下&#xff1a; 提供应用程序运行上下文&#xff0c;方便跟踪调试&#xff1b; 可扩展的、多种方式记录日志&#xff0c;包括命令行、文件、回卷文件、内…

Kubernetes入门 一、认识Kubernetes

目录 什么是Kubernetes为什么使用 Kubernetes集群组件组件Master组件节点组件附加组件 核心概念和资源服务的分类资源和对象命名空间级ContainerPodreplicas控制器-ReplicationController控制器-ReplicaSet控制器-StatefulSet控制器-DaemonSet控制器-JobDeployment服务发现-Ser…

pytest-xdist分布式测试原理浅析

目录 pytest-xdist执行流程&#xff1a; pytest-xdist 模块结构&#xff1a; pytest-xdist分布式测试原理&#xff1a; pytest-xdist源码浅读&#xff1a; pytest-xdist执行流程&#xff1a; 解析命令行参数&#xff1a;pytest-xdist 会解析命令行参数&#xff0c;获取用户…

Vue3_对响应式对象解构赋值之后失去响应性——toRefs()

官网toRefs() :响应式 API&#xff1a;工具函数 | Vue.js toRefs 在调用时只会为源对象上可以枚举的属性创建 ref。如果要为可能还不存在的属性创建 ref&#xff0c;请改用 toRef。 setup(){const state reactive({name:"张三"age:14})const stateAsToRefs toRef…

SpringBoot 热部署

文章目录 前言一、spring-boot-devtools添加热部署框架支持settings 开启项目自动编译开启运行中热部署使用Debug启动 二、IDEA 自带 HowSwap 功能设置 Spring Boot 启动类等待项目启动完成点击热加载按钮存在的问题 三、JRebel 插件【推荐】安装插件使用插件 前言 在日常开发…

linux学习——Redis基础

目录 一、noSQL 类型 特点及应用场景 二、Redis 三、安装方式 编译安装 rpm安装 四、目录结构 /etc/redis.conf 五、Redis命令 六、本地登录和远程登录 本地登录 远程登录 七、数据库操作 帮助信息 库操作 数据操作 八、Redis持久化 一、RDB类型 二、AOF模式 一…