【IO多路转接】pollepoll

news2024/11/23 19:10:06

文章目录

  • 1 :peach:poll:peach:
    • 1.1 :apple:poll函数接口:apple:
    • 1.2 :apple:poll接口的使用:apple:
    • 1.3 :apple:poll的优缺点:apple:
  • 2 :peach:epoll:peach:
    • 2.1 :apple:epoll函数接口:apple:
      • 2.1.1 :lemon:epoll_create:lemon:
      • 2.1.2 :lemon:epoll_ctl:lemon:
      • 2.1.3 :lemon:epoll_wait:lemon:
    • 2.2 :apple:epoll工作原理:apple:
    • 2.3 :apple:epoll的优点:apple:
    • 2.4 :apple:epoll接口的使用:apple:
      • 2.4.1 :lemon:第一版本的epoll:lemon:
      • 2.4.2 :lemon:epoll工作方式:lemon:
      • 2.4.3 :lemon:对比LT和ET:lemon:
      • 2.4.4 :lemon:第二个版本的epoll:lemon:
      • 2.4.5 :lemon:Reactor:lemon:


1 🍑poll🍑

1.1 🍎poll函数接口🍎

#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
// pollfd结构
struct pollfd 
{
 int fd; /* file descriptor */
 short events; /* requested events */
 short revents; /* returned events */
};

参数说明:

  • fds是一个poll函数监听的结构列表. 每一个元素中, 包含了三部分内容: 文件描述符, 监听的事件集合, 返回的事件集合;
  • nfds表示fds数组的长度;
  • timeout表示poll函数的超时时间, 单位是毫秒(ms)

eventsrevents的取值:

事件描述是否可作为输入是否可作为输出
POLLIN普通数据和优先数据可读
POLLEDNORM普通数据可读
POLLEDBAND优先级带数据可读(Linux不支持)
POLLOUT普通数据和优先数据可写
POLLWRNORM普通数据可写
POLLWRBAND优先级带数据可写
POLLERR错误
POLLHUP挂起,比如管道的写端关闭后,读端描述符上将收到POLLHUP事件
POLLNVAL文件描述符没有打开

返回结果:

  • 返回值小于0, 表示出错;
  • 返回值等于0, 表示poll函数等待超时;
  • 返回值大于0, 表示poll由于监听的文件描述符就绪而返回;

1.2 🍎poll接口的使用🍎

通过对poll接口的介绍后大家不难发现,其实使用poll接口是比用select是更简单的,因为在之前我们写select服务器时我们需要自己来维护一个fd数组帮助我们将位图结构初始化,但是使用poll就不用了,我们只需要创建一个struct pollfd*结构的指针,动态开辟空间即可。

代码实例:

#include "Sock.hpp"
#include <memory>
#include <string>
#include<poll.h>
#include<cassert>

using namespace std;
const int N = 1024;
const int default_fd = -1;
const short default_event=0;
const uint16_t gport=8866;
class PollServer
{
public:
    PollServer(const uint16_t port=gport)
    :_port(port)
    ,_ppd(nullptr)
    {}
    ~PollServer()
    {
        _listensock.Close();
        delete []_ppd;
        _ppd=nullptr;
    }

    void init()
    {
        _listensock.Socket();
        _listensock.Bind(_port);
        _listensock.Listen();

        _ppd=new pollfd[N];
        for(int i=0; i<N; ++i)
        {
            _ppd[i].fd=default_fd;
            _ppd[i].events=default_event;
            _ppd[i].revents=default_event;
        }

    }

    void run()
    {
        _ppd[0].fd=_listensock.Fd();
        _ppd[0].events=POLLIN;
        while(true)
        {
            //int timeout=-1;
            int n=poll(_ppd, N, -1); 
            if (n > 0)
            {
                cout << "有一个就绪事件发生了" << endl;
                // 表示已经有n个连接到来了,此时我们能够直接accept吗?
                hand_event();
                printf_fd();
            }
            else if (n == 0)
            {
                cout << "time out" << endl;
            }
            else
            {
                cout << "select errno:" << errno << ":" << strerror(errno) << endl;
            }
        }
    }
private:
    void accepter()
    {
        string clientip;
        uint16_t clientport;
        int sock = _listensock.Accept(&clientip, &clientport);
        cout << "[ip:port]:" << clientip << ":" << clientport << endl;
        int pos = 1;
        while (pos < N)
        {
            if (_ppd[pos].fd == default_fd)
            {
                _ppd[pos].fd = sock;
                _ppd[pos].events=POLLIN;
                _ppd[pos].revents=default_event;
                break;
            }
            ++pos;
        }
        if (pos > N)
        {
            cout << "_fdarr full" << endl;
            close(sock);
        }
    }

