EventLoop模块 --- 事件循环模块

news2024/10/24 7:30:30

目录

1 设计思想

eventfd

创建eventfd

2 实现

3 联合调试

4 整合定时器模块

5 联合超时模块调试 


1 设计思想

EventLoop 模块是和线程一一绑定的,每一个EventLoop模块内部都管理了一个Poller对象进行事件监控,同时管理着多个Connection对象,每一个连接所要完成的操作最终都会在他所绑定的EventLoop对象中进行。 EventLoop 模块是对事件进行整体管理和操作的,它内部包含的Channel和Poller分别负责事件管理和事件监控,但是最终执行各个事件都是要在EventLoop中执行,以此来保证线程安全。

eventfd

但是,在EventLoop内部的 Connection 有事件到来时,EventLoop 模块怎么知道呢?内部的Connection 有事件到来之后,我们要有办法能够通知 EventLoop 模块 。

而事件通知我们以前学过信号,但是在我们的主从Reactor 服务器中,是无法使用信号来通知具体的某一个EventLoop线程的,因为信号是针对进程的,但是事件具体分配给进程内的哪一个线程去执行我们是不确定的

所以,我们需要一个新的事件通知机制。 最简单的就是文件描述符,我们只需要给每一个 EventLoop线程专门分配一个 用于事件通知的文件描述符,每一次EventLoop内部的Connection有事件到来时,我们就可以调用EventLoop提供的方法向该文件描述符中写入数据,那么我们就可以通过EventLoop的读事件就绪,来唤醒EventLoop线程进行事件的处理了。

而 eventfd 就是一个专门用于事件通知的文件描述符,他在内核中相当于管理的数据结构中,存在一个计数器,这个计数器中的数字就是事件通知的次数。 每次我们向该文件描述符中写入一个数值,就是会将该计数器的数值加一

而我们可以使用 read 读取事件通知次数,读取到的数据就是事件的通知次数,读取完之后,计数就清零了。

那么eventfd 怎么使用呢?

他需要包含头文件  <sys/eventfd.h>

创建eventfd

使用 eventfd 接口

第一个参数就是设置计数器的初始值,而第二个参数 flags 是用于设置该fd的一些属性,他有以下选项:

EFD_CLOEXEC 就是禁止被子进程拷贝

EFD_NONBLOCK 设置该描述符为非阻塞

EFD_SEMAPHORE 其实就是用这个文件描述符实现信号量类似的功能,我们不是用这个属性。

未来我们使用的时候其实就是设置  EFD_CLOEXEC | EFD_NONBLOCK 这两个属性。

而他的返回值自然就是创建好的事件通知文件描述符,也就是操作句柄。

未来我们在对 eventfd 进行读写的时候,必须是一个 八字节的数据 。因为它本身就是一个计数器的功能,未来我们写入的数值会累加到内部的计数器上,同时读取的时候也是吧内部计数器的数值读取出去并清零。

正常的读写或者IO操作就是调用 read 和 write 接口,就是要注意读和写都使用一个八字节数据来进行。

未来我们需要一个任务队列,为什么呢?

要知道,虽然我们将每一个线程都绑定了一个 EventLoop 模块,一个EventLoop模块除了对Coonection的就绪的IO事件做管理,对连接就绪事件的管理,由于我们只在对应的EventLoop线程的Poller中进行监控,那么注定只会在对应的EventLoop线程中进行,不会有线程安全的问题,我们还需要对连接本身做管理未来我们对连接本身做管理的时候,虽然会调用绑定的 EventLoop 模块的函数,但是调用连接管理函数的时候不一定是在绑定的 EventLoop 模块绑定的线程中执行的,那么这些操作在执行的时候,需要进行一次判断,也就是判断当前调度的线程是不是对应的连接绑定大的EventLoop所绑定的线程,其实判断也很简单,用线程ID判断就行了。

所以未来我们的EventLoop中需要提供一个接口,提供给其他的模块进行连接的管理,而这些管理操作需要封装成一个任务,如果调用的时候不是在绑定的线程中,就需要将任务压入到对应EventLoop的任务队列中,等待绑定的EventLoop线程进行处理。

这样一来,对一个具体的连接的IO事件处理以及他的管理操作都在一个线程内部执行了,那么就必然是串行的,不会出现线程安全问题。但是连接无法和线程绑定,所以我们的策略就是让连接和一个EventLoop进行绑定,而EventLoop再想办法和线程进行绑定,来达成我们的目的。

那么一个 EventLoop 模块的操作的流程就是: 首先对所管理的描述符的事件进行监控,有事件就绪之后对就绪的事件进行处理,然后再处理任务队列中对连接的管理操作。 这是一个循环的过程。

