目录
1 设计思想
eventfd
创建eventfd
2 实现
3 联合调试
4 整合定时器模块
5 联合超时模块调试
1 设计思想
EventLoop 模块是和线程一一绑定的,每一个EventLoop模块内部都管理了一个Poller对象进行事件监控,同时管理着多个Connection对象,每一个连接所要完成的操作最终都会在他所绑定的EventLoop对象中进行。 EventLoop 模块是对事件进行整体管理和操作的,它内部包含的Channel和Poller分别负责事件管理和事件监控,但是最终执行各个事件都是要在EventLoop中执行,以此来保证线程安全。
eventfd
但是,在EventLoop内部的 Connection 有事件到来时,EventLoop 模块怎么知道呢?内部的Connection 有事件到来之后,我们要有办法能够通知 EventLoop 模块 。
而事件通知我们以前学过信号,但是在我们的主从Reactor 服务器中,是无法使用信号来通知具体的某一个EventLoop线程的,因为信号是针对进程的,但是事件具体分配给进程内的哪一个线程去执行我们是不确定的。
所以,我们需要一个新的事件通知机制。 最简单的就是文件描述符,我们只需要给每一个 EventLoop线程专门分配一个 用于事件通知的文件描述符,每一次EventLoop内部的Connection有事件到来时,我们就可以调用EventLoop提供的方法向该文件描述符中写入数据,那么我们就可以通过EventLoop的读事件就绪,来唤醒EventLoop线程进行事件的处理了。
而 eventfd 就是一个专门用于事件通知的文件描述符,他在内核中相当于管理的数据结构中,存在一个计数器,这个计数器中的数字就是事件通知的次数。 每次我们向该文件描述符中写入一个数值,就是会将该计数器的数值加一。
而我们可以使用 read 读取事件通知次数,读取到的数据就是事件的通知次数,读取完之后,计数就清零了。
那么eventfd 怎么使用呢?
他需要包含头文件 <sys/eventfd.h>
创建eventfd
使用 eventfd 接口
第一个参数就是设置计数器的初始值,而第二个参数 flags 是用于设置该fd的一些属性,他有以下选项:
EFD_CLOEXEC 就是禁止被子进程拷贝。
EFD_NONBLOCK 设置该描述符为非阻塞。
EFD_SEMAPHORE 其实就是用这个文件描述符实现信号量类似的功能,我们不是用这个属性。
未来我们使用的时候其实就是设置 EFD_CLOEXEC | EFD_NONBLOCK 这两个属性。
而他的返回值自然就是创建好的事件通知文件描述符,也就是操作句柄。
未来我们在对 eventfd 进行读写的时候,必须是一个 八字节的数据 。因为它本身就是一个计数器的功能,未来我们写入的数值会累加到内部的计数器上,同时读取的时候也是吧内部计数器的数值读取出去并清零。
正常的读写或者IO操作就是调用 read 和 write 接口,就是要注意读和写都使用一个八字节数据来进行。
未来我们需要一个任务队列,为什么呢?
要知道,虽然我们将每一个线程都绑定了一个 EventLoop 模块,一个EventLoop模块除了对Coonection的就绪的IO事件做管理,对连接就绪事件的管理,由于我们只在对应的EventLoop线程的Poller中进行监控,那么注定只会在对应的EventLoop线程中进行,不会有线程安全的问题,我们还需要对连接本身做管理,未来我们对连接本身做管理的时候,虽然会调用绑定的 EventLoop 模块的函数,但是调用连接管理函数的时候不一定是在绑定的 EventLoop 模块绑定的线程中执行的,那么这些操作在执行的时候,需要进行一次判断,也就是判断当前调度的线程是不是对应的连接绑定大的EventLoop所绑定的线程,其实判断也很简单,用线程ID判断就行了。
所以未来我们的EventLoop中需要提供一个接口,提供给其他的模块进行连接的管理,而这些管理操作需要封装成一个任务,如果调用的时候不是在绑定的线程中,就需要将任务压入到对应EventLoop的任务队列中,等待绑定的EventLoop线程进行处理。
这样一来,对一个具体的连接的IO事件处理以及他的管理操作都在一个线程内部执行了,那么就必然是串行的,不会出现线程安全问题。但是连接无法和线程绑定,所以我们的策略就是让连接和一个EventLoop进行绑定,而EventLoop再想办法和线程进行绑定,来达成我们的目的。
那么一个 EventLoop 模块的操作的流程就是: 首先对所管理的描述符的事件进行监控,有事件就绪之后对就绪的事件进行处理,然后再处理任务队列中对连接的管理操作。 这是一个循环的过程。
但是任务队列本身就不是线程安全的了,因为未来可能会有多个线程对这个任务队列进行添加任务的操作,所以我们需要为EventLoop中的任务队列配一把互斥锁,用于实现对任务队列的互斥操作。
简而言之,就是在EventLoop内部对线程进行操作是线程安全的,但是在其他模块中通过调用EventLoop内的接口进行操作,就不一定是线程安全的了,所以需要进行判断。
那么EventLoop模块要如何设计呢?
首先我们需要记录一个线程id ,记录EventLoop所绑定的线程的唯一标识。
其实我们需要一个任务队列和一个互斥锁,保证任务队列的安全。
需要一个Poller对象,用来完成事件的监控。
同时需要一个 eventfd ,用于事件的通知。
为什么需要这个 eventfd 呢? 我们说过,EventLoop 的执行流程是 监控文件描述符的事件,而这个监控我们肯定是需要阻塞的监控的,那么就会出现一种情况,我们的连接一直没有IO事件就绪,但是我们的任务队列中却已经放了很多任务了,这些对连接的管理操作就会一直得不到执行,那么未来就会出问题。 所以不仅是IO事件到来的时候需要结束这个阻塞状态,我们的任务队列中有新任务的时候也需要结束这个阻塞状态, 那么就需要一个事件的通知机制,也就是EventLoop所管理的Poller中也需要监控一个 eventfd 的读事件,每次我们向任务队列中放入任务的时候,我们需要向eventfd中写入一个数据,那么Poller 得Poll 操作就会返回,我们的 EventLoop 就能及时去执行这些任务队列的任务。
同时,我们也需要为eventfd进行事件的管理,为其创建一个 Channel 对象
那么现在的问题就是,任务队列如何设计?
首先,任务队列在压入任务的时候要加锁,这是毋庸置疑的。而我们在取出任务的时候,其实也是需要加锁的,因为我们无法保证 EventLoop线程在取任务的时候不会有其他线程进行压入任务的操作,而导致的线程安全问题。那么取任务或者执行任务的时候,如何进行加锁呢? 难道把执行任务队列中的任务的代码放在临界区内吗? 这样做会导致我们执行任务的过程中,别的线程也无法进行压入任务的操作,会阻塞在加锁上,效率很低,尤其是当里面有一些耗时的任务的时候。而其实我们只是在取出任务的时候需要加锁,将任务从任务队列中取出之后,这些任务对象其实就已经是在线程的私有栈中了,这时候是不会由线程安全的问题了,并不需要执行的时候也加锁。 那么我们就有一种思路:我们在取出任务的时候,进行加锁,加锁之后,直接将任务队列中的所有任务全部置换到我们的线程的私有栈中,然后清空任务队列,再解锁,之后再进行任务的执行。
所以我们的任务队列可以设计成一个 vector ,而每一次执行任务队列的时候,加锁直接使用vector的swap操作,将任务转移到线程私有栈中。
那么EventLoop的成员就是这几个:
class EventLoop
{
private:
using Task = std::function<void()>;
std::thread::id _thread_id; //绑定的线程的id
std::vector<Task> _queue; //任务队列
std::mutex _mutex; //保证任务队列的安全
Poller _poller; //用于事件监控
int _eventfd; //用于事件通知
后续EventLoop需要的接口:
RunInLoop: 进行对连接的操作,需要将操作封装成一个 Task ,然后传给 RunInLoop 来执行,RunInLoop 中会判断当前线程是否是EventLoop绑定的线程来决定这个操作是立即执行还是压入任务队列。
PushTask: 将任务压入任务队列
IsInLoop:用来判断当前线程是否是当前EventLoop绑定的线程
UpdateEvent:更新/添加/修改事件监控
RemoveEvent:移除事件监控
Start:启动EventLoop模块,也就是开启EventLoop的死循环流程
RunAllTask : 用于执行任务队列的所有任务
目前的接口就这些,后续有需要的话还会再添加接口。
class EventLoop
{
private:
using Task = std::function<void()>;
std::thread::id _thread_id; //绑定的线程的id
std::vector<Task> _queue; //任务队列
std::mutex _mutex; //保证任务队列的安全
Poller _poller; //用于事件监控
int _eventfd; //用于事件通知
Channel* _eventfd_channel; //管理eventfd的事件
private:
void RunAllTask();
bool IsInLoop()const;
bool PushTask();
public: //对外提供的功能接口
void UpdateEvent();
void RemoveEvent();
void RunInLoop();
void Start();
void WakeUp(); //进行事件通知,也就是向eventfd中写入数据
};
2 实现
首先实现构造函数
构造函数的时候,最重要的就是绑定一个线程以及创建一个 eventfd,并初始化_event_channel,同时启动eventfd的读事件监听,设置回调方法,将其放入poller的事件监听中
static int CreateEventfd() //用于创建一个 eventfd
{
int fd = eventfd(0,EFD_CLOEXEC|EFD_NONBLOCK);
if(fd==-1)
{
ERROR_LOG("create eventfd failed");
abort();
}
return fd;
}
void EventReadCallBack()const
{
uint64_t cnt = 0;
int ret = read(_eventfd,&cnt,sizeof cnt);
if(ret<0)
{
if(errno ==EAGAIN ||errno==EWOULDBLOCK ||errno ==EINTR) return;
ERROR_LOG("read eventfd failed");
}
return;
}
public: //对外提供的功能接口
EventLoop():_thread_id(std::this_thread::get_id()),_eventfd(CreateEventfd()),_eventfd_channel(new Channel(_eventfd))
{
_eventfd_channel->SetReadCallBack(std::bind(&EventLoop::EventReadCallBack,this)); //设置读回调函数
_eventfd_channel->EnableRead(); //启动读事件监听
}
RunAllTask需要先加锁,取出所有任务,然后然后在临界区外执行任务。
void RunAllTask()
{
std::vector<Task> tasks;
{
std::unique_lock<std::mutex> lock(_mutex); //加锁
tasks.swap(_queue); //取任务
}
for(auto&f:tasks)
f(); //执行任务
}
IsInLoop接口只需要判断当前线程id是否和EventLoop绑定的线程id相等。
bool IsInLoop()const {return std::this_thread::get_id() == _thread_id ;}
PushTask需要加锁然后push_back,之后就调用WakeUp唤醒EventLoop线程就行了。
void PushTask(const Task& f)
{
{
std::unique_lock<std::mutex> lock(_mutex);
_queue.push_back(f);
}
WakeUp();
}
接下来就是对内部的连接所监控的事件的操作,对事件的操作其实并不会涉及到线程安全的问题,所以我们可以直接执行。
void UpdateEvent(Channel* channel)
{
_poller.UpdateEvents(channel);
}
void RemoveEvent(Channel* channel)
{
_poller.Remove(channel);
}
然后就是RunInLoop函数,无非就是判断加执行或者压入任务队列
void RunInLoop(const Task& f)
{
if(IsInLoop()) f(); //如果是绑定的线程就直接执行
else PushTask(f);
}
然后就是Start函数,也就是EventLoop的主流程
void Start()
{
while(1)
{
//1 监听事件
std::vector<Channel*> actives;
int ret = _poller.Poll(&actives);
//2 执行IO回调
for(auto channel:actives) channel->HandlerEvents();
//3 执行任务队列的任务
RunAllTask();
}
}
最后就是唤醒EventLoop线程的函数WakeUp,很简单,向eventfd中写入一个1就行了。
void WakeUp() //唤醒EventLoop线程
{
uint64_t val = 1;
int ret = write(_eventfd,&val,sizeof val);
if(ret < 0)
{
if(errno == EAGAIN ||errno == EWOULDBLOCK ||errno==EINTR) return;
ERROR_LOG("WakeUp failed");
abort();
}
return;
}
最后就是析构函数,释放eventfd 的channel就行了。 不过由于我们这是一个服务器,这个析构函数写不写都无所谓,因为当EventLoop对象释放的时候,说明服务器进程也结束了,这时候会自动释放资源。
~EventLoop(){delete _eventfd_channel;}
3 联合调试
我们需要将 Channel 模块以及 Poller 模块和EventLoop 做一个整合。
首先我们要修改一些Channel模块中的函数,将EventLoop的一些操作加进去。
首先需要在 Channel 中加上一个_loop 成员,绑定一个EventLoop 。由于EventLoop的定义在Channel后面,所以我们需要在Channel前对EventLoop进行声明。
其次Channel的构造函数需要传入一个 EventLoop的指针。
然后就是我们Channel 中的Update和Remove函数中,需要调用 _loop 提供的接口。
这两个函数由于调用了EventLoop的函数,所以我们需要将其定义写到EventLoop后面
void Channel::UpdateEvents() //op 就是未来传递给 epoll_ctl 的op参数
{
//后续调用EventLoop提供的接口
_loop->UpdateEvent(this);
}
//移除监控
void Channel::Remove()
{
DisableAll();
//调用_loop提供的Remove接口
_loop->RemoveEvent(this);
}
那么我们在日志打印中再加一个内容,打印线程id,这个信息后续会用到。
fprintf(stdout,"%p %s [%s:%d] " format"\n",(void*)pthread_self(),str,__FILE__,__LINE__,##__VA_ARGS__);\
然后就是调试了,调试要怎么进行呢?
我们需要写一个通信套接字的读回调函数,未来设置进通信套接字的Channel中,当然,这个工作最终不是我们做的,后续是Connection来完成。后续EventLoop管理的也是Connection。
void ReadHandler(int fd)
{
char buf[1024] = {0};
int ret = read(fd,buf,sizeof buf -1);
if(ret<0)
{
if(errno ==EAGAIN ||errno ==EWOULDBLOCK || errno ==EINTR) return;
ERROR_LOG("read failed");
}
buf[ret]=0;
NORMAL_LOG("read a message : %s",buf);
ret = write(fd,buf,ret);
if(ret<0)
{
if(errno ==EAGAIN ||errno ==EWOULDBLOCK || errno ==EINTR) return;
ERROR_LOG("write failed");
}
return ;
}
其次,我们目前需要手动为监听套接字创建一个Channel对象以及为其设置读回调方法
void Acceptor(int lstfd)
{
int newfd = accept(lstfd,nullptr,nullptr);
if(newfd == -1)
{
if(errno ==EAGAIN ||errno ==EWOULDBLOCK || errno ==EINTR) return;
ERROR_LOG("acceptor failed");
}
//获取到新连接之后,把新连接绑定到 EventLoop 中 ,并启动事件监听
Channel* channel = new Channel(newfd,&loop);
channel->SetReadCallBack(std::bind(ReadHandler,newfd));
channel->EnableRead();
}
同时,写到这里,我发现了一个重大错误,就是前面我们实现Poller的时候,没有实现构造函数。
这里我补一下:
static int CreateEpfd()
{
#define EPOLL_SIZE 1024 //这个值大于 0 就行,无需关心
int fd = epoll_create(EPOLL_SIZE);
}
Poller():_epfd(CreateEpfd()){}
而client 的逻辑还是用之前的测试代码,我们只需要修改 server 的逻辑。
int main(){
Socket sock;
sock.CreatServerSocket(8080);
int lstfd = sock.Fd();
Channel lstchannel(lstfd,&loop);
lstchannel.SetReadCallBack(std::bind(Acceptor,lstfd));
lstchannel.EnableRead();
loop.Start();
return 0;
}
那么我们的测试的结果就是:
4 整合定时器模块
EventLoop模块的总体框架其实已经差不多了,那么我们可以把定时器或者说超时管理模块加进去。
首先,超时管理模块也是与 EventLoop 模块一一对应的,他负责管理他所在 EventLoop 模块所管理的连接的超时释放机制。
而超时模块我们前面已经设计了一个简单的时间轮了,我们可以修改时间轮的代码来设计一个超时模块。
原始的时间轮代码:
class TimerTask
{
using task = std::function<void()>; //无参的回调,如果需要参数,有上层进行使用std::bind() 进行参数的绑定
using releasetask = std::function<void(uint64_t)>;
private:
uint64_t _id;
uint64_t _delay;
task _cb;
releasetask _release;
bool _is_canceled; //表示是否被取消
public:
TimerTask(uint64_t id,uint64_t delay,task cb,releasetask rcb):_id(id),_delay(delay),_cb(cb),_release(rcb),_is_canceled(false){
std::cout<<"构造,id:"<<id<<"----addr:"<<this<<std::endl;
}
~TimerTask(){if(!_is_canceled)_cb();_release(_id);
std::cout<<"析构,id:"<<_id<<"----addr:"<<this<<std::endl;
} //_iscanceled 为false表示该任务未被取消,这时候执行任务回调
uint64_t GetDelay(){return _delay;} //获取定时任务设置的延时
void CancelTask(){_is_canceled = true;}
};
class TimerWheel
{
using task = std::function<void()>;
private:
std::vector<std::vector<std::shared_ptr<TimerTask>>> _wheel;
std::unordered_map<uint64_t,std::weak_ptr<TimerTask>> _tasks;
int _timer_idx;
#define MAXTIME 60
public:
TimerWheel():_wheel(MAXTIME),_timer_idx(0){} //我们默认时间轮的最大刻度就是 60
//添加定时任务
bool AddTimerTask(uint64_t id , uint64_t delay,task cb)
{
assert(_tasks.find(id) == _tasks.end()); //确保 id 合法
std::shared_ptr<TimerTask> pt(new TimerTask(id,delay,cb,std::bind(&TimerWheel::RealeaseTask ,this,std::placeholders::_1))); //构建任务对象
std::weak_ptr<TimerTask> wpt(pt); //weak_ptr
int pos = (_timer_idx + delay) % MAXTIME; //计算到期时间
_wheel[pos].push_back(pt); //定时任务放入时间轮
_tasks[id] = wpt; //添加到map中管理
return true;
}
//刷新/延迟定时任务
bool RefreshTimerTask(uint64_t id)
{
std::unordered_map<uint64_t,std::weak_ptr<TimerTask>>::iterator it = _tasks.find(id);
if(it==_tasks.end()) return false; //id 不合法直接返回false
std::shared_ptr<TimerTask> pt = it->second.lock(); //构造新的shared_ptr
int pos = (_timer_idx + pt->GetDelay()) % MAXTIME; //找到新的位置
_wheel[pos].push_back(pt);
return true;
}
//删除map的映射
bool RealeaseTask(uint64_t id)
{
std::unordered_map<uint64_t,std::weak_ptr<TimerTask>>::iterator it = _tasks.find(id);
if(it==_tasks.end()) return false;
_tasks.erase(it);
return true;
}
//取消定时任务
bool CancelTimerTask(uint64_t id)
{
std::unordered_map<uint64_t,std::weak_ptr<TimerTask>>::iterator it = _tasks.find(id);
if(it==_tasks.end()) return false;
(it->second).lock()->CancelTask();
return true;
}
//移动秒针
void RunTick()
{
_timer_idx ++;
_timer_idx %= MAXTIME;
_wheel[_timer_idx].clear();
}
};
定时任务类以及人物的添加修改的策略我们不需要修改,主要修改的其实就两方面,第一就是我们需要将时间轮和timerfd 结合起来,让时间轮中的 RunTick 函数每秒钟自动执行一次,其实就是在我们的时间轮中增加一个 timerfd 成员,设置他的超时时间为一秒,那么每隔一秒内核就会像timerfd中写入一个 1 。只需要为 timerfd 创建一个Channel对象,然后设置其读回调函数为RunTick函数,然后启动读事件回调,那么就完成了。
那么按照上面的思路,TimerWheel 模块中其实也还需要一个 _loop 对象的指针,因为我们需要通过_loop来构造Channel。
EventLoop* _loop;
Channel* _timerfd_channel;
构造函数修改:构造函数中我们需要创建号 timerfd 以及为其设置读回调方法,回调方法中其实就是读取出 timerfd 的内容以及调用一次 RunTick
static int CreateTimerfd()
{
int timerfd = timerfd_create(CLOCK_MONOTONIC,TFD_CLOEXEC|TFD_NONBLOCK);
assert(timerfd!=-1);
struct itimerspec timeout;
//第一次超时时间间隔
timeout.it_value.tv_sec = 2; // 第一次超时为 3 s
timeout.it_value.tv_nsec = 0;
//第二次以及之后的超时时间间隔
timeout.it_interval.tv_sec = 1; // 往后每隔 1s 超时一次
timeout.it_interval.tv_nsec = 0;
int ret = timerfd_settime(timerfd,0,&timeout,NULL); //设置定时通知
assert(ret!=-1); //返回值为-1表示设置失败,但是一般是不会失败的,可以不用关心
return timerfd;
}
TimerWheel(EventLoop* loop):_wheel(MAXTIME),_timer_idx(0),_loop(loop),_timerfd(CreateTimerfd()),_timerfd_channel(new Channel(_timerfd,_loop))
{
_timerfd_channel->SetReadCallBack(std::bind(&TimerWheel::OnTime,this));
_timerfd_channel->EnableRead();
}
~TimerWheel(){delete _timerfd_channel;}
void OnTime()
{
TimerRead();
RunTick();
}
void TimerRead()
{
uint64_t val = 0;
int ret = read(_timerfd,&val,sizeof val);
if(ret<0)
{
if(errno == EAGAIN ||errno == EWOULDBLOCK ||errno == EINTR) return;
ERROR_LOG("timerfd read failed");
abort();
}
return;
}
那么其实超时模块的设计就已经完成了。
然后我们接着修改 EventLoop 模块。首先EventLoop模块内需要一个TimerWheel对象,用于设置超时事件。,初始化的时候传入 EventLoop对象的指针就行了。
private:
using Task = std::function<void()>;
std::thread::id _thread_id; //绑定的线程的id
std::vector<Task> _queue; //任务队列
std::mutex _mutex; //保证任务队列的安全
Poller _poller; //用于事件监控
int _eventfd; //用于事件通知
Channel* _eventfd_channel; //管理eventfd的事件
TimerWheel _timer_wheel; //超时管理模块
EventLoop():_thread_id(std::this_thread::get_id()),_eventfd(CreateEventfd()),_eventfd_channel(new Channel(_eventfd,this)),_timer_wheel(this)
{
_eventfd_channel->SetReadCallBack(std::bind(&EventLoop::EventReadCallBack,this)); //设置读回调函数
_eventfd_channel->EnableRead(); //启动读事件监听
}
然后就是提供三个接口,用于添加,刷新和取消定时任务。
void AddTimerTask(uint64_t id , uint64_t delay,Task f)
{
_timer_wheel.AddTimerTask(id,delay,f);
}
void RefreshTimerTask(uint64_t id)
{
_timer_wheel.RefreshTimerTask(id);
}
void CancelTimerTask(uint64_t id)
{
_timer_wheel.CancelTimerTask(id);
}
但是仅仅是这样还不行,因为我们之前说了,对连接的管理操作其实并不是线程安全的,最终执行AddTimerTask 和RefreshTimerTask 和CancelTimerTask 这三个函数的并不一定是他所绑定的EventLoop模块,所以我们需要调用他关联的EventLoop模块的RunInLoop来完成超时任务的添加,刷新,取消等。那么我们的接口就不能这样设置了,而是需要再封装一层。
我们需要将实际进行添加,刷新,取消定时任务的操作封装成一个任务,未来交给 RunInLoop来执行。
void AddTimerTaskInLoop(uint64_t id , uint64_t delay,task cb)
{
assert(_tasks.find(id) == _tasks.end()); //确保 id 合法
std::shared_ptr<TimerTask> pt(new TimerTask(id,delay,cb,std::bind(&TimerWheel::RealeaseTask ,this,std::placeholders::_1))); //构建任务对象
std::weak_ptr<TimerTask> wpt(pt); //weak_ptr
int pos = (_timer_idx + delay) % MAXTIME; //计算到期时间
_wheel[pos].push_back(pt); //定时任务放入时间轮
_tasks[id] = wpt; //添加到map中管理
}
//刷新/延迟定时任务
void RefreshTimerTaskInLoop(uint64_t id)
{
std::unordered_map<uint64_t,std::weak_ptr<TimerTask>>::iterator it = _tasks.find(id);
if(it==_tasks.end()) return ; //id 不合法直接返回false
std::shared_ptr<TimerTask> pt = it->second.lock(); //构造新的shared_ptr
int pos = (_timer_idx + pt->GetDelay()) % MAXTIME; //找到新的位置
_wheel[pos].push_back(pt);
}
//取消定时任务
void CancelTimerTaskInLoop(uint64_t id)
{
std::unordered_map<uint64_t,std::weak_ptr<TimerTask>>::iterator it = _tasks.find(id);
if(it==_tasks.end()) return ;
(it->second).lock()->CancelTask();
}
//添加定时任务
bool TimerWheel::AddTimerTask(uint64_t id , uint64_t delay,task cb)
{
_loop->RunInLoop(std::bind(&TimerWheel::AddTimerTaskInLoop,this,id,delay,cb));
}
//刷新/延迟定时任务
bool TimerWheel::RefreshTimerTask(uint64_t id)
{
_loop->RunInLoop(std::bind(&TimerWheel::RefreshTimerTaskInLoop,this,id));
}
bool TimerWheel::CancelTimerTask(uint64_t id)
{
_loop->RunInLoop(std::bind(&TimerWheel::CancelTimerTaskInLoop,this,id));
}
这样一来我们的超时模块就设计完了。
5 联合超时模块调试
现在我们需要测试一下超时模块。
现在的问题是,超时任务怎么设置?其实在我们这里,超时任务就是一个关闭连接以及释放channel资源的任务,也就是一个关闭连接任务。
那么我们可以这样设置:
void CloseHandler(Channel*channel) //简单点就移除事件监控并且关闭连接
{
channel->Remove();
close(channel->Fd());
delete channel;
DEBUG_LOG("释放了一个Channel:%p",channel);
}
同时,读任务还是用之前的。
Acceptor函数也做一些小的修改,把回调函数设置的完整一点。
int taskid = 0;
void Acceptor(int lstfd)
{
int newfd = accept(lstfd,nullptr,nullptr);
if(newfd == -1)
{
if(errno ==EAGAIN ||errno ==EWOULDBLOCK || errno ==EINTR) return;
ERROR_LOG("acceptor failed");
}
//获取到新连接之后,把新连接绑定到 EventLoop 中 ,并启动事件监听
Channel* channel = new Channel(newfd,&loop);
DEBUG_LOG("收到一个连接:%p",channel);
channel->SetReadCallBack(std::bind(ReadHandler,newfd));
channel->SetEventCallBack(std::bind(EventHandler,taskid));
channel->SetCloseCallBack(std::bind(CloseHandler,channel));
channel->SetErrorCallBack(std::bind(CloseHandler,channel));
channel->EnableRead();
//添加超时任务
int delay = 10;
loop.AddTimerTask(taskid++,delay,std::bind(CloseHandler,channel));
}
那么任意事件回调我们就设置一个刷新定时任务的回调
//任意事件回调
void EventHandler(int id) //但是怎么找到任务的id呢?
{
loop.RefreshTimerTask(id);
}
这样就差不多了,我们主要测试一下超时模块能否正常工作。
我们的client则是前五秒发送五个消息,然后就死循环休眠,也就是模拟非活跃的情况。
目前我们也没看出来什么问题。
目前来说,我们的EventLoop模块和他所关联的的三个子模块 :Channel,Poller,TimerQueue模块还没有出现大的问题,后续出现问题我们再修正。
再附上一张项目概述时的EventLoop模块的关联图。