利用C++11的异步操作实现一个线程池
- 利用C++11的异步操作实现一个线程池
- 介绍
- 关于一些代码细节的解释
- 测试
介绍
基于线程池执行任务的时候,入口函数内部执行逻辑是固定的,因此选择std::packaged_task
加上std::future
的组合来实现。
具体使用可以见我上一篇博客:C++11中的异步操作|std::future|std::promise|std::packaged_task
线程池的工作思想:
- 用户传入要执行的函数,以及需要处理的数据(函数的参数),由线程池中的工作线程来执行函数完成任务;
完整代码可以见博客最后,也可以见项目地址: HareMQ/demo/thread_pool
关于一些代码细节的解释
push
函数参数的解释
template <typename func, typename... Args>
auto push(func&& f, Args&&... args) -> std::future<decltype(f(args...))>;
push 进来的首先是一个函数(用户要执行的函数),接下来是不定参数,表示要处理的数据,也就是要穿入的参数,然后在push内部,把这个函数封装成一个异步操作(packaged_task),丢给线程执行!
返回值的解释
因为我们不知道用户的线程给我们返回的是什么类型的值(因为我们是不知道用户要执行的函数要长什么样的),所以只能用auto
表示类型,但是直接用auto
肯定是不行的,因为系统不知道要开辟多少空间压栈,所以C++中提供的一个类型推导: decltype(func(args...))
表示返回类型就是 func(args...)
的返回类型。
push
函数内部的一些解释
auto push(func&& f, Args&&... args) -> std::future<decltype(f(args...))> {
// 1. 对传入的函数封装成 packaged_task 任务包
using return_type = decltype(f(args...)); // 返回值类型
auto bind_func = std::bind(std::forward<func>(f), std::forward<Args>(args)...); // 函数+参数类型
auto task = std::make_shared<std::packaged_task<return_type()>>(bind_func);
std::future<return_type> fu = task->get_future();
// 2. 构造 lambda 匿名函数(捕获任务对象,函数内执行任务对象)
{
std::unique_lock<std::mutex> lock(__mtx_lock);
// 3. 将构造出来的匿名函数对象,抛入到任务队列中
__task_queue.push_back([task]() {
(*task)();
});
// 4. 唤醒消费者
__cond.notify_one();
}
return fu;
}
首先,因为每个用户传递进来的函数可能是不一样的,参数返回值都不一样,在push
里一定要做一个统一。
首先是返回值类型,我们要做推导:
using return_type = decltype(f(args...)); // 返回值类型
然后我们要把函数和函数参数绑定在一起,所以要用bind
,因为是一个可变参数,所以要...
展开,因为为了保持参数的性质,需要用一个完美转发std::forward
。
auto bind_func = std::bind(std::forward<func>(f), std::forward<Args>(args)...); // 函数+参数类型
然后后面就和 asynchronous.md 里面的 demo 一样了,要用一个指针,来封装 std::packaged_task
。
auto task = std::make_shared<std::packaged_task<return_type()>>(bind_func);
std::future<return_type> fu = task->get_future();
测试
#include "thread_pool.hpp"
int add(int a, int b) { return a + b; }
int main() {
thread_pool pool;
for (int i = 0; i < 11; i++) {
std::future<int> fu = pool.push(add, i, 22);
LOG(INFO) << "add res: " << fu.get() << std::endl;
}
pool.stop();
return 0;
}
完整代码
thread_pool.hpp
/*
* Write by Yufc
* See https://github.com/ffengc/HareMQ
* please cite my project link: https://github.com/ffengc/HareMQ when you use this code
*/
#ifndef __YUFC_DEMO_THREAD_POOL__
#define __YUFC_DEMO_THREAD_POOL__
#include "../log.hpp"
#include <assert.h>
#include <condition_variable>
#include <deque>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>
class thread_pool {
public:
using func_t = std::function<void(void)>;
private:
std::atomic<bool> __stop_signal; // 线程池停止信号
std::mutex __mtx_lock; // 互斥锁
std::condition_variable __cond; // 同步变量
std::vector<std::thread> __threads; // 线程池的线程
std::deque<func_t> __task_queue; // 任务队列
public:
thread_pool(int thread_count = 1)
: __stop_signal(false) {
// 创建线程
for (int i = 0; i < thread_count; i++) {
__threads.emplace_back(&thread_pool::entry, this);
}
}
~thread_pool() {
stop();
}
void stop() {
if (__stop_signal == true)
return; // 如果已经退出了就不能重复退出了
__stop_signal = true;
__cond.notify_all();
for (auto& e : __threads) // 等待所有线程退出
e.join();
}
template <typename func, typename... Args>
auto push(func&& f, Args&&... args) -> std::future<decltype(f(args...))> {
// 1. 对传入的函数封装成 packaged_task 任务包
using return_type = decltype(f(args...)); // 返回值类型
auto bind_func = std::bind(std::forward<func>(f), std::forward<Args>(args)...); // 函数+参数类型
auto task = std::make_shared<std::packaged_task<return_type()>>(bind_func);
std::future<return_type> fu = task->get_future();
// 2. 构造 lambda 匿名函数(捕获任务对象,函数内执行任务对象)
{
std::unique_lock<std::mutex> lock(__mtx_lock);
// 3. 将构造出来的匿名函数对象,抛入到任务队列中
__task_queue.push_back([task]() {
(*task)();
});
// 4. 唤醒消费者
__cond.notify_one();
}
return fu;
}
private:
// 线程入口函数 -- 不断从任务队列中取出任务进行执行
void entry() {
while (!__stop_signal) {
std::deque<func_t> tmp;
{
// 1. 加锁
std::unique_lock<std::mutex> lock(__mtx_lock);
// 2. 任务队列不为空,或者 __stop_signal 被置位, 否则就一直等着
__cond.wait(lock, [this]() { return __stop_signal || !__task_queue.empty(); });
// 3. 因为现在是加锁状态,一次取一个太亏了,如果就绪多个,可以一次取出来的}
tmp.swap(__task_queue);
assert(__task_queue.size() == 0); // 此时任务队列应该为空了
}
for (auto& t : tmp) // 逐个执行即可(无锁状态)
t();
}
}
};
#endif
test.cc
/*
* Write by Yufc
* See https://github.com/ffengc/HareMQ
* please cite my project link: https://github.com/ffengc/HareMQ when you use this code
*/
#include "thread_pool.hpp"
int add(int a, int b) { return a + b; }
int main() {
thread_pool pool;
for (int i = 0; i < 11; i++) {
std::future<int> fu = pool.push(add, i, 22);
LOG(INFO) << "add res: " << fu.get() << std::endl;
}
pool.stop();
return 0;
}