但是任务队列本身就不是线程安全的了,因为未来可能会有多个线程对这个任务队列进行添加任务的操作,所以我们需要为EventLoop中的任务队列配一把互斥锁,用于实现对任务队列的互斥操作。

简而言之,就是在EventLoop内部对线程进行操作是线程安全的,但是在其他模块中通过调用EventLoop内的接口进行操作,就不一定是线程安全的了,所以需要进行判断。

那么EventLoop模块要如何设计呢?

首先我们需要记录一个线程id ,记录EventLoop所绑定的线程的唯一标识。

其实我们需要一个任务队列和一个互斥锁,保证任务队列的安全。

需要一个Poller对象,用来完成事件的监控。

同时需要一个 eventfd ,用于事件的通知。 

为什么需要这个 eventfd 呢? 我们说过,EventLoop 的执行流程是 监控文件描述符的事件,而这个监控我们肯定是需要阻塞的监控的,那么就会出现一种情况,我们的连接一直没有IO事件就绪,但是我们的任务队列中却已经放了很多任务了,这些对连接的管理操作就会一直得不到执行,那么未来就会出问题。 所以不仅是IO事件到来的时候需要结束这个阻塞状态,我们的任务队列中有新任务的时候也需要结束这个阻塞状态, 那么就需要一个事件的通知机制,也就是EventLoop所管理的Poller中也需要监控一个 eventfd 的读事件,每次我们向任务队列中放入任务的时候,我们需要向eventfd中写入一个数据,那么Poller 得Poll 操作就会返回,我们的 EventLoop 就能及时去执行这些任务队列的任务。

同时,我们也需要为eventfd进行事件的管理,为其创建一个 Channel 对象

那么现在的问题就是,任务队列如何设计?

首先,任务队列在压入任务的时候要加锁,这是毋庸置疑的。而我们在取出任务的时候,其实也是需要加锁的,因为我们无法保证 EventLoop线程在取任务的时候不会有其他线程进行压入任务的操作,而导致的线程安全问题。那么取任务或者执行任务的时候,如何进行加锁呢? 难道把执行任务队列中的任务的代码放在临界区内吗? 这样做会导致我们执行任务的过程中,别的线程也无法进行压入任务的操作,会阻塞在加锁上,效率很低,尤其是当里面有一些耗时的任务的时候。而其实我们只是在取出任务的时候需要加锁,将任务从任务队列中取出之后,这些任务对象其实就已经是在线程的私有栈中了,这时候是不会由线程安全的问题了,并不需要执行的时候也加锁。 那么我们就有一种思路:我们在取出任务的时候,进行加锁,加锁之后,直接将任务队列中的所有任务全部置换到我们的线程的私有栈中,然后清空任务队列,再解锁,之后再进行任务的执行。

所以我们的任务队列可以设计成一个 vector ,而每一次执行任务队列的时候,加锁直接使用vector的swap操作,将任务转移到线程私有栈中。

那么EventLoop的成员就是这几个:

class EventLoop
{
private:
using Task = std::function<void()>;
    std::thread::id _thread_id;         //绑定的线程的id
    std::vector<Task> _queue;           //任务队列
    std::mutex _mutex;                  //保证任务队列的安全
    Poller _poller;                     //用于事件监控
    int _eventfd;                       //用于事件通知

后续EventLoop需要的接口:

RunInLoop: 进行对连接的操作,需要将操作封装成一个 Task ,然后传给 RunInLoop 来执行,RunInLoop 中会判断当前线程是否是EventLoop绑定的线程来决定这个操作是立即执行还是压入任务队列。

PushTask: 将任务压入任务队列

IsInLoop:用来判断当前线程是否是当前EventLoop绑定的线程

UpdateEvent:更新/添加/修改事件监控

RemoveEvent:移除事件监控

Start:启动EventLoop模块,也就是开启EventLoop的死循环流程

RunAllTask : 用于执行任务队列的所有任务

目前的接口就这些,后续有需要的话还会再添加接口。


class EventLoop
{
private:
using Task = std::function<void()>;
    std::thread::id _thread_id;         //绑定的线程的id
    std::vector<Task> _queue;           //任务队列
    std::mutex _mutex;                  //保证任务队列的安全
    Poller _poller;                     //用于事件监控
    int _eventfd;                       //用于事件通知
    Channel* _eventfd_channel;          //管理eventfd的事件
private:
    void RunAllTask();
    bool IsInLoop()const;
    bool PushTask();
public:                             //对外提供的功能接口
    void UpdateEvent();
    void RemoveEvent();
    void RunInLoop();
    void Start();
    void WakeUp();                    //进行事件通知,也就是向eventfd中写入数据
};

2 实现

首先实现构造函数

构造函数的时候,最重要的就是绑定一个线程以及创建一个 eventfd,并初始化_event_channel,同时启动eventfd的读事件监听,设置回调方法,将其放入poller的事件监听中

