前言
之前的一篇文章讲述了future的优缺点,以及future的组合性,其中也讲述了构建任务DAG一些问题,同时给出了比较好的方案则是Executor。
Executor还未进入标准(C++23),Executor拥有惰性构建及良好的抽象模型来构建任务DAG,libunifex则给出了相当具有标准的实现,我们也借助libunifex的简短的代码来看下构建任务DAG的便利性。
Executor
在讲述例子之前,我们先来了解下Executor的一些概念
Executor有sender和receiver的概念,sender则是当任务构建的存放对象,使用sync_wait等函数去执行构建的任务(sender)时,会创建一个receiver来和sender进行连接,同时连接过程也是sender的拆解和receiver构建的过程。这里sender拆解是因为构建任务的过程中,一个任务就是一个sender,为了避免类型擦除,后一个任务就会拥有前一个任务创建的sender,举例来说,via() -> then -> then, 这样的一个流程时,构建的sender则为,then_sender<then_sender<just_sender>>,但是实际运行时开始则是运行最里边sender,那么就会进行拆解构建receiver,即为via_receiver<then_receiver<then_receiver>>,把这个receiver拿去一层一层的运行即可。
理解起来可能不那么明朗,使用libunifex的例子来看下。
libunifex的例子
#include <unifex/scheduler_concepts.hpp>
#include <unifex/single_thread_context.hpp>
#include <unifex/sync_wait.hpp>
#include <unifex/then.hpp>
#include <unifex/via.hpp>
#include <unifex/just.hpp>
#include <iostream>
using namespace unifex;
int main() {
single_thread_context context;
auto sender = via(context.get_scheduler(), just(1)) | then([](int i) {
return i + 45;
});
auto ret = sync_wait_r<int>(std::move(sender));
std::cout << "ret is: " << *ret << std::endl;
return 0;
}
sender构造
这是一个完整的例子来表述任务的依赖关系,首先看到我们初始化变量single_thread_context是一个调度器的context,libunifex是以conetxt内部来封装调度器,便于隐藏cpu或者gpu等调度运行细节。
我们就先来了解下single_thread_context的一些重要的部分:
namespace _single_thread {
class context {
manual_event_loop loop_;
std::thread thread_;
public:
context() : loop_(), thread_([this] { loop_.run(); }) {}
~context() {
loop_.stop();
thread_.join();
}
auto get_scheduler() noexcept {
return loop_.get_scheduler();
}
std::thread::id get_thread_id() const noexcept {
return thread_.get_id();
}
};
} // namespace _single_thread
using single_thread_context = _single_thread::context;
最后一行可以看到single_thread_context就是context这个类,包含一个线程成员和manual_event_loop成员,构造时就开启一个线程运行loop的run函数。调度器也就是loop的调度器。这么看来主要的核心代码还是在manual_event_loop中实现。但是可以知道context的主要作用是提供一个get_scheduler的抽象接口,便于能够使用真正的调度器。
继续来看下manual_event_loop类:
class context {
public:
class scheduler {
class schedule_task {
// ...
template <typename Receiver>
operation<Receiver> connect(Receiver&& receiver) const {
return operation<Receiver>{(Receiver &&) receiver, loop_};
}
explicit schedule_task(context* loop) noexcept
: loop_(loop)
{}
context* const loop_;
};
explicit scheduler(context* loop) noexcept : loop_(loop) {}
public:
schedule_task schedule() const noexcept {
return schedule_task{loop_};
}
private:
context* loop_;
};
scheduler get_scheduler() {
return scheduler{this};
}
void run();
void stop();
void enqueue(task_base* task);
};
using manual_event_loop = _manual_event_loop::context;
这里的最后一行代码也可以看出来manual_event_loop就是_manual_event_loop下的context。context中要表述的东西有点多,先大致讲解下流程,后边我们一步一步分析代码就明朗了。首先可以看到也是我们知道的有一个调度器scheduler类,而scheduler中又包含schedule_task,在调用get_scheduler时返回scheduler的对象,某一时刻需要执行调度任务时会调用scheduler类中schedule()函数返回schedule_task,并使用schedule_task调用connect函数得到operation这个对象,operation继承自task_base,那就能知道operation本身可以归属为一个task,然后会调用enqueue将operation放到manual_event_loo也就是_manual_event_loop下的context的任务队列中,run函数检查到有任务则会直接运行该任务。知道别处调用stop就会推出这个调度器的运行。
通过以上讲述,大家可能大概明白调度器的一个运行逻辑,我们也先回到开头的例子中进行分析。
auto sender = via(context.get_scheduler(), just(1)) | then([](int i) {
return i + 45;
});
来看这句代码,使用竖线来表述运行的前后关系,这也是c++20中一个调用的特性,写成我们熟知的形式就是:
then(via(context.get_scheduler(), just(1)), [](int i){
return i + 45;
});
如此便可以知道,首先调用context.get_scheduler()和just传递给via,via计算出结果再传递给then。那我们首先看你just(1)
是做了什么?
namespace _just_cpo {
inline const struct just_fn {
template <typename... Values>
constexpr auto operator()(Values&&... values) const {
return _just::sender<Values...>{std::in_place, (Values&&)values...};
}
} just{};
} // namespace _just_cpo
using _just_cpo::just;
这里使用了cpo的一种技术手段来实现just的函数调用,cpo这里我们不展开来讲,这里我们只需要知道just函数最终会调用到operator()中,然后可以知道该函数仅仅是构造了一个_just::sender并将我们的传递的参数1保存下来。看到这里我们已经学会了一个sender,也就是最简单just_sender,那就是明白这个just主要作用就是构造一个sender并保存参数,已便于给后边then的形参使用。
继续看下via的调用:
namespace _via_cpo {
inline const struct _fn {
template (typename Scheduler, typename Sender)
auto operator()(Scheduler&& sched, Sender&& send) const {
return _via::sender<Sender, schedule_result_t<Scheduler>>{
(Sender&&) send,
schedule(sched)};
}
} via{};
} // namespace _via_cpo
using _via_cpo::via;
代码中同理可知,调用operator()函数并传递Scheduler和Sender参数,同样也是会构造一个_via::sender,会将sender和schedule(sched)保存,这里看到会调用到schedule函数,同样使用cpo的手段调用到我们的manual_event_loop的Scheduler中的schedule函数。上文我们可以知道schedule函数会返回schedule_task对象。那也就是说这里_via::sender会保存sender和schedule_task,不过在_via::sender中名称有一点变化,传递进来的sender对象称为前驱Predecessor,schedule_task被称为后继Successor。此时我们构造出来的完整的类型就是:_via::sender<_just::sender<Values…>, _manual_event_loop::context::scheduler::schedule_task>。
然后继续看下then函数的调用:
template(typename Sender, typename Func)
auto operator()(Sender&& predecessor, Func&& func) const {
return _then::sender<Sender, Func>{(Sender &&) predecessor, (Func &&) func};
}
这里除了使用cpo,同时还使用了tag_invoke的技术手段,cpo或者tag_invoke可以帮助找到实际的调用的函数是哪个(候选的函数名字基本一致),同样我们也不展开tag_invoke.
我们仅仅是告知大家会调用到这个函数,相信大家也猜到了这里也是会构造一个sender出来(_then::sender),会保存我们刚刚生成_via::sender对象以及自己带的函数对象。
小结
到这里我们就完成sender的构造:_then::sender<_via::sender<_just::sender<Values…>, _manual_event_loop::context::scheduler::schedule_task>, Func>。哇,好长。要注意的是这个sender不仅保存了完整的类型,同时也会将相应的对象保存下来,一层一层进行了包装。就是这样就没有了所谓的类型擦除。
receiver构造
继续来看例子:
auto ret = sync_wait_r<int>(std::move(sender));
通过sync_wait_r来对sender开始进行任务的执行,使用sync_wait_r获取到最终执行完成的结果。看下sync_wait_r的实现细节。
namespace _sync_wait_r_cpo {
template <typename Result>
struct _fn {
template(typename Sender)
(requires sender<Sender>)
decltype(auto) operator()(Sender&& sender) const {
using Result2 = non_void_t<wrap_reference_t<decay_rvalue_t<Result>>>;
return _sync_wait::_impl<Result2>((Sender&&) sender);
}
};
} // namespace _sync_wait_r_cpo
代码很简单,仅仅是直接去调用_sync_wait::_impl函数:
template <typename Result, typename Sender>
std::optional<Result> _impl(Sender&& sender) {
using promise_t = _sync_wait::promise<Result>;
promise_t promise;
manual_event_loop ctx;
// Store state for the operation on the stack.
auto operation = connect(
(Sender&&)sender,
_sync_wait::receiver_t<Result>{promise, ctx});
start(operation);
ctx.run();
switch (promise.state_) {
case promise_t::state::done:
return std::nullopt;
case promise_t::state::value:
return std::move(promise.value_).get();
case promise_t::state::error:
std::rethrow_exception(promise.exception_.get());
default:
std::terminate();
}
}
impl函数通过connect函数对sender和receiver进行了绑定,返回了operation对象,对operation执行start,使用manual_event_loop对象的run函数对主线程进行阻塞直到所有的任务完成,最后会得到结果返回。那么先看关键的connect函数做了什么
首先要知道传递给connect的是_then::sender和临时构造了一个_sync_wait::receiver。
template(typename Sender, typename Receiver)
friend auto tag_invoke(tag_t<unifex::connect>, Sender&& s, Receiver&& r)
-> connect_result_t<member_t<Sender, Predecessor>, receiver_t<remove_cvref_t<Receiver>>> {
return unifex::connect(
static_cast<Sender&&>(s).pred_,
receiver_t<remove_cvref_t<Receiver>>{
static_cast<Sender&&>(s).func_,
static_cast<Receiver&&>(r)});
}
调用connect时会调用到_then::sender的tag_invoke中,函数中还是会继续调用connect函数,不过这一次的sender参数就是_then::sender的pred_成员,也就是_via::sender,同时还会构造一个_then中receiver_t,我们称他为then::receiver,这里then::receiver会保存_then::sender的func和传递进来的Receiver,也就是_sync_wait::receiver,那我们看下函数内部connect的参数的类型是什么:connect(_via::sender<…>, _then::receiver<Func, _sync_wait::receiver>).
接下来就会_via::sender的connect函数:
template <typename Receiver>
auto connect(Receiver&& receiver) && {
return unifex::connect(
static_cast<Predecessor&&>(pred_),
predecessor_receiver<Successor, Receiver>{
static_cast<Successor&&>(succ_),
static_cast<Receiver&&>(receiver)});
}
像前边一样,_via::sender会取出来自己的pred作为现在connect的sender,自己的succ和传递进来的receiver包装成一个全新的receiver,也就是:
connect(_just::sender<Values…>, _via::receiver<_manual_event_loop::context::scheduler::schedule_task, _then::receiver<Func, _sync_wait::receiver>>)
接下来就会调用到_just::sender的tag_invoke函数中:
template(typename This, typename Receiver)
auto tag_invoke(tag_t<connect>, This&& that, Receiver&& r)
-> operation<Receiver, Values...> {
return {static_cast<This&&>(that).values_, static_cast<Receiver&&>(r)};
}
这里会构造一个operation对象,使用自己的values和_via::receiver作为参数,那么这个operation就返回给最初的调用地方,同时receiver也构造完成。
然后我们回到_sync_wait::_impl函数:
std::optional<Result> _impl(Sender&& sender) {
// ...
// Store state for the operation on the stack.
auto operation = connect(
(Sender&&)sender,
_sync_wait::receiver_t<Result>{promise, ctx});
start(operation);
// ...
}
我们就知道了这里的operation就是_just::sender构造的_just::operation, 接下来就调用start函数并传入调用。
任务的执行
然后我们发现会调用到_just::operation的start函数:
template <typename Receiver, typename... Values>
using operation = typename _op<remove_cvref_t<Receiver>, Values...>::type;
template <typename Receiver, typename... Values>
struct _op<Receiver, Values...>::type {
std::tuple<Values...> values_;
Receiver receiver_;
void start() & noexcept {
std::apply(
[&](Values&&... values) {
unifex::set_value((Receiver &&) receiver_, (Values &&) values...);
},
std::move(values_));
}
};
可以看到start函数中就是使用apply函数针对receiver进行set_value。
接下来就会调用到_via::receiver的set_value函数
template <typename... Values>
void set_value(Values&&... values) && noexcept {
submit(
(Successor &&) successor_,
value_receiver<Receiver, Values...>{
{(Values &&) values...}, (Receiver &&) receiver_});
}
set_value会调用summit函数,同时会将_via::receiver的成员successor_和receiver_作为参数传递,successor_是_manual_event_loop::context::scheduler::schedule_task, receiver_则是_then::receiver<Func, _sync_wait::receiver>,同样也会将receiver_成员和values包装成一个value_receiver,value_receiver就变成了value_receiver<Values, _then::receiver<Func, sync_wait::receiver>>,然后就去调用submit函数。
submit比较特殊不是直接去调用schedule_task的submit函数,而是会先构造一个submit::operation<schedule_task, value_receiver<…>>,然后调用start函数。
在submit::operation的构造函数中会调用submit::operatio的sender(schedule_task)的connect函数赋值给inner
template <typename Sender, typename Receiver>
class _op<Sender, Receiver>::type {
template <typename Receiver2>
explicit type(Sender&& sender, Receiver2&& receiver)
: receiver_((Receiver2 &&) receiver)
, inner_(unifex::connect((Sender &&) sender, wrapped_receiver{this}))
{}
void start() & noexcept {
unifex::start(inner_);
}
};
先来看connect会调用schedule_task的connect函数,我们称这里wrapped_receiver为submit_receiver,那么传给connect函数的receiver就是submit_receiver<value_receiver<Values, _then::receiver<Func, _sync_wait::receiver>>>
template <typename Receiver>
operation<Receiver> connect(Receiver&& receiver) const {
return operation<Receiver>{(Receiver &&) receiver, loop_};
}
那么我们可以看到把调度器给从sender拆解下来了,并且剩余的部分又包装了一个receiver,其实也就是这里才算完整的receiver构建完成。
继续我们schedule_task的connect函数,会构造一个_manual_event_loop::operation对象,传入了loop_对象,也就是调度器context对象。
在然后就开始从submit的start函数调用到_manual_event_loop::operation的start函数:
template <typename Receiver>
inline void _op<Receiver>::type::start() noexcept {
loop_->enqueue(this);
}
终于到这里了,把_manual_event_loop::operation放到了调度器任务队列了。因为我们在构造conetext那里就会执行run函数,run函数就是从任务队列中取任务运行。
void context::run() {
std::unique_lock lock{mutex_};
while (true) {
while (head_ == nullptr) {
if (stop_) return;
cv_.wait(lock);
}
auto* task = head_;
head_ = task->next_;
if (head_ == nullptr) {
tail_ = nullptr;
}
lock.unlock();
task->execute();
lock.lock();
}
}
这里看到会执行_manual_event_loop::operation的execute,最终会调用到execute_impl函数:
void execute_impl(task_base* t) noexcept {
auto& self = *static_cast<type*>(t);
if constexpr (is_stop_never_possible_v<stop_token_type>) {
unifex::set_value(std::move(self.receiver_));
} else {
// ...
}
}
然后大家也可以猜到了,就是调用receiver的set_value函数了,我们又要回到_submit::receiver的中了。
template(typename... Values)
void set_value(Values&&... values) && noexcept {
auto allocator = get_allocator(get_receiver());
unifex::set_value(std::move(get_receiver()), (Values &&) values...);
destroy(std::move(allocator));
}
这里的Values就是没有值,然后会调用_submit::receiver的receiver_成员的set_value函数,也就是value_receiver<Values, _then::receiver<Func, _sync_wait::receiver>>的函数:
void set_value() noexcept {
std::apply(
[&](Values && ... values) noexcept {
unifex::set_value(
std::forward<Receiver>(receiver_), (Values &&) values...);
},
std::move(values_));
}
接着就调用_then::receiver的set_value函数:
template <typename... Values>
void set_value(Values&&... values) && noexcept {
using result_type = std::invoke_result_t<Func, Values...>;
unifex::set_value(
(Receiver &&) receiver_,
std::invoke((Func &&) func_, (Values &&) values...));
}
这里只留下了一些关键代码方便理解,首先调用func函数并将返回值传递给下一个receiver的set_value函数。
然后就是_sync_wait::receiver的set_value函数:
template <typename... Values>
void set_value(Values&&... values) && noexcept {
unifex::activate_union_member(promise_.value_, (Values&&)values...);
promise_.state_ = promise<T>::state::value;
signal_complete();
}
void signal_complete() noexcept {
ctx_.stop();
}
这里就是把then执行完的赋给promise_,并且更新完成的状态。这里的ctx就是最开始sync_wait_r函数中manual_event_loop对象,调用stop告知sync_wait_r函数中阻塞全部任务已完成可以结束程序了。
用一张图完整的来描述sender和receiver构建的整个过程:
总结
本篇文章通过阅读libunifex的源码,带着大家了解下c++的executor的任务构建流程及前后依赖的任务执行过程,尽管executor还尚未进入标准,libunifex已经算比较好的诠释了exeutor,当然除了我们上边讲解的简单的前后依赖的任务,还有并行的任务流,libunifex使用when_all来实现。除了单线程single_thread_context的context,libunifex还提供了线程池的context来供使用。
文中主要侧重了解大概的流程而忽略了一些实现细节,比如说cpo,tag_invoke,一些concept等等。只能大家自己去学习了,可以参考ref的链接
ref
https://zhuanlan.zhihu.com/p/395250667
https://mp.weixin.qq.com/s/oKCtKZq1R5PkVTSJxEhLfQ
https://blog.csdn.net/QcloudCommunity/article/details/125611481 系列
https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/p1341r0.pdf