    void serverio(int fd, int i)
    {
        char buffer[1024];
        ssize_t n = read(fd, buffer, sizeof(buffer) - 1);
        if (n < 0)
        {
            cout << "read fail" << endl;
            return;
        }
        else if (n == 0)
        {
            cout << "client close,me too" << endl;
            close(fd);
            _ppd[i].fd=default_fd;
            _ppd[i].events=default_event;
            _ppd[i].revents=default_event;
        }
        else
        {
            buffer[n - 1] = 0;
            cout << "client:" << buffer << endl;
            string echo = buffer;
            echo += " [poll server echo]";
            write(fd, echo.c_str(), echo.size());
        }
    }

    void hand_event()
    {
        for (int i = 0; i < N; ++i)
        {
            int fd=_ppd[i].fd;
            short revent=_ppd[i].revents;
            if(fd == default_fd)
                continue;
            if (revent & POLLIN)
            {
                if (fd == _listensock.Fd())
                {
                    accepter();
                }
                else
                {
                    serverio(fd, i);
                }
            }
            else if(revent & POLLOUT)
            {
                cout<<"POLLOUT"<<endl;
            }

        }
    }

    void printf_fd()
    {
        for (int i = 0; i < N; ++i)
        {
            if (_ppd[i].fd != default_fd)
                cout << _ppd[i].fd << " ";
        }
        cout<<endl;
    }
private:
    Sock _listensock;
    uint16_t _port;
    pollfd* _ppd;
};

注意:此时在进行serverio时也会有粘包问题以及write的fd没有交给poll处理(写实事件并不一定就绪)问题。

验证:
在这里插入图片描述

1.3 🍎poll的优缺点🍎

poll优点
不同与select使用三个位图来表示三个fdset的方式,poll使用一个pollfd的指针实现:

  • pollfd结构包含了要监视的event和已经就绪的revent,不再使用select手动设置fd集合的方式,接口使用比select更方便;
  • poll并没有最大数量限制 (但是数量过大后性能也是会下降);

poll缺点

poll中监听的文件描述符数目增多时:

  • 和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符;
  • 每次调用poll都需要把大量的pollfd结构从用户态拷贝到内核中;
  • 同时连接的大量客户端在一时刻可能只有很少的处于就绪状态, 因此随着监视的描述符数量的增长, 其效率也会线性下降.

从本质上来说,poll知识解决了select文件描述符个数限制问题,但是select其他缺点poll并没有解决,那么还有更好的方式来解决吗?这时就引出来了一个更为厉害的转接方式:epoll


2 🍑epoll🍑

2.1 🍎epoll函数接口🍎

2.1.1 🍋epoll_create🍋

int epoll_create(int size);

创建一个epoll的句柄:

  • 自从linux2.6.8之后,size参数是被忽略的(只要随便设置一个>0的数字就行)
  • 用完之后, 必须调用close()关闭;

2.1.2 🍋epoll_ctl🍋

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll的事件注册函数:

  • 它不同于select是在监听事件时告诉内核要监听什么类型的事件, 而是在这里先注册要监听的事件类型;
  • 第一个参数是epoll_create()的返回值(epoll的句柄);
  • 第二个参数表示动作,用三个宏来表示;
  • 第三个参数是需要监听的fd;
  • 第四个参数是告诉内核需要监听什么事件;

op参数的取值:

  • EPOLL_CTL_ADD :注册新的fd到epfd中;
  • EPOLL_CTL_MOD :修改已经注册的fd的监听事件;
  • EPOLL_CTL_DEL :从epfd中删除一个fd;

struct epoll_event结构如下:
在这里插入图片描述

events可以是以下几个宏的集合:

  • EPOLLIN : 表示对应的文件描述符可以读 (包括对端SOCKET正常关闭);
  • EPOLLOUT : 表示对应的文件描述符可以写;
  • EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来);
  • EPOLLERR : 表示对应的文件描述符发生错误;
  • EPOLLHUP : 表示对应的文件描述符被挂断;
  • EPOLLET : 将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的;
  • EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要再次把这个socket加入到EPOLL队列里;

2.1.3 🍋epoll_wait🍋

int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

收集在epoll监控的事件中已经就绪事件。

  • 参数events是分配好的epoll_event结构体数组;
  • epoll将会把发生的事件赋值到events数组中 (events不可以是空指针,内核只负责把数据复制到这个events数组中,不会去帮助我们在用户态中分配内存);
  • maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size;
  • timeout是超时时间 (毫秒,0会立即返回,-1是永久阻塞);
  • 如果函数调用成功,返回对应I/O上已准备好的文件描述符数目,如返回0表示已超时, 返回小于0表示函数失败;

2.2 🍎epoll工作原理🍎

  • 1️⃣当某一进程调用epoll_create方法时,Linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用方式密切相关:
