【C++实现】线程池的设计与实现

news2024/11/24 7:43:45

文章目录

  • 前言
  • 正文
  • 线程池提供的两个重要方法
  • Any类的设计
  • Semaphore
  • Result的实现
  • Cache模式解释
  • 会遇到死锁问题
  • 第二个死锁问题,移植到Linux发生
  • 项目重构
  • 大致流程
  • 总结


前言


开发环境:

  • Linux,要求g++版本能够支持C++17以上;
  • vs2019下开发,需要将C++标准调制C++17以上。

涉及知识点:

  • 熟悉C++11多线程编程 thread、mutex、atomic、condition_variable、unique_lock等。
  • C++17和C++20标准的内容,C++17的any类型和C++20的信号量semaphore,项目上都我们自己用代码实现
  • 熟悉多线程理论:多线程基本知识、线程互斥、线程同步、原子操作、CAS等。

正文


项目链接,可以下载项目下来,边看边学效果更加!
码云,点击直达~

ThreadPool 线程池类型
线程池支持的类型:

  • MODE_FIXED标识固定线程的数量,线程不会中途扩容,适合长任务。
  • MODE_CACHED模式可以实现线程数量中途增加减少,适合小型任务,但任务海量到来的时候会进行适当的增加线程数,会有上限阈值。

如果是一开始设计,实际上线程池只需要
std::vector<std::shared_ptr<Thread>> threads_; // 线程列表,避免手动释放线程列表来实现,后面改成了std::unordered_map<int, std::unique_ptr<Thread>> threads_;// 线程列表是因为我们需要给每一个线程一个自定义的id,因为ThreadPool需要动态删除线程的时候,需要找到对应的线程,此时如果遍历数组的效率低,我们后面改成哈希表。
总结:
如果是MODE_FIXED模式,这里用vector存储线程是可以的。
但是如果是MODE_CACHED,则需要使用哈希表。

// 线程支持的模式
enum class PoolMode
{
	MODE_FIXED,// 固定数量的线程   -- 不需要考虑线程安全问题
	MODE_CACHED, // 线程数量可动态增长 -- 需要考虑线程安全问题
};

注意:

  • std::queue<std::shared_ptr<Task>> taskQue_; 如果存储任务,我们需要使用到多态,所以我们这里要用基类的指针,但是如果使用 std::queue<Task*> 是有问题的,因为用户若创建一个临时的任务对象,用户调用submitTask回来后,我们queue记录到任务,但是任务已经被销毁了。 但是如果用shared_ptr<Task> 后,其实这个任务的生命周期已经是threadFunc线程的函数里面执行完,就会将任务进行析构。
  • 任务的个数也是需要记录的,因为后续threadFunc不执行了,线程退出的时候,需要保证taskSize_ 已经没有任务了,我们才能够释放锁。因为使用到锁的时候都已经释放了。
    其他线程的字段:
    std::unordered_map<int, std::unique_ptr> threads_; 不需要shared_ptr,因为线程不会发生拷贝,所以用unique_ptr就可以了。
// std::vector<std::shared_ptr<Thread>> threads_; // 线程列表,避免手动释放线程列表
std::unordered_map<int, std::unique_ptr<Thread>> threads_;// 线程列表
int initThreadSize_;	 // 初始的线程数量
std::atomic_int curThreadSize_;// 记录当前线程池里面的线程总数量
int threadSizeThreshHold_; // 线程数量的上限阈值
// std::atomic_int idleThreadSize_;// 记录空闲线程的数量
int idleThreadSize_;// 记录空闲线程的数量

std::queue<std::shared_ptr<Task>> taskQue_;// 这里不用基类的裸指针,并且这里保证要基类,因为我们要用到多态
std::atomic_int taskSize_;			// 任务的数量
int taskQueMaxThreshHold_;		// 任务最大的上限,超过上线会阻塞生产者,直到队列有新的线程执行任务

std::mutex taskQueMtx_;		    // 保证任务队列的原子性
std::condition_variable notFull_; // 表示任务队列不满
std::condition_variable nonEmpty_; // 表示任务队列不空 
std::condition_variable exitCond_; // 等待线程资源全部回收

PoolMode poolMode_; // 线程模式
std::atomic_bool isPoolRunning_;// 线程是否在运行

线程池提供的两个重要方法

线程池需要提供的方法:
其实线程池就重要的就是submitTaskthreadFunc函数,一个是放任务,一个是取任务。其他的都是设置一下相关的成员变量,方便使用。

由于这两个函数关系紧密,我们放到一起讲,就不分开了。

public:
	// 线程池构造
	ThreadPool();
	// 线程池析构
	~ThreadPool();

	// 设置线程池的工作模式
	void setMode(PoolMode mode);
	
	// 设置线程池cached模式下线程阈值
	void setThreadSizeThreshHold(int threshhold);
	// 设置task任务队列上线阈值
	void setTaskQueMaxThreshHold(int threshhold);

	// 给线程池提交任务
	Result submitTask(std::shared_ptr<Task> sp);

	// 开启线程池,指定线程里面的数量的多少,初始化这里给一次就可以了
	void start(int initThreadSize = 4);

	// 不希望线程池对象进行拷贝构造和赋值
	ThreadPool(const ThreadPool&) = delete;
	ThreadPool& operator=(const ThreadPool&) = delete;

private:
	// 定义线程函数
	void threadFunc(int threadid);
	bool checkRunningState() const;

线程池初始化批量线程,将线程绑定了线程函数,后续Thread::start就可以直接用到线程池当中的私有成员变量(如任务队列等等)
所以线程函数我们是放在线程池当中,而不是放到线程当中。

