[线程/C++(11)]线程池

news2025/1/23 17:35:13

文章目录

  • 一、C++实现线程池
    • 1. 头文件
    • 2. 测试部分
  • 二、C++11实现线程池
    • 1. 头文件
    • 2. 测试部分


一、C++实现线程池

1. 头文件

#define _CRT_SECURE_NO_WARNINGS
#pragma once
#include<iostream>
#include<string.h>
#include<string>
#include<pthread.h>
#include<stdlib.h>
#include<queue>
#include<unistd.h>
using namespace std;


using callback = void(*)(void*);
//任务的结构体
template<typename T>
struct Task
{
	Task()
	{
		function = nullptr;
		args = nullptr;
	}
	Task(callback fun, void* args)
	{
		function = fun;
		this -> args = (T*)args;
	}
	callback function;
	T* args;
};

//任务队列
template<typename T>
class TaskQueue
{
public:
	TaskQueue()
	{
		pthread_mutex_init(&mutex,NULL);
	}

	~TaskQueue()
	{
		pthread_mutex_destroy(&mutex);
	}

	//添加任务
	void AddTask(Task<T> task)
	{
		pthread_mutex_lock(&mutex);
		queue.push(task);
		pthread_mutex_unlock(&mutex);
	}
	void AddTask(callback fun, void* args)
	{
		pthread_mutex_lock(&mutex);
		Task<T> task(fun,args); 
		queue.push(task);
		pthread_mutex_unlock(&mutex);
	}

	//取出一个任务
	Task<T> TakeTask()
	{
		Task<T> task;
		pthread_mutex_lock(&mutex);
		if (queue.size() > 0)
		{
			task = queue.front();
			queue.pop();
		}
		pthread_mutex_unlock(&mutex);
		return task;
	}

	//获取当前队列中的任务个数
	inline int GetTaskNum()
	{
		return queue.size();
	}
private:
	pthread_mutex_t mutex; //互斥锁
	std::queue<Task<T>> queue;
};


//线程池
template<typename T>
class ThreadPool
{
public:
	ThreadPool(int min , int max)
	{
		//实例化任务队列
		taskqueue = new TaskQueue<T>;

		//初始化线程池
		min_num = min;
		max_num = max;
		busy_num = 0;
		live_num = min;

		//根据线程最大上限,给线程数组分配内存
		threadID = new pthread_t[max];
		if (threadID == nullptr)
		{
			cout << "new threadID fail" << endl;
		}

		//初始化线程ID
		memset(threadID, 0, sizeof(pthread_t) * max);

		//初始化互斥锁和条件变量
		if (pthread_mutex_init(&mutex_pool, NULL) != 0 ||
			pthread_cond_init(&notempty, NULL) != 0)
		{
			cout << "mutex or cond init fail" << endl;
		}

		//创建线程
		for (size_t i = 0; i < min; ++i)
		{
			pthread_create(&threadID[i], NULL, Work, this);
			cout << "create thread ID :" << to_string(threadID[i]) << endl;
		}
		pthread_create(&managerID, NULL, Manage, this);
		
	}

	~ThreadPool()
	{
		shutdown = true;
		//销毁管理者进程
		pthread_join(managerID, NULL);

		//唤醒消费者
		for (int i = 0; i < live_num; ++i)
		{
			pthread_cond_signal(&notempty);
		}

		if (taskqueue)
		{
			delete taskqueue;
		}

		if (threadID)
		{
			delete[] threadID;
		}

		pthread_mutex_destroy(&mutex_pool);
		pthread_cond_destroy(&notempty);
	}

	//添加任务
	void Add_Task(Task<T> task)
	{
		if (shutdown)
			return;

		//添加任务,不需加锁,队列中有
		taskqueue->AddTask(task);

		//唤醒消费者
		pthread_cond_signal(&notempty);
	}

	//获取忙线程个数
	int Get_Busy_Num()
	{
		int busynum = 0;
		pthread_mutex_lock(&mutex_pool);
		busynum = busy_num;
		pthread_mutex_unlock(&mutex_pool);
		return busynum;
	}

