最近看到一个线程池,写的实在太好,于是想深入理解一下。原始代码出处:GitHub - Ahajha/CTPL: Modern and efficient C++ Thread Pool Library
由于平时的工程一般只支持到C++11,而拿到的代码应该是在C++20下才能编译通过,因此也做了一些修改,需要原始码的可去github上自行下载。
先出测试代码:
#include "ThreadPool.h"
void func4444()
{
printf("[%s]thread_id[%s]:func4444begin--------\n", current_time().c_str(),get_curr_thread_id().c_str());
::Sleep(2000);
printf("[%s]thread_id[%s]:func4444end..--------\n", current_time().c_str(), get_curr_thread_id().c_str());
}
void func5555(int v)
{
printf("[%s]thread_id[%s]:func5555begin----参数: %d----\n", current_time().c_str(), get_curr_thread_id().c_str(), v);
::Sleep(3000);
printf("[%s]thread_id[%s]:func5555end..----参数: %d----\n", current_time().c_str(), get_curr_thread_id().c_str(), v);
}
int main()
{
printf("[%s]main_thread_id:[%s]\n", current_time().c_str(), get_curr_thread_id().c_str());
ctpl::thread_pool p;
int intValue = 42;
p.add_func("func4444", func4444);
p.add_func("func5555", func5555, intValue);
while (true)
{
::Sleep(500);
}
}
测试结果如下:
可以看到,加入到线程池的过程是在主线程中进行的,实际运行的都是在工作线程中完成。
关键代码
1、加入到任务队列
template<typename F, typename... Args>
void add_func(const char* name, F && f, Args&&... args)
{
auto pck = std::make_shared<std::packaged_task<decltype(f(args...))()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
printf("[%s]thread_id[%s]:add_func: %s\n", current_time().c_str(), get_curr_thread_id().c_str(), name);
this->push_func(std::make_unique<std::function<void()>>(
[pck, name] {
if (strlen(name) > 0)
printf("[%s]thread_id[%s]:sche_trace_func %s enter ...\n", current_time().c_str(), get_curr_thread_id().c_str(), name);
(*pck)();
if (strlen(name) > 0)
printf("[%s]thraed_id[%s]:sche_trace_func %s exit .\n", current_time().c_str(), get_curr_thread_id().c_str(), name);
})
);
}
pck是一个包装指针,(*pck)()
会执行被包装任务中的函数 f
,并传入参数 args...,因此(*pck)();实际就是执行f(args)
由于(*pck)()是在匿名函数,当执行this->push_func时,相当于把这个匿名函数存于func_tasks中,func_tasks是一个线程安全的队列。此时并未真正执行函数,只是暂存起来。注意,detail::atomic_queue<std::unique_ptr<std::function<void()>>> func_tasks;看到队列存储的函数类型是固定的,即无输入且无输出的函数类型,即我们这儿的匿名函数。
void thread_pool::push_func(std::unique_ptr<std::function<void()>> &task)
{
this->func_tasks.push(std::move(task));
this->signal_cv.notify_one();
}
当通过push_func将匿名函数加入进队列func_tasks后,会给工作线程发送通知,激活空闲的工作线程以执行此匿名函数,并从此匿名函数中执行对应的函数f(args)
2、从任务队列中取任务并执行
void thread_pool::emplace_thread()
{
this->stop_flags.emplace_back(std::make_shared<std::atomic<bool>>(false));
// The main loop for the thread. Grabs a copy of the pointer
// to the stop flag.
const auto stop_flag = this->stop_flags.back();
this->threads.emplace_back([this, stop_flag]
{
printf("[%s]thread_id:[%s]\n", current_time().c_str(), get_curr_thread_id().c_str());
std::atomic<bool>& stop = *stop_flag;
// Used to store new tasks.
std::unique_ptr<std::function<void()>> task;
// True if 'task' currently has a runnable task in it.
auto has_new_task = this->func_tasks.pop(task);
while (true)
{
// If there is a task to run
while (has_new_task)
{
// Run the task
(*task)();
// Delete the task
task.reset();
// The thread is wanted to stop, return even
// if the queue is not empty yet
if (stop)
{
return;
}
// Get a new task
has_new_task = this->func_tasks.pop(task);
}
// At this point the queue has run out of tasks, wait here for more.
// Thread is now idle.
// If all threads are idle, notify any waiting in wait().
if (++this->_n_idle == this->size())
{
std::lock_guard<std::mutex> lock(this->waiter_mut);
this->waiter.notify_all();
}
std::unique_lock<std::mutex> lock(this->signal_mut);
// While the following evaluates to true, wait for a signal.
this->signal_cv.wait(lock, [this, &task, &has_new_task, &stop]()
{
// Try to get a new task. This will fail if the thread was
// woken up for another reason (stopping or resizing), or
// if another thread happened to grab the task before this
// one got to it.
has_new_task = this->func_tasks.pop(task);
// If there is a new task or the thread is being told to stop,
// stop waiting.
return has_new_task || this->done || stop;
});
// Thread is no longer idle.
--this->_n_idle;
// if the queue is empty and it was able to stop waiting, then
// that means the thread was told to stop, so stop.
if (!has_new_task)
{
return;
}
}
});
}
一般emplace_thread在主线程中被调用,通常调用一次就会多一个工作线程,this->threads.emplace_back([this, stop_flag]{}这一行代码完成了一个工作线程的开启,工作的内容即是大括号内的匿名函数。
这个函数内部有两层循环,最内部的循环总是判断是否有新的任务,有的话通过(*task)()执行之前包装的匿名函数,再由那个被包装的匿名函数调用真正需要执行的函数。执行完后取下一个新线程,队列中若没有新线程则跳出内层的while循环,在外层等待新任务的信号。
三、小结
前面两个是线程池最关键的代码段,有了这个基础,再顺着代码调试一下,基本上就清晰了。当然,这里面还有关于C++11的用法,模板相关的同样比较绕人,不过那些小知识点可以通过小的练习完成,不会对理解这个程序造成太大的困扰。
后面再找时间做一个扩展,看如何在这个基础上进行定时器的逻辑推演,其实原先拿到的程序是定时器相关的东西,只不过觉得那个太复杂,于是花了很大的精力将其去除,保留核心。