// 开启线程池
void ThreadPool::start(int initThreadSize)
{
	// 设置线程池的运行状态
	isPoolRunning_ = true; 
	// 记录初始线程的个数
	initThreadSize_ = initThreadSize;
	curThreadSize_ = initThreadSize;

	// 创建线程对象
	for (int i = 0; i < initThreadSize_ ;++ i)
	{
		// 这里把线程函数传递给线程
		auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this,std::placeholders::_1));
		int threadId = ptr->getId();
		threads_.emplace(threadId, std::move(ptr));
		/*threads_.emplace_back(std::move(ptr));*/
	}

	// 启动所有线程
	for (int i = 0; i < initThreadSize_; ++i)
	{
		// 创建线程,启动线程,执行线程函数:即查看任务队列是否有任务,若有就拿走执行
		threads_[i]->start(); 
		idleThreadSize_++; // 记录初始空闲线程的数量
	}
}

线程里面就存储函数对象

// 线程类型
class Thread
{
public:
	using ThreadFunc = std::function<void(int)>;//void threadFunc();
	Thread(ThreadFunc func);
	~Thread();
	// 启动线程
	void start();

	// 获取线程的id
	int getId() const;
private:
	ThreadFunc func_;
	static int generateId_;
	int threadId_; // 保存线程的id
	
};

启动线程的时候就通过函数对象来跑就行了。
设置成分离线程,主线程退出的时候分离线程没执行完线程函数后也会退出

// 启动线程,线程函数要访问的锁,条件变量都在ThreadPool里面,所以线程函数要放到ThreadPool里面
void Thread::start()
{
	// 创建一个线程函数,这里的thread的方法就是在构造函数里面执行的
	std::thread t(func_,threadId_); // C++11 来说,线程对象t 和线程函数func_ 
	t.detach(); // 设置为分离线程,主线程和从线程func_ 
}

wait 就是阻塞的等待,wait_for 就是等待的时间长度会说,wait_util强调等待得时间结点会说。如果主线程放任务的过程超过1s,那么此时返回一个valid 为false 的Result,标识放任务失败。那么用户需要对返回值进行判断。

// 给线程池提交任务
Result ThreadPool::submitTask(std::shared_ptr<Task> sp)
{
	// 获取锁
	std::unique_lock<std::mutex> lock(taskQueMtx_);
	// 线程的通信   --判断是否队列中的任务到达了上限
	// 在 nonempty 条件变量进行等待,Pred条件不满足才会跳出wait
	// 用户提交任务,最长不能超过1s,否则判断任务失败,返回
	// wait(与时间无关) wait_for(持续等待的时间)  wait_until(等待的终点)
	// wait_for 返回值判断
	if (!notFull_.wait_for(lock, std::chrono::seconds(1), [&]()->bool {return taskQue_.size() < taskQueMaxThreshHold_; }))
	{
		// 返回值为false,条件依旧没有满足,任务队列满的
		std::cerr << "task queue is full,submit task fail." << std::endl;
		return Result(sp,false); // Task  Result
	}
	//taskQue_.push(std::move(sp));// bug
	taskQue_.emplace(sp);
	taskSize_++;
	// 通知消费者消费
	nonEmpty_.notify_all();

	// cached模式 需要根据任务数量和空闲线程数量,判断是否需要创建新的线程
	// 应用场景: 小而快的任务
	if (poolMode_ == PoolMode::MODE_CACHED
		&& taskSize_ > idleThreadSize_
		&& curThreadSize_< threadSizeThreshHold_)	
	{
		std::cout << ">>> create new thread" << std::endl;
		// 创建新的线程对象
		auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this,std::placeholders::_1));
		int threadId = ptr->getId();
		threads_.emplace(threadId, std::move(ptr));
		threads_[threadId]->start();
		// 修改线程个数相关的变量
		curThreadSize_++;
		idleThreadSize_++;
	}
	return Result(sp);
}

exec函数:
exec 函数的执行,是执行程序,并且将返回值放到Task当中的Result*当中。
exec封装了setVal,实际上就是提前判断result_是否存在,存在才设置。

void Task::exec()
{
	if (result_ != nullptr)
	{
		result_->setVal(run()); // 这里发生多态调用
	}
}

设置结果在Result的构造函数中执行。

void Task::setResult(Result* res)
{
	result_ = res;
}

get函数
isValid_ 当任务没放到任务队列的时候会被设置为false,此时用户即使是调用get方法获取返回值也是会失败的。

if (!notFull_.wait_for(lock, std::chrono::seconds(1), [&]()->bool {return taskQue_.size() < taskQueMaxThreshHold_; }))
	{
		// 返回值为false,条件依旧没有满足,任务队列满的
		std::cerr << "task queue is full,submit task fail." << std::endl;
		return Result(sp,false); // Task  Result
	}
// 用户调用
Any Result::get()
{
	if (!isValid_)
	{
		return nullptr;
	}
	sem_.wait(); // 任务如果没有执行完,这里会阻塞用户
	return std::move(any_);
}

Any类的设计


首先要构造函数要能接受任意类型,那么我们一定有一个构造函数是函数模板;
然后需要Any类是这个接受类型的父类,那么我们可以定义一个子类Derive,专门接受不同类型的,然后保存在子类当中;父类不需要模板,Any类放一个unique_ptr<Base>,然后构造函数 Any(T data) :base_(std::make_unique<Derive<T>>(data)) {} 这样base_ 就指向了任意类型,并且是父类。若要使用的时候可以通过转化为子类(dynamic_cast)然后cast_操作就会将保存的值进行返回。