	//获取存活线程个数
	int Get_Live_Num()
	{
		int livenum = 0;
		pthread_mutex_lock(&mutex_pool);	 
		livenum = live_num;			 
		pthread_mutex_unlock(&mutex_pool);	 
		return livenum;						 
	}

private:
	//工作的线程任务函数
	static void* Work(void* args)
	{
		ThreadPool* pool = static_cast<ThreadPool*>(args);

		while (true)
		{
			//访问任务队列加锁
			pthread_mutex_lock(&pool->mutex_pool);
			//判断任务队列是否为空,空了就堵塞
			while (pool->taskqueue->GetTaskNum() == 0 && !pool->shutdown)
			{
				cout << "thread :" << to_string(pthread_self()) << " waiting..." << endl;
				pthread_cond_wait(&pool->notempty, &pool->mutex_pool);

				//解除后 判断是否要销毁进程
				if (pool->exit_num > 0)
				{
					pool->exit_num--;
					if (pool->live_num > pool->min_num)
					{
						pool->live_num--;
						pthread_mutex_unlock(&pool->mutex_pool);
						pool->Thread_Exit();
					}
				}
			}

			//判断线程池是否要关闭了
			if (pool->shutdown)
			{
				pthread_mutex_unlock(&pool->mutex_pool);
				pool->Thread_Exit();
			}

			//从任务队列取出任务
			Task<T> task = pool->taskqueue->TakeTask();
			pool->busy_num++;
			pthread_mutex_unlock(&pool->mutex_pool);

			cout << "thread :" << to_string(pthread_self()) << " start working..." << endl;
			task.function(task.args);

			delete task.args;
			task.args = nullptr;

			//任务结束
			cout << "thread :" << to_string(pthread_self()) << " end working..." << endl;
			pthread_mutex_lock(&pool->mutex_pool);
			pool->busy_num--;
			pthread_mutex_unlock(&pool->mutex_pool);

		}
		return nullptr;
	}

	//管理者线程任务函数
	static void* Manage(void* args)
	{
		ThreadPool* pool = static_cast<ThreadPool*>(args);
		while (!pool->shutdown)
		{
			//5秒检测一次
			sleep(5);
			pthread_mutex_lock(&pool->mutex_pool);
			int livenum = pool->live_num;
			int busynum = pool->busy_num;
			int queuesize = pool->taskqueue->GetTaskNum();
			pthread_mutex_unlock(&pool->mutex_pool);

			const int NUMBER = 2;
			//创建
			if (queuesize > livenum && livenum < pool->max_num)
			{
				pthread_mutex_lock(&pool->mutex_pool);
				int num = 0;
				for (int i = 0; i < pool->max_num && 
									num < NUMBER && 
					pool->live_num < pool->max_num ; ++i)
				{
					if (pool->threadID[i] == 0)
					{
						pthread_create(&pool->threadID[i], NULL, Work, pool);
						num++;
						pool->live_num++;
					}
				}
				pthread_mutex_unlock(&pool->mutex_pool);
			}

			//销毁
			if (busynum * 2 < livenum && livenum > pool->min_num)
			{
				pthread_mutex_lock(&pool->mutex_pool);
				pool->exit_num = NUMBER;
				pthread_mutex_unlock(&pool->mutex_pool);
				for (int i = 0; i < NUMBER; ++i)
				{
					pthread_cond_signal(&pool->notempty);
				}
			}
		}
		return nullptr;
	}

	void Thread_Exit()
	{
		pthread_t tid = pthread_self();
		for (int i = 0; i < max_num; ++i)
		{
			if (threadID[i] == tid)
			{
				cout << "thread :" << to_string(pthread_self()) << "exiting" << endl;
				threadID[i] = 0;
				break;
			}
		}
		pthread_exit(NULL);
	}
private:
	pthread_mutex_t mutex_pool;
	pthread_cond_t notempty;
	pthread_t* threadID;
	pthread_t managerID;
	TaskQueue<T>* taskqueue;
	int min_num;
	int max_num;
	int busy_num;
	int live_num;
	int exit_num;
	bool shutdown = false;

};

2. 测试部分

#include"ThreadPool.h"

void Task_Test(void* args)
{
	int num = *(int*)args;
	cout<<"thread :" << pthread_self() << " is working " << "number =" << num <<endl;
	sleep(1);
	return;
}

int main()
{
	//创建线程池
	ThreadPool<int> pool(3, 10);
	for (int i = 0; i < 100; ++i)
	{
		int* num = new int(i+100);
		pool.Add_Task(Task<int>(Task_Test,num));
	}
	sleep(40);
	return 0;
}



以上只是基于C修改出对应于C++的代码

并且以上代码存在一个问题
输出的结果有时会因为线程原因出现混乱
可以通过加锁来解决,但锁的数量超过1就容易导致死锁问题,所以暂且搁置


二、C++11实现线程池

并非原创,摘于此处

1. 头文件

#pragma once
#include<queue>
#include<thread>
#include<condition_variable>
#include<atomic>
#include<stdexcept>
#include<future>
#include<vector>
#include<functional>

