前言
有三个核心组件支撑一个reactor实现 [持续] 的 [监听] 一组fd,并根据每个fd上发生的事件 [调用] 相应的处理函数。这三个组件就是 EventLoop 、Channel 以及 Poller 三个类,其中 EventLoop 可以看作是对业务线程的封装,而 Channel 可以看作是对 每个已经建立连接 的封装(即accept(3) 返回的文件描述符)
EventLoop类
先明确一点,作为一个网络服务器,需要有 持续监听、持续获取监听结果、持续处理监听结果对应事件的能力,也就是 循环 去 调用Poller::poll方法获取实际发生事件的Channel集合,然后调用这些Channel里面保管的不同类型事件的处理函数,也就是Channel::HandlerEvent方法。绿色部分不懂没关系,继续看。
总之EventLoop就是负责实现 循环 ,负责驱动循环的主要模块,Channel和Poller都是他的手下,因为EventLoop整合封装了两者,并且向上提供了更方便的接口来使用。
在EventLoop的类定义中,除了⼀些状态量以外,每个 EventLoop 持有⼀个 Poller 的智能指针(对 epoll / poll 的封装),⼀个用于 EventLoop 之间通信的 Channel ,自己的线程 id,互斥锁以及装有等待处理函数的 vector 。很明显, EventLoop 并不直接管理各个连接的 Channel (文件描述符的封装),而是通过Poller 来进行的。 EventLoop 中最核心的函数就是 EventLoop::Loop() 。
EventLoop::loop(),代码如下:
void EventLoop::loop()
{
// 开始事件循环 调⽤该函数的线程必须是该EventLoop所在线程
assert(!is_looping_);
assert(is_in_loop_thread());
is_looping_ = true;
is_stop_ = false;
while(!is_stop_)
{
// 1、epoll_wait阻塞 等待就绪事件
auto ready_channels = poller_->Poll();
is_event_handling_ = true;
// 2、处理每个就绪事件(不同channel绑定了不同的callback)
for (auto& channel : ready_channels) {
channel->HandleEvents();
}
is_event_handling_ = false;
// 3、执⾏正在等待的函数(fd注册到epoll内核事件表)
PerformPendingFunctions();
// 4、处理超时事件 到期了就从定时器⼩根堆中删除(定时器析构会EpollDel掉fd)
poller_->HandleExpire();
}
is_looping_ = false;
}
每个EventLoop对象都唯一绑定了一个线程,这个线程其实就在一直执行这个函数里面的while循环,这个while循环的大致逻辑比较简单。就是调用Poller::poll方法获取事件监听器上的监听结果。接下来在loop里面就会调用监听结果中每一个Channel的处理函数HandlerEvent( )。每一个Channel的处理函数会根据Channel类中封装的实际发生的事件,执行Channel类中封装的各事件处理函数。比如一个Channel发生了可读事件,可写事件,则这个Channel的HandlerEvent( )就会调用提前注册在这个Channel的可读事件和可写事件处理函数,又比如另一个Channel只发生了可读事件,那么HandlerEvent( )就只会调用提前注册在这个Channel中的可读事件处理函数。
所以总结,每个EventLoop实际上就做了四件事
- epoll_wait阻塞 等待就绪事件(没有注册其他fd时,可以通过event_fd来异步唤醒)
- 处理每个就绪事件
- 执行正在等待的函数(fd注册到epoll内核事件表)
- 处理超时事件,到期了就从定时器小根堆中删除
Channel类
接下来解释EventLoop的两个手下之一Channel,Channel类其实相当于一个文件描述符的保姆。
想要通过 IO 多路复用(epoll / poll)监听某个文件描述符,就需要把这个 fd 和该 fd 感兴趣 的事件通过 epoll_ctl 注册到 IO 多路复用模块(事件监听器)上。当 IO 多路复用模块监听到该 fd 发生了某个事件。事件监听器返回发生事件的 fd 集合(有哪些 fd 发生了事件)以及每个 fd 的事件集合(每个 fd 具体发生了什么事件)。
Channel类则封装了一个 [fd] 和这个 [fd感兴趣事件] 以及事件监听器监听到 [该fd实际发生的事件]。同时Channel类还提供了设置该fd的感兴趣事件,以及将该fd及其感兴趣事件注册到事件监听器或从事件监听器上移除,以及保存了该fd的每种事件对应的处理函数。
Channel类重要的成员变量
- fd_:Channel对象照看的文件描述符
- events_:代表fd感兴趣的事件类型集合,或者说正在监听的事件
- revents_:代表事件监听器实际监听到该fd发生的事件类型集合,或者说是返回的就绪事件,当事件监听器监听到一个fd发生了什么事件,通过Channel::set_revents()函数来设置revents值
- last_events_:上一此事件(主要用于记录如果本次事件和上次事件⼀样 就没必要调用
- read_handler_,write_handler_,update_handler_,error_handler_:这些是std::function类型的各种回调函数,代表这个Channel为这个文件描述符保存的各事件类型发生时的处理函数。
Channel类重要的成员方法
Channel::HandleEvents()方法
/ IO事件的回调函数 EventLoop中调⽤Loop开始事件循环 会调⽤Poll得到就绪事件
// 然后依次调⽤此函数处理就绪事件
void Channel::HandleEvents() {
events_ = 0;
// 触发挂起事件 并且没触发可读事件
if ((revents_ & EPOLLHUP) && !(revents_ & EPOLLIN)) {
events_ = 0;
return;
}
// 触发错误事件
if (revents_ & EPOLLERR) {
HandleError();
events_ = 0;
return;
}
// 触发可读事件 | ⾼优先级可读 | 对端(客户端)关闭连接
if (revents_ & (EPOLLIN | EPOLLPRI | EPOLLRDHUP)) {
HandleRead();
}
// 触发可写事件
if (revents_ & EPOLLOUT) {
HandleWrite();
}
//处理更新监听事件(EpollMod)
HandleUpdate();
}
每个Channel对象只属于一个EventLoop ,即只属于一个IO线程,只负责一个文件描述符fd的IO时间分发,但并不拥有这个fd,Channel把不同的IO事件分发为不同的回调,回调用C++11的特性function表示。
从Channel类的定义中可以看出,每个Channel持有一个文件描述符,正在监听的事件,已经发生的事件(由Poller返回),以及各个事件的回调函数的Function对象。
总的来说,Channel就是对fd事件的封装,包括注册它的事件以及回调。EventLoop通过调用Channel::handleEvent()来执行Channel的读写事件。Channel::handleEvent() 的实现也非常简单,就是比较已经发生的事件(由 Poller 返回),来调用对应的回调函数(读、写、错误)。
Poller类
Poller 类的作用就是负责监听文件描述符事件是否触发以及返回发生事件的文件描述符以及具体事件。所以一个Poller 对象对应⼀个 IO 多路复用模块。在 muduo 中,⼀个 EventLoop 对应一个Poller 。
Epoll代码如下 :
class Epoll {
public:
Epoll();
~Epoll();
void epoll_add(const sp_Channel& request);
void epoll_mod(const sp_Channel& request);
void epoll_del(const sp_Channel& request);
void poll(std::vector<sp_Channel>& req);
private:
int epollFd_;
std::vector<epoll_event> events_; // epoll_wait()返回的活动事件都放在这个数组⾥
std::unordered_map<int, sp_Channel> channelMap_;
};
Poller类的主要成员变量有三个:
- epollFd_:就是用epoll_create方法返回的epoll句柄
- events:存放epoll_wait()返回的活动事件,是一个结构体
- channelMap_ :这个变量是 std::unordered_map<int, std::shared_ptr<Channel>> 类型,负责记录 文件描述符fd -> Channel 的映射,也帮忙保管所有注册在你这个 Poller 上的 Channel 。
其他函数无非就是对Epoll_ctl(4)和 Epoll_wait(4)的封装
void Epoll::poll(std::vector<sp_Channel>& req) {
int event_count =
Epoll_wait(epollFd_, &*events_.begin(), events_.size(), EPOLLWAIT_TIME);
for(int i = 0; i < event_count; ++i) {
int fd = events_[i].data.fd;
sp_Channel temp = channelMap_[fd];
temp->setRevents(events_[i].events);
req.emplace_back(std::move(temp));
}
// LOG << "Epoll finished";
}
Epoll::poll(1) 这个函数可以说是 Poller 的核心了,当外部调用 poll 方法的时候,该方法底层其实是通过epoll_wait 获取这个事件监听器上发生事件的 fd 及其对应发生的事件,我们知道每个 fd 都是由⼀个Channel封 装的,通过哈希表 channelMap_ 可以根据 fd 找到封装这个 fd 的 Channel 。将 IO 多路复用模块监听到该 fd 发生 的事件写进这个 Channel 中的 revents 成员变量中。然后把这个 Channel 装进 req 中。这样,当外界调用完poll 之后就能拿到 IO 多路复用模块的监听结果( std::vector<sp_Channel>& req )。