// Any 类型:可以接受任意数据的类型
class Any
{
public:
	Any() = default;
	~Any() = default;
	Any(const Any&) = delete;// 成员禁止左值拷贝和赋值
	Any& operator=(const Any&) = delete;
	Any(Any&&) = default;
	Any& operator=(Any&&) = default;
	// 模板收任意类型的数据来构造一个对象
	template<typename T>
	Any(T data) :base_(std::make_unique<Derive<T>>(data))
	{}

	// 这个方法能把Any对象里面存储的data对象提取出来
	template<typename T>
	T cast_()
	{
		// 我们怎么从base_找到它所指向的Derive对象,从他里面取出data成员变量
		// 基类指针 =》 派生类指针
		// base.get_() 就是获取裸指针
		Derive<T>* pd = dynamic_cast<Derive<T>*>(base_.get());
		// 类型不匹配就会失败
		if (pd == nullptr)
		{
			throw "type is unmatch !";
		}

		return pd->data_;
	}
private:
	class Base
	{
	public:
		virtual ~Base() = default;
	};

	// 派生类类型
	template<typename T>
	class Derive : public Base
	{
	public:
		Derive(T data):data_(data)
		{}
	public:
		T data_; // 保存了任意其他类型
	};
private:
	std::unique_ptr<Base> base_;// 基类指针
};

Result 就是提供给外部的,封装了Any,定义Any成员变量作为返回值,外部通过Result简介可以通过Any的cast_方法拿到想要的返回值。
但是如果cast_ 的类型不匹配,会返回nullptr。即外部定义MyTask的返回值sum的类型需要确定。

// 实现接受任务提交到线程池的task任务执行完成后的返回值类型Result
class Result
{
public:
	Result(std::shared_ptr<Task> task, bool isValid = true);
	~Result() = default;
	
	// 问题一:setVal 方法,获取任务执行完的返回值的
	void setVal(Any any);
	// 问题二: get方法,用户调用这个方法获取task的返回值
	Any get();
private:
	Any any_; // 存储任务返回值
	Semaphore sem_; // 线程通信信号量 ,默认二元
	std::shared_ptr<Task> task_;// 指向获取返回值的任务对象
	std::atomic_bool isValid_; // 返回值是否有效,任务提交失败那肯定时无效的
};

使用范例,由于调用方肯定知道返回值的类型,那么就可以拿到Result当中的Any对象存储的值提取成相应的返回值。

Result res1 = pool.submitTask(std::make_shared<MyTask>(1, 100000000));
pool.submitTask(std::make_shared<MyTask>(200000001, 300000000));
ull sum1 = res1.get().cast_<ull>();

Semaphore


信号量和mutex的区别:
二元信号量其实已经有了互斥的语义,但是信号量的P,V可以在不同线程,而mutex必须在一个线程。

C++20 提供了,但是我们自己实现。
由于拿返回值的时候可能存在线程池中的线程暂时还没处理完任务,所以得阻塞。

实现信号量可以用条件变量,用锁搭配条件变量实现通知,用resLimit_说明资源的数量。

class Semaphore
{
public:
	Semaphore(int limit = 0)
		:resLimit_(limit)
	{}
	~Semaphore() = default;

	// 获取一个信号量资源
	void wait()
	{
		std::unique_lock<std::mutex> lock(mtx_);
		// 等待信号量有资源,没有资源的话,会阻塞当前线程
		cond_.wait(lock, [&]()->bool {return  resLimit_ > 0; }); // 该条件满足就退出
		resLimit_--;
	}

	// 增加一个信号量资源
	void post()
	{
		std::unique_lock<std::mutex> lock(mtx_);
		resLimit_++;
		cond_.notify_all();
	}
private:
	int resLimit_;				   // 资源的一个数量
	std::mutex mtx_; 			   // 需要互斥
	std::condition_variable cond_; // 需要通知
};

submitTask 返回任务的结果,不能够通过往Task对象添加成员方法来实现,即task->getResult();这种是不行的。但是Result(task)这种是没有问题的。 原因是:task的生命周期比Result的短。

Result的实现


Result的构造函数,会将对应的Task结构绑定起来。
其中std::shared_ptr<Task> task作为形参也是延续了Task的生命周期,至此task的生命周期由Result管控。
总结步骤

  • 1.Result的构造函数,在submitTask,失败或者成功都会调用,不同在于isValid_是否会被设置为false,默认设置true,标识放任务成功。Task和Result绑定成功。
  • 2.用户此时若访问Result获取返回值,会调用get方法然后在信号量下等待。
  • 3.线程池拿到任务,执行任务,将任务结果设置到Result的any_,唤醒用户线程。
    特殊情况:若是任务没有成功放到队列的话,就会返回nullptr给any_,从any_此时提取不出来返回值。因为base_ 里面存的是nullptr,dynamic_ptr的时候返回nullptr,我们内部做了抛异常处理。
// Result 方法的实现
Result::Result(std::shared_ptr<Task> task, bool isValid)
	:isValid_(isValid)
	,task_(task)
{
	task_->setResult(this);
}

// setResult 设置Task的返回结果
void Task::setResult(Result* res)
{
	result_ = res;
}

// 用户调用,若是isValid_ 是false就表示任务都没传进来,其他情况就等待信号量。
Any Result::get()
{
	if (!isValid_)
	{
		return nullptr;
	}
	sem_.wait(); // 任务如果没有执行完,这里会阻塞用户
	return std::move(any_);
}