namespace std
{
	#define THREADPOOL_MAX_NUM 16
	class threadpool
	{
		unsigned short _initsize;
		using Task = function<void()>;
		vector<thread> _pool;
		queue<Task> _tasks;
		mutex _lock;
		mutex _lockGrow;
		condition_variable _task_cv;
		atomic<bool> _run{true};
		atomic<int> _spa_trd_num{0};

	public:
		inline threadpool(unsigned short size = 4)
		{
			_initsize = size;
			Add_Thread(size);
		}
		inline ~threadpool()
		{
			_run = false;
			_task_cv.notify_all();
			for (thread& thread : _pool)
			{
				if (thread.joinable())
					thread.join();
			}
		}

		template<typename F,typename... Args>
		auto commit(F&& f, Args&& ...args) -> future<decltype(f(args...)) >
		{
			if (!_run)
				throw runtime_error{"commit auto stop"};
			using RetType = decltype(f(args...));
			auto task = make_shared<packaged_task<RetType()>>(bind(forward<F>(f), forward<Args>(args)...));
			future<RetType> future = task->get_future();
			{
				lock_guard<mutex> lock{_lock};
				_tasks.emplace([task]() {(*task)(); });
			}
			if (_spa_trd_num < 1 && _pool.size() < THREADPOOL_MAX_NUM)
				Add_Thread(1);
			_task_cv.notify_one();
			return future;
		}

		template<typename F>
		void commit2(F&& f)
		{
			if (!_run)
				return;
			{
				lock_guard<mutex> lock{_lock};
				_tasks.emplace(forward<F>(f));
			}
			if (_spa_trd_num < 1 && _pool.size() < THREADPOOL_MAX_NUM)
				Add_Thread(1);
			_task_cv.notify_one();
		}

		int idlCount() { return _spa_trd_num; }
		int thrCount() { return _pool.size(); }

	private:
		void Add_Thread(unsigned short size)
		{
			if (!_run)
				throw runtime_error{"Add_Thread stop"};
			unique_lock<mutex> lockgrow{_lockGrow};
			for (; _pool.size() < THREADPOOL_MAX_NUM && size > 0; --size)
			{
				_pool.emplace_back([this]
				{
					while (true)
					{
						Task task;
						{
							unique_lock<mutex> lock{_lock};
							_task_cv.wait(lock, [this] {return !_run || !_tasks.empty(); });
							if (!_run && _tasks.empty())
								return;
							_spa_trd_num--;
							task = move(_tasks.front());
							_tasks.pop();
						}
						task();
						if (_spa_trd_num > 0 && _pool.size() > _initsize)
							return;
						{
							unique_lock<mutex> lock{_lock};
							_spa_trd_num++;
						}
					}
				});
				{
					unique_lock<mutex> lock{_lock};
					_spa_trd_num++;
				}
			}	
		}
	};
}

要使用pthread依赖库


2. 测试部分

#include"ThreadPool.hpp"
#include<iostream>

void fun1(int slp)
{
    printf("fun1  %ld\n", std::this_thread::get_id());
    if (slp > 0)
    {
        printf("fun1 sleep %ld  =========  %ld\n", slp, std::this_thread::get_id());
        std::this_thread::sleep_for(std::chrono::milliseconds(slp));
    }
}

struct gfun
{
    int operator()(int n)
    {
        printf("gfun  %ld\n", n, std::this_thread::get_id());
        return 42;
    }
};

class A
{
public:
    static int Afun(int n = 0) //函数必须是 static 的才能直接使用线程池
    {
        std::cout << n << "Afun  " << std::this_thread::get_id() << std::endl;
        return n;
    }

    static std::string Bfun(int n, std::string str, char c) 
    {
        std::cout << n << "Bfun   " << str.c_str() << "  " << (int)c << "  " << std::this_thread::get_id() << std::endl;
        return str;
    }
};

