在以往多线程的实现的时候,都是自己去亲自创建线程,采用特殊flag 及锁控制线程的运转状态。这无可厚非,但又似乎有重复造轮子的嫌疑。最近发现了一个线程池的轮子,很不错,ZZ一下。
C++多线程+线程池(全详解) - 知乎 (zhihu.com)
多线程(multithreading),是指从软件或者硬件上实现多个线程并发执行的技术。具有多线程能力的计算机因有硬件支持而能够在同一时间执行多于一个线程,进而提升整体处理性能。在一个程序中,这些独立运行的程序片段叫作“线程”(Thread),利用它编程的概念就叫作“多线程处理”。
线程池
安全工作队列实现
#include<queue>
#include<mutex>
template<typename T>
class safequeue
{
private:
std::queue<T>m_queue;
std::mutex m_mutex;
public:
safequeue();
safequeue(safequeue &&other);
~safequeue();
bool empty()
{
std::unique_lock<std::mutex>lock(m_mutex);
return m_queue.empty();
}
int size()
{
std::unique_lock<std::mutex>lock(m_mutex);
return m_queue.size();
}
void enqueue(T &t)
{
std::unique_lock<std::mutex>lock(m_mutex);
m_queue.emplace(&t);
}
bool dequeue(T &t)
{
std::unique_lock<std::mutex>lock(m_mutex);
if(m_queue.empty())
return false;
t=std::move(m_queue.front());
m_queue.pop();
return true;
}
};
线程池
#include<functional>
#include<mutex>
#include<vector>
#include<D:\code\c++project\C++\src\project\safequeue.c++>
#include<thread>
#include<condition_variable>
#include<future>
class threadpool
{
private:
class threadworker
{
private:
int m_id;
threadpool *m_pool;
public:
threadworker(const int id,threadpool *pool):m_id(id),m_pool(pool){}
void operator()()//重载操作
{
std::function<void()> func;
bool dequeued;
if(!m_pool->m_shutdown)
{
std::unique_lock<std::mutex>lock(m_pool->m_conditional_mutex);
if(m_pool->m_queue.empty())
{
m_pool->m_conditional_lock.wait(lock);
}
dequeued=m_pool->m_queue.dequeue(func);
}
if(dequeued)
{
func();
}
}
};
bool m_shutdown;
std::vector<std::thread> m_threads;
safequeue<std::function<void()>> m_queue;
std::mutex m_conditional_mutex;
std::condition_variable m_conditional_lock;
public:
threadpool(const int n_threads=4):m_threads(std::vector<std::thread>(n_threads)),m_shutdown(false){}
threadpool(const threadpool &)=delete;
threadpool(threadpool &&)=delete;
threadpool &operator=(const threadpool &)=delete;
threadpool &&operator=(threadpool &&)=delete;
void init()//初始化分配线程
{
for(int i=0;i<m_threads.size();i++)
{
m_threads.at(i)=std::thread(threadworker(i,this));
}
}
void shutdown()//关闭线程
{
m_shutdown=true;
m_conditional_lock.notify_all();
for(int i=0;i<m_threads.size();i++)
{
if( m_threads.at(i).joinable())
{
m_threads.at(i).join();
}
}
}
template<typename F,typename... Args>
auto submit(F &&f,Args &&...args)->std::future<decltype(f(args...))>
{
//这段C++代码定义了一个名为`submit`的函数模板。它接受一个可调用对象`f`和一系列参数`args`。
//函数模板使用了右值引用和可变参数模板的特性。`F &&f`表示对可调用对象`f`进行右值引用,`Args &&...args`表示可变数量的参数`args`,它们都是右值引用。
//返回类型使用了`std::future<decltype(f(args...))>`,表示返回一个`std::future`对象,该对象的类型是通过调用`f(args...)`来推断的。
//函数体中没有具体的实现,因此代码块中的大括号是空的。这意味着在使用这个函数模板时,需要根据具体的需求来提供实现。
//这段代码的目的是定义一个通用的函数模板,用于提交任务并返回一个`std::future`对象,以便在将来某个时刻获取任务的结果。通过使用右值引用和可变参数模板,可以接受不同类型的可调用对象和参数,并返回相应的`std::future`对象。
//总而言之,这段代码定义了一个通用的函数模板`submit`,用于提交任务并返回一个`std::future`对象,以便在将来获取任务的结果。具体的实现需要根据具体的需求来提供。
std::function<decltype(f(args...))()> func=std::bind(std::forward<F>(f),std::forward<Args>(args)...);//forward为完美转发
auto task_ptr=std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);
std::function<void()>task=[task_ptr]()
{
(*task_ptr)();
};
//这段C++代码创建了一个`std::function`对象`wrapper_func`,并使用Lambda表达式作为其可调用对象。
//Lambda表达式`[task_ptr]() { (*task_ptr)(); }`定义了一个匿名函数,没有参数,返回类型为`void`。在函数体中,通过解引用`task_ptr`,调用了指针所指向的可调用对象。
//这个Lambda表达式被用作初始化`std::function<void()>`对象`wrapper_func`,因此`wrapper_func`成为了一个可调用对象,可以像函数一样被调用。
//这段代码的作用是将一个指针`task_ptr`所指向的可调用对象进行封装,并通过`std::function`对象`wrapper_func`来调用该可调用对象。通过这种方式,可以将一个具体的任务对象包装成一个可调用对象,并进行进一步的处理和调用。
//总而言之,这段代码创建了一个`std::function`对象,并使用Lambda表达式将一个指针所指向的可调用对象进行封装。这样可以通过`std::function`对象来调用该可调用对象,并进行进一步的处理。
m_queue.enqueue(task);
m_conditional_lock.notify_one();
return task_ptr->get_future();
}
};
线程池测试
#include<iostream>
#include<D:\code\c++project\C++\src\project\threadpool.c++>
#include <random>
std::random_device rd; // 真实随机数产生器
std::mt19937 mt(rd()); //生成计算随机数mt
std::uniform_int_distribution<int> dist(-1000, 1000); //生成-1000到1000之间的离散均匀分布数
auto rnd = std::bind(dist, mt);
// 设置线程睡眠时间
void simulate_hard_computation()
{
std::this_thread::sleep_for(std::chrono::milliseconds(2000 + rnd()));
}
// 添加两个数字的简单函数并打印结果
void multiply(const int a, const int b)
{
simulate_hard_computation();
const int res = a * b;
std::cout << a << " * " << b << " = " << res << std::endl;
}
// 添加并输出结果
void multiply_output(int &out, const int a, const int b)
{
simulate_hard_computation();
out = a * b;
std::cout << a << " * " << b << " = " << out << std::endl;
}
// 结果返回
int multiply_return(const int a, const int b)
{
simulate_hard_computation();
const int res = a * b;
std::cout << a << " * " << b << " = " << res << std::endl;
return res;
}
void example()
{
// 创建3个线程的线程池
threadpool pool(3);
// 初始化线程池
pool.init();
// 提交乘法操作,总共30个
for (int i = 1; i <= 3; ++i)
for (int j = 1; j <= 10; ++j)
{
pool.submit(multiply, i, j);
}
// 使用ref传递的输出参数提交函数
int output_ref;
auto future1 = pool.submit(multiply_output, std::ref(output_ref), 5, 6);
// 等待乘法输出完成
future1.get();
std::cout << "Last operation result is equals to " << output_ref << std::endl;
// 使用return参数提交函数
auto future2 = pool.submit(multiply_return, 5, 3);
// 等待乘法输出完成
int res = future2.get();
std::cout << "Last operation result is equals to " << res << std::endl;
// 关闭线程池
pool.shutdown();
}
int main()
{
example();
return 0;
}
该轮子通过
std::thread(threadworker(i,this))
来创建线程,work后的()遭到重载,进行等待信号及dequeue动作,来调用queue内进入的func。而submit的作用是将func们压入queue。并提示线程们有新压入进来的func。
线程池内有处理函数,有处理结果查询函数,二者怎么协调动作以达到最佳整体运转性能是需要考虑的地方。这里的线程池轮子是将每个功能块的线程开辟统一到了一个模板下,每个功能块只需要注册自己的操作函数即可,具有一定的简洁性。不足之处是每个功能块需要共享模板的实现和声明。总之是具有一定的进步性的。
使用这个轮子的另一个难度地方是,如果func内需要调用功能块里的作用域的函数时候,是需要一些额外传值的。这在设计的时候是需要考虑到的。