从12月20号开始看Muduo网络库,到28号的时候弄懂了EventLoop, Poller, Channel是怎么一回事,一番琢磨之后觉得还是应该发到博客上跟大家分享,特此记录。
对照linyacool那个webserver的实现,再看了一遍muduo的EventLoop, Poller ,Channel, 突然开悟了。原来这个架构不管是主线程mainreactor(Acceptor)还是工作线程subreactor都是靠EventLoop驱动的,每个EventLoop都手持一个Poller, EventLoop内部循环调用Poller::poll()监听各自channel数组上的事件,监听到活跃事件后就将活跃的channel包装成数组返回给EventLoop(其实就是模拟epoll),然后EventLoop遍历这个数组逐个触发其上事件,用channel封装fd的意义在于不同的channel对于同样的事件类型要注册不同的事件回调,比如同样是读事件,监听socket封装成的channel注册的读回调是接收连接,而客户端socket注册的读回调是读取请求。
(为打字方便,以下mainreactor一律称作主线程,subreactor一律称作工作线程)
void EventLoop::loop() {
assert(!looping_);
assert(isInLoopThread());
looping_ = true;
quit_ = false;
// LOG_TRACE << "EventLoop " << this << " start looping";
std::vector<SP_Channel> activeChannels;
while (!quit_) {
// cout << "doing" << endl;
activeChannels.clear();
activeChannels = poller_->poll();
eventHandling_ = true;
for (auto& it : activeChannels) it->handleEvents();
eventHandling_ = false;
doPendingFunctors();
poller_->handleExpired();
}
looping_ = false;
}
Epoller::poll(){
int nevent = epoll_wait();
//..填充activeChannels
}
对于主线程只有一个监听socket包装成的acceptChannel, 这个channel在被EventLoop触发后接受连接,然后将连接socket按照round-robin算法分发给工作线程, 但是这里有个问题,就是主线程给工作线程分发连接的时候,工作线程很可能在忙于处理某个channel的请求,怎么办呢?
那只能延迟执行了,因此每个EventLoop都有一个pendingFunctors队列,pendingFunctors队列专门用于存放来自外部线程的"任务委托",EventLoop将活跃channels上事件都处理完了就会去处理这个队列,回到连接的例子上就是主线程分发连接时调用工作线程的queueInLoop()方法,将初始化连接的回调放入到队列中,等待着工作线程的EventLoop处理完请求后去清空这个队列。
void EventLoop::runInLoop(Functor&& cb) {
if (isInLoopThread())
cb();
else
queueInLoop(std::move(cb));
}
void EventLoop::queueInLoop(Functor&& cb) {
{
MutexLockGuard lock(mutex_);
pendingFunctors_.emplace_back(std::move(cb));
}
if (!isInLoopThread() || callingPendingFunctors_) wakeup();
}
此外还要考虑一种情况,工作线程的Poller只能监听已经连接的channel,对于尚在pendingFunctors队列中的新连接的channel是poll()不到的, 这样要处理新连接,工作线程就只能等到其上poller的channel数组出现活跃,处理活跃channels的时候顺便doPendingFunctors()去处理新连接。
void EventLoop::doPendingFunctors() {
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
MutexLockGuard lock(mutex_);
functors.swap(pendingFunctors_);
}
for (size_t i = 0; i < functors.size(); ++i) functors[i]();
callingPendingFunctors_ = false;
}
那么假设工作线程很长一段时间得不到新事件,阻塞在poll()中,但是pendingFunctor队列中还挤压着任务要处理那怎么办呢?
比如工作线程正在调用doPendingFunctors()函数处理队列内的事件,但这时候主线程恰好调用queueInLoop()放了新的连接入队列(这里解释一下,为了避免竞态条件,doPendingFunctors的实现是准备一个临时队列,直接将pendingFunctors队列上的委托swap到这个临时队列上, 这样只需要在swap的时候加锁就行,减少临界区长度,加快效率)
工作线程注意不到,之后工作线程的的Poller上再也监测不到新事件,那么工作线程就阻塞在poll中无法处理队列上的新连接了,而且以后主线程还可能再调用queueInLoop()放新的连接进去,工作线程无法响应,怎么办?
因此EventLoop上又引入了一个wakeChannel和wakeup()方法,主线程调用工作线程EventLoop的queueInLoop()方法的时候也会调用其wakeup方法,往其wakeupChannel上写入一字节,打破poll()的僵局。这样连接阻塞的问题就解决了。(当然, wakeupChannel要加入epoll的监视树中)
void EventLoop::wakeup() {
uint64_t one = 1;
ssize_t n = writen(wakeupFd_, (char*)(&one), sizeof one);
if (n != sizeof one) {
LOG << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
}
}
有了如下理解,回头再看原书的时序图就清晰多了: