cpp11实现线程池(七)——线程池cached模式设计实现

news2024/11/28 8:43:20

用vector::size() 获取当前容器元素数量不是线程安全的,所以采用atomic_int 来实现当前容器元素数量的改变能够保证线程安全

线程池成员变量的修改

添加变量记录当前线程数量、空闲线程数量,以及线程数的上限:

int threadSizeThreshHold_; // 线程数量上限(cached模式使用)
std::atomic_int curThreadSize_;  // 记录当前线程池里面线程的总数量(cached模式使用)
std::atomic_int idleThreadSize_; // 记录空闲线程的数量(cached模式使用)

存储线程池的容器也要改成 unordered_map,用ThreadId能够查找对应的Thread对象,这样我们后面超时回收线程时能够通过threadId来删除对应的Thread对象

std::unordered_map<int, std::unique_ptr<Thread>> threads_;

主线程submitTask函数修改

线程池线程扩充是在提交任务submitTask时检测到空闲线程数 小于 任务数 且 当前总线程数 小于 线程数阈值,则可以创建新线程:

Result ThreadPool::submitTask(std::shared_ptr<Task> sp)
{
	// 获取锁
	std::unique_lock<std::mutex> lck(taskQueMtx_);
	if (!notFull_.wait_for(lck, std::chrono::seconds(1), [&]()->bool { return taskQue_.size() < (size_t)taskQueMaxThreshHold_ ;}))
	{
		std::cerr << "task queue is full, submit task fail" << std::endl;
		return Result(sp, false);//
	}

	// 如果有空余,把任务放入任务队列中
	taskQue_.emplace(sp);
	taskSize_++; // ???

	// 任务队列不空,唤醒notEmpty_的wait,即通知消费者消费
	notEmpty_.notify_all();

	// cached 任务处理比较紧急,场景:小而快的任务
	// 同时需要根据任务数量和空闲线程数量,判断是否需要创建新的线程
	if (poolMode_ == PoolMode::MODE_CACHED
		&& taskSize_ > idleThreadSize_
		&& curThreadSize_ < threadSizeThreshHold_)
	{
		std::cout << ">>> create new thread..." << std::endl;

		// 创建新的线程对象
		auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this, std::placeholders::_1));
		int threadId = ptr->getId();
		// unique_ptr的拷贝构造和赋值都被delete,所以要把ptr转为右值
		threads_.emplace(threadId, std::move(ptr));

		// 启动线程
		threads_[threadId]->start();

		curThreadSize_++;
		idleThreadSize_++;
	}

	// 返回任务的Result对象
	return Result(sp);
}

工作线程运行函数修改

cached模式下,可能创建了很多线程, 但是空闲时间超过60s,应该把多余的线程回收掉(但是要保证线程数量>= initThreadSize_)即:

if 当前时间 - 上一次线程执行时间 > 60s then 回收线程

每秒种判断一次,需要区分超时返回有任务待返回,使用 锁 + 双重判断

并且线程池关闭时,必须先让所有线程执行完手头任务才能结束线程,也就是while (taskQue_.size() == 0) 才会去判断线程池是否结束

