C++ 多线程(四):实现一个功能完整的线程池
今天我们来聊一聊异步编程的知识。在分布式系统中,一个功能完整的线程池类是一切代码的前提。
一个『合格』的线程池该具备哪些功能?
首先,很自然地想到『线程池类里该有个线程对象的集合,然后可以初始化线程对象的个数、创建线程对象、及启动线程主函数』。没错,这些是基本功能,但是,它更重要的功能是『这些线程对象该运行哪些任务,以及怎么运行这些任务』
解决方案就是任务队列,这个队列里装载了各类各样的任务(函数对象、函数指针、仿函数等)。接着,每个线程去循环遍历这个任务队队列,然后执行该任务。所以,这个类的成员组成大致就清晰了:
- 一个线程对象队列及初始化方法
- 一个任务队列及任务添加的方法
功能完整的线程池类实现
#include <iostream>
#include <cstring>
#include <stdexcept>
#include <vector>
#include <map>
#include <set>
#include <algorithm>
#include <mutex>
#include <deque>
#include <thread>
#include <pthread.h>
#include <unistd.h>
#include <memory>
#include <condition_variable>
#include <future>
#include <list>
// 线程安全队列
template <typename T>
class ThreadSafeQue {
public:
ThreadSafeQue() {};
~ThreadSafeQue() {};
bool empty()
{
std::unique_lock<std::mutex> mlock(mutex_);
return queue_.empty();
}
bool pop(T&& pData)
{
std::unique_lock<std::mutex> mlock(mutex_);
if (queue_.empty()) {
return false;
}
pData = std::move(queue_.front());
queue_.pop_front();
return true;
}
void push(T&& item)
{
std::unique_lock<std::mutex> mlock(mutex_);
queue_.push_back(std::move(item));
mlock.unlock();
cond_.notify_all();
}
private:
std::deque<T> queue_;
std::mutex mutex_;
std::condition_variable cond_;
};
// 线程池类
template<class T>
class ThreadPool {
public:
ThreadPool(uint32_t i, uint32_t j) : _thread_num(i), _task_num(j) {}
~ThreadPool() {}
void Run() {
for (std::thread &t : _threads) {
t.join(); // 等待子线程执行结束
}
}
void Init() {
for (int i = _thread_num; i > 0; --i) {
_threads.emplace_back([this, i]() { // 线程对象创建的同时就开始执行
std::packaged_task<T()> task;
while(task_queue.pop(std::move(task))) {
std::unique_lock<std::mutex> ul(mtx);
std::cout << "curr thread id: " << i << "\n";
task();
}
});
}
}
template <class Callable, class... Args>
void AddTask(Callable &&func, Args &&... args) {
std::packaged_task<T()> task(std::bind(std::forward<Callable>(func), std::forward<Args>(args)...));
ret_status.push_back(task.get_future());
task_queue.push(std::move(task));
}
public:
uint32_t _thread_num;
uint32_t _task_num;
ThreadSafeQue<std::packaged_task<T()>> task_queue; // 线程任务队列
std::vector<std::thread> _threads; // 线程对象池
std::mutex mtx;
std::vector<std::future<T>> ret_status;
};
class Func {
public:
void operator()(int k) {
std::cout << k << "\n";
usleep(1000);
}
};
template <class T>
void FillTaskQue(ThreadPool<T>* tp) {
for (int i = 0; i < tp->_task_num; i++) {
Func func;
tp->AddTask(func, i);
}
}
int main()
{
using T = void;
ThreadPool<T> threadPool(4, 10);
std::future<T> ret = std::async(FillTaskQue<T>, &threadPool);
//ret.wait();
threadPool.Init();
threadPool.Run();
return 0;
}
上述代码中,首先实现了一个简单的线程安全队列 ThreadSafeQue,用来存放任务。
线程池类 ThreadPool 中定义了 AddTask 模板方法用来添加线程任务(也就是函数对象 Func func)。ret_status 记录了任务执行的完成情况。
上述实现代码量虽然不多,但涉及的知识点还是挺多的:
- std::async 启动一个任务队列填充的线程
- std::forward 通常是用于完美转发的,它会将输入的参数原封不动地传递到下一个函数中,
这个“原封不动”指的是,如果输入的参数是左值,那么传递给下一个函数的参数的也是左值;如果输入的参数是右值,那么传递给下一个函数的参数的也是右值 - std::packaged_task 包装一个可调用的对象,并且允许异步获取该可调用对象产生的结果,从包装可调用对象意义上来讲,std::packaged_task 与 std::function 类似,只不过 std::packaged_task 将其包装的可调用对象的执行结果传递给一个 std::future 对象(该对象通常在另外一个线程中获取 std::packaged_task 任务的执行结果)。
- 函数的返回结果用 std::future 接收,可以调用 future.get 等待异步任务执行完成,调用 future.wait 等待异步任务开始执行,future.wait_for 超时等待函数返回结果,但是它的模板类型只能是函数的返回值类型,局限性比较大,可以结合 std::promise 使用,更加灵活。std::promise 也是一个模板类,它的作用是在不同的线程中实现数据的同步,set_value()则直接将 future 的状态设置为ready,使用方法如下:
auto promise = std::,make_shared<std::promise<int32_t>>();
promise->set_value(value)
std::future<int> fut = promise->get_future();
最后,我们再来看下线程池类的测试结果吧。