io_context关系图
io_context
io_context::io_context()
: impl_(add_impl(new impl_type(*this,
ASIO_CONCURRENCY_HINT_DEFAULT, false)))
{
}
io_context::io_context(int concurrency_hint): impl_(add_impl(new impl_type(*this, concurrency_hint == 1 ? ASIO_CONCURRENCY_HINT_1 : concurrency_hint, false)))
{
}
io_context::impl_type& io_context::add_impl(io_context::impl_type* impl)
{
asio::detail::scoped_ptr<impl_type> scoped_impl(impl);
asio::add_service<impl_type>(*this, scoped_impl.get());
return *scoped_impl.release();
}
io_context继承自execution_context,execution_context内部维护了一个service的管理器services_register。 因为sheduler继承自service, 所以可以看到在io_context的构造中,会通过add_service 将impl_添加到 services_register中,之后strand、socket等都会通过UseService这个函数将自身添加到这个链表当中(后面具体介绍到strand的时候在具体说明)。
io_context 对外提供了一批的接口,但实际上都送hi由impl_这个成员变量来实现具体操作。在io_context.ipp中可以看到run,poll等主要行为都是impl的封装。
io_context::count_type io_context::run()
{
asio::error_code ec;
count_type s = impl_.run(ec);
asio::detail::throw_error(ec);
return s;
}
io_context::count_type io_context::poll()
{
asio::error_code ec;
count_type s = impl_.poll(ec);
asio::detail::throw_error(ec);
return s;
}
scheduler
scheduler 的实现是在scheduler.ipp 下。
构造函数
scheduler::scheduler(asio::execution_context& ctx,
int concurrency_hint, bool own_thread)
: asio::detail::execution_context_service_base<scheduler>(ctx),
one_thread_(concurrency_hint == 1
|| !ASIO_CONCURRENCY_HINT_IS_LOCKING(
SCHEDULER, concurrency_hint)
|| !ASIO_CONCURRENCY_HINT_IS_LOCKING(
REACTOR_IO, concurrency_hint)),
mutex_(ASIO_CONCURRENCY_HINT_IS_LOCKING(
SCHEDULER, concurrency_hint)),
task_(0),
task_interrupted_(true),
outstanding_work_(0),
stopped_(false),
shutdown_(false),
concurrency_hint_(concurrency_hint),
thread_(0)
{
ASIO_HANDLER_TRACKING_INIT;
if (own_thread)
{
++outstanding_work_;
asio::detail::signal_blocker sb;
thread_ = new asio::detail::thread(thread_function(this));
}
}
可以看到构造函数参数由3个,一个是context的引用,concurrency_hint_ 好像没在代码中看到由什么作用,而own_thread是用来标识schedule是否有自己的线程,不过io_context创建的schedule这个参数都是false。
run
std::size_t scheduler::run(asio::error_code& ec)
{
ec = asio::error_code();
if (outstanding_work_ == 0)
{
stop();
return 0;
}
thread_info this_thread;
this_thread.private_outstanding_work = 0;
thread_call_stack::context ctx(this, this_thread);
mutex::scoped_lock lock(mutex_);
std::size_t n = 0;
for (; do_run_one(lock, this_thread, ec); lock.lock())
if (n != (std::numeric_limits<std::size_t>::max)())
++n;
return n;
}
调用io_context的run 实际上执行的是scheduler的run,可以看到run做的事情不多,一个是检查outstanding_work_ 是否为0,如果是0则说明没有任务需要执行;第二步是为执行run的线程单独设立一个栈,然后将scheduler 和 线程信息放入到栈内,这边使用方法会比较复杂,主要是使用线程特有存储的方式,因为这个部分没太大影响,所以细节先不深入。最后获取锁然后循环执行do_run_one。
do_run_one
std::size_t scheduler::do_run_one(mutex::scoped_lock& lock,
scheduler::thread_info& this_thread,
const asio::error_code& ec)
{
while (!stopped_)
{
if (!op_queue_.empty()) // 判断队列是否为空
{
// 不为空则取出头部
// Prepare to execute first handler from queue.
operation* o = op_queue_.front();
op_queue_.pop();
bool more_handlers = (!op_queue_.empty()); // 判断队列中是否还有剩余的任务
if (o == &task_operation_) // 判断是否是epoll标识的任务
{
task_interrupted_ = more_handlers;
if (more_handlers && !one_thread_) // 如果还有其他任务,且不止一个线程
wakeup_event_.unlock_and_signal_one(lock); // 释放锁,并唤醒另一个线程
else
lock.unlock(); // 正常释放锁
task_cleanup on_exit = { this, &lock, &this_thread }; // 构造一个IO任务的清理结构体,RAII方式析构时执行清理动作
(void)on_exit;
// Run the task. May throw an exception. Only block if the operation
// queue is empty and we're not polling, otherwise we want to return
// as soon as possible.
task_->run(more_handlers ? 0 : -1, this_thread.private_op_queue); // 执行epoll的run,实际上时epoll_wait的封装
}
else
{
std::size_t task_result = o->task_result_; // 获取临时数据,主要和 IO事件有关
if (more_handlers && !one_thread_) // 如果还有其他任务,且不止一个线程
wake_one_thread_and_unlock(lock); // 解锁并唤醒一条线程
else
lock.unlock(); // 解锁
// Ensure the count of outstanding work is decremented on block exit.
work_cleanup on_exit = { this, &lock, &this_thread }; // 构造一个普通任务的清理结构体,RAII方式析构时执行清理动作
(void)on_exit;
// Complete the operation. May throw an exception. Deletes the object.
o->complete(this, ec, task_result); // 执行任务
this_thread.rethrow_pending_exception();
return 1;
}
}
else
{
wakeup_event_.clear(lock); //释放锁
wakeup_event_.wait(lock); // 等待唤醒
}
}
return 0;
}
当执行do_run_one时,队列中没有任务的话,线程就会执行通过wakeup_event_来等待唤醒,wakeup_event_在linux下是由posix_event实现,posix_event内部维护了::pthread_cond_t条件变量,所以线程会通过条件变量来实现等待和唤醒。
当队列中有任务得时候,如果不是task_operation_,则获取其task_result,然后执行其complete。任务的类型是scheduler_operation,后面介绍Post的时候会再详细说scheduler_operation内部情况,这边认为是执行任务队列中相对应的任务即可。
epoll
当队列中任务是task_operation_时,会调用task->run,这个task_在linux下类型是epoll_reactor,run实际上是对epoll_wait的封装。
void epoll_reactor::run(long usec, op_queue<operation>& ops)
{
……
// Calculate timeout. Check the timer queues only if timerfd is not in use.
int timeout; // 计算需要等待的时间
if (usec == 0)
timeout = 0;
else
{
timeout = (usec < 0) ? -1 : ((usec - 1) / 1000 + 1);
if (timer_fd_ == -1)
{
mutex::scoped_lock lock(mutex_);
timeout = get_timeout(timeout);
}
}
// Block on the epoll descriptor.
epoll_event events[128];
int num_events = epoll_wait(epoll_fd_, events, 128, timeout); // epoll_wait
……
// Dispatch the waiting events.
for (int i = 0; i < num_events; ++i)
{
void* ptr = events[i].data.ptr;
if (ptr == &interrupter_)
{
// 如果描述符是否打断标记的,则什么都不做
}
else
{
descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);
if (!ops.is_enqueued(descriptor_data)) // 判断任务是否已经在私有队列中了,
{
descriptor_data->set_ready_events(events[i].events); //设置对应的事件
ops.push(descriptor_data); 加入私有队列
}
else
{
descriptor_data->add_ready_events(events[i].events); //额外添加对应事件
}
}
}
}
}
class descriptor_state : operation
{
…………
void set_ready_events(uint32_t events) { task_result_ = events; }
void add_ready_events(uint32_t events) { task_result_ |= events; }
…………
}
调用task_->run的时候,传入的参数有两个,一个是等待的时间,另一个是线程的私有队列。当scheduler的公共队列中有其他任务时,则传入0,调用不会阻塞。不然执行的线程就会一直等待在epoll_wait上,直到有I/O任务完成或者被中断。如果是正常的任务完成,则将任务放入私有队列等待后续放回倒scheduler的公共队列中。
这边说下asio这里是怎么做到打断epoll_wait的,
epoll_reactor::epoll_reactor(...){
....
// Add the interrupter's descriptor to epoll.
epoll_event ev = { 0, { 0 } };
ev.events = EPOLLIN | EPOLLERR | EPOLLET;
ev.data.ptr = &interrupter_;
epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupter_.read_descriptor(), &ev);
interrupter_.interrupt();
....
}
void epoll_reactor::interrupt()
{
epoll_event ev = { 0, { 0 } };
ev.events = EPOLLIN | EPOLLERR | EPOLLET;
ev.data.ptr = &interrupter_;
epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, interrupter_.read_descriptor(), &ev);
}
在epoll_reactor的构造函数里,会通过pipe两个fd,一个用来读,一个用来写,保存在interrupter_中,并在构造时就将其塞入epoll中。然后在写的fd中写入8个字节。因为触发模式的ET的,所以epoll中不会一直触发。当有线程执行interrupt函数的时候,会刷新读fd的描述符,导致阻塞在epoll_wait的线程被打断返回。
task_cleanup, work_cleanup
struct scheduler::task_cleanup
{
~task_cleanup()
{
if (this_thread_->private_outstanding_work > 0)
{
asio::detail::increment(
scheduler_->outstanding_work_,
this_thread_->private_outstanding_work);
}
this_thread_->private_outstanding_work = 0;
lock_->lock();
scheduler_->task_interrupted_ = true;
scheduler_->op_queue_.push(this_thread_->private_op_queue);
scheduler_->op_queue_.push(&scheduler_->task_operation_);
}
scheduler* scheduler_;
mutex::scoped_lock* lock_;
thread_info* this_thread_;
};
struct scheduler::work_cleanup
{
~work_cleanup()
{
if (this_thread_->private_outstanding_work > 1)
{
asio::detail::increment(
scheduler_->outstanding_work_,
this_thread_->private_outstanding_work - 1);
}
else if (this_thread_->private_outstanding_work < 1)
{
scheduler_->work_finished();
}
this_thread_->private_outstanding_work = 0;
#if defined(ASIO_HAS_THREADS)
if (!this_thread_->private_op_queue.empty())
{
lock_->lock();
scheduler_->op_queue_.push(this_thread_->private_op_queue);
}
#endif // defined(ASIO_HAS_THREADS)
}
scheduler* scheduler_;
mutex::scoped_lock* lock_;
thread_info* this_thread_;
};
task_cleanup, work_cleanup 内容级别上差不多,只是一个是清理epoll任务的结构,一个是清理普通任务的结构。主要执行的逻辑是将自身的private_outstanding_work累加到scheduler的outstanding_work上,并将私有队列的任务都移动到scheduler的公共队列上去。
poll
poll 实际上和run 基本一致,区别只是poll调用的是do_poll_one, 而do_poll_one 里也只是将do_run_one里的while循环去掉,且在poll前判断队列里是否有任务,没有直接返回。可以认为poll是不会阻塞的。
post
看完上面的执行任务的过程,这边开始看一下投递任务的过程。投递任务可以使用post 和 dispatch,这边先看post
template <typename LegacyCompletionHandler>
ASIO_INITFN_AUTO_RESULT_TYPE(LegacyCompletionHandler, void ())
io_context::post(ASIO_MOVE_ARG(LegacyCompletionHandler) handler)
{
return async_initiate<LegacyCompletionHandler, void ()>(
initiate_post(), handler, this);
}
async_initiate 函数会对参数和返回值类型进行一些处理,然后对于进行调用。这边initiate_post是一个struct, 所以会调用其operator()。
struct io_context::initiate_post
{
template <typename LegacyCompletionHandler>
void operator()(ASIO_MOVE_ARG(LegacyCompletionHandler) handler,
io_context* self) const
{
// If you get an error on the following line it means that your handler does
// not meet the documented type requirements for a LegacyCompletionHandler.
ASIO_LEGACY_COMPLETION_HANDLER_CHECK(LegacyCompletionHandler, handler) type_check;
detail::non_const_lvalue<LegacyCompletionHandler> handler2(handler); // 非const
bool is_continuation = asio_handler_cont_helpers::is_continuation(handler2.value);
// 这边非常绕,实际上是ASIO的内存管理机制。
typedef detail::completion_handler<
typename decay<LegacyCompletionHandler>::type, executor_type> op;
typename op::ptr p = { detail::addressof(handler2.value),
op::ptr::allocate(handler2.value), 0 };
p.p = new (p.v) op(handler2.value, self->get_executor());
ASIO_HANDLER_CREATION((*self, *p.p, "io_context", self, 0, "post"));
self->impl_.post_immediate_completion(p.p, is_continuation); // 最终调用scheduler的post_immediate_completion
p.v = p.p = 0;
}
};
上面的代码非常复杂和繁琐,实际上是为了使用ASIO的内存管理机制生成一个completion_handler类型的对象。生成完对象后,通过post_immediate_completion丢入到scheduler的公共队列中。
scheduler_operation
scheduler_operation 是scheduler 执行任务的基本单位。scheduler_operation主要目得是为了将外部函数的类型进行擦除,对scheduler来说调度时为统一的类型。不过其各个子类中会对原始的函数进行二次擦除。
template <typename Handler, typename IoExecutor>
class completion_handler : public operation
{
public:
ASIO_DEFINE_HANDLER_PTR(completion_handler);
completion_handler(Handler& h, const IoExecutor& io_ex)
: operation(&completion_handler::do_complete), // 构造时传入的实际是这个类的do_complete函数
handler_(ASIO_MOVE_CAST(Handler)(h)),
work_(handler_, io_ex)
{
}
static void do_complete(void* owner, operation* base,
const asio::error_code& /*ec*/,
std::size_t /*bytes_transferred*/)
{
// 整个过程包装非常复杂,更具体的细节我也没深究
// Take ownership of the handler object.
completion_handler* h(static_cast<completion_handler*>(base));
ptr p = { asio::detail::addressof(h->handler_), h, h };
ASIO_HANDLER_COMPLETION((*h));
// Take ownership of the operation's outstanding work.
handler_work<Handler, IoExecutor> w( ASIO_MOVE_CAST2(handler_work<Handler, IoExecutor>)(h->work_));
Handler handler(ASIO_MOVE_CAST(Handler)(h->handler_));
p.h = asio::detail::addressof(handler);
p.reset();
// Make the upcall if required.
if (owner)
{
fenced_block b(fenced_block::half);
ASIO_HANDLER_INVOCATION_BEGIN(());
w.complete(handler, handler); // 执行对应函数
ASIO_HANDLER_INVOCATION_END;
}
}
private:
Handler handler_;
handler_work<Handler, IoExecutor> work_;
};
class scheduler_operation ASIO_INHERIT_TRACKED_HANDLER
{
public:
typedef scheduler_operation operation_type;
void complete(void* owner, const asio::error_code& ec,
std::size_t bytes_transferred)
{
func_(owner, this, ec, bytes_transferred); //执行构造函数时传入的函数
}
protected:
typedef void (*func_type)(void*,
scheduler_operation*,
const asio::error_code&, std::size_t);
scheduler_operation(func_type func)
: next_(0),
func_(func),
task_result_(0)
{
}
private:
friend class op_queue_access;
scheduler_operation* next_;
func_type func_;
protected:
friend class scheduler;
unsigned int task_result_; // Passed into bytes transferred.
};
scheduler_operation 的complete过程还是比较简单的, 执行传入的函数,并将scheduler、operation、errcode、bytes_transferred 4个参数传入。但因为字类实际上包装的时候,传入的时直接的do_complete函数,其内部包装非常复杂,目前我也还没特别搞清楚。
post_immediate_completion
void scheduler::post_immediate_completion(
scheduler::operation* op, bool is_continuation)
{
#if defined(ASIO_HAS_THREADS)
if (one_thread_ || is_continuation)
{
if (thread_info_base* this_thread = thread_call_stack::contains(this))
{
++static_cast<thread_info*>(this_thread)->private_outstanding_work;
static_cast<thread_info*>(this_thread)->private_op_queue.push(op);
return;
}
}
#else // defined(ASIO_HAS_THREADS)
(void)is_continuation;
#endif // defined(ASIO_HAS_THREADS)
work_started();
mutex::scoped_lock lock(mutex_);
op_queue_.push(op);
wake_one_thread_and_unlock(lock);
}
执行到post_immediate_completion,可以看出传入的时operation 的指针, 宏定义内部是对单线程的优化,但schduler指定单线程运行的时候,这边就会使用无锁版本进行插入。多线程下,可以看到需要先获取到锁,然后插入到公共队列中。然后执行wake_one_thread_and_unlock 尝试唤醒一条线程。
wake_one_thread_and_unlock
void scheduler::wake_one_thread_and_unlock(
mutex::scoped_lock& lock)
{
if (!wakeup_event_.maybe_unlock_and_signal_one(lock))
{
if (!task_interrupted_ && task_)
{
task_interrupted_ = true;
task_->interrupt();
}
lock.unlock();
}
}
唤醒线程的方式也很简单,先尝试使用信号量唤醒等待中的线程,如果没有线程等待在信号量上的话,且当前没有打断过epoll,那么就会尝试对阻塞在epoll上的线程进行打断。
总结
io_context中主要的代码流程还是简单的梳理了一下,如果不太看重各个位置实现的细节的话吗,io_context的代码还是稍微能看懂一点的,但如果要对细节也了如指掌的话,确实很有难度。尤其是关于operation申请内存和调用的位置确实没怎么懂;不过我这边整理得不是很好,都是看到哪里就整理到哪里,看起来会比较得乱。