// threadid参数是Thread::threadId_,是自定义的对象编号
// 线程池里所有任务消费任务
void ThreadPool::threadFunc(int threadid)
{
	auto lastTime = std::chrono::high_resolution_clock().now();
	

	// 所有任务必须执行完成,线程池才可以回收所有线程资源
	for (;;)
	{
		std::shared_ptr<Task> task;
		{
			// 获取锁
			std::unique_lock<std::mutex> lock(taskQueMtx_);
			std::cout << "tid:" << std::this_thread::get_id() << "尝试获取任务..." << std::endl;
			
			// 没有任务的处理
			while (taskQue_.size() == 0)
			{
				// 线程池结束,回收线程资源
				if (!isPoolRunning_)
				{
					threads_.erase(threadid);
					std::cout << "threadid:" << std::this_thread::get_id() << " exit!" << std::endl;
					exitCond_.notify_all();
					return;
				}

				if (poolMode_ == PoolMode::MODE_CACHED)
				{
					// 超时,这里是用来实现每1s检测一次当前线程空闲时间是否达到删除条件
					if(std::cv_status::timeout ==
						notEmpty_.wait_for(lock, std::chrono::seconds(1)))
					{
						auto now = std::chrono::high_resolution_clock().now();
						auto dur = std::chrono::duration_cast<std::chrono::seconds> (now - lastTime);

						if (dur.count() >= THREAD_MAX_IDLE_TIME
							&& curThreadSize_ > initThreadSize_)
						{
							// 回收当前现线程
							// 记录线程数量相关变量的修改
							// 将线程对象从线程列表容器中删除
							// threadid => Thread对象 => erase
							threads_.erase(threadid); // 注意不是删除std::this_thread::get_id()
							curThreadSize_--;
							idleThreadSize_--; // 这里线程肯定是空闲的,因为压根没执行任务

							std::cout << "threadid:" << std::this_thread::get_id() << " exit!" << std::endl;
							return;
						}
					}
				}
				else // fixed模式
				{
					notEmpty_.wait(lock);
				}
			}


			idleThreadSize_--; // 执行任务时,空闲线程-1

			std::cout << "tid:" << std::this_thread::get_id() << "获取任务成功..." << std::endl;
			// 从任务队列中取一个任务出来
			task = taskQue_.front();
			taskQue_.pop();
			taskSize_--;

			// 若依然有剩余任务,继续通知其他线程执行任务
			if (taskQue_.size() > 0)
			{
				notEmpty_.notify_all();
			}

			// 取出一个任务进行通知 生产者可以进行生产任务
			notFull_.notify_all();
		} // 只需要对taskQue_及相关临界资源操作才加锁,执行不需要加锁

		// 当前线程负责执行这个任务
		if (task != nullptr)
		{
			//task->run(); 
			// 执行任务,把任务的返回值setVal方法给到Result
			task->exec();
		}
		// 执行任务后空闲线程 + 1
		idleThreadSize_++;
		auto lastTime = std::chrono::high_resolution_clock().now();
	}	
}

线程池资源回收

线程池析构将运行状态设置为false

通知所有消费者(每个工作线程 Slave),让它们检测到运行状态为false可以退出了

注意:submitTask是主线程来调用的,用来分配任务的线程(Master)

ThreadPool::~ThreadPool()
{
    isPoolRunning_ = false;
    // 等待线程池里所有线程返回,两种状态:阻塞 或 正在执行任务中
    std::unique_lock<std::mutex> lock(taskQueMtx_);
    notEmpty_.notify_all(); // 
    // 等待线程销毁
    exitCond_.wait(lock, [&]()->bool { return threads_.size() == 0; });
}

线程的初始数量可以用库函数提供的硬件并发数(CPU核心数)来给出,即

void start(int initThreadSize = std::thread::hardware_concurrency());

测试

设置初始线程数为4,提交6个任务,空闲时间最大6s(超过则回收),模式为MODE_CACHED,测试代码如下:

#include <iostream>
#include <chrono>
#include "threadpool.h"

using uLong = unsigned long long;

class MyTask : public Task
{
public:
	MyTask(uLong /*int*/ begin, uLong /*int*/ end)
		: begin_(begin)
		, end_(end)
	{}

	Any run()
	{
		std::cout << "tid:" << std::this_thread::get_id() << " begin!" << std::endl;
		//std::this_thread::sleep_for(std::chrono::seconds(5));
		uLong sum = 0;
		for (uLong i = begin_; i <= end_; i++)
		{
			sum += i;
		}
		std::cout << "tid:" << std::this_thread::get_id() << " end!" << std::endl;
		
		return sum;
	}
private:
	uLong /*int*/ begin_;
	uLong /*int*/ end_;
};
int main()
{
	
#if 1
	ThreadPool pool;
	// 设置为可伸缩模式
	pool.setMode(PoolMode::MODE_CACHED);
	pool.start(4);
	
	Result res1 = pool.submitTask(std::make_shared<MyTask>(1, 1000000000));
	Result res2 = pool.submitTask(std::make_shared<MyTask>(1000000001, 2000000000));
	Result res3 = pool.submitTask(std::make_shared<MyTask>(2000000001, 3000000000));
	pool.submitTask(std::make_shared<MyTask>(2000000001, 3000000000));
	pool.submitTask(std::make_shared<MyTask>(2000000001, 3000000000));
	pool.submitTask(std::make_shared<MyTask>(2000000001, 3000000000));
	
	uLong sum1 = res1.get().cast_<uLong>();
	uLong sum2 = res2.get().cast_<uLong>();
	uLong sum3 = res3.get().cast_<uLong>();

	// Master - Slave线程模型
	std::cout << (sum1 + sum2 + sum3) << std::endl;
	
	getchar();
#endif
	return 0;
}