// 线程池内部线程调用
/// 设置返回值的调用
void Result::setVal(Any any)
{
	// 存储task的返回值
	this->any_ = std::move(any);
	sem_.post(); // 已经获取了任务的返回值
}

/// 线程池内部调用exec函数会将用户传进来的run()方法跑了,返回值给到setVal方法,进行保存并且通知用户。
void Task::exec()
{
	if (result_ != nullptr)
	{
		result_->setVal(run()); // 这里发生多态调用
	}
}

Result的生命周期什么时候结束。出了用户的生命周期才会析构。

所有的set方法都要在进程没被run起来之前被设置。

// 设置线程池的工作模式
void ThreadPool::setMode(PoolMode mode)
{
	// 启动之后不允许设置线程池状态
	if (checkRunningState())
		return;
	poolMode_ = mode;
}

Cache模式解释


若是处于Cache模式:
Thread的成员变量threadid_ 就是为这里考虑的。因为删除线程的时候需要找到通过id来找,这样会比较优雅。
由于需要判断是否需要增加或者删除线程,用idleThreadSize_来记录空闲线程的数量,curThreadSize_ 来记录已经创建了的线程总数(空闲+非空闲),taskSize_记录任务的任务队列中数量,后续可以通过空闲的线程数量和taskSize_进行比较,从而来增加/删除线程。
submitTask:
负责提交,提交的时候需要考虑已有的任务若是比线程多,那么就可以动态的创建适量线程来执行任务。因为提交任务可以对任务进行初步的判断,考虑是否需要增加线程处理。
增加的时间点,只要是cache,并且当前的线程数没有达到上限值,任务数量大于空闲线程数,就可以进行创建。

// cached模式 需要根据任务数量和空闲线程数量,判断是否需要创建新的线程
// 应用场景: 小而快的任务
if (poolMode_ == PoolMode::MODE_CACHED
	&& taskSize_ > idleThreadSize_
	&& curThreadSize_< threadSizeThreshHold_)	
{
	std::cout << ">>> create new thread" << std::endl;
	// 创建新的线程对象
	auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this,std::placeholders::_1));
	int threadId = ptr->getId();
	threads_.emplace(threadId, std::move(ptr));
	threads_[threadId]->start();
	// 修改线程个数相关的变量
	curThreadSize_++;
	idleThreadSize_++;
}

threadFunc:
线程池中的线程在wait_for进行等待,当被唤醒的时候判断如果是timeout了,表示一定时间内没有任务了,此时若是线程的数量超过初始的,就可以让线程销毁了。
我们这里规定,就是从等待任务的时候,每1s timeout一次,直到时间已经超过60s,那么线程就不能留着了。

if (poolMode_ == PoolMode::MODE_CACHED)
{
	if (std::cv_status::timeout == nonEmpty_.wait_for(lock, std::chrono::seconds(1)))
	{
		auto now = std::chrono::high_resolution_clock().now();
		auto dur = std::chrono::duration_cast<std::chrono::seconds>(now - lastTime);
		if (dur.count() >= THREAD_MAX_IDLE_TIME
			&& curThreadSize_ > initThreadSize_)
		{
			// 开始回收当前线程
			// 记录线程数量的相关变量的值修改
			// 把线程对象从线程列表容器中删除 到那时没有办法直到对应的thread的线程对象
			// threadid => thread对象 
			threads_.erase(threadid);// 删除id,不能用this_thread里面的id
			curThreadSize_--;
			idleThreadSize_--;
			std::cout << "threadid:" << std::this_thread::get_id() <<
				"exit !" << std::endl;
			return;
		}
	}
}
				

setThreadSizeThreshHold:
若是要设置线程为Cache模式,需要判断线程是否有启动。

会遇到死锁问题


  1. 当ThreadPool的析构函数不做处理的时候,线程池已有的线程由于会访问到ThreadPool的数据结构,而线程池的线程都是detach状态,只有主线程退出才能够进行释放,所以我们需要在~ThreadPool() 当中把所有等待在nonEmpty条件变量下的线程唤醒,并且设置isPoolRunning_状态。
// 线程池析构
ThreadPool::~ThreadPool()
{
	isPoolRunning_ = false;
	//nonEmpty_.notify_all();// 唤醒所有的线程  -- 这里唤醒的话,会有一种情况唤醒不了
	// 等待线程池里面所有的线程返回,有两种状态: 阻塞 & 正在执行的任务
	std::unique_lock<std::mutex> lock{ taskQueMtx_ };
	nonEmpty_.notify_all();// 唤醒所有的线程
	// 所有线程都释放,这里才会彻底退出
	exitCond_.wait(lock, [&]()->bool {return threads_.size() == 0; });
}

若是nonEmpty的唤醒放在notify前面,有可能还会造成死锁,这个问题是非闭环的,一定要逻辑清晰。
在这里插入图片描述
即下面这种特殊情况。那为什么nonEmpty的唤醒放下面就没有问题呢? 因为即使获取锁后唤醒,那么我们可以通过双重判断解决。
~ThreadPool() 如果是先唤醒,在获取锁,(此时可能)就有可能主线程和线程池的线程都在等待唤醒的情况。但是如果是先获取锁,那么线程池的线程在第二次判断isPoolRunning这里,就一定会被判断失败,然后去执行任务。 其实单纯的只是析构函数唤醒的时候线程切换,就有可能发生部分线程执行任务的时候进入了临界区执行任务,但是没任务又wait了。

下图的说法是错误的,是一开始总结的时候想的:其实这里不用考虑切换,都上锁了,这里肯定是串行的。愚蠢。
在这里插入图片描述