int main()
try {
    std::threadpool executor{ 50 };
    std::future<void> ff = executor.commit(fun1, 0);
    std::future<int> fg = executor.commit(gfun{}, 0);
    //std::future<int> gg = executor.commit(A::Afun, 9999); //IDE提示错误,但可以编译运行
    std::future<std::string> gh = executor.commit(A::Bfun, 9998, "mult args", 123);
    std::future<std::string> fh = executor.commit([]()->std::string { std::cout << "hello, fh !  " << std::this_thread::get_id() << std::endl; return "hello,fh ret !\n"; });

    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << fg.get() << "  " << fh.get().c_str() << "  " << std::this_thread::get_id() << std::endl;

    std::cout << " =======  fun1,55 ========= " << std::this_thread::get_id() << std::endl;
    executor.commit(fun1, 55).get();    //调用.get()获取返回值会等待线程执行完

    std::threadpool pool(4);
    std::vector< std::future<int> > results;

    for (int i = 0; i < 8; ++i)
    {
        results.emplace_back(
            pool.commit([i] {
                std::cout << "hello " << i << std::endl;
                std::this_thread::sleep_for(std::chrono::seconds(3));
                std::cout << "world " << i << std::endl;
                return i * i;
                })
        );
    }
    std::this_thread::sleep_for(std::chrono::seconds(15));
    for (auto&& result : results)
        std::cout << result.get() << ' ';
    std::cout << std::endl;
    return 0;
}
catch (std::exception& e)
{
    std::cout << "some error " << std::this_thread::get_id() << e.what() << std::endl;
}
  • 测试结果
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/924647.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vue学习之热更新、单文件开发、插槽、作用域插槽

vue-cli 全局安装&#xff1a;-g&#xff0c;全局安装 vue-cli npm install -g vuecli 创建项目 vue create my-app 生成的文件结构&#xff1a; node-modules: 存放依赖src&#xff1a;源代码文件夹src- components&#xff1a;存放组件的位置 将上一篇中我们html的文件…

【SpringCloud技术专题】「Gateway网关系列」(1)微服务网关服务的Gateway组件的原理介绍分析

为什么要有服务网关? 我们都知道在微服务架构中&#xff0c;系统会被拆分为很多个微服务。那么作为客户端要如何去调用这么多的微服务呢&#xff1f;难道要一个个的去调用吗&#xff1f;很显然这是不太实际的&#xff0c;我们需要有一个统一的接口与这些微服务打交道&#xf…

Android JNI系列详解之CMake配置库文件的输出目录

一、前提 阅读上一篇文章Android JNI系列详解之CMake编译工具的使用&#xff0c;里面讲到了需要配置两个文件&#xff1a;CMakeList.txt和build.gradle 二、配置CMake编译工具输出库文件的路径 1.默认的库文件输出路径&#xff1a;app/build/intermediates/cmake/debug/obj 由此…

springboot整合rabbitmq发布确认高级

在生产环境中由于一些不明原因&#xff0c;导致 rabbitmq 重启&#xff0c;在 RabbitMQ 重启期间生产者消息投递失败&#xff0c;导致消息丢失&#xff0c;需要手动处理和恢复。于是&#xff0c;我们如何才能进行 RabbitMQ 的消息可靠投递。 发布确认 发布确认方案 架构 配置…

poi带表头多sheet导出

导出工具类 package com.hieasy.comm.core.excel;import com.hieasy.comm.core.excel.fragment.ExcelFragment; import com.hieasy.comm.core.utils.mine.MineDateUtil; import org.apache.poi.hssf.usermodel.*; import org.apache.poi.ss.usermodel.*; import org.apache.po…

android studio安装教程

1、android studio 下载 下载网址&#xff1a;Download Android Studio & App Tools - Android Developers 2、开始安装 因为不需要每次连接手机进行调试&#xff0c;android studio给我们提供了模拟器调试环境。 一般选择自定义安装&#xff0c;这样可选sdk以及下载路径…

IT运维:使用数据分析平台监控 Kafka 服务

Apache Kafka 是由 LinkedIn 开发&#xff0c;并于2011年开源的分布式消息队列服务。但是通过快速持续的演进&#xff0c;目前它发展成为成熟的事件流处理平台&#xff0c;可用于大规模流处理、实时数据管道和数据集成等场景。 Kafka 的服务端组件包括一个或者多个 broker。Bro…

视频云存储/安防监控AI视频智能分析平台——智慧煤矿解决方案

一、方案背景 煤矿业是一个高风险行业&#xff0c;存在着许多潜在的安全隐患和风险。互联网、物联网、人工智能等新兴技术高速发展&#xff0c;为传统行业带来颠覆性变革&#xff0c;将高新技术与传统技术装备、管理相融合&#xff0c;实现产业转型升级已经成为煤矿行业发展趋…

【0824作业】C++ 拷贝赋值函数、匿名对象、友元、常成员函数和常对象、运算符重载

一、思维导图 二、作业&#xff1a;实现关系运算符的重载 关系运算符重载 概念&#xff1a; 种类&#xff1a;>、>、< 、< 、 、!表达式&#xff1a;L#R (L表示左操作数&#xff0c;R表示有操作数&#xff0c;#表示运算符)左操作数&#xff1a;既可以是左值也可以…