实验结果如下:

在这里插入图片描述

可以看出能够动态创建2个额外线程,并且超时能够回收线程,退出时能正常回收所有线程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/550907.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

由浅入深Netty源码分析

目录 1 启动剖析2 NioEventLoop 剖析3 accept 剖析4 read 剖析 1 启动剖析 我们就来看看 netty 中对下面的代码是怎样进行处理的 //1 netty 中使用 NioEventLoopGroup &#xff08;简称 nio boss 线程&#xff09;来封装线程和 selector Selector selector Selector.open();…

Trie与可持久化Trie

Trie Trie&#xff0c;也称为字典树或前缀树&#xff0c;是一种用于高效存储和检索字符串的树形数据结构。它的主要特点是利用字符串的公共前缀来减少存储空间和提高查询效率。下面是对 Trie 的常见操作的介绍&#xff1a; 插入&#xff08;Insertion&#xff09;&#xff1a…

PETRv2 论文学习

1. 解决了什么问题&#xff1f; 过去&#xff0c;一般使用基于单目视觉进行 3D 目标检测。现在进行 3D 任务的方法大致分两类。一类是基于 BEV&#xff0c;将多视角图像映射为 BEV 表征&#xff0c;然后使用 3D 目标检测方法。另一类是基于 DETR&#xff0c;如 DETR3D 和 PETR…

xhs-xs webmsxywx分析

近期又更新了&#xff0c;先是改了x-s生成&#xff0c;然后又加上了a1校验。 后面可能会全参校验&#xff0c;比如再加上gid、deviceId、profileData、x-s-common、smidV2之类。 估计以后不能写xhs了&#xff0c;大家且看且珍惜吧。之前相关的文章都被下架了 危&#xff01;…

K8s日志组件-Loki是如何存储数据的?

文章目录 为什么需要loki为什么不是EFK&#xff1f;Loki是如何存储数据的&#xff1f;底层的LSM treeB tree 和LSM tree的区别&#xff1f;Ref参考链接 为什么需要loki 日志记录本质上是一个事件。大多数语言、应用程序框架或库都支持日志&#xff0c;表现形式可以是字符串这样…

安卓动画壁纸实战:制作一个星空动态壁纸(带随机流星动画)

前言 在我之前的文章 羡慕大劳星空顶&#xff1f;不如跟我一起使用 Jetpack compose 绘制一个星空背景&#xff08;带流星动画&#xff09; 中&#xff0c;我们使用 Compose 实现了星空背景效果。 并且调用非常方便&#xff0c;只需要一行代码就可以给任意 Compose 组件添加上…

30多家投递石沉大海,总算上岸了

大家好&#xff0c;我是帅地。 今年的行情&#xff0c;无论是暑假实习还是春招校招&#xff0c;都比往年要难一些&#xff0c;很多人在三月份要嘛简历石沉大海&#xff0c;要嘛面试一轮游&#xff0c;但也有部分人最后都拿到了不错的 Offer&#xff0c;包括我 训练营 里&#…

企业级信息系统开发——初探Spring-采用Spring配置文件管理Bean

初探Spring 一、Spring框架&#xff08;一&#xff09;Spring框架优点&#xff08;二&#xff09;Spring 框架因何而来&#xff08;三&#xff09;Spring框架核心概念 二、采用Spring配置文件管理Bean&#xff08;一&#xff09;创建Maven项目&#xff08;二&#xff09;添加Sp…

在C++中,怎么把string转换成char*?

2023年5月21日&#xff0c;周日中午&#xff1a; 今天在写项目的时候遇到了这个问题&#xff0c;也解决了&#xff0c;所以记录一下 通过string类的copy成员函数就可以解决这个问题 copy函数的函数原型&#xff1a; string& copy(char* s, size_t n, size_t pos 0); 其…