析构函数代码如上。

// 线程池析构
ThreadPool::~ThreadPool()
{
	isPoolRunning_ = false;
	//nonEmpty_.notify_all();// 唤醒所有的线程  -- 这里唤醒的话,会有一种情况唤醒不了
	// 等待线程池里面所有的线程返回,有两种状态: 阻塞 & 正在执行的任务
	std::unique_lock<std::mutex> lock{ taskQueMtx_ };
	nonEmpty_.notify_all();// 唤醒所有的线程
	// 所有线程都释放,这里才会彻底退出
	exitCond_.wait(lock, [&]()->bool {return threads_.size() == 0; });
}

配合线程函数,可以彻底解决死锁问题。
这种线程执行可以保证,已经在任务队列的任务都能执行完再退出。

// 定义线程函数
void ThreadPool::threadFunc(int threadid) // 线程函数返回,相应的线程也就结束了
{
	// 获取一个高精度时间
	auto lastTime = std::chrono::high_resolution_clock().now();
	while (isPoolRunning_)
	{
		std::shared_ptr<Task> task;
		{
			// 先获取锁
			std::unique_lock<std::mutex> lock(taskQueMtx_);
			cout << "tid: " << std::this_thread::get_id() << " 尝试获取任务..." << endl;

			// cache模式下,有可能已经创建了很多的线程,但是空闲时间超过60s,应该把多余的线程给取消了
			// 当前时间 - 上一次线程执行时间 > 60s
			// 等待nonEmpty
			// 总结:也就是线程获取任务的过程中等待过长时间就可以进行删除
			
			// 每1s返回一次,区分 超时返回?有任务待执行
			while (isPoolRunning_ && taskSize_ <= 0)
			{
				// 条件变量,超时返回
				if (poolMode_ == PoolMode::MODE_CACHED)
				{
					if (std::cv_status::timeout == nonEmpty_.wait_for(lock, std::chrono::seconds(1)))
					{
						auto now = std::chrono::high_resolution_clock().now();
						auto dur = std::chrono::duration_cast<std::chrono::seconds>(now - lastTime);
						if (dur.count() >= THREAD_MAX_IDLE_TIME
							&& curThreadSize_ > initThreadSize_)
						{
							// 开始回收当前线程
							// 记录线程数量的相关变量的值修改
							// 把线程对象从线程列表容器中删除 到那时没有办法直到对应的thread的线程对象
							// threadid => thread对象 
							threads_.erase(threadid);// 删除id,不能用this_thread里面的id
							curThreadSize_--;
							idleThreadSize_--;
							std::cout << "threadid:" << std::this_thread::get_id() <<
								"exit !" << std::endl;
							return;
						}
					}
				}
				else
				{
					nonEmpty_.wait(lock);
				}
				// 线程等待被唤醒后,两种情况都检查是被谁唤醒
				// 线程池要结束,回收线程资源
				if (!isPoolRunning_)
				{
					// erase就是调用了unique_ptr的析构函数,会把Thread删除,这里就已经删除线程了
					threads_.erase(threadid);// 删除id,不能用this_thread里面的id
					std::cout << "threadid:" << std::this_thread::get_id() <<
						"exit !" << std::endl;
					exitCond_.notify_all();
					return;
				}
			}
			taskSize_--;
		}

		idleThreadSize_--;
		cout << "tid: " << std::this_thread::get_id() << " 获取任务成功" << endl;

		// 取任务
		task = taskQue_.front();
		taskQue_.pop();

		// 如果依然有剩余的任务,继续通知其他线程执行任务
		if (taskQue_.size() > 0)
		{// 第一个吃螃蟹的通知其他人吃螃蟹
			nonEmpty_.notify_all();
		}
		// 取出一个任务,进行通知,通知可以继续提交生产任务
		notFull_.notify_all();
		// 释放锁,执行任务
		if (task != nullptr)
		{
			// task->run();
			// 执行任务:把任务的返回值通过setVal方法给到Result
			task->exec();
		}
		// 返回值处理
		idleThreadSize_++;// 线程执行完了任务,空闲线程++

		// 更新线程调度执行完任务的时间
		lastTime = std::chrono::high_resolution_clock().now();
	}
	// isRunLooping_表示部分线程在线程池要退出的时候刚好在执行任务,执行后到这里
	threads_.erase(threadid);// 删除id,不能用this_thread里面的id
	std::cout << "threadid:" << std::this_thread::get_id() <<
		"exit !" << std::endl;
	exitCond_.notify_all();
}

第二个死锁问题,移植到Linux发生


若项目移植到Linux下会发生错误,在Semaphore 提供一个变量isExit_可以解决。

具体调试步骤,发生死锁,查看进程id
在这里插入图片描述
1号是main线程,我们看看2号线程是在哪里阻塞了;
很明显,线程执行notify_all的时候进入睡眠了。
原因:此时的条件变量已经析构了。vs下没出问题是因为对条件变量的资源进行了删除,而Linux下没有对条件变量析构做处理。

在这里插入图片描述

那为啥Semaphore会被析构呢?
Semaphore 是在Result中被使用,是用来用户进行获取返回值的时候进行使用的。但是有一种场景:
如下图,解释也在图中。
在这里插入图片描述

解决办法,每次wait,post操作对isExit_进行判断。

// 实现一个信号量类
class Semaphore
{
public:
	Semaphore(int limit = 0)
		:resLimit_(limit)
		,isExit_(false)
	{}
	~Semaphore()
	{
		isExit_ = true;
	}