struct eventpoll
{ 
 .... 
 /*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/ 
 struct rb_root rbr; 
 /*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/ 
 struct list_head rdlist; 
 .... 
};
  • 2️⃣每一个epoll对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件;这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是lgn,其中n为树的高度)
    而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当响应的事件发生时会调用这个回调方法;这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中。在epoll中,对于每一个事件,都会建立一个epitem结构体。
struct epitem
{ 
 struct rb_node rbn;//红黑树节点 
 struct list_head rdllink;//双向链表节点 
 struct epoll_filefd ffd; //事件句柄信息 
 struct eventpoll *ep; //指向其所属的eventpoll对象 
 struct epoll_event event; //期待发生的事件类型 
}
  • 3️⃣当调用epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可,如果rdlist不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户. 这个操作的时间复杂度是O(1)

在这里插入图片描述总结一下, epoll的使用过程就是三部曲:

  • 调用epoll_create创建一个epoll句柄;
  • 调用epoll_ctl, 将要监控的文件描述符进行注册;
  • 调用epoll_wait, 等待文件描述符就绪;

2.3 🍎epoll的优点🍎

  • 接口使用方便: 虽然拆分成了三个函数, 但是反而使用起来更方便高效,不需要每次循环都设置关注的文件描述符, 也做到了输入输出参数分离开;
  • 数据拷贝轻量: 只在合适的时候调用 epoll_ctl() 将文件描述符结构拷贝到内核中, 这个操作并不频繁(而select/poll都是每次循环都要进行拷贝);
  • 事件回调机制: 避免使用遍历, 而是使用回调函数的方式, 将就绪的文件描述符结构加入到就绪队列中, epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪. 这个操作时间复杂度O(1). 即使文件描述符数目很多, 效率也不会受到影响;
  • 没有数量限制: 文件描述符数目无上限;(IO效率不随fd数目增加而线性下降)

网上有些博客说, epoll中使用了内存映射机制(内核直接将就绪队列通过mmap的方式映射到用户态,避免了拷贝内存这样的额外性能开销);这种说法是不准确的,我们定义的struct epoll_event是我们在用户空间中分配好的内存,势必还是需要将内核的数据拷贝到这个用户空间的内存中的。

2.4 🍎epoll接口的使用🍎

2.4.1 🍋第一版本的epoll🍋

为了更加方便使用 epoll接口,便封装了一个类专门处理epoll接口:

#include "Sock.hpp"
#include <memory>
#include <string>
#include<sys/epoll.h>
#include<cassert>
using namespace std;


const int default_sz=132;
const int default_epfd=-1;
class Epoller
{
public:
    Epoller()
    :_epfd(default_epfd)
    {}
    ~Epoller()
    {
        if(_epfd != default_epfd)
            close(_epfd);
    }
    void create()
    {
        _epfd=epoll_create(default_sz);
        if(_epfd < 0)
        {
            cout<<"epoll_create fail"<<endl;
        }
    }

    bool add_event(int fd, uint32_t events)
    {
        epoll_event ev;
        ev.data.fd=fd;
        ev.events=events;
        int n=epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &ev);
        if(n < 0)
        {
            cout<<"add_event fail"<<endl;
            return false;
        }
        return true;
    }
    bool mod_event(int fd, uint32_t events)
    {
        epoll_event ev;
        ev.data.fd=fd;
        ev.events=events;
        int n=epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &ev);
        if(n < 0)
        {
            cout<<"mod_event fail"<<endl;
            return false;
        }
        return true;
    }
    bool del_event(int fd)
    {
        int n=epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);
        if(n < 0)
        {
            cout<<"del_event fail"<<endl;
            return false;
        }
        return true;
    }

    int ep_wait(epoll_event* revents, int max_events, int timeout)
    {
        return epoll_wait(_epfd, revents, max_events, timeout);
    }

    int get_epfd()
    {
        return _epfd;
    }
private:
    int _epfd;
};

epoll服务器的编写:

const uint16_t g_port=8899;
const int max_epollrevent_sz=64;
class EpollServer
{
public:
    EpollServer(uint16_t port=g_port)
    :_port(port)
    {}
    ~EpollServer()
    {
        _listensock.Close();
    }

    void init()
    {
        _listensock.Socket();
        _listensock.Bind(_port);
        _listensock.Listen();

        _epoller.create();
    }
    void run()
    {
        _epoller.add_event(_listensock.Fd(), EPOLLIN);
        int timeout=-1;//以阻塞方式进行等待
        while(true)
        {
            int n=_epoller.ep_wait(_rvents, max_epollrevent_sz, timeout);
            if(n < 0)
            {
                cout<<"ep_wait fail"<<endl;
            }
            else if(n == 0)
            {
                cout<<"timeout"<<endl;
            }
            else
            {
                cout<<"当前已经有"<<n<<"个事件就绪"<<endl;
                hander_event(n);
            }
        }
    }
private:
    void hander_event(int n)//由于只有n个事件就绪,所以我们只需要遍历0~n即可
    {
        for(int i=0; i<n; ++i)
        {
            int fd=_rvents[i].data.fd;
            uint32_t revent=_rvents->events;
            if (revent & EPOLLIN)//读事件就绪时
            {
                if (fd == _listensock.Fd())
                {
                    accepter(); // 进行accept获取新连接
                }
                else
                {
                    serverio(fd); // 用于进行数据io
                }
            }
        }
    }
    void accepter()
    {
        string clientip;
        uint16_t clientport;
        int sock=_listensock.Accept(&clientip, &clientport);
        cout<<"【"<<clientip<<","<<clientport<<"】事件已经就绪,fd:"<<sock<<endl;
        //将新连接添加到_epoller
        _epoller.add_event(sock, EPOLLIN);
    }
    void serverio(int fd)
    {
        char buffer[1024];
        ssize_t n = read(fd, buffer, sizeof(buffer) - 1);
        if (n < 0)
        {
            cout << "read fail" << endl;
        }
        else if (n == 0)
        {
            cout << "client close,me too" << endl;
            close(fd);
            _epoller.del_event(fd);
        }
        else
        {
            buffer[n - 1] = 0;
            cout << "client:" << buffer << endl;
            string echo = buffer;
            echo += " [epoll server echo]";
            send(fd, echo.c_str(), echo.size(), 0);
        }
    }

private:
    uint16_t _port;
    Sock _listensock;
    Epoller _epoller;
    epoll_event _rvents[max_epollrevent_sz];
};

代码中需要注意的地方:

  • 1️⃣此时在进行serverio时也会有粘包问题以及写事件并不一定就绪的问题(这个我们在第二版本会讲解处理方式)。
  • 2️⃣在select/poll编程中,在读取消息时当对端已经把连接关闭时都会修改数组(select中是数组,而poll中是指针),目的都是让内核不要在关心该事件了,epoll也是同理,不同的是调用del_event将不在关心的事件删除而已。

2.4.2 🍋epoll工作方式🍋

epoll有2种工作方式:水平触发(LT)和边缘触发(ET)
我们来举一个生活中小栗子来帮助更好的理解这两种方式:

比如你正在打游戏,你的妈妈喊你吃饭,这时她通知你的方式可能有下面两种:

  1. 每隔一段时间通知你一次,直到你来吃饭为止;(LT)
  2. 只通知你一次,后面就不管你了;(ET)

看一个实际例子:

  • 我们已经把一个tcp socket添加到epoll描述符 ;
  • 这个时候socket的另一端被写入了2KB的数据;
  • 调用epoll_wait,并且它会返回,说明它已经准备好读取操作;
  • 然后调用read, 只读取了1KB的数据;
  • 继续调用epoll_wait…

水平触发Level Triggered 工作模式

  • 当epoll检测到socket上事件就绪的时候, 可以不立刻进行处理,或者只处理一部分;
  • 如上面的例子, 由于只读了1K数据, 缓冲区中还剩1K数据, 在第二次调用 epoll_wait 时, epoll_wait仍然会立刻返回并通知socket读事件就绪;
  • 直到缓冲区上所有的数据都被处理完, epoll_wait 才不会立刻返回;
  • 支持阻塞读写和非阻塞读写;

注意:epoll默认状态下就是LT工作模式.

边缘触发Edge Triggered工作模式

如果我们将socket添加到epoll描述符的时候使用了EPOLLET标志, epoll进入ET工作模式。

  • 当epoll检测到socket上事件就绪时, 必须立刻处理;
  • 如上面的例子, 虽然只读了1K的数据, 缓冲区还剩1K的数据, 在第二次调用 epoll_wait 的时候,epoll_wait 不会再返回了;
  • 也就是说, ET模式下, 文件描述符上的事件就绪后, 只有一次处理机会;
  • ET的性能比LT性能更高( epoll_wait 返回的次数少了很多),Nginx默认采用ET模式使用epoll;
  • 只支持非阻塞的读写;

2.4.3 🍋对比LT和ET🍋

LT是 epoll 的默认行为,使用 ET 能够减少 epoll 触发的次数,但是代价就是强逼着程序猿一次响应就绪过程中就把所有的数据都处理完。这也就表明ET的代码复杂程度更高。

相当于一个文件描述符就绪之后, 不会反复被提示就绪, 看起来就比 LT 更高效一些,但是在 LT 情况下如果也能做到每次就绪的文件描述符都立刻处理,不让这个就绪被重复提示的话,其实性能也是差不多的。

2.4.4 🍋第二个版本的epoll🍋

在开始写代码前,我们增加一个Connection的结构体,它的主要成员如下:
在这里插入图片描述为什么要设计这么一个结构体呢?我们知道,使用ET模式就要求程序员将缓冲区的数据一次性全部取走,所以为了简便就使用了_inbuffer_outbuffer这两个缓冲区来处理读到的数据以及要发送的数据。另外该成员中还存在3个回调函数,这样我们只需要不断的监视就绪的事件中_events,就能够回调不同的处理方式。
除此之外,我们还得在EpollServer类中再增加一个成员变量:
在这里插入图片描述
使用fdConnection建立唯一映射关系,当我们监听到一个新连接到来时,就将新连接添加到_conns中管理,所以我们接下来便可以完善代码了:

const uint16_t g_port=8899;
const int max_epollrevent_sz=64;
const int g_num=8848;


class Connection;
class EpollServer;
using func_t=std::function<void (Connection*, const protocol_ns::Request&)>;
using callbact_t=std::function<void(Connection*)>;

class Connection
{
public:
    Connection(int fd, string ip, uint16_t port)
    :_fd(fd)
    ,_ip(ip)
    ,_port(port)
    {}
    void resgister(callbact_t calb_read, callbact_t calb_write, callbact_t calb_excep)
    {
        _calb_read=calb_read;
        _calb_write=calb_write;
        _calb_excep=calb_excep;
    }
    //用于进行数据IO
    int _fd;
    string _inbuffer;
    string _outbuffer;
    //用户信息
    string _ip;
    uint16_t _port;
    //用户关心的事件
    uint32_t _events;
    //IO处理函数
    callbact_t _calb_read;
    callbact_t _calb_write;
    callbact_t _calb_excep;
};
class EpollServer
{
public:
    EpollServer(func_t func, uint16_t port=g_port)
    :_func(func)
    ,_port(port)
    {}
    ~EpollServer()
    {
        _listensock.Close();
    }

    void init()
    {
        _listensock.Socket();
        _listensock.Bind(_port);
        _listensock.Listen();

        _epoller.create();
        add_connection(_listensock.Fd(), EPOLLIN | EPOLLET);//将_listensock添加到_conns中
        cout<<"EpollServer init success"<<endl;
    }


    void loop_once(int timeout)
    {
        int n=_epoller.ep_wait(_rvents, max_epollrevent_sz, timeout);
        for(int i=0; i<n; ++i)
        {
            int fd=_rvents[i].data.fd;
            uint32_t events=_rvents[i].events;
            cout<<n<<":"<<i<<endl;
            cout<<"正在处理"<<fd<<"事件上的"<<((events & EPOLLIN) ? "EPOLLIN" : "OTHER")<<endl;

            //将所有的异常情况,最后全部转化成为read,write的异常
            if ((events & EPOLLERR) || (events & EPOLLHUP))
                events |= (EPOLLIN | EPOLLOUT);

            if((events & EPOLLIN) && conn_isexist(fd))
                _conns[fd]->_calb_read(_conns[fd]);//如果是读事件就执行读事件的回调
            else if((events & EPOLLOUT) && conn_isexist(fd))
                _conns[fd]->_calb_write(_conns[fd]);//如果是写事件就执行写事件的回调
        }
    }
    void run()
    {
       int timeout=-1;
       while(true)
       {
            loop_once(timeout);
       }
    }

private:
    void add_connection(int fd, uint32_t events, string ip="127.0.0.1", uint16_t port=g_port)
    {
        //将fd设置为非阻塞,保证ET模式下不会一直卡在wait
        Util::SetNonBlock(fd);
        //构建Connection对象,交给_conns管理
        Connection* con=new Connection(fd, ip, port);
        if(fd == _listensock.Fd())
            con->resgister(std::bind(&EpollServer::accepter, this, std::placeholders::_1), nullptr, nullptr);
        else
            con->resgister(std::bind(&EpollServer::reader, this, std::placeholders::_1),
            std::bind(&EpollServer::writer, this, std::placeholders::_1),
            std::bind(&EpollServer::excepter, this, std::placeholders::_1));
        con->_events=events;
        _conns.insert({fd, con});
        //将事件写到内核中
        bool r=_epoller.add_event(fd, events);
        cout<<"_conns insert success,fd:"<<fd<<",ip:"<<ip<<",port"<<port<<endl;
    }

    void accepter(Connection* conn)
    {
    }
    void reader(Connection* conn)
    {
    }
    void writer(Connection* conn)
    {
    }
    void excepter(Connection* conn)
    {
    }
    bool conn_isexist(int fd)
    {
        return _conns.find(fd) != _conns.end();
    }

private:
    uint16_t _port;
    Sock _listensock;
    Epoller _epoller;
    epoll_event _rvents[max_epollrevent_sz];//就绪的响应事件
    unordered_map<int, Connection*> _conns;//使用fd与Connection*建立映射关系
    func_t _func;//用于执行上层传入的回调
};

代码中需要注意的地方:

  • 1️⃣:在添加连接时,首先将fd设置为非阻塞,保证了read/write一定能够把数据读取完毕/发送完毕;
  • 2️⃣:在添加连接的时候,由于参数不匹配,所以使用了bind来调整参数个数:
    在这里插入图片描述
  • 3️⃣:在进行数据IO时,我们分成了readerwriter

现在的重点是如何实现accepter/reader/writer:
在实现前,我们就必须考虑协议定制的问题了,要读取或者发送一个完整的报文,我们之前实现网络版本计算器时已经实现过一次协议定制,所以此时直接拿来用即可。
先来实现acceper:

    void accepter(Connection* conn)
    {
        do
        {
            string clientip;
            uint16_t clientport;
            int err=0;
            int sock = _listensock.Accept(&clientip, &clientport, err);
            if (sock > 0)
            {
                cout << "【" << clientip << "," << clientport << "】事件已经就绪,fd:" << sock << endl;
                // 将新连接添加到_conns中管理
                add_connection(sock, EPOLLIN | EPOLLET, clientip, clientport);
            }
            else
            {
                if(err == EAGAIN || err == EWOULDBLOCK)
                    break;
                else if(err == EINTR)
                    continue;
                else
                    cout<<"accept fail"<<endl;
            }
        } while (conn->_events & EPOLLET);
    }

代码中需要循环获取新的连接,当新连接到来时就将新连接添加到_conns中管理即可。

再来实现reader:

    void hander_requset(Connection* conn)
    {
        bool quit = false;
        while (!quit)
        {
            string requestStr;
            // ParsePackage函数作用是将_inbuffer中取出一个完整报文并将其写入到requestStr中
            // 返回值==0表示没有一个完整报文;>0表示一个完整报文的长度
            int t = protocol_ns::ParsePackage(conn->_inbuffer, &requestStr);
            if (t > 0)
            {
                // 去除报头
                requestStr = protocol_ns::RemoveHeader(requestStr, t);
                // 进行反序列化
                protocol_ns::Request req;
                req.Deserialize(requestStr);
                // 执行回调进行处理
                protocol_ns::Response resp=_func(req);
                //序列化
                string responseStr;
                resp.Serialize(&responseStr);
                //添加报头
                responseStr=protocol_ns::AddHeader(responseStr);
                //将数据写到发送缓冲区中
                conn->_outbuffer+=responseStr;
            }
            else
                quit=true;
        }
    }
    bool reader_hander(Connection* conn)
    {
        bool res=true;
        do
        {
            char buffer[g_num]={0};
            int n=read(conn->_fd, buffer, sizeof(buffer)-1);
            if(n > 0)
            {
                buffer[n]=0;
                conn->_inbuffer+=buffer;
            }
            else if (n == 0)
            {
                conn->_calb_excep(conn);
                break;
            }
            else
            {
                if (errno == EAGAIN || errno == EWOULDBLOCK)
                    break;//表示已经将数据全部读取完毕,跳出循环
                else if (errno == EINTR)
                    continue;
                else
                {
                    conn->_calb_excep(conn);
                    res=false;
                    break;
                }
            }
        }while(conn->_events & EPOLLET);
        return res;
    }
    void reader(Connection* conn)
    {
        if(!reader_hander(conn))
            return;
        //处理request,返回response
        hander_requset(conn);
        //一般我们在面对写入的时候,直接写入,没写完才交给epoll
        if(!conn->_outbuffer.empty())
            conn->_calb_write(conn);//如果发送缓冲区不为空,就进行一次writer
    }

上面代码中比较重要的地方都标有注释。
处理reader的方式是先将所有的数据全部读到_inbuffer中,然后再根据协议进行处理,将处理好的数据放进_outbuffer中,最后判断一下_outbuffer是否为空,不为空的话就手动调用写事件的回调函数进行处理。

在进行写事件处理时我们要明白一件事:读事件需要一直关心,因为你需要一直监听是否有新的socket到来,但是写事件其实就不需要了,一直关心反而占用CPU资源,只有当_outbuffer不为空时我们才去关心。

writer的编写:

    bool enable_read_write(Connection *conn, bool read, bool write)
    {
        conn->_events = ((read ? EPOLLIN : 0) | (write ? EPOLLOUT : 0) | EPOLLET);
        return _epoller.mod_event(conn->_fd, conn->_events);
    }
    void writer(Connection *conn)
    {
        {
            bool safe = true;
            do
            {
                ssize_t n = write(conn->_fd, conn->_outbuffer.c_str(), conn->_outbuffer.size());
                if (n > 0)
                {
                    conn->_outbuffer.erase(0, n);
                    if (conn->_outbuffer.empty())
                        break;
                }
                else
                {
                    if (errno == EAGAIN || errno == EWOULDBLOCK)
                    {
                        break;
                    }
                    else if (errno == EINTR)
                        continue;
                    else
                    {
                        safe = false;
                        conn->_calb_excep(conn);
                        break;
                    }
                }
            } while (conn->_events & EPOLLET);
            if (!safe)
                return;
            if (!conn->_outbuffer.empty())
                enable_read_write(conn, true, true);
            else
                enable_read_write(conn, true, false);
        }
    }

最后excepter的编写就很简单了:

    void excepter(Connection *conn)
    {
        // 1. 先从epoll移除fd
        _epoller.del_event(conn->_fd);

        // 2. 从_conns中移除fd
        _conns.erase(conn->_fd);

        // 3. 关闭fd
        close(conn->_fd);

        // 5. 释放conn对象
        delete conn;
    }

2.4.5 🍋Reactor🍋

首先来回答什么是Reactor?
Reactor是基于多路转接包含事件派发器,连接管理器的半同步半异步的IO服务器。
其实我们实现的第二个版本的epoll就是一个简易版本的Reactor(但是没有加上异步处理)。异步就是使用多进程/多线程的方式让事件处理交给另外一个进程/线程处理,防止当前业务进程/业务线程阻塞而导致整个业务无法处理。

如果使用多线程的方式我们可以使用下面这种方式将之前的代码进行优化:
在这里插入图片描述
这样一个sock就对应这一个线程来处理。

多进程方式:

  • 我们可以使用管道来处理,父进程负责获得listensock,子进程将管道的读端添加进epoll中,由于子进程继承了父进程的listensock(不将listensock添加进Reactor),所以让子进程自己accept获得sock然后添加进Reactor,父进程可以使用轮询的方式随机挑选子进程向管道写入数据;
  • 除了使用管道外,还可以多进程加锁竞争的方式来进行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1184333.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OGG将Oracle全量同步到kafka

&#x1f4e2;&#x1f4e2;&#x1f4e2;&#x1f4e3;&#x1f4e3;&#x1f4e3; 哈喽&#xff01;大家好&#xff0c;我是【IT邦德】&#xff0c;江湖人称jeames007&#xff0c;10余年DBA及大数据工作经验 一位上进心十足的【大数据领域博主】&#xff01;&#x1f61c;&am…

C语言之pthread_once实例总结(八十三)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 人生格言&#xff1a; 人生…

ClickHouse介绍和使用

ClickHouse介绍和使用 1. 简介2. ClickHouse特点3. 数据类型3.1. 整型3.2. 浮点型3.3. Decimal型3.4. 布尔型3.5. 字符串3.6. 枚举类型3.7. 时间类型 4. 表引擎4.1. TinyLog4.2. Memory4.3. MergeTree4.3.1. partition by分区&#xff08;可选&#xff09;4.3.2. primary key 主…

微信小程序将后端返回的图片文件流解析显示到页面

说明 由于请求接口后端返回的图片格式不是一个完整的url,也不是其他直接能显示的图片格式&#xff0c;是一张图片 后端根据模板与二维码生成图片,返回二进制数据 返回为文件流的格式,用wx.request请求的时候&#xff0c;就自动解码成为了下面这样的数据数据格式,这样的数据没…

Spring的缓存机制-循环依赖

群公告 Java每日大厂面试题&#xff1a; 1、Spring 是如何解决循环依赖&#xff1f; 答案&#xff1a;三级缓存&#xff0c;简单来说&#xff0c;A创建过程中需要B&#xff0c;于是A将自己放到三级缓存里面&#xff0c;去实例化B&#xff0c;B实例化的时候发现需要…

智能AI系统ChatGPT系统源码+支持GPT4.0+支持ai绘画(Midjourney)/支持OpenAI GPT全模型+国内AI全模型

一、AI创作系统 SparkAi创作系统是基于OpenAI很火的ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统&#xff0c;支持OpenAI-GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美&#xff0c;可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如…

交流负载发电机测试

交流负载发电机测试是一种常用的测试方法&#xff0c;用于评估发电机在负载条件下的性能和稳定性。测试过程中需要使用负载设备模拟实际负载&#xff0c;并通过测量电压、电流、功率等参数来评估发电机的输出能力和稳定性。 在进行测试之前&#xff0c;首先需要准备好测试设备和…

30张图详解IP地址网络知识

你们好&#xff0c;我的网工朋友。 IP地址是所有网络初级课程里最先涉及到的技术点&#xff0c;对于IP地址的合理规划是网络设计的重要环节&#xff0c;必须拿捏。 IP地址规划的好坏&#xff0c;影响到网络路由协议算法的效率&#xff0c;影响到网络的性能&#xff0c;影响到网…

短剧出海火爆,Flat Ads独家流量助泛娱乐赛道App迅速获客增长

10月26日&#xff0c;由扬帆出海主办的GICC2023 | 第四届全球互联网产业CEO大会正式圆满落幕&#xff0c;Flat Ads等出海企业应邀参加。 据悉&#xff0c;本届GICC深圳站邀请200CXO行业领袖、300各路优质厂商、1200全球互联网产业代表共聚一堂&#xff0c;聚焦短剧、游戏、泛娱…

最前端|如何使用Plausible实现页面埋点?

目录 一、业务背景 二、业务场景描述 三、解决方案 //如何集成 Plausible &#xff1f; //如何监控特定功能使用情况&#xff1f; 什么是 MyEventName? //如何向自定义事件传递参数&#xff1f; 一、业务背景 随着公司自研产品的不断发展&#xff0c;对前端页面的监控和…

强力解决使用node版本管理工具 NVM 出现的问题(找不到 node,或者找不到 npm)

强力解决使用node版本管理工具 NVM 出现的问题&#xff08;找不到 node&#xff0c;或者找不到 npm&#xff09; node与npm版本对应关系 nvm是好用的Nodejs版本管理工具&#xff0c; 通过它可以方便地在本地调换Node版本。 2020-05-28 Node当前长期稳定版12.17.0&#xff0c;…

适用于Linux桌面歌词应用程序MusixMatch

导读Musixmatch桌面应用程序可用于Linux&#xff01;不是Linux用户缺少桌面歌词应用程序。包括“即时歌词”和“Lyricfier”&#xff0c;许多开源音乐播放器都会运用某种歌词集成。 但是Musixmatch应用程序与那些有点不同。 Musixmatch的USP是Syncronized歌词 如果您曾经使用…

如何选择高效率的在线分板机主轴?

随着智能移动设备和其他电子3C设备需求的增大&#xff0c;PCB分板机的需要也随之而大增。越来越多的企业开始使用在线分板机来替代传统的手工分板&#xff0c;从而提升了生产效率&#xff0c;提高了产品质量&#xff0c;降低了生产成本。在分板机设备中&#xff0c;高速主轴是关…

Pandas数据预处理Pandas合并数据集在线闯关_头歌实践教学平台

这里写目录标题 第1关 Concat与Append操作第2关 合并与连接第3关 案例&#xff1a;美国各州的统计数据 第1关 Concat与Append操作 任务描述 本关任务&#xff1a;使用read_csv()读取两个csv文件中的数据&#xff0c;将两个数据集合并&#xff0c;将索引设为Ladder列&#xff0…

92. 递归实现指数型枚举

题目 思路 因为有n个数&#xff0c;每个数选或不选都是一种方案&#xff0c;而且要递增输出&#xff0c;那么就标记每个数是否备选&#xff0c;然后判断完n个数以后&#xff0c;就可以输出了 代码 #include<bits/stdc.h> using namespace std; int n; bool f[100] {0…

WEB渲染模式——CSR SSR SSG ISR DPR区别

页面渲染 浏览器渲染页面&#xff0c;根据HTML文档类型声明&#xff08;DOCTYPE&#xff09;解析HTML和CSS&#xff0c;渲染步骤&#xff1a;解析、样式计算、元素布局、绘制、重绘重排。HTML、CSS、JavaScript是网页的三大核心技术。 HTML (Hyper Text Markup Language) 超文…

【沐风老师】3dMax快速平铺纹理插件QuickTiles教程

QuickTiles是3ds max的一个插件&#xff0c;允许您将常规瓷砖纹理转换为交互式纹理&#xff0c;就在mat.editor中。 换言之&#xff0c;您可以根据需要对任何纹理进行修改和重新创建&#xff1a;更改布局、瓷砖大小、格式、颜色、接缝、体积、随机化形状或纹理等等。 这种方法大…

eNsp下如何使用wireshark抓包

文章目录 拓扑图抓包操作 拓扑图 抓包操作 可以通过下图上的指示 来设置 Time列的显示样式。 这里有个缺点就是就是抓取ensp上的虚拟设备上的数据包时的&#xff0c;年月日时间显示的不对。暂时无解决办法。 一般选择 日期和时间&#xff08;日期和时间与当前标准时间对应上时…

集简云平台助力无代码开发,实现平安银行与电商平台、CRM系统的快速连接

无代码开发与平安银行 平安银行是中国内地首家公开上市的全国性股份制银行&#xff0c;经过多年发展&#xff0c;已经在科技引领、综合金融、零售转型等领域形成独特竞争力和鲜明经营特色。近年来&#xff0c;平安银行更是积极拥抱科技&#xff0c;为此&#xff0c;选择了与集…

智慧工地源码 手册文档 app 数据大屏、硬件对接、萤石云

智慧工地解决方案依托计算机技术、物联网、云计算、大数据、人工智能、VR、AR等技术相结合&#xff0c;为工程项目管理提供先进技术手段&#xff0c;构建工地现场智能监控和控制体系&#xff0c;弥补传统方法在监管中的缺陷&#xff0c;最终实现项目对人、机、料、法、环的全方…