    static int CreateEventfd()          //用于创建一个 eventfd
    {
        int fd = eventfd(0,EFD_CLOEXEC|EFD_NONBLOCK);
        if(fd==-1)
        {
            ERROR_LOG("create eventfd failed");
            abort();
        }
        return fd;
    }
    void EventReadCallBack()const
    {
        uint64_t cnt = 0;
        int ret = read(_eventfd,&cnt,sizeof cnt);
        if(ret<0)
        {
            if(errno ==EAGAIN ||errno==EWOULDBLOCK ||errno ==EINTR) return;
            ERROR_LOG("read eventfd failed");
        }
        return;
    }
public:                             //对外提供的功能接口
    EventLoop():_thread_id(std::this_thread::get_id()),_eventfd(CreateEventfd()),_eventfd_channel(new Channel(_eventfd))
    {
        _eventfd_channel->SetReadCallBack(std::bind(&EventLoop::EventReadCallBack,this));   //设置读回调函数
        _eventfd_channel->EnableRead();                     //启动读事件监听
    }

RunAllTask需要先加锁,取出所有任务,然后然后在临界区外执行任务。

    void RunAllTask()
    {
        std::vector<Task> tasks;
        {
            std::unique_lock<std::mutex> lock(_mutex); //加锁
            tasks.swap(_queue);                        //取任务
        }
        for(auto&f:tasks)
        f();                                           //执行任务
    }

IsInLoop接口只需要判断当前线程id是否和EventLoop绑定的线程id相等。

    bool IsInLoop()const {return std::this_thread::get_id() == _thread_id ;} 

PushTask需要加锁然后push_back,之后就调用WakeUp唤醒EventLoop线程就行了。

    void PushTask(const Task& f)
    {
        {
            std::unique_lock<std::mutex> lock(_mutex);
            _queue.push_back(f);
        }
        WakeUp();
    }

接下来就是对内部的连接所监控的事件的操作,对事件的操作其实并不会涉及到线程安全的问题,所以我们可以直接执行。

    void UpdateEvent(Channel* channel)
    {
        _poller.UpdateEvents(channel);
    }
    void RemoveEvent(Channel* channel)
    {
        _poller.Remove(channel);
    }

然后就是RunInLoop函数,无非就是判断加执行或者压入任务队列

    void RunInLoop(const Task& f)
    {
        if(IsInLoop()) f();     //如果是绑定的线程就直接执行
        else PushTask(f);
    }

然后就是Start函数,也就是EventLoop的主流程

    void Start()
    {
        while(1)
        {
            //1 监听事件
            std::vector<Channel*> actives;
            int ret = _poller.Poll(&actives);    
            //2 执行IO回调
            for(auto channel:actives) channel->HandlerEvents();
            //3 执行任务队列的任务
            RunAllTask();
        }
    }

最后就是唤醒EventLoop线程的函数WakeUp,很简单,向eventfd中写入一个1就行了。

    void WakeUp()                  //唤醒EventLoop线程
    {
        uint64_t val = 1;
        int ret = write(_eventfd,&val,sizeof val);
        if(ret < 0)
        {
            if(errno == EAGAIN ||errno == EWOULDBLOCK ||errno==EINTR) return;
            ERROR_LOG("WakeUp failed");
            abort();
        }
        return;
    }

最后就是析构函数,释放eventfd 的channel就行了。 不过由于我们这是一个服务器,这个析构函数写不写都无所谓,因为当EventLoop对象释放的时候,说明服务器进程也结束了,这时候会自动释放资源。

    ~EventLoop(){delete _eventfd_channel;}

3 联合调试

我们需要将 Channel 模块以及 Poller 模块和EventLoop 做一个整合。

首先我们要修改一些Channel模块中的函数,将EventLoop的一些操作加进去。

首先需要在 Channel 中加上一个_loop 成员,绑定一个EventLoop 。由于EventLoop的定义在Channel后面,所以我们需要在Channel前对EventLoop进行声明。

其次Channel的构造函数需要传入一个 EventLoop的指针。

然后就是我们Channel 中的Update和Remove函数中,需要调用 _loop 提供的接口。

这两个函数由于调用了EventLoop的函数,所以我们需要将其定义写到EventLoop后面

void Channel::UpdateEvents()  //op 就是未来传递给 epoll_ctl 的op参数
{
    //后续调用EventLoop提供的接口
    _loop->UpdateEvent(this);
}
//移除监控
void Channel::Remove()
{
    DisableAll();
    //调用_loop提供的Remove接口
    _loop->RemoveEvent(this);
}

那么我们在日志打印中再加一个内容,打印线程id,这个信息后续会用到。