	// 获取一个信号量资源
	void wait()
	{
		if(isExit_)
		{
			return ;
		}
		std::unique_lock<std::mutex> lock(mtx_);
		// 等待信号量有资源,没有资源的话,会阻塞当前线程
		cond_.wait(lock, [&]()->bool {return  resLimit_ > 0; }); // 该条件满足就退出
		resLimit_--;
	}

	// 增加一个信号量资源
	void post()
	{
		// 如果已经不存在了,那么可以不做任何事情了
		if(isExit_)
		{
			return ;
		}
		std::unique_lock<std::mutex> lock(mtx_);
		resLimit_++;
		// 这里会有问题,因为Linuxcond_不释放资源,会发生无故阻塞
		cond_.notify_all();
	}
private:
	int resLimit_;// 资源的一个数量
	std::mutex mtx_;
	std::condition_variable cond_;
	std::atomic_bool isExit_;
};

项目重构

用C++11,14 来进行代码重构,摒弃自己的Result,Task,Semaphore等等,用packaged_task 和 future类型来实现。

类似thread(func,12,3),我们希望给submitTask传参的时候也可以使用这种类似的方式,而不是自己手动创建任务,然后将任务传参。
简单的一些前置知识铺垫:

  • packaged_task类似上述Task,不过支持了传函数。packaged_task重载的operator()就是对任务进行执行,并且设置了返回值(和Task的exec方法类似)。
  • packaged_task与 functional一样,都是函数对象,就是将任务进行打包。但是packaged_task能够帮助获得返回值,返回值是future<返回值类型>。
  • packaged_task的get_future方法返回一个future<返回值类型>的对象, 相当于先前我们写的submitTask的时候返回的Result(sp);
    总结:
    暂时可以将packaged_task 看作上面的Task方法,future 看作上面的Result方法。

注意:packaged_task的operator()重载中若是执行失败,会返回future_error的异常,通常若是发生这个异常,可能是任务没有被执行。
在这里插入图片描述

submitTask改造,其中返回值类型用了auto,这是C++14提供的,std::future<decltype(func(args…))>用来推导返回值类型是没有问题的,和sizeof类似,func(args…)并不会真正执行,只进行推导。

  • 若是任务提交成功,则保存函数对象,注意Task是以值引用方式传递给了taskQue_里面,线程会拿到taskQue_ 里面的函数对象直接执行。
// 给线程池提交任务
// 使用可变参模板变成,让submitTask接受任意任务函数和任意数量的参数
// pool.submitTask(sum1,10,20)
template<typename Func,typename ...Args>
auto submitTask(Func&& func, Args&& ...args) -> std::future<decltype(func(args...))>
{
	// 打包任务,放入任务队列
	// 返回值类型重命名
	using RType = decltype(func(args ...));
	auto task = std::make_shared<std::packaged_task<RType()>>(
		std::bind(std::forward<Func>(func), std::forward<Args>(args)...)
	);
	std::unique_lock<std::mutex> lock(taskQueMtx_);
	// 线程的通信   --判断是否队列中的任务到达了上限
	// 在 nonempty 条件变量进行等待,Pred条件不满足才会跳出wait
	// 用户提交任务,最长不能超过1s,否则判断任务失败,返回
	// wait(与时间无关) wait_for(持续等待的时间)  wait_until(等待的终点)
	// wait_for 返回值判断
	if (!notFull_.wait_for(lock, std::chrono::seconds(1), [&]()->bool {return taskQue_.size() < taskQueMaxThreshHold_; }))
	{
		// 返回值为false,条件依旧没有满足,任务队列满的
		std::cerr << "task queue is full,submit task fail." << std::endl;
		auto task = std::make_shared<std::packaged_task<RType()>>(
			[]()->RType {return RType(); } // 返回一个空值的任务
			);
		(*task)(); // 相当于主线程来执行这个不成功的例子了
		return task->get_future(); // Task  Result
	}
	//taskQue_.push(std::move(sp));// bug
	//taskQue_.emplace(sp);
	// using Task = std::function<void()>;
	taskQue_.emplace([task]() { // 以值形式拷贝,相当于shared_ptr的拷贝了一份,疑问,这个函数体返回值不是void吗
		// 去执行下面的任务
		(*task)(); // 执行任务,以及设置任务的返回值
		});
	taskSize_++;
	// 通知消费者消费
	nonEmpty_.notify_all();

	// cached模式 需要根据任务数量和空闲线程数量,判断是否需要创建新的线程
	// 应用场景: 小而快的任务
	if (poolMode_ == PoolMode::MODE_CACHED
		&& taskSize_ > idleThreadSize_
		&& curThreadSize_ < threadSizeThreshHold_)
	{
		std::cout << ">>> create new thread" << std::endl;
		// 创建新的线程对象
		auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this, std::placeholders::_1));
		int threadId = ptr->getId();
		threads_.emplace(threadId, std::move(ptr));
		threads_[threadId]->start();
		// 修改线程个数相关的变量
		curThreadSize_++;
		idleThreadSize_++;
	}
	return task->get_future();
}

线程执行的函数基本上不用变,执行函数从task->exec() 换成 task() 即可,相当于执行了packaged_task的operator() 函数。

