One Loop Per Thread的含义就是,一个EventLoop和一个线程唯一绑定,和这个EventLoop有关的,被这个EventLoop管辖的一切操作都必须在这个EventLoop绑定线程中执行
1.在MainEventLoop中,负责新连接建立的操作都要在MainEventLoop线程中运行。
2.已建立的连接分发到某个SubEventLoop上,这个已建立连接的任何操作,比如接收数据发送数据,连接断开等事件处理都必须在这个SubEventLoop线程上运行,还不准跑到别的SubEventLoop线程上运行。
一.主要成员
二.wakeupFd_
调用函数eventfd()会创建一个eventfd对象,或者也可以理解打开一个eventfd类型的文件,类似普通文件的open操作。
eventfd的在内核空间维护一个无符号64位整型计数器, 初始化为initval的值。
#include <sys/eventfd.h>
int eventfd(unsigned int initval, int flags);
flags是以下三个标志位OR结果
- EFD_CLOEXEC(2.6.27~) : eventfd()返回一个文件描述符,如果该进程被fork的时候,这个文件描述符也会被复制过去,这个时候就会有多个描述符指向同一个eventfd对象,如果设置了这个标志,则子进程在执行exec的时候,会自动清除掉父进程的这个文件描述符。
- EFD_NONBLOCK(2.6.27~):文件描述符会被设置为O_NONBLOCK,如果没有设置这个标志位,read操作的时候将会阻塞直到计数器中有值,如果设置了这个这个标志位,计数器没有值得时候也会立刻返回-1。
- EFD_SEMAPHORE(2.6.30~): 这个标志位会影响read操作。
三.EventLoop对象和一个线程唯一绑定
/***** EventLoop.cc *****/
__thread EventLoop *t_loopInThisThread = nullptr;
EventLoop::EventLoop() :
wakeupFd_(createEventfd()), //生成一个eventfd,每个EventLoop对象,都会有自己的eventfd
wakeupChannel_(new Channel(this, wakeupFd_))
{
LOG_DEBUG("EventLoop created %p in thread %d \n", this, threadId_);
if(t_loopInThisThread) //如果当前线程已经绑定了某个EventLoop对象了,那么该线程就无法创建新的EventLoop对象了
LOG_FATAL("Another EventLoop %p exits in this thread %d \n", t_loopInThisThread, threadId_);
else
t_loopInThisThread = this;
wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));
wakeupChannel_->enableReading();
}
介绍一下这个__thread,这个__thread是一个关键字,被这个关键字修饰的全局变量 t_loopInThisThread会具备一个属性,每个线程私有一份
在EventLoop对象的构造函数中,如果当前线程没有绑定EventLoop对象,那么t_loopInThisThread为nullptr,然后就让该指针变量指向EventLoop对象的地址。如果t_loopInThisThread不为nullptr,说明当前线程已经绑定了一个EventLoop对象了,这时候EventLoop对象构造失败!
四. EventLoop的线程只运行该Loop的操作
我们来描绘一个情景,我们知道每个EventLoop线程主要就是在执行其EventLoop对象的loop函数(该函数就是一个while循环,循环的获取事件监听器的结果以及调用每一个发生事件的Channel的事件处理函数)。此时SubEventLoop上注册的Tcp连接都没有任何动静,整个SubEventLoop线程就阻塞在epoll_wait()上。
EventLoop* ioLoop = threadPool_->getNextLoop();
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
此时MainEventLoop接受了一个新连接请求,并把这个新连接封装成一个TcpConnection对象,并且希望在SubEventLoop线程中执行TcpConnection::connectEstablished()函数,因为该函数的目的是将TcpConnection注册到SubEventLoop的事件监听器上,并且调用用户自定义的连接建立后的处理函数。当该TcpConnection对象注册到SubEventLoop之后,这个TcpConnection对象的任何操作(包括调用用户自定义的连接建立后的处理函数。)都必须要在这个SubEventLoop线程中运行,所以TcpConnection::connectEstablished()函数必须要在SubEventLoop线程中运行。
那么我们怎么在MainEventLoop线程中通知SubEventLoop线程起来执行TcpConnection::connectEstablished()函数呢?这里就要好好研究一下EventLoop::runInLoop()函数了。
void EventLoop::runInLoop(Functor cb)
{//该函数保证了cb这个函数对象一定是在其EventLoop线程中被调用。
if(isInLoopThread())//如果当前调用runInLoop的线程正好是EventLoop的运行线程,则直接执行此函数
cb();
else//否则调用 queueInLoop 函数
queueInLoop(cb);
}
void EventLoop::queueInLoop(Functor cb)
{
{
unique_lock<mutex> lock(mutex_);
pendingFunctors_.emplace_back(cb);
}
if(!isInLoopThread() || callingPendingFunctors_)
wakeup();
}
void EventLoop::wakeup()
{
uint64_t one = 1;
ssize_t n = write(wakeupFd_, &one, sizeof(one));
if(n != sizeof(n))
LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8 \n", n);
}
EventLoop* ioLoop = threadPool_->getNextLoop();
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
此时的Loop为线程池EventLoopThreadPool中取出的一个从Reactor
ioLoop调用runInLoop,将cb放入queue(pendingFunctors)中,调用wakeup唤醒ioLoop线程
void EventLoop::loop()
{ //EventLoop 所属线程执行
looping_ = true;
quit_ = false;
LOG_INFO("EventLoop %p start looping \n", this);
while(!quit_)
{
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);//此时activeChannels已经填好了事件发生的channel
for(Channel *channel : activeChannels_)
channel->HandlerEvent(pollReturnTime_);
doPendingFunctors(); //执行当前EventLoop事件循环需要处理的回调操作。
}
}
void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
unique_lock<mutex> lock(mutex_);
functors.swap(pendingFunctors_); //这里的swap其实只是交换的vector对象指向的内存空间的指针而已。
}
for(const Functor &functor:functors)
{
functor();
}
callingPendingFunctors_ = false;
}
从Reactor的loop过程中,处理activeChannels_时,会包含wakeup_channel,此时可以接管TcpConnection,(已经建立且就绪的事件,就调用相应的回调函数即可)