    fprintf(stdout,"%p %s [%s:%d] " format"\n",(void*)pthread_self(),str,__FILE__,__LINE__,##__VA_ARGS__);\

然后就是调试了,调试要怎么进行呢? 

我们需要写一个通信套接字的读回调函数,未来设置进通信套接字的Channel中,当然,这个工作最终不是我们做的,后续是Connection来完成。后续EventLoop管理的也是Connection。

void ReadHandler(int fd)
{
    char buf[1024] = {0};
    int ret = read(fd,buf,sizeof buf -1);
    if(ret<0)    
    { 
        if(errno ==EAGAIN ||errno ==EWOULDBLOCK || errno ==EINTR) return;
        ERROR_LOG("read failed");
    }
    buf[ret]=0;
    NORMAL_LOG("read a message : %s",buf);
    ret = write(fd,buf,ret);
    if(ret<0)
    {
        if(errno ==EAGAIN ||errno ==EWOULDBLOCK || errno ==EINTR) return;
        ERROR_LOG("write failed");
    }
    return ;
}

其次,我们目前需要手动为监听套接字创建一个Channel对象以及为其设置读回调方法

void Acceptor(int lstfd)
{
    int newfd = accept(lstfd,nullptr,nullptr);
    if(newfd == -1) 
    {
        if(errno ==EAGAIN ||errno ==EWOULDBLOCK || errno ==EINTR) return;
        ERROR_LOG("acceptor failed");
    }
    //获取到新连接之后,把新连接绑定到 EventLoop 中 ,并启动事件监听
    Channel* channel = new Channel(newfd,&loop);
    channel->SetReadCallBack(std::bind(ReadHandler,newfd));
    channel->EnableRead();
}

同时,写到这里,我发现了一个重大错误,就是前面我们实现Poller的时候,没有实现构造函数。

这里我补一下:

    static int CreateEpfd()
    {
#define EPOLL_SIZE 1024     //这个值大于 0 就行,无需关心
        int fd = epoll_create(EPOLL_SIZE);
    }

    Poller():_epfd(CreateEpfd()){}

而client 的逻辑还是用之前的测试代码,我们只需要修改 server 的逻辑。

int main(){
    Socket sock;
    sock.CreatServerSocket(8080);
    int lstfd = sock.Fd();
    Channel lstchannel(lstfd,&loop);
    lstchannel.SetReadCallBack(std::bind(Acceptor,lstfd));
    lstchannel.EnableRead();
    loop.Start();
    return 0;
}

那么我们的测试的结果就是:

4 整合定时器模块

EventLoop模块的总体框架其实已经差不多了,那么我们可以把定时器或者说超时管理模块加进去。

首先,超时管理模块也是与 EventLoop 模块一一对应的,他负责管理他所在 EventLoop 模块所管理的连接的超时释放机制。

而超时模块我们前面已经设计了一个简单的时间轮了,我们可以修改时间轮的代码来设计一个超时模块。

原始的时间轮代码:

class TimerTask
{
    using task = std::function<void()>;  //无参的回调,如果需要参数,有上层进行使用std::bind() 进行参数的绑定
    using releasetask = std::function<void(uint64_t)>;
private:
    uint64_t _id;
    uint64_t _delay;
    task _cb;
    releasetask _release;
    bool _is_canceled; //表示是否被取消
public:
    TimerTask(uint64_t id,uint64_t delay,task cb,releasetask rcb):_id(id),_delay(delay),_cb(cb),_release(rcb),_is_canceled(false){
        std::cout<<"构造,id:"<<id<<"----addr:"<<this<<std::endl;
    } 
    ~TimerTask(){if(!_is_canceled)_cb();_release(_id);
        std::cout<<"析构,id:"<<_id<<"----addr:"<<this<<std::endl;
    }  //_iscanceled 为false表示该任务未被取消,这时候执行任务回调
    uint64_t GetDelay(){return _delay;}      //获取定时任务设置的延时
    void CancelTask(){_is_canceled = true;}
};


class TimerWheel
{
    using task = std::function<void()>;
private:
    std::vector<std::vector<std::shared_ptr<TimerTask>>> _wheel;
    std::unordered_map<uint64_t,std::weak_ptr<TimerTask>> _tasks;
    int _timer_idx;  
#define MAXTIME 60
public:
    TimerWheel():_wheel(MAXTIME),_timer_idx(0){}  //我们默认时间轮的最大刻度就是 60
    //添加定时任务
    bool AddTimerTask(uint64_t id , uint64_t delay,task cb)
    {
        assert(_tasks.find(id) == _tasks.end());                   //确保 id 合法
        std::shared_ptr<TimerTask> pt(new TimerTask(id,delay,cb,std::bind(&TimerWheel::RealeaseTask ,this,std::placeholders::_1))); //构建任务对象
        std::weak_ptr<TimerTask> wpt(pt);                          //weak_ptr
        int pos = (_timer_idx + delay) % MAXTIME;                  //计算到期时间
        _wheel[pos].push_back(pt);                                 //定时任务放入时间轮
        _tasks[id] = wpt;                                          //添加到map中管理
        return true;
    }
    //刷新/延迟定时任务
    bool RefreshTimerTask(uint64_t id)
    {
        std::unordered_map<uint64_t,std::weak_ptr<TimerTask>>::iterator it = _tasks.find(id);
        if(it==_tasks.end()) return false;                          //id 不合法直接返回false
        std::shared_ptr<TimerTask> pt = it->second.lock();          //构造新的shared_ptr
        int pos = (_timer_idx + pt->GetDelay()) % MAXTIME;          //找到新的位置
        _wheel[pos].push_back(pt);
        return true;
    }
    //删除map的映射
    bool RealeaseTask(uint64_t id)
    {
        std::unordered_map<uint64_t,std::weak_ptr<TimerTask>>::iterator it = _tasks.find(id);
        if(it==_tasks.end()) return false;
        _tasks.erase(it);
        return true;
    }
    //取消定时任务
    bool CancelTimerTask(uint64_t id)
    {
        std::unordered_map<uint64_t,std::weak_ptr<TimerTask>>::iterator it = _tasks.find(id);
        if(it==_tasks.end()) return false;
        (it->second).lock()->CancelTask();
        return true;
    }
    //移动秒针
    void RunTick()
    {
        _timer_idx ++;
        _timer_idx %= MAXTIME;
        _wheel[_timer_idx].clear();
    }
};

定时任务类以及人物的添加修改的策略我们不需要修改,主要修改的其实就两方面,第一就是我们需要将时间轮和timerfd 结合起来,让时间轮中的 RunTick 函数每秒钟自动执行一次,其实就是在我们的时间轮中增加一个 timerfd 成员,设置他的超时时间为一秒,那么每隔一秒内核就会像timerfd中写入一个 1 。只需要为 timerfd 创建一个Channel对象,然后设置其读回调函数为RunTick函数,然后启动读事件回调,那么就完成了。

那么按照上面的思路,TimerWheel 模块中其实也还需要一个 _loop 对象的指针,因为我们需要通过_loop来构造Channel。

    EventLoop* _loop;
    Channel* _timerfd_channel;  

构造函数修改:构造函数中我们需要创建号 timerfd 以及为其设置读回调方法,回调方法中其实就是读取出 timerfd 的内容以及调用一次 RunTick

    static int CreateTimerfd()
    {
        int timerfd = timerfd_create(CLOCK_MONOTONIC,TFD_CLOEXEC|TFD_NONBLOCK);
        assert(timerfd!=-1);
        struct itimerspec timeout;
        //第一次超时时间间隔
        timeout.it_value.tv_sec = 2;     // 第一次超时为 3 s
        timeout.it_value.tv_nsec = 0;
        //第二次以及之后的超时时间间隔
        timeout.it_interval.tv_sec = 1;  // 往后每隔 1s 超时一次
        timeout.it_interval.tv_nsec = 0;
        int ret = timerfd_settime(timerfd,0,&timeout,NULL);  //设置定时通知
        assert(ret!=-1);  //返回值为-1表示设置失败,但是一般是不会失败的,可以不用关心
        return timerfd;
    } 

    TimerWheel(EventLoop* loop):_wheel(MAXTIME),_timer_idx(0),_loop(loop),_timerfd(CreateTimerfd()),_timerfd_channel(new Channel(_timerfd,_loop))
    {
        _timerfd_channel->SetReadCallBack(std::bind(&TimerWheel::OnTime,this));
        _timerfd_channel->EnableRead();
    }
    ~TimerWheel(){delete _timerfd_channel;}
    void OnTime()
    {
        TimerRead();
        RunTick();
    }
    void TimerRead()
    {
        uint64_t val = 0;
        int ret = read(_timerfd,&val,sizeof val);
        if(ret<0)
        {
            if(errno == EAGAIN ||errno == EWOULDBLOCK ||errno == EINTR) return;
            ERROR_LOG("timerfd read failed");
            abort();
        }
        return;
    }

那么其实超时模块的设计就已经完成了。

然后我们接着修改 EventLoop 模块。首先EventLoop模块内需要一个TimerWheel对象,用于设置超时事件。,初始化的时候传入 EventLoop对象的指针就行了。

private:
using Task = std::function<void()>;
    std::thread::id _thread_id;         //绑定的线程的id
    std::vector<Task> _queue;           //任务队列
    std::mutex _mutex;                  //保证任务队列的安全
    Poller _poller;                     //用于事件监控
    int _eventfd;                       //用于事件通知
    Channel* _eventfd_channel;          //管理eventfd的事件
    TimerWheel _timer_wheel;            //超时管理模块
    EventLoop():_thread_id(std::this_thread::get_id()),_eventfd(CreateEventfd()),_eventfd_channel(new Channel(_eventfd,this)),_timer_wheel(this)
    {
        _eventfd_channel->SetReadCallBack(std::bind(&EventLoop::EventReadCallBack,this));   //设置读回调函数
        _eventfd_channel->EnableRead();                     //启动读事件监听
    }

然后就是提供三个接口,用于添加,刷新和取消定时任务。

    void AddTimerTask(uint64_t id , uint64_t delay,Task f)
    {
        _timer_wheel.AddTimerTask(id,delay,f);
    }
    void RefreshTimerTask(uint64_t id)
    {
        _timer_wheel.RefreshTimerTask(id);
    }
    void CancelTimerTask(uint64_t id)
    {
        _timer_wheel.CancelTimerTask(id); 
    }

但是仅仅是这样还不行,因为我们之前说了,对连接的管理操作其实并不是线程安全的,最终执行AddTimerTask 和RefreshTimerTask 和CancelTimerTask 这三个函数的并不一定是他所绑定的EventLoop模块,所以我们需要调用他关联的EventLoop模块的RunInLoop来完成超时任务的添加,刷新,取消等。那么我们的接口就不能这样设置了,而是需要再封装一层。

我们需要将实际进行添加,刷新,取消定时任务的操作封装成一个任务,未来交给 RunInLoop来执行。

    void AddTimerTaskInLoop(uint64_t id , uint64_t delay,task cb)
    {
        assert(_tasks.find(id) == _tasks.end());                   //确保 id 合法
        std::shared_ptr<TimerTask> pt(new TimerTask(id,delay,cb,std::bind(&TimerWheel::RealeaseTask ,this,std::placeholders::_1))); //构建任务对象
        std::weak_ptr<TimerTask> wpt(pt);                          //weak_ptr
        int pos = (_timer_idx + delay) % MAXTIME;                  //计算到期时间
        _wheel[pos].push_back(pt);                                 //定时任务放入时间轮
        _tasks[id] = wpt;                                          //添加到map中管理
    }
    //刷新/延迟定时任务
    void RefreshTimerTaskInLoop(uint64_t id)
    {
        std::unordered_map<uint64_t,std::weak_ptr<TimerTask>>::iterator it = _tasks.find(id);
        if(it==_tasks.end()) return ;                          //id 不合法直接返回false
        std::shared_ptr<TimerTask> pt = it->second.lock();          //构造新的shared_ptr
        int pos = (_timer_idx + pt->GetDelay()) % MAXTIME;          //找到新的位置
        _wheel[pos].push_back(pt);
    }
    //取消定时任务
    void CancelTimerTaskInLoop(uint64_t id)
    {
        std::unordered_map<uint64_t,std::weak_ptr<TimerTask>>::iterator it = _tasks.find(id);
        if(it==_tasks.end()) return ;
        (it->second).lock()->CancelTask();
    }

//添加定时任务
bool TimerWheel::AddTimerTask(uint64_t id , uint64_t delay,task cb)
{
    _loop->RunInLoop(std::bind(&TimerWheel::AddTimerTaskInLoop,this,id,delay,cb));
}
//刷新/延迟定时任务
bool TimerWheel::RefreshTimerTask(uint64_t id)
{
_loop->RunInLoop(std::bind(&TimerWheel::RefreshTimerTaskInLoop,this,id));
}
bool TimerWheel::CancelTimerTask(uint64_t id)
{
    _loop->RunInLoop(std::bind(&TimerWheel::CancelTimerTaskInLoop,this,id));
}

这样一来我们的超时模块就设计完了。

 

5 联合超时模块调试 

现在我们需要测试一下超时模块。 

现在的问题是,超时任务怎么设置?其实在我们这里,超时任务就是一个关闭连接以及释放channel资源的任务,也就是一个关闭连接任务。

那么我们可以这样设置:

void CloseHandler(Channel*channel)  //简单点就移除事件监控并且关闭连接
{
    channel->Remove();
    close(channel->Fd());
    delete channel;
    DEBUG_LOG("释放了一个Channel:%p",channel);
}

同时,读任务还是用之前的。

Acceptor函数也做一些小的修改,把回调函数设置的完整一点。

int taskid = 0;
void Acceptor(int lstfd)
{
    int newfd = accept(lstfd,nullptr,nullptr);
    if(newfd == -1) 
    {
        if(errno ==EAGAIN ||errno ==EWOULDBLOCK || errno ==EINTR) return;
        ERROR_LOG("acceptor failed");
    }
    //获取到新连接之后,把新连接绑定到 EventLoop 中 ,并启动事件监听
    Channel* channel = new Channel(newfd,&loop);
    DEBUG_LOG("收到一个连接:%p",channel);
    channel->SetReadCallBack(std::bind(ReadHandler,newfd));
    channel->SetEventCallBack(std::bind(EventHandler,taskid));
    channel->SetCloseCallBack(std::bind(CloseHandler,channel));
    channel->SetErrorCallBack(std::bind(CloseHandler,channel));
    channel->EnableRead();
    //添加超时任务
    int delay = 10;
    loop.AddTimerTask(taskid++,delay,std::bind(CloseHandler,channel));
}

那么任意事件回调我们就设置一个刷新定时任务的回调

//任意事件回调
void EventHandler(int id)  //但是怎么找到任务的id呢?
{
    loop.RefreshTimerTask(id);    
}

这样就差不多了,我们主要测试一下超时模块能否正常工作。

我们的client则是前五秒发送五个消息,然后就死循环休眠,也就是模拟非活跃的情况。

目前我们也没看出来什么问题。

目前来说,我们的EventLoop模块和他所关联的的三个子模块 :Channel,Poller,TimerQueue模块还没有出现大的问题,后续出现问题我们再修正。

再附上一张项目概述时的EventLoop模块的关联图。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2211243.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

python 使用faker库 生成数据

Welcome to Faker’s documentation! — Faker 30.3.0 documentationVersion1: Example from docs:from faker import Faker from faker.providers import internet for i in range(2): #批量生成数据fake Faker()name fake.name()address fake.address()text f…

el-动态表单的校验不触发/只触发了部分项

参考&#xff1a; 深入了解Element Form表单动态验证问题 转载vue elementUI组件表单动态验证失效的问题与解决办法 在别人的代码上开发新功能时&#xff0c;发现动态表单的校验功能突然出现问题&#xff1a; 重构前,只有两步&#xff0c;通过type来判断当前显示内容 <el-f…

Cesium.js(SuperMap iClient3D for Cesium)进行三维场景展示和图层动画

1&#xff09;&#xff1a;参考API文档&#xff1a;SuperMap iClient3D for Cesium 开发指南 2&#xff09;&#xff1a;官网示例&#xff1a;support.supermap.com.cn:8090/webgl/Cesium/examples/webgl/examples.html#layer 3&#xff09;&#xff1a;SuperMap iServer&…

自定义类型 - 结构体

2024 - 10 - 13 - 笔记 - 26 作者(Author): 郑龙浩 / 仟濹(CSDN账号名) 自定义类型 - 结构体 平时用的数组是一组相同类型的数据&#xff0c;如果想表示一组不同类型的数据&#xff0c;那么就可以结构体了。 ① 结构体的声明&#xff08;重要&#xff09; 自己起的名字&…

[论文阅读]: Detecting Copyrighted Content in Language Models Training Data

发布链接&#xff1a;http://arxiv.org/abs/2402.09910 核心目标&#xff1a;检测语言模型的训练过程中是否使用了受版权保护的内容 基于假设&#xff1a;语言模型有可能识别训练文本中的逐字节选 工作&#xff1a;提出了 DE-COP&#xff0c;一种确定训练中是否包含受版权保…

如何在Android平板上使用谷歌浏览器进行网页缩放

在使用Android平板时&#xff0c;我们经常会浏览各种网页&#xff0c;但有时网页内容可能无法适应屏幕大小&#xff0c;这时就需要用到网页缩放功能。本文将为您详细介绍如何在Android平 板上的谷歌浏览器中进行网页缩放&#xff0c;帮助您更好地浏览网页。&#xff08;本文由h…

Cursor 平替项目 bolt.new

Cursor 是一个全新的编程工具&#xff0c;旨在帮助开发者更高效地写代码。它不仅能提升编程速度&#xff0c;还能让代码更干净、更智能。无论你是编程新手还是经验丰富的开发者&#xff0c;Cursor AI都能为你提供智能辅助&#xff0c;显著提高编程效率。 但是目前 Cursor 免费…

QT开发--文件的读写操作

第十三章 文件的读写操作 Qt提供两种读写纯文本文件的方法&#xff1a; 1、直接使用 QFile 类的IO功能&#xff1b; 2、结合 QFile 和 QTextStream&#xff0c;利用流(Stream)进行操作。 13.1 文件读操作 13.1.1 使用QFile类 Qt封装了QFile类&#xff0c;方便我们对文件进行操…

物联网直播技术揭秘:如何保证超高可用性?

我是小米,一个喜欢分享技术的29岁程序员。如果你喜欢我的文章,欢迎关注我的微信公众号“软件求生”,获取更多技术干货! Hello,大家好!我是小米,一个29岁超爱分享技术的码农。今天跟大家聊一聊物联网时代下直播高可用方案的那些事儿。 随着物联网的快速发展,直播技术已…

针对考研的C语言学习(循环队列-链表版本以及2019循环队列大题)

题目 【注】此版本严格按照数字版循环队列的写法&#xff0c;rear所代表的永远是空数据 图解 1.初始化部分和插入部分 2出队 3.分部代码解析 初始化 void init_cir_link_que(CirLinkQue& q) {q.rear q.front (LinkList)malloc(sizeof(LNode));q.front->next NULL…

【宝可梦】游戏

pokemmo https://pokemmo.com/zh/ 写在最后&#xff1a;若本文章对您有帮助&#xff0c;请点个赞啦 ٩(๑•̀ω•́๑)۶

LeetCode讲解篇之1749. 任意子数组和的绝对值的最大值

文章目录 题目描述题解思路题解代码题解链接 题目描述 题解思路 这个我只需要求子数组和的最小值相反数和子数组和的最大值&#xff0c;本题答案为二者的最大值 设数组maxDp中第i号元素表示以nums[i]为结尾的子数组和的最大值 设数组minDp中第i号元素表示以nums[i]为结尾的子…

机器学习课程学习周报十六

机器学习课程学习周报十六 文章目录 机器学习课程学习周报十六摘要Abstract一、机器学习部分1. 再探马尔可夫链1.1 离散状态马尔可夫链1.1.1 转移概率矩阵和状态分布1.1.2 平稳分布 1.2 连续状态马尔可夫链1.3 马尔可夫链的性质 2. 马尔可夫蒙特卡罗法2.1 基本想法2.2 基本步骤…

77.【C语言】文件操作(3)

目录 6.文件的顺序读写 1.几个顺序读写函数 1.fgetc函数 代码示例 代码改进 2.fputc函数 3.fputs函数 如果需要换行,应该写入换行符(\n) 4.fgets函数 1.读取单行字符串 2.读取多行字符串 6.文件的顺序读写 1.几个顺序读写函数 分组:(fgetc,fputc),(fgets,fputs),(f…

服务器数据恢复—Raid5阵列硬盘磁头损坏导致掉线的数据恢复案例

服务器数据恢复环境&#xff1a; 一台某品牌存储设备上有一组由10块硬盘&#xff08;9块数据盘1块热备盘&#xff09;组建的raid5阵列&#xff0c;上层部署vmware exsi虚拟化平台。 服务器故障&#xff1a; raid5阵列中两块硬盘对应的指示灯亮黄灯掉线。硬盘序列号无法读取&am…

【动手学深度学习】6.3 填充与步幅(个人向笔记)

卷积的输出形状取决于输入形状和卷积核的形状在应用连续的卷积后&#xff0c;我们最终得到的输出大小远小于输入大小&#xff0c;这是由于卷积核的宽度和高度通常大于1导致的比如&#xff0c;一个 240 240 240240 240240像素的图像&#xff0c;经过10层 5 5 55 55的卷积后&am…

D3.js(五):实现组织架构图

实现组织架构图 效果初始化组织机构容器并实现缩放平移功能效果源码 渲染节点效果源码 渲染连线效果源码 完整源码 效果 初始化组织机构容器并实现缩放平移功能 效果 源码 import {useEffect} from react; import TreeData from ./json/tree-data.json;interface ITreeConfig…

电子电气架构---汽车OEM敏捷式集成方案简介

我是穿拖鞋的汉子&#xff0c;魔都中坚持长期主义的汽车电子工程师。 老规矩&#xff0c;分享一段喜欢的文字&#xff0c;避免自己成为高知识低文化的工程师&#xff1a; 屏蔽力是信息过载时代一个人的特殊竞争力&#xff0c;任何消耗你的人和事&#xff0c;多看一眼都是你的不…

数据在内存中的存储【下】

三.浮点数在内存中的存储 我们常见的浮点数&#xff1a;3.14159&#xff0c;1E10等&#xff0c;浮点数家族包括&#xff1a;float&#xff0c;double, long double类型。浮点数表示的范围&#xff1a;float.h中定义。之前我们说过浮点数在内存中无法精确保存&#xff0c;那为什…

OKHTTP 如何处理请求超时和重连机制

&#x1f604;作者简介&#xff1a; 小曾同学.com,一个致力于测试开发的博主⛽️&#xff0c;主要职责&#xff1a;测试开发、CI/CD 如果文章知识点有错误的地方&#xff0c;还请大家指正&#xff0c;让我们一起学习&#xff0c;一起进步。 &#x1f60a; 座右铭&#xff1a;不…