// 定义线程函数
void threadFunc(int threadid)
{
	// 获取一个高精度时间
	auto lastTime = std::chrono::high_resolution_clock().now();
	while (isPoolRunning_)
	{
		Task task;
		{
			// 先获取锁
			std::unique_lock<std::mutex> lock(taskQueMtx_);
			cout << "tid: " << std::this_thread::get_id() << " 尝试获取任务..." << endl;

			// cache模式下,有可能已经创建了很多的线程,但是空闲时间超过60s,应该把多余的线程给取消了
			// 当前时间 - 上一次线程执行时间 > 60s
			// 等待nonEmpty
			// 总结:也就是线程获取任务的过程中等待过长时间就可以进行删除
			

			// 每1s返回一次,区分 超时返回?有任务待执行
			while (taskSize_ <= 0)
			{
				// 条件变量,超时返回
				if (poolMode_ == PoolMode::MODE_CACHED)
				{
					if (std::cv_status::timeout == nonEmpty_.wait_for(lock, std::chrono::seconds(1)))
					{
						auto now = std::chrono::high_resolution_clock().now();
						auto dur = std::chrono::duration_cast<std::chrono::seconds>(now - lastTime);
						if (dur.count() >= THREAD_MAX_IDLE_TIME
							&& curThreadSize_ > initThreadSize_)
						{
							// 开始回收当前线程
							// 记录线程数量的相关变量的值修改
							// 把线程对象从线程列表容器中删除 到那时没有办法直到对应的thread的线程对象
							// threadid => thread对象 
							threads_.erase(threadid);// 删除id,不能用this_thread里面的id
							curThreadSize_--;
							idleThreadSize_--;
							std::cout << "threadid:" << std::this_thread::get_id() <<
								"exit !" << std::endl;
							return;
						}
					}
				}
				else
				{
					nonEmpty_.wait(lock);
				}
				// 线程等待被唤醒后,两种情况都检查是被谁唤醒
				// 线程池要结束,回收线程资源
				if (!isPoolRunning_)
				{
					// erase就是调用了unique_ptr的析构函数,会把Thread删除,这里就已经删除线程了
					threads_.erase(threadid);// 删除id,不能用this_thread里面的id
					std::cout << "threadid:" << std::this_thread::get_id() <<
						"exit !" << std::endl;
					exitCond_.notify_all();
					return;
				}
			}
			taskSize_--;
		}

		idleThreadSize_--;
		cout << "tid: " << std::this_thread::get_id() << " 获取任务成功" << endl;

		// 取任务
		task = taskQue_.front();
		taskQue_.pop();

		// 如果依然有剩余的任务,继续通知其他线程执行任务
		if (taskQue_.size() > 0)
		{// 第一个吃螃蟹的通知其他人吃螃蟹
			nonEmpty_.notify_all();
		}
		// 取出一个任务,进行通知,通知可以继续提交生产任务
		notFull_.notify_all();
		// 释放锁,执行任务
		if (task != nullptr)
		{
			// task->run();
			// 执行任务:把任务的返回值通过setVal方法给到Result
			//task->exec();
			task(); // 执行package_task封装的函数,执行完会post
		}
		// 返回值处理
		idleThreadSize_++;// 线程执行完了任务,空闲线程++

		// 更新线程调度执行完任务的时间
		lastTime = std::chrono::high_resolution_clock().now();
	}
	// isRunLooping_表示部分线程在线程池要退出的时候刚好在执行任务,执行后到这里
	threads_.erase(threadid);// 删除id,不能用this_thread里面的id
	std::cout << "threadid:" << std::this_thread::get_id() <<
		"exit !" << std::endl;
	exitCond_.notify_all();
}

大致流程

在这里插入图片描述

总结

该项目可能会碰到两次死锁,并且会碰到Linux平台和windows平台运行结果不一致的情况,均解决了。
项目链接:
码云,点击直达~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/28308.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

实验四 数码管实验【Verilog】

实验四 数码管实验【Verilog】前言推荐实验四 数码管实验【Verilog】一、实验目的&#xff1a;二、实验设备&#xff1a;三、实验任务:四、实验原理:五、实验步骤&#xff1a;六、实验结果&#xff1a;七、心得体会&#xff1a;最后前言 以下内容源自Verilog实验 仅供学习交流…

[附源码]java毕业设计校园期刊网络投稿系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

Git的基础操作及使用

目录 1.git工作原理示意 2.git拉取服务器上的代码 3.git往服务器上提交新code 4.git 查看提交记录&#xff0c; 5.git删除旧代码仓库&#xff0c;提交新的代码仓库 6.如何修改自己提交代码的用户名和邮箱 6.1.查看现用邮箱和用户名 6.2.使用命令修改git的用户名和提交的…

将一段文本映射到低纬向量空间

文本表示学习就是将一段文本映射到低纬向量空间&#xff0c;获取句子的语义表示&#xff0c;大致经历过四个阶段&#xff1a; 阶段 1&#xff1a;统计类型&#xff0c;此阶段比较典型的是利用 TD-IDF 抽取关键词&#xff0c;用关键词表示表征整个句子。 阶段 2&#xff1a;深度…

MySQL数据库增删改查进阶 — 聚合查询、分组查询、联合查询

文章目录1.聚合查询1.1 count 函数1.2 sum 函数1.3 avg 函数1.4 max 和 min 函数2.分组查询2.1 group by 子句2.2 分组查询可以指定条件2.2.1 分组之前&#xff0c;指定条件2.2.2 分组之后&#xff0c;指定条件2.2.3 分组前后都指定条件3.联合查询3.1 笛卡尔积3.1.1 笛卡尔积中…

YourKit Java Profiler 2022.9.X Crack

YourKit Java Profiler 2022.9.X Crack 从 CPU 和内存利用率的角度分析您的程序非常重要。它允许您最大限度地提高自身性能并限制其对服务器系统工具的影响&#xff0c;这将始终受到最终用户的重视。 在 YourKit Java Profiler 的支持下&#xff0c;可以很容易地运行基于 Java…