BSN与中国食品药品企业质量安全促进会达成战略合作协议

2023年8月18日至20日&#xff0c;“首届中国食品药品医疗器械化妆品高质量发展大会”在北京召开&#xff0c;本届大会以“树立新发展理念&#xff0c;服务构建新发展格局&#xff0c;助力食药行业高质量发展”为主题&#xff0c;聚焦食药监管和行业发展的热点、难点问题&#x…

R语言主成分分析

R语言主成分分析 之前介绍过怎么用SPSS进行主成分分析(PCA)&#xff0c;已经忘了的朋友们可以到主页看看 今天主要介绍下R语言主成分分析的几种方法。都是入门级别&#xff0c;跟着我一步步走&#xff0c;一点都不难哈~ 首先调用R语言自带的数据集&#xff0c;USArrests。这…

嵌入式linux之QT交叉编译环境搭建(最简单实测通用版)

这里总结下用于嵌入式linux下的QT交叉编译环境搭建&#xff0c;留作备忘&#xff0c;分享给有需要的小伙伴。不管你的是什么嵌入式linux环境&#xff0c;实测过的通用方法总结。 环境准备 需要准备的环境要求如下&#xff1a; 1.虚拟机(vmvare15.5) 2.ubuntu18.04-x64的linu…

4.网络设计与redis、memcached、nginx组件(一)

网络组件系列文章目录 第四章 网络设计与redis、memcached、nginx组件 文章目录 网络组件系列文章目录文章的思维导图前言一、网络相关的问题&#xff0c;网络开发中要处理那些问题&#xff1f;网络操作IO连接建立连接断开消息到达消息发送网络操作IO特性 二、网络中IO检测IO函…

【Java】基础练习(十一)

1.Poker 定义两个数组&#xff0c;一个数组存储扑克牌花色&#xff0c;另一个数组存储扑克牌&#xff08;A~K&#xff09;&#xff0c;输出52张扑克牌&#xff08;除大小王&#xff09; ♥A、♥2...&#xff08;1&#xff09;Poker类&#xff1a; package swp.kaifamiao.cod…

03-Numpy基础-通用函数:快速的元素级数组函数

通用函数&#xff08;即ufunc&#xff09;是一种对ndarray中的数据执行元素级运算的函数。你可以将 其看做简单函数&#xff08;接受一个或多个标量值&#xff0c;并产生一个或多个标量值&#xff09;的矢量化包 装器。 通用函数&#xff08;ufunc&#xff09;有三种类型&…

【BUG】解决安装oracle11g或12C中无法访问临时位置的问题

项目场景&#xff1a; 安装oracle时&#xff0c;到第二步出现oracle11g或12C中无法访问临时位置的问题。 解决方案&#xff1a; 针对客户端安装&#xff0c;在cmd中执行命令&#xff1a;前面加实际路径setup.exe -ignorePrereq -J"-Doracle.install.client.validate.cli…

countDown+react+hook

道阻且长&#xff0c;行而不辍&#xff0c;未来可期 知识点一&#xff1a; new Date().getTime()可以得到得到1970年01月1日0点零分以来的毫秒数。单位是毫秒 new Date().getTime()/1000获取秒数1分钟60秒&#xff0c;1小时60分钟1hour:60*60>单位是秒 60*60*1000>单位…

远程办公中安全远程访问解决方案

什么是安全远程访问 安全的远程访问是一个至关重要的过程&#xff0c;可让您使用互联网从远处完全控制某人的设备。为了确保安全&#xff0c;为受保护的远程访问采取了额外的身份验证和加密措施。 为什么安全远程访问解决方案很重要 当 IT 技术人员从远处帮助人们解决计算机…

GWO-LSTM交通流量预测(python代码)

使用 GWO 优化 LSTM 模型的参数&#xff0c;从而实现交通流量的预测方法 代码运行版本要求 1.项目文件夹 data是数据文件夹&#xff0c;data.py是数据归一化等数据预处理脚本 images文件夹装的是不同模型结构打印图 model文件夹 GWO-LSTM测试集效果 效果视频&#xff1a;GWO…

NLNet论文总结和代码实现

Non-local Neural Networks&#xff08;非局部神经网络&#xff09;&#xff1a;使用自注意力机制捕获远程依赖。 论文&#xff1a; https://arxiv.org/pdf/1711.07971.pdf 源码&#xff1a; 长距离依赖关系&#xff0c;顾名思义&#xff0c;是要和远程建立关系&#xff0c;在l…