【框架源码】SpringBoot核心源码解读之启动类源码分析

首先我们要先带着我们的疑问&#xff0c;spring boot是如何启动应用程序&#xff1f;去分析SpringBoot的启动源码。 我们在新建SpringBoot项目时&#xff0c;核心方法就是主类的run方法。 SpringApplication.run(ArchWebApplication.class, args) 我们点击run方法进入到源码中…

A survey of Large Lanuage models

一.引言 语言建模的四个阶段&#xff0c;统计语言模型&#xff08;SLM&#xff09;&#xff1a;基于马尔科夫假设建立词预测模型&#xff0c;n-gram&#xff0c;神经语言模型&#xff08;NLM&#xff09;&#xff1a;word2vec&#xff0c;预训练语言模型&#xff08;PLM&#…

Godot引擎 4.0 文档 - 入门介绍 - 学习新功能

本文为Google Translate英译中结果&#xff0c;DrGraph在此基础上加了一些校正。英文原版页面&#xff1a; Learning new features — Godot Engine (stable) documentation in English 学习新功能 Godot 是一个功能丰富的游戏引擎。有很多关于它的知识。本页介绍了如何使用…

English Learning - L3 作业打卡 Lesson2 Day11 2023.5.15 周一

English Learning - L3 作业打卡 Lesson2 Day11 2023.5.15 周一 引言&#x1f349;句1: Sometimes a person may be upset because he does not have something as nice as a friend has, like a fast new car.成分划分弱读连读爆破语调 &#x1f349;句2: That person may say…

【wifi-app 任意泄露】

一、fofa 搜索 title“Wi-Fi APP Login” # Date: 2022-06-12 # Exploit Author: Ahmed Alroky # Author Company : AIactive # Version: M30HG4.V5030.191116 # Vendor home page : wavlink.com # Authentication Required: No # CVE : CVE-2022-34047 # Tested on: Windows…

day2 I/O多路复用select函数

目录 思考一个问题&#xff1a; I/O多路复用select函数 代码实现 net.h server.c: socket.c 思考一个问题&#xff1a; 我们还是把视角放到应用B从TCP缓冲区中读取数据这个环节来。如果在并发的环境下&#xff0c;可能会N个人向应用B发送消息&#xff0c;这种情况下我们的…

java+springboot留学生新闻资讯网的设计与实现

Spring框架是Java平台的一个开放源代码的Full-stack(全栈)应用程序框架&#xff0c;和控制翻转容器的实现。Spring框架的一些核心功能理论&#xff0c;可以用于所有Java应用&#xff0c;Spring还为Java EE构建的Web应用提供大量的扩展支持。Spring框架没有实现任何的编程模型&a…

nodejs进阶(5)—接收请求参数

1. get请求参数接收 我们简单举一个需要接收参数的例子 如果有个查找功能&#xff0c;查找关键词需要从url里接收&#xff0c;http://localhost:8000/search?keyword地球。通过前面的进阶3教程《nodejs进阶(3)—路由处理》重介绍的url模块&#xff0c;我们知道接收方法如下这…

cpp11实现线程池(六)——线程池任务返回值类型Result实现

介绍 提交任务函数submitTask中返回的Result类型应该是用Result类包装当前的task&#xff0c;因为出函数之后task即如下形式&#xff1a;return Result(task); Result和Task都要互相持有对方的指针&#xff0c;Task要将任务执行结果通过Result::setVal(run()) 调用传给其对应…

RestCloud新一代(智能)全域数据集成平台发布

5月18日&#xff0c;RestCloud在其成立六周年的当天&#xff0c;发布了“新一代&#xff08;智能&#xff09;全域数据集成平台”。 5月18日&#xff0c;RestCloud在其成立六周年的当天&#xff0c;发布了“新一代&#xff08;智能&#xff09;全域数据集成平台”。 根据业内专…

【Linux环境基础开发工具】软件包管理器-yum

写在前面 今天我打算介绍如何在Linux环境下载软件&#xff0c; Linux作为一个操作系统&#xff0c;就像windows一样&#xff0c;当然是存在软件的。 目录 写在前面 怎么在Linux环境安装软件 源代码安装 rpm安装包安装 yum安装 如何理解Linux的生态 如何使用yum安装软…