传奇单机架设教程及游戏GM设置方法

传奇技术教学 第二课:传奇单机架设教程及游戏GM设置方法 架设前关杀毒 确保自己的热血传奇客户端是13周年以后的 最好用最新的. 不要使用已经淘汰的10周年客户端和微端客户端 否则会出现显示不全情况. 注意HERO引擎版本在登录器方面不支持WIN8及WIN10系统的. 若你是以上系统…

Linux资源限制命令—ulimit

ulimit功能简述 假设有这样一种情况&#xff0c;当一台 Linux 主机上同时登陆了 10 个人&#xff0c;在系统资源无限制的情况下&#xff0c;这 10 个用户同时打开了 500 个文档&#xff0c;而假设每个文档的大小有 10M&#xff0c;这时系统的内存资源就会受到巨大的挑战。而实…

市面上主流源表软件全面对比,总有一款适合你!

在电测行业中&#xff0c;在对高精度的电压、电流或电流电压源进行测量扫描时就要请出我们的小伙伴“源表“。 它精确的采集能力以及为各种低电平测量应用提供额外的灵活性让它成为了电测行业中不和或缺的一员。而作为和它配合的搭档源表软件也在电测行业中有着重要的作用。 …

【排序专题】不会吧,不会吧居然还有人不懂排序算法?有彩蛋哦

文章目录1. 冒泡排序2. 选择排序3. 简单插入排序4. 希尔排序-->简单插入排序演变5. 归并排序(递归版本)6. 归并排序(非递归版本)7. 荷兰国旗问题8.由荷兰国旗问题进而引出快速排序 and 快速排序1.0版本9.快速排序2.0版本(挖坑法)10.快速排序 3.0版本(随机取数法)11.堆排序12…

AOP结合注解实现项目中接口调用情况监控

一、概述 项目中经常会遇到这样一个需求&#xff0c;需要监控每个controller中接口被调用的情况。 比如某个接口被调用的时间&#xff0c;哪个用户调用的&#xff0c;请求参数是什么&#xff0c;返回值是什么等等。 并且调用情况需要存储到数据库中&#xff0c;此时就可以AO…

三十二、Java LinkedList

Java LinkedList 链表&#xff08;Linked list&#xff09;是一种常见的基础数据结构&#xff0c;是一种线性表&#xff0c;但是并不会按线性的顺序存储数据&#xff0c;而是在每一个节点里存到下一个节点的地址。 链表可分为单向链表和双向链表。 一个单向链表包含两个值: 当…

JavaEE——Servlet中的session

之前的博客中提到&#xff0c;cookie是为了浏览器能够在本地保存数据而产生的机制&#xff0c;是在浏览器工作的。而session则是与之对应的&#xff0c;在客户端工作的。一个服务器对应多个客户端&#xff0c;每个客户端都有自己的session&#xff0c;以sessionId为key&#xf…

python进阶(28)import导入机制原理

前言 在Python中&#xff0c;一个.py文件代表一个Module。在Module中可以是任何的符合Python文件格式的Python脚本。了解Module导入机制大有用处。 1. Module组成 一个.py文件就是一个module。Module中包括attribute, function等。 这里说的attribute其实是module的global vari…

[ros2实操]2-ros2的消息和ros1的消息转换

记录一下ros2和ros1消息互传的操作 !!!需要注意的是,只有ros1订阅了相关ros2的消息时,ros1_bridge节点才会转发相关消息. 参考链接 1,docker镜像链接:待定 2,github教程:Bridge communication between ROS 1 and ROS 2 一.准备步骤 为在自己原来的系统ubuntu1804上同时安装…

[附源码]SSM计算机毕业设计基于ssm的电子网上商城JAVA

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

通过五个维度比较分析关键容灾技术

【摘要】每一种容灾技术方案,可以从实现的技术复杂度、需要投入的成本、需要承担的风险、技术的先进性、技术的成熟度等几个方面来综合评估,寻求适合企业的最佳技术组合方案。本文对几类关键容灾技术进行了比较分析。(文中涉及相关技术产品最新参数请以官网最新发布为准) …

【SpringBoot】SpringBoot开启MyBatis缓存+ehcache(一二级缓存和myBatis的差不多,第三方缓存是jar包的不一样)

文章目录第三方缓存1、导jar包2、配置文件ehcache.xml3、EnableCaching开启缓存4、application.yml读取配置文件5、使用缓存&#xff08;注解Cacheable&#xff09;Cacheable的三个属性&#xff1a;value、key和condition▶测试&#xff1a;root对象可以用来生成keyCachePutCac…

LeetCode HOT 100 —— 4.寻找两个正序数组的中位数

题目 给定两个大小分别为 m 和 n 的正序&#xff08;从小到大&#xff09;数组 nums1 和 nums2。请你找出并返回这两个正序数组的 中位数 。 算法的时间复杂度应该为 O(log (mn)) 思路 正序数组&#xff0c;立即推—>二分查找 如果本题不要求时间复杂度O&#xff08;log…

光点数据可视化解决方案,助力新型智慧城市打造_光点科技

随着城市化进程的快速发展&#xff0c;智慧城市逐渐从理论理念演变为实践。智慧城市作为一个极其复杂的城市数字化建设和运营系统&#xff0c;涵盖了大量的智能交通、智能物流、智能公园等子系统。对于智慧城市解决方案提供商和集成商来说&#xff0c;其数据可视化产品的统一监…