接着上文[muduo网络库]——muduo库的Reactor模型(剖析muduo网络库核心部分、设计思想),接下来详细介绍一下这三大核心组件中的Channel类。
先回顾一下三大核心组件之间的关系。
接着我们进入正题。
Channel
Channel类封装了一个 fd 、fd感兴趣事件events、该fd实际发生的事件revents。同时Channel类还提供了设置该fd的感兴趣事件,以及相应的回调函数。
重要成员变量
EventLoop *loop_; //事件循环
const int fd_; //fd:poller监听的对象 epoll_ctl
int events_; //注册fd感兴趣的事件
int revents_; //poller返回的具体发生的事件
//因为channel可以获得fd最终发生的具体事件revent,所以他负责回调
//他们都属于std::function<>类型,保管着可调用的函数 ;
ReadEventCallback readCallback_;
EventCallback writeCallback_;
EventCallback closeCallback_;
EventCallback errorCallback_;
重要成员函数
- 设置回调函数对象
void setReadCallback(ReadEventCallback cb) { readCallback_=std::move(cb);}
void setWriteCallback(EventCallback cb) { writeCallback_=std::move(cb);}
void setCloseCallback(EventCallback cb) { closeCallback_=std::move(cb);}
void setErrorCallback(EventCallback cb) { errorCallback_=std::move(cb);}
- 设置fd相应的状态,update()相当于调用epoll_ctl
void enableReading() { events_ |= kReadEvent; update();} //相当于把读事件给events相应的位置位了
void disableReading() { events_ &= ~kReadEvent; update();}
void enableWriting() { events_ |= kWriteEvent; update();}
void disableWriting() { events_ &= ~kWriteEvent; update();}
void disableAll() { events_ = kNoneEvent; update();}
这里的update()
,实际上就是调用了EventLoop
里面的updateChannel()
,进一步调用EPollPoller::updateChannel
,在EventLoop
里面我们还会进一步看到。
- 同理还有
remove()
,最终也是调用了EPollPoller::removeChannel
。
void Channel::remove()
{
loop_->removeChannel(this);
}
- 封装fd,fd感兴趣的事件以及实际发生的事件
int fd() const {return fd_;}
int events() const {return events_;}
int set_revents(int revt) { revents_=revt; }
- fd得到poller通知以后,调用相应的方法来处理事件过程为handleEvent -> handleEventWithGuard -> 回调
read_callback_/write_callback_/close_callback_/error_callback_
。
void handleEvent(TimeStamp receiveTime);
- 还有一个
Channel::tie()
方法
我们知道,当客户端正常断开TCP连接,IO事件会触发Channel中的设置的CloseCallback回调,但是用户代码在onClose()中有可能析构Channel对象,导致回调执行到一半的时候,其所属的Channel对象本身被销毁了。这时程序会出现问题。muduo的解决办法是提供Channel::tie(const boost::shared_ptr<void>&)
这个函数,用于延长某些对象的生命期,使之长过Channel::handleEvent()
函数。所以muduo库中的 TcpConnection采用了shared_ptr管理对象生命期。单说的意义并不大,所以在之后的剖析中遇到tie
,会进一步介绍它的巧妙之处。 - 处理事件
void Channel::handleEvent(TimeStamp receiveTime)
{
if(tied_)
{
std::shared_ptr<void> guard;
guard = tie_.lock(); //提升
if(guard)
{
handleEventWithGuard(receiveTime);
}
}
else
{
handleEventWithGuard(receiveTime);
}
}
void Channel::handleEventWithGuard(TimeStamp receiveTime)
{
LOG_INFO("channel handleEvent revents:%d\n",revents_);
// 连接断开,并且fd上没有可读数据(默认水平触发)
if((revents_ & EPOLLHUP) && !(revents_ & EPOLLIN))
{
if(closeCallback_)
{
closeCallback_();
}
}
if(revents_ & EPOLLERR)
{
if (errorCallback_)
{
errorCallback_();
}
}
if(revents_ & (EPOLLIN | EPOLLPRI))
{
if(readCallback_)
{
readCallback_(receiveTime);
}
}
if(revents_ & EPOLLOUT)
{
if(writeCallback_)
{
writeCallback_();
}
}
}
在这里,我们可以看出handleEvent中tie
实际上这是一个弱指针,绑定到TcpConnection的共享指针 ,如果可以原来的弱指针,变成了强指针,这时候tie()
的作用就表明了,延长了TcpConnection的生命周期,使之长过Channel::handleEvent(),保证了TcpConnection不被销毁,因此Channel中存了一个TcpConnection的弱指针,在处理事件的时候,lock将引用计数加1,Channel::handleEvent()来关闭连接时,guard变量依然持有一份TcpConnection。也就是说Channel不会在执行完Channel::handleEvent()之前被析构。这一点在这里可能还是有点稀里糊涂,在我们剖析到TcpConnection时,我还会再来分析一遍。如果难以理解,这里可以先记住有这个指针即可。
handleEventWithGuard根据revents_不同的值调用不同的回调函数,revents_的值是在channel->set_revents(events_[i].events);
,由Poll检测是什么事件,给revents_赋相应的值,处理不同的事件。
本小节有关于Channel类核心部分就到此结束了,下一篇我们来看第二大核心部分 Poller/EpollPoller类。
本小节就到这里,下一篇我会对着三个核心组件进行一个详细的剖析介绍,希望有需要的小伙伴可以持续关注哦~
代码地址:https://github.com/Cheeron955/mymuduo/tree/master
最后附上Channel类源码
Channel.h
#pragma once
#include "noncopyable.h"
#include "TimeStamp.h"
#include <functional>
#include <memory>
/*
理清楚 EventLoop Channel,Poller之间的关系 他们在Reactor上面对应的Demultiplex
Channel 理解为通道,封装了sockfd和其感兴趣的event,如EPOLLIN EPOLLOUT事件
还绑定了poller返回的具体事件
*/
class EventLoop;
class Channel : noncopyable
{
public:
using EventCallback = std::function<void()> ;
using ReadEventCallback = std::function<void(TimeStamp)>;
Channel(EventLoop *loop,int fd);//只用类型对应的指针,大小是固定的四个字节,不影响编译,所以直接可以前置声明EventLoop就可以
~Channel();
//fd得到poller通知以后,调用相应的方法来处理事件
void handleEvent(TimeStamp receiveTime);//receiveTime变量,必须包含头文件
//设置回调函数对象
void setReadCallback(ReadEventCallback cb) { readCallback_=std::move(cb);}
void setWriteCallback(EventCallback cb) { writeCallback_=std::move(cb);}
void setCloseCallback(EventCallback cb) { closeCallback_=std::move(cb);}
void setErrorCallback(EventCallback cb) { errorCallback_=std::move(cb);}
//防止当channel被手动remove掉,channel还在执行回调操作
void tie(const std::shared_ptr<void>&);
int fd() const {return fd_;}
int events() const {return events_;}
int set_revents(int revt) { revents_=revt; }
//设置fd相应的状态 update()相当于调用epoll_ctl
void enableReading() { events_ |= kReadEvent; update();} //相当于把读事件给events相应的位置位了
void disableReading() { events_ &= ~kReadEvent; update();}
void enableWriting() { events_ |= kWriteEvent; update();}
void disableWriting() { events_ &= ~kWriteEvent; update();}
void disableAll() { events_ = kNoneEvent; update();}
//返回fd当前的事件状态
bool isNoneEvent() const {return events_ == kNoneEvent;}
bool isReading() const {return events_ & kReadEvent;}
bool isWriting() const {return events_ & kWriteEvent;}
int index() {return index_;}
void set_index(int idx) { index_ = idx;}
// one loop per thread
EventLoop* onwerLoop() {return loop_;}
void remove();
private:
void update();
void handleEventWithGuard(TimeStamp receiveTime);
//成员变量
static const int kNoneEvent; //fd的状态 没有感兴趣的
static const int kReadEvent; //读
static const int kWriteEvent; //写
EventLoop *loop_; //事件循环
const int fd_; //fd:poller监听的对象 epoll_ctl
int events_; //注册fd感兴趣的事件
int revents_; //poller返回的具体发生的事件
int index_;
std::weak_ptr<void> tie_;
bool tied_;
//因为channel可以获得fd最终发生的具体事件revent,所以他负责回调
ReadEventCallback readCallback_;
EventCallback writeCallback_;
EventCallback closeCallback_;
EventCallback errorCallback_;
};
Channel.cc
#include "Channel.h"
#include "EventLoop.h"
#include "logger.h"
#include <sys/epoll.h>
const int Channel::kNoneEvent = 0;
const int Channel::kReadEvent =EPOLLIN | EPOLLPRI;
const int Channel::kWriteEvent = EPOLLOUT;
Channel::Channel(EventLoop *loop,int fd)
: loop_(loop)
, fd_(fd)
, events_(0)
, revents_(0)
, index_(-1)
, tied_(false)
{
}
Channel::~Channel()
{
}
//channel的tie方法 在一个TcpConnection新连接创建的时候 调用
//TcpConnection->channel
void Channel::tie(const std::shared_ptr<void> &obj)
{
tie_ = obj;
tied_ = true;
}
/*
当改变channel所表示的events事件后,update负责在poller里面更改fd相应的事件 epoll_ctl
EventLoop => ChannelList Poller
*/
void Channel::update()
{
//通过channel所属的EventLoop,调用Poller相应的方法,注册fd的events事件
loop_->updateChannel(this);
}
//在channel所属的EventLoop中 ,把当前的channel删除掉
void Channel::remove()
{
loop_->removeChannel(this);
}
void Channel::handleEvent(TimeStamp receiveTime)
{
if(tied_)
{
std::shared_ptr<void> guard;
guard = tie_.lock(); //提升
if(guard)
{
handleEventWithGuard(receiveTime);
}
}
else
{
handleEventWithGuard(receiveTime);
}
}
void Channel::handleEventWithGuard(TimeStamp receiveTime)
{
LOG_INFO("channel handleEvent revents:%d\n",revents_);
// 连接断开,并且fd上没有可读数据(默认水平触发)
if((revents_ & EPOLLHUP) && !(revents_ & EPOLLIN))
{
if(closeCallback_)
{
closeCallback_();
}
}
if(revents_ & EPOLLERR)
{
if (errorCallback_)
{
errorCallback_();
}
}
if(revents_ & (EPOLLIN | EPOLLPRI))
{
if(readCallback_)
{
readCallback_(receiveTime);
}
}
if(revents_ & EPOLLOUT)
{
if(writeCallback_)
{
writeCallback_();
}
}
}