高级IO(Linux)

news2025/1/16 14:42:42

高级IO

  • 五种IO模型
  • 高级IO重要概念
    • 同步通信 vs 异步通信
    • 阻塞 vs 非阻塞
  • 非阻塞IO
    • fcntl
    • 实现函数SetNoBlock
    • 轮询方式读取标准输入
  • I/O多路转接之select
    • 初识select
    • select函数原型
      • 参数解释
      • 参数timeout取值
      • 关于fd_set结构
      • 关于timeval结构
      • 函数返回值
      • 三级目录
    • 理解select执行过程
    • socket就绪条件
      • 读就绪
      • 写就绪
    • select使用示例
    • select的特点
      • select缺点
  • I/O多路转接之epoll
    • epoll的相关系统调用
      • epoll_create
      • epoll_ctl
      • epoll_wait
    • epoll工作原理
    • epoll的实例
    • epoll的优点
    • epoll工作方式
      • 水平触发Level Triggered 工作模式
      • 边缘触发Edge Triggered工作模式
    • 对比LT和ET
    • epoll示例: epoll服务器(ET模式)
  • Reactor

五种IO模型

什么是IO?
就拿 read系统调用来说,从缓冲区中读取数据;首先要保证缓冲区有数据,若没有,操作就会被阻塞,也就是等待资源就绪;若有,将数据拷贝完之后直接返回;所以,IO分为两部分:等待资源就绪+拷贝数据;其中等待资源就绪在整个IO过程中占比非常大,如何降低等待的占比,也就是需要学习的目的

举个栗子:钓鱼的故事
张三,一个非常固执的钓鱼佬,在钓鱼的时候,只盯着鱼竿,身边发生什么都不在乎;
李四,一个随心所欲的钓鱼佬,在等待鱼上钩的时候,做着其他的事情,是不是地观察一个鱼竿,有没有鱼上钩;
王五,一个“非常懒”的钓鱼佬,把鱼竿放好,在上面安装一个报警器,一个有鱼上钩,直接把杆;
赵六,一个“多金”的钓鱼佬,每次都拿上十只鱼竿来钓鱼,没有都忙的不亦乐乎;
田七,一个爱吃鱼不爱钓鱼的有钱人,雇了一个小王,让他去钓鱼;

这五个人的钓鱼方式可以类比为五种IO方式:
张三-阻塞式IO,两耳不闻窗外事,一心只观水中浮
李四-非阻塞IO,闲来没事看两眼
王五-信号驱动IO,报警之后,立刻提杆
赵六-多路转接/多路复用
田七-异步IO

在整个钓鱼的过程中,鱼就是数据,河就是内核空间,鱼漂表明事件就绪,鱼竿就是文件描述符,钓鱼的动作也就是系统调用操作

钓鱼的人,等的占比越低,单位时间,钓鱼的效率就越高

多路转接/多路复用是高级IO的原因就是等待资源就绪的占比低

高级IO重要概念

同步通信 vs 异步通信

同步和异步关注的是消息通信机制

  • 所谓同步,就是在发出一个调用时,在没有得到结果之前,该调用就不返回. 但是一旦调用返回,就得到返回值了; 换句话说,就是由调用者主动等待这个调用的结果
  • 异步则是相反,调用在发出之后,这个调用就直接返回了,所以没有返回结果; 换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果; 而是在调用发出后,被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用

多进程多线程中, 也提到同步和互斥. 这里的同步通信和进程之间的同步是完全不想干的概念

  • 进程/线程同步也是进程/线程之间直接的制约关系
  • 是为完成某种任务而建立的两个或多个线程,这个线程需要在某些位置上协调他们的工作次序而等待、传递信息所产生的制约关系. 尤其是在访问临界资源的时候

阻塞 vs 非阻塞

阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态

  • 阻塞调用是指调用结果返回之前,当前线程会被挂起. 调用线程只有在得到结果之后才会返回
  • 非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程

非阻塞IO

fcntl

一个文件描述符, 默认都是阻塞IO

int fcntl(int fd, int cmd, ... /* arg */ );

fcntl函数有5种功能

  • 复制一个现有的描述符(cmd=F_DUPFD)
  • 获得/设置文件描述符标记(cmd=F_GETFD或F_SETFD)
  • 获得/设置文件状态标记(cmd=F_GETFL或F_SETFL)
  • 获得/设置异步I/O所有权(cmd=F_GETOWN或F_SETOWN)
  • 获得/设置记录锁(cmd=F_GETLK,F_SETLK或F_SETLKW)

此处只是用第三种功能, 获取/设置文件状态标记, 就可以将一个文件描述符设置为非阻塞

实现函数SetNoBlock

基于fcntl, 我们实现一个SetNoBlock函数, 将文件描述符设置为非阻塞

void SetNonBlock(int fd)
{
    int fl=fcntl(fd,F_GETFL);
    if(fl<0)
    {
        std::cerr<<"fcntl: "<<strerror(errno)<<std::endl;
        return ;
    }
    fcntl(fd,F_SETFL,fl|O_NONBLOCK);
}

轮询方式读取标准输入

void SetNonBlock(int fd)
{
    int fl = fcntl(fd, F_GETFL);
    if (fl < 0)
    {
        std::cerr << "fcntl: " << strerror(errno) << std::endl;
        return;
    }
    fcntl(fd, F_SETFL, fl | O_NONBLOCK);
}

int main()
{
    SetNonBlock(0);
    char buffer[1024];
    while (true)
    {
        printf(">>>> ");
        fflush(stdout);
        ssize_t s = read(0, buffer, sizeof(buffer) - 1);
        if (s > 0)
        {
            buffer[s - 1] = 0;
            std::cout << "echo# " << buffer << std::endl;
        }
        else if (s == 0)
        {
            std::cout << "read end" << std::endl;
            break;
        }
        else
        {
        }
        sleep(1);
    }
}

在这里插入图片描述

结果与预期的一致,不过这里还存在一个问题,当s<0时,数据读取失败,打印结果又是怎么样的呢?

在这里插入图片描述

资源准备未就绪

I/O多路转接之select

初识select

系统提供select函数来实现多路复用输入/输出模型

  • select系统调用是用来让我们的程序监视多个文件描述符的状态变化的
  • 程序会停在select这里等待,直到被监视的文件描述符有一个或多个发生了状态改变

select函数原型

select的函数原型如下:

int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);

参数解释

  • 参数nfds是需要监视的最大的文件描述符值+1
  • rdset,wrset,exset分别对应于需要检测的可读文件描述符的集合,可写文件描述符的集 合及异常文件描述符的集合,其本质是位图结构
  • 参数timeout为结构timeval,用来设置select()的等待时间

参数timeout取值

  • NULL:则表示select()没有timeout,select将一直被阻塞,直到某个文件描述符上发生了事件
  • 0:仅检测描述符集合的状态,然后立即返回,并不等待外部事件的发生
  • 特定的时间值:如果在指定的时间段里没有事件发生,select将超时返回

关于fd_set结构

在这里插入图片描述

其实这个结构就是一个整数数组, 更严格的说, 是一个 “位图”. 使用位图中对应的位来表示要监视的文件描述符
提供了一组操作fd_set的接口, 来比较方便的操作位图

// 用来清除描述词组set中相关fd 的位
void FD_CLR(int fd, fd_set *set); 
// 用来测试描述词组set中相关fd 的位是否为真
int FD_ISSET(int fd, fd_set *set); 
// 用来设置描述词组set中相关fd的位
void FD_SET(int fd, fd_set *set); 
// 用来清除描述词组set的全部位
void FD_ZERO(fd_set *set); 

关于timeval结构

timeval结构用于描述一段时间长度,如果在这个时间内,需要监视的描述符没有事件发生则函数返回,返回值为0

函数返回值

  • 执行成功则返回文件描述词状态已改变的个数
  • 如果返回0代表在描述词状态改变前已超过timeout时间,没有返回
  • 当有错误发生时则返回-1,错误原因存于errno,此时参数readfds,writefds, exceptfds和timeout的值变成不可预测

错误值可能是:

  • EBADF 文件描述词为无效的或该文件已关闭
  • EINTR 此调用被信号所中断
  • EINVAL 参数n 为负值
  • ENOMEM 核心内存不足

三级目录

理解select执行过程

取fd_set长度为1字节,fd_set中的每一bit可以对应一个文件描
述符fd。则1字节长的fd_set最大可以对应8个fd;
作为输入时:表示用户告知内核,关心一下,集合中所有的fd事件;
比特位的位置,表示fd的数值;比特位的内容,表示是否关心

作为输出时:内核告知用户,所关心的多个fd中,有哪些已经就绪;
比特位的位置,表示fd的数值;比特位的内容,表示fd对应的事件已经就绪

socket就绪条件

读就绪

  • socket内核中, 接收缓冲区中的字节数, 大于等于低水位标记SO_RCVLOWAT. 此时可以无阻塞的读该文件描述符, 并且返回值大于0
  • socket TCP通信中, 对端关闭连接, 此时对该socket读, 则返回0
  • 监听的socket上有新的连接请求
  • socket上有未处理的错误

写就绪

  • socket内核中, 发送缓冲区中的可用字节数(发送缓冲区的空闲位置大小), 大于等于低水位标记SO_SNDLOWAT, 此时可以无阻塞的写, 并且返回值大于0
  • socket的写操作被关闭(close或者shutdown). 对一个写操作被关闭的socket进行写操作, 会触发SIGPIPE信号
  • socket使用非阻塞connect连接成功或失败之后
  • socket上有未读取的错误

select使用示例

Tcpserver:只处理读取,只获取数据的server
首先,监听端口可以交付给select,监听端口的连接就绪事件,其实就是读事件就绪

err.hpp

enum{
    USAGE_ERR=1,
    SOCKET_ERR,
    BIND_ERR,
    LISTEN_ERR
};

log.hpp

#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4

const char*to_levelstr(int level)
{
    switch(level)
    {
        case DEBUG: return "DEBUG";
        case NORMAL: return "NORMAL";
        case WARNING: return "WARNING";
        case ERROR: return "ERROR";
        case FATAL: return "FATAL";
        default: return nullptr;
    }
}

void logMessage(int level,const char* format,...)
{
#define NUM 1024
    char logprefix[NUM];
    snprintf(logprefix,sizeof(logprefix),"[%s][%ld][pid:%d]",
    to_levelstr(level),(long int)time(nullptr),getpid());

    char logcontent[NUM];
    va_list arg;
    va_start(arg,format);
    vsnprintf(logcontent,sizeof(logcontent),format,arg);
    std::cout<<logprefix<<logcontent<<std::endl;
}

Sock.hpp

class Sock
{
    const static int backlog=30;
public:
    //创建socket文件套接字对象
    static int Socket()
    {
        int sock=socket(AF_INET,SOCK_STREAM,0);
        if(sock<0)
        {
            logMessage(FATAL,"创建套接字文件对象失败!");
            exit(SOCKET_ERR);
        }
        logMessage(NORMAL,"创建套接字文件对象成功: %d",sock);
        //设置地址复用
        int opt=1;
        setsockopt(sock,SOL_SOCKET,SO_REUSEADDR|SO_REUSEPORT,&opt,sizeof(opt));
        return sock;
    }

    //绑定自己的网络信息
    static void Bind(int sock,int port)
    {
        struct sockaddr_in local;
        memset(&local,0,sizeof(local));
        local.sin_family=AF_INET;
        local.sin_port=htons(port);
        local.sin_addr.s_addr=INADDR_ANY;
        if(bind(sock,(struct sockaddr*)&local,sizeof(local))<0)
        {
            logMessage(FATAL,"绑定失败!");
            exit(BIND_ERR);
        }
        logMessage(NORMAL,"绑定成功!");
    }

    //设置socket为监听状态
    static void Listen(int sock)
    {
        if(listen(sock,backlog)<0)
        {
            logMessage(FATAL,"监听失败!");
            exit(LISTEN_ERR);
        }
        logMessage(NORMAL,"监听成功!");
    }

    static int Accept(int listensock,std::string*clientip,uint16_t*clientport)
    {
        struct sockaddr_in peer;
        socklen_t len=sizeof(peer);
        int sock=accept(listensock,(struct sockaddr*)&peer,&len);
        if(sock<0)
        {
            logMessage(ERROR,"接收失败!");
        }
        else
        {
            logMessage(NORMAL,"接收一个新连接,sock: %d",sock);
            *clientip=inet_ntoa(peer.sin_addr);
            *clientport=ntohs(peer.sin_port);
        }
        return sock;
    }
};

SelectServer.hpp

namespace select_yjm
{
    static const int defaultport=8082;
    static const int fdnum=sizeof(fd_set)*8;
    static const int defaultfd=-1;

    using func_t =std::function<std::string(const std::string&)>;

    class SelectServer
    {
    public:
        SelectServer(func_t f,int port=defaultport)
            :_func(f)
            ,_port(port)
            ,_listensock(-1)
            ,fdarray(nullptr)
        {

        }

        
        ~SelectServer()
        {
            if(_listensock<0) close(_listensock);
            if(fdarray) delete[] fdarray;
        }

        void initServer()
        {
            _listensock=Sock::Socket();
            Sock::Bind(_listensock,_port);
            Sock::Listen(_listensock);
            fdarray=new int[fdnum];
            for(int i=0;i<fdnum;i++) fdarray[i]=defaultfd;
            fdarray[0]=_listensock;
        }

        void Print()
        {
            std::cout<<"fd list: ";
            for(int i=0;i<fdnum;i++)
            {
                if(fdarray[i]!=defaultfd) std::cout<<fdarray[i]<<" ";
            }
            std::cout<<std::endl;
        }

        void Accepter(int listensock)
        {
            logMessage(DEBUG,"Accepter in");
            //此时,listen不会阻塞,已经就绪
            std::string clientip;
            uint16_t clinetport=0;
            int sock=Sock::Accept(listensock,&clientip,&clinetport);
            if(sock<0)
            {
                return ;
            }
            logMessage(NORMAL,"accept success [%s:%d]",clientip.c_str(),clinetport);
            //此时不能直接读取,只有select有资格检测事件是否就绪
            //将新的sock交给select,本质就是将sock添加到fdarray数组中即可
            int i=0;
            for(;i<fdnum;i++)
            {
                if(fdarray[i]!=defaultfd) continue;
                else break;
            }
            
            if(i==fdnum)
            {
                logMessage(WARNING,"server id full,please wait!");
                close(sock);
            }
            else
            {
                fdarray[i]=sock;
            }

            Print();
            logMessage(DEBUG,"Accepter out");
        }

        void Recver(int sock,int pos)
        {
            logMessage(DEBUG,"in Recver");
            //读取请求,并不会阻塞
            char buffer[1024];
            ssize_t s=recv(sock,buffer,sizeof(buffer)-1,0);
            if(s>0)
            {
                buffer[s]=0;
                logMessage(NORMAL,"client# %s",buffer);
            }
            else if(s==0)
            {
                close(sock);
                fdarray[pos]=defaultfd;
                logMessage(NORMAL,"client quit");
                return;
            }
            else
            {
                close(sock);
                fdarray[pos]=defaultfd;
                logMessage(ERROR,"client quit: %s",strerror(errno));
                return;
            }

            //处理响应
            std::string response=_func(buffer);

            //返回response
            write(sock,response.c_str(),response.size());

            logMessage(DEBUG,"out Recver");
        }

        //
        void HandlerReadEvent(fd_set& rfds)
        {
            for(int i=0;i<fdnum;i++)
            {
                //过滤非法的fd
                if(fdarray[i]==defaultfd) continue;

                //正常的fd,不一定就绪,需要判断
                //此时只有监听事件就绪
                if(FD_ISSET(fdarray[i],&rfds)&&fdarray[i]==_listensock) Accepter(_listensock);
                //获取新连接之后,进行读取
                else if(FD_ISSET(fdarray[i],&rfds)) Recver(fdarray[i],i);
                else {}
            }
        }

        void start()
        {
            for(;;)
            {
                fd_set rfds;
                FD_ZERO(&rfds);
                int maxfd=fdarray[0];

                for(int i=0;i<fdnum;i++)
                {
                    if(fdarray[i]==defaultfd) continue;
                    //合法的fd全部添加到读文件描述符集中
                    FD_SET(fdarray[i],&rfds);
                    //更新所有fd中最大的fd
                    if(maxfd<fdarray[i]) maxfd=fdarray[i];
                }
                logMessage(NORMAL,"max fd is: %d",maxfd);
                //使用select,需要程序员维护一个保存所有合法的fd的数组
                int n=select(maxfd+1,&rfds,nullptr,nullptr,nullptr);
                switch(n)
                {
                    case 0:
                        logMessage(NORMAL,"timeout...");
                        break;
                    case -1:
                        logMessage(WARNING,"select error,code: %d,err string: %s",errno,strerror(errno));
                        break;
                    default:
                        //表明有事件就绪,目前只有监听事件就绪
                        logMessage(NORMAL,"have event ready!");
                        HandlerReadEvent(rfds);
                        break;
                }
            }
        }

    private:
        int _port;
        int _listensock;
        int *fdarray;
        func_t _func;
    };
}

在这里插入图片描述

select的特点

  1. select能够同时等待的文件fd是有上限的
  2. 必须借助第三方数组来维护合法的fd
  3. select的大部分参数都是输入输出型的,调用select之前,要重新设置所有的fd;调用之后,还要检查更新所有的fd,带来了遍历的成本
  4. select第一个参数的目的是为了确定遍历范围
  5. select采取位图的方式,用户到内核,内核到用户,来回地进行数据拷贝,造成拷贝成本的问题

select缺点

  1. select的fd有上限的问题
  2. 每次调用都要重新设置关心的fd

I/O多路转接之epoll

epoll的相关系统调用

epoll 有3个相关的系统调用

epoll_create

int epoll_create(int size);

在这里插入图片描述

创建一个epoll的句柄或者说是创建一个epoll模型(下面介绍):用完之后, 必须调用close()关闭

epoll_ctl

用户告知内核需要关心哪个文件描述符上的什么事件

int epoll_ctl(int epfd, int op, int fd,
 struct epoll_event *event);

epoll的事件注册函数

  • 它不同于select()是在监听事件时告诉内核要监听什么类型的事件, 而是在这里先注册要监听的事件类型.
  • 第一个参数是epoll_create()的返回值(epoll的句柄)
  • 第二个参数表示动作,用三个宏来表示(增删改)
  • 第三个参数是需要监听的fd
  • 第四个参数是告诉内核需要监听什么事

第二个参数的取值:

  • EPOLL_CTL_ADD:注册新的fd到epfd中
  • EPOLL_CTL_MOD:修改已经注册的fd的监听事件
  • EPOLL_CTL_DEL:从epfd中删除一个fd

struct epoll_event结构如下:
在这里插入图片描述
events可以是以下几个宏的集合:

  • EPOLLIN:表示对应的文件描述符可以读 (包括对端SOCKET正常关闭)
  • EPOLLOUT:表示对应的文件描述符可以写
  • EPOLLPRI:表示对应的文件描述符有紧急的数据可读
  • EPOLLERR:: 表示对应的文件描述符发生错误
  • EPOLLHUP:表示对应的文件描述符被挂断
  • EPOLLET:将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的
  • EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要再次把这个socket加入到EPOLL队列里

epoll_wait

内核告知用户,那些文件描述符上的什么事件已经就绪

int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);

收集在epoll监控的事件中已经发送的事件

  • 参数events是分配好的epoll_event结构体数组
  • epoll将会把发生的事件赋值到events数组中 (events不可以是空指针,内核只负责把数据复制到这个events数组中,不会去帮助我们在用户态中分配内存)
  • maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size
  • 参数timeout是超时时间 (毫秒,0会立即返回,-1是永久阻塞)
  • 如果函数调用成功,返回对应I/O上已准备好的文件描述符数目,如返回0表示已超时, 返回小于0表示函数失败

epoll工作原理

在这里插入图片描述

  • 当某一进程调用epoll_create方法时,Linux内核会创建一个eventpoll结构体(其实就是红黑树),这个结构体中有两个成员与epoll的使用方式密切相关
struct eventpoll{ 
 .... 
 /*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/ 
 struct rb_root rbr; 
 /*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/ 
 struct list_head rdlist; 
 .... 
}
  • 每一个epoll对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件;当使用epoll_ctl方法时相等于向红黑树中的节点中添加数据,而这个数据是由文件描述符和事件组成的键值对
  • 而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当响应的事件发生时会调用这个回调方法;红黑树的每个结点中还存在着链表的结构,每当文件描述符上的事件就绪之后,这些结点就会前后连接组成一个链表
  • 这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中
  • 在epoll中,对于每一个事件,都会建立一个epitem结构体
struct epitem{ 
 struct rb_node rbn;//红黑树节点 
 struct list_head rdllink;//双向链表节点 
 struct epoll_filefd ffd; //事件句柄信息 
 struct eventpoll *ep; //指向其所属的eventpoll对象 
 struct epoll_event event; //期待发生的事件类型 
}
  • 当调用epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可
  • 如果rdlist不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户

epoll的使用过程就是三部曲

  • 调用epoll_create创建一个epoll句柄
  • 调用epoll_ctl, 将要监控的文件描述符进行注册
  • 调用epoll_wait, 等待文件描述符就绪

epoll的实例

namespace epoll_yjm
{
    static const int defaultport = 8083;
    static const int size = 128;
    static const int defaultvalue = -1;
    static const int defaultnum = 64;

    using func_t = std::function<std::string(const std::string &)>;

    class EpollServer
    {
    public:
        EpollServer(func_t f, uint16_t port = defaultport, int num = defaultnum)
            : _func(f), _num(num), _revs(nullptr), _port(port), _listensock(defaultport), _epfd(defaultvalue)
        {
        }

        ~EpollServer()
        {
            if (_listensock != defaultvalue)
                close(_listensock);

            if (_epfd != defaultvalue)
                close(_epfd);

            if (_revs)
                delete[] _revs;
        }

        void initserver()
        {
            // 1.创建socket
            _listensock = Sock::Socket();
            Sock::Bind(_listensock, _port);
            Sock::Listen(_listensock);

            // 2.创建epoll模型
            _epfd = epoll_create(size);
            if (_epfd < 0)
            {
                logMessage(FATAL, "epoll create error: %s", strerror(errno));
                exit(EPOLL_CREATE_ERR);
            }
            // 3.添加listen到epoll模型中
            struct epoll_event ev;
            ev.events = EPOLLIN;
            // 当事件就绪,被重新捞上来时,需要知道哪一个fd就绪
            ev.data.fd = _listensock;
            epoll_ctl(_epfd, EPOLL_CTL_ADD, _listensock, &ev);

            // 4.申请就绪事件的空间
            _revs = new struct epoll_event[_num];

            logMessage(NORMAL, "init server success!");
        }

        void HandlerEvent(int readynum)
        {
            logMessage(DEBUG, "HandlerEvent in");
            for (int i = 0; i < readynum; i++)
            {
                uint32_t events = _revs[i].events;
                int sock = _revs[i].data.fd;

                if (sock == _listensock && events & EPOLLIN)
                {
                    // listensock读事件就绪,获取新连接
                    std::string clientip;
                    uint16_t clientport;
                    int fd = Sock::Accept(sock, &clientip, &clientport);
                    if (fd < 0)
                    {
                        logMessage(WARNING, "accept error");
                        continue;
                    }

                    // 获取fd成功,不可以直接读,数据可能还没就绪
                    // 将fd放入epoll等待就绪
                    struct epoll_event ev;
                    ev.events = EPOLLIN;
                    ev.data.fd = fd;
                    epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &ev);
                }
                else if (events & EPOLLIN)
                {
                    // 普通事件就绪
                    char buffer[1024];
                    // 把本轮数据读完,不一定能够读取完整的请求
                    int n = recv(sock, buffer, sizeof(buffer), 0);
                    if (n > 0)
                    {
                        buffer[n] = 0;
                        logMessage(DEBUG, "client# %s", buffer);
                        std::string response = _func(buffer);
                        send(sock, response.c_str(), response.size(), 0);
                    }
                    else if (n == 0)
                    {
                        // 建议先将fd从epoll中移除,再关闭
                        epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);
                        close(sock);
                        logMessage(NORMAL, "client quit");
                    }
                    else
                    {
                        // 建议先将fd从epoll中移除,再关闭
                        epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);
                        close(sock);
                        logMessage(ERROR,"recv error,code: %d,errstring: %s",errno,strerror(errno));
                    }
                }
                else
                {

                }
            }
            logMessage(DEBUG,"HandlerEvent out");
        }

        void start()
        {
            int timeout = -1;
            for (;;)
            {
                int n = epoll_wait(_epfd, _revs, _num, timeout);
                switch (n)
                {
                case 0:
                    logMessage(NORMAL, "timeout ...");
                    break;
                case -1:
                    logMessage(WARNING, "epoll_wait failed,code: %d,errstring: %s", errno, strerror(errno));
                    break;
                default:
                    logMessage(NORMAL, "have event ready!");
                    HandlerEvent(n);
                    break;
                }
            }
        }

    private:
        uint16_t _port;
        int _listensock;
        int _epfd;
        struct epoll_event *_revs;
        int _num;
        func_t _func;
    };
}

在这里插入图片描述

epoll的优点

  • 接口使用方便: 虽然拆分成了三个函数, 但是反而使用起来更方便高效. 不需要每次循环都设置关注的文件描述符, 也做到了输入输出参数分离开
  • 数据拷贝轻量: 只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝到内核中, 这个操作并不频繁(而select/poll都是每次循环都要进行拷贝)
  • 事件回调机制: 避免使用遍历, 而是使用回调函数的方式, 将就绪的文件描述符结构加入到就绪队列中, epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪. 这个操作时间复杂度O(1). 即使文件描述符数目很多, 效率也不会受到影响
  • 没有数量限制: 文件描述符数目无上限

epoll工作方式

epoll有2种工作方式-水平触发(LT)和边缘触发(ET)
假如有这样一个例子

你网购了好几个快递,张三快递员先给你派发了几个快递,你并没有去拿,因为你知道张三肯定会一直通知你,直到把你所有的快递都给你为止,因此你一直在忙别的事,直到很晚采取拿快递;又过了几天,你又网购了几个快递,不过这次给你派发快递的小哥李四不同于张三,他打电话通过你,并告知你:如果你不抓紧拿快递,那么就再也不通知你,你没有办法只能一次性将所有的快递都拿走;如果你这次没有将所有的快递都拿走,当李四再次拿到需要给你派发的快递时,就会再次通知你一次

水平触发Level Triggered 工作模式

epoll默认状态下就是LT工作模式

  • 在上面的栗子中,张三的派发模式就是LT工作模式
  • 当epoll检测到socket上事件就绪的时候, 可以不立刻进行处理. 或者只处理一部分
  • 直到缓冲区上所有的数据都被处理完, epoll_wait 才不会立刻返回
  • 支持阻塞读写和非阻塞读写

边缘触发Edge Triggered工作模式

如果我们在第1步将socket添加到epoll描述符的时候使用了EPOLLET标志, epoll进入ET工作模式

  • 在上面的栗子中,李四的派发模式就是ET工作模式
  • 当epoll检测到socket上事件就绪时, 必须立刻处理
  • ET模式下, 文件描述符上的事件就绪后, 只有一次处理机会
  • ET的性能比LT性能更高( epoll_wait 返回的次数少了很多). Nginx默认采用ET模式使用epol
  • 只支持非阻塞的读写

epoll既可以支持LT, 也可以支持ET

对比LT和ET

LT是 epoll 的默认行为. 使用 ET 能够减少 epoll 触发的次数. 但是代价就是强逼着程序猿一次响应就绪过程中就把所有的数据都处理完;ET的高效不仅仅体现在通知机制上,为了尽快让上层将数据取走,TCP可以给发送方提供一个更大的窗口大小,让对方更新出更大的滑动窗口,提高底层的数据发送效率

相当于一个文件描述符就绪之后, 不会反复被提示就绪, 看起来就比 LT 更高效一些. 但是在 LT 情况下如果也能做到每次就绪的文件描述符都立刻处理, 不让这个就绪被重复提示的话, 其实性能也是一样的

epoll示例: epoll服务器(ET模式)

Reactor

Reactor=IO+协议定制+业务处理,基于ET模式下的Reactor,可以处理所有的IO

处理几个小细节:
在epollserver接收到数据之后,并不能确定数据的完整性,因为ET保证了数据被读取完,而协议才能保证数据的完整性;因此在epollserver还需要自己的接收缓冲区以保证数据的完整性

在epollserver同样也需要发送缓冲区,服务器刚开始启动,或者很多情况下,发送事件一直都是就绪的,可以直接发送;如果用户一次并没有把所有数据发送完,还需要再次发送;同样文件描述符上的发送事件也要注册到epoll模型中,一般按需设置

接下来就编写代码

tcpserver.hpp

namespace tcpserver
{
    class Connection;
    class Tcpserver;

    static const uint16_t defaultport = 8084;
    static const int num = 64;

    using func_t = std::function<void(Connection *)>;

    class Connection
    {
    public:
        Connection(int sock,Tcpserver*tsp)
            :_sock(sock)
            ,_tsp(tsp)
        {
        }

        void Register(func_t r,func_t s,func_t e)
        {
            _recver=r;
            _sender=s;
            _excepter=e;
        }

        ~Connection()
        {

        }

        void Close()
        {
            close(_sock);
        }
    public:
        int _sock;
        std::string _inbuffer;  // 输入缓冲区
        std::string _outbuffer; // 输出缓冲区

        func_t _recver;   // 从sock中读取
        func_t _sender;   // 向sock中写入
        func_t _excepter; // 处理sockIO的异常事件

        Tcpserver *_tsp; // 回指指针
        uint64_t lasttime;
    };

    class Tcpserver
    {
    private:
        void Recver(Connection *conn)
        {
            conn->lasttime = time(nullptr);

            char buffer[1024];
            while (true)
            {
                ssize_t s = recv(conn->_sock, buffer, sizeof(buffer) - 1, 0);
                if (s > 0)
                {
                    buffer[s] = 0;
                    conn->_inbuffer += buffer; // 将读到的数据放入缓冲区中
                    logMessage(DEBUG, "\n%s", conn->_inbuffer);
                    _service(conn);
                }
                else if (s == 0)
                {
                    if (conn->_excepter)
                    {
                        conn->_excepter(conn);
                        return;
                    }
                }
                else
                {
                    if (errno == EAGAIN || errno == EWOULDBLOCK)
                        break;
                    else if (errno == EINTR)
                        continue;
                    else
                    {
                        if (conn->_excepter)
                        {
                            conn->_excepter(conn);
                            return;
                        }
                    }
                }
            }
        }

        void Sender(Connection *conn)
        {
            conn->lasttime = time(nullptr);
            while (true)
            {
                ssize_t s = send(conn->_sock, conn->_outbuffer.c_str(), conn->_outbuffer.size(), 0);
                if (s > 0)
                {
                    if (conn->_outbuffer.empty())
                        break;
                    else
                        conn->_outbuffer.erase(0, s);
                }
                else
                {
                    if (errno == EAGAIN || errno == EWOULDBLOCK)
                        break;
                    else if (errno == EINTR)
                        continue;
                    else
                    {
                        if (conn->_excepter)
                        {
                            conn->_excepter(conn);
                            return;
                        }
                    }
                }
            }

            //如果没有发送完毕,需要对对应的sock开启写事件的关心
            //如果发送完毕,需要关闭对写事件的关心
            if(!conn->_outbuffer.empty())
                conn->_tsp->EnableReadWrite(conn,true,true);
            else
                conn->_tsp->EnableReadWrite(conn,true,false);
        }

        void Excepter(Connection*conn)
        {
            logMessage(DEBUG,"Excepter begin");
            _epoller.Control(conn->_sock,0,EPOLL_CTL_DEL);
            conn->Close();
            _connections.erase(conn->_sock);
            logMessage(DEBUG,"关闭%d 文件描述符的所有资源",conn->_sock);

            delete conn;
        }

        void Accepter(Connection *conn)
        {
            for (;;)
            {
                std::string clientip;
                uint16_t clientport;
                int err;
                int sock = _sock.Accept(&clientip, &clientport, &err);
                if (sock > 0)
                {
                    AddConnection(
                        sock, EPOLLIN | EPOLLET,
                        std::bind(&Tcpserver::Recver, this, std::placeholders::_1),
                        std::bind(&Tcpserver::Sender, this, std::placeholders::_1),
                        std::bind(&Tcpserver::Excepter, this, std::placeholders::_1));

                    logMessage(DEBUG, "get a new link,info: [%s:%d]", clientip.c_str(), clientport);
                }
                else
                {
                    if (err == EAGAIN || err == EWOULDBLOCK)
                        break;
                    else if (err == EINTR)
                        continue;
                    else
                        break;
                }
            }
        }

        void AddConnection(int sock, uint32_t events, func_t recver, func_t sender, func_t excepter)
        {
            // 1.首先给该sock创建connection,并初始化,并添加到_connections中
            // 将该sock设置为非阻塞
            if (events & EPOLLET)
                Util::SetNonBlock(sock);
            Connection *conn = new Connection(sock, this);
            // 2.给对应的sock设置对应的回调处理方法
            conn->Register(recver, sender, excepter);
            // 3.将sock与其要关心的事件注册到epoll中
            bool r = _epoller.AddEvent(sock, events);
            assert(r);
            (void)r;
            // 4.将kv添加到_connections中
            _connections.insert(std::pair<int, Connection *>(sock, conn));
            logMessage(DEBUG, "add new sock: %d in epoll and unordered_map", sock);
        }

        bool IsConnectionExists(int sock)
        {
            auto iter = _connections.find(sock);
            return iter != _connections.end();
        }

        void Loop(int timeout)
        {
            // 获得已就绪事件
            int n = _epoller.Wait(_revs, _num, timeout);
            for (int i = 0; i < n; i++)
            {
                int sock = _revs[i].data.fd;
                uint32_t events = _revs[i].events;

                // 将所有异常问题,全部转化为读写问题
                if (events & EPOLLERR)
                    events |= (EPOLLIN | EPOLLOUT);
                if (events & EPOLLHUP)
                    events |= (EPOLLIN | EPOLLOUT);

                // listen事件就绪
                if ((events & EPOLLIN) && IsConnectionExists(sock) && _connections[sock]->_recver)
                    _connections[sock]->_recver(_connections[sock]);

                if ((events & EPOLLOUT) && IsConnectionExists(sock) && _connections[sock]->_sender)
                    _connections[sock]->_sender(_connections[sock]);
            }
        }

    public:
        Tcpserver(func_t func, uint16_t port = defaultport)
            : _service(func), _port(port), _revs(nullptr)
        {
        }

        ~Tcpserver()
        {
            _sock.Close();
            _epoller.Close();
            if (_revs == nullptr)
                delete[] _revs;
        }

        void initserver()
        {
            // 1.创建socket
            _sock.Socket();
            _sock.Bind(_port);
            _sock.Listen();
            // 2.创建epoll模型
            _epoller.Create();
            // 先将listenock设置为非阻塞
            // 3.将目前唯一一个sock,添加到epoll模型中
            AddConnection(_sock.Fd(), EPOLLIN | EPOLLET,
                          std::bind(&Tcpserver::Accepter, this, std::placeholders::_1), nullptr, nullptr);

            _revs = new struct epoll_event[num];
            _num = num;
        }

        void EnableReadWrite(Connection *conn, bool readable, bool writeable)
        {
            uint32_t event = (readable ? EPOLLIN : 0) | (writeable ? EPOLLIN : 0) | EPOLLET;
            _epoller.Control(conn->_sock, event, EPOLL_CTL_MOD);
        }

        // 事件派发
        void Dispatcher()
        {
            int timeout = 1000;
            while (true)
            {
                Loop(timeout);
            }
        }

    private:
        uint16_t _port;
        Sock _sock;
        Epoll _epoller;
        std::unordered_map<int, Connection *> _connections;
        struct epoll_event *_revs;
        int _num;
        func_t _service;
    };
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1070552.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

多功能频率计周期/脉宽/占空比/频率测量verilog,视频/代码

名称&#xff1a;多功能频率计周期、脉宽、占空比、频率测量verilog 软件&#xff1a;Quartus 语言&#xff1a;Verilog 代码功能&#xff1a; 多功能频率计&#xff0c;可测量信号的周期、脉冲宽度、占空比、频率&#xff0c;语言为verilog&#xff0c;quartus软件设计仿真…

B (1089) : DS单链表--合并

Description 假定两个单链表是递增有序&#xff0c;定义并实现以下函数&#xff0c;完成两个单链表的合并&#xff0c;继续保持递增有序 int LL_merge(ListNode *La, ListNode *Lb) Input 第1行先输入n表示有n个数据&#xff0c;接着输入n个数据 第2行先输入m表示有M个数据…

扭线机控制

扭线机属于线缆加工设备&#xff0c;线缆加工设备种类非常多。有用于网线绞合的单绞&#xff0c;双绞机等&#xff0c;有关单绞机相关算法介绍&#xff0c;大家可以查看专栏相关文章&#xff0c;有详细介绍&#xff0c;常用链接如下&#xff1a; 线缆行业单绞机控制算法&#…

MySQL命令行中文乱码问题

MySQL命令行中文乱码问题&#xff1a; 命令行界面默认字符集是gbk&#xff0c;若字符集不匹配会中文乱码或无法插入中文。 解决办法&#xff1a;执行set names gbk; 验证&#xff1a; 执行命令show variables like ‘char%’;查看默认字符集。 创建数据库设置字符集utf8&…

Nginx详细学习记录

1. Nginx概述 Nginx是一个轻量级的高性能HTTP反向代理服务器&#xff0c;同时它也是一个通用类型的代理服务器&#xff0c;支持绝大部分协议&#xff0c;如TCP、UDP、SMTP、HTTPS等。 1.1 Nginx基础架构 Nginx默认采用多进程工作方式&#xff0c;Nginx启动后&#xff0c;会运行…

多线程锁-synchronized字节码分析

从字节码角度分析synchronized实现 javap -c(v附加信息) ***.class 文件反编译 synchronized同步代码块 >>>实现使用的是monitorenter和monitorexit指令 synchronized普通同步方法 >>>调用指令将会检查方法的ACC_SYNCHRONIZED访问标志是否被设置&#xf…

HTTPS工作过程,国家为什么让http为什么要换成https,Tomcat在MAC M1电脑如何安装,Tomcat的详细介绍

目录 引言 一、HTTPS工作过程 二、Tomcat 在访达中找到下载好的Tomcat文件夹&#xff08;这个要求按顺序&#xff09; zsh: permission denied TOMCAT的各部分含义&#xff1a; 引言 在密码中一般是&#xff1a;明文密钥->密文&#xff08;加密&#xff09; &#xff…

ubuntu 安装postgresql,增加VECTOR向量数据库插件 踏坑详细流程

PGSQL安装&#xff0c;删除&#xff0c;运行&#xff0c;修改密码流程 Ubuntu18.04安装与配置postgresql含远程连接教程&#xff08;含踩坑记录&#xff09;_sudo apt-get install postgresql-CSDN博客 详细安装流程以上博客&#xff0c;自己也记录下 安装vector扩展连接 声明…

微服务学习(十):安装Maven

微服务学习&#xff08;十&#xff09;&#xff1a;安装Maven 1、下载Maven 官网下载 2、将下载后的资源包上传到服务器 3、解压资源包并安装 tar -zxvf apache-maven-3.9.5-bin.tar.gz4、配置环境变量 vi /etc/profileexport MAVEN_HOME/home/maven/apache-maven-3.9.5 …

如何部署一个高可用高并发的电商平台

假设我们已经有了一个特别大的电商平台&#xff0c;这个平台应该部署在哪里呢&#xff1f;假设我们用公有云&#xff0c;一般公有云会有多个位置&#xff0c;比如在华东、华北、华南都有。毕竟咱们的电商是要服务全国的&#xff0c;当然到处都要部署了。我们把主站点放在华东。…

汇编语言是怎么一回事?

汇编语言基础 汇编指令和机器码的区别 数据的表示 各类汇编指令 数据传送和算法运算 位运算 条件分支指令 函数调用 字符串处理 流水线和指令调度 流水线实现指令级并行 编译器指令调度 CPU乱序与投机执行 汇编器将汇编语言翻译成 CPU 可以执行的机器码&#xff0c…

【软考】9.1 顺序表/链表/栈和队列

《线性结构》 顺序存储和链表存储 每个元素最多只有一个出度和一个入度&#xff0c;表现为一条线状链表存储结构&#xff1a;每个节点有两个域&#xff0c;即数据&#xff0c;指针域&#xff08;指向下一个逻辑上相邻的节点&#xff09; 时间复杂度&#xff1a;与其数量级成正…

OpenCV防抖实践及代码解析笔记

视频防抖是指用于减少摄像机运动对最终视频的影响的一系列方法。摄像机的运动可以是平移&#xff08;比如沿着x、y、z方向上的运动&#xff09;或旋转&#xff08;偏航、俯仰、翻滚&#xff09;。 正如你在上面的图片中看到的&#xff0c;在欧几里得运动模型中&#xff0c;图像…

分布式文件系统HDFS(林子雨慕课课程)

文章目录 3. 分布式文件系统HDFS3.1 分布式文件系统HDFS简介3.2 HDFS相关概念3.3 HDFS的体系结构3.4 HDFS的存储原理3.5 HDFS数据读写3.5.1 HDFS的读数据过程3.5.2 HDFS的写数据过程 3.6 HDFS编程实战 3. 分布式文件系统HDFS 3.1 分布式文件系统HDFS简介 HDFS就是解决海量数据…

4.方法操作实例变量 对象的行为

4.1 操作对象状态的方法 同一类型的每个对象能够有不同的方法行为&#xff0c;任一类的每个实例都带有相同的方法&#xff0c;但是方法可以根据实例变量的值来表现不同的行为。 play()会播放title值表示的歌曲&#xff0c;调用某个实例的play()可能会播放“Politik”而另一个会…

第三章 Android 开发从入门到实战--简单控件

文章目录 1.文本显示1.1设置文本的内容1.2设置文本字体大小1.3设置文本的颜色 2.视图基础2.1设置视图的宽高2.2设置视图的间距2.3设置视图的对齐方式 3.常用布局3.1线性布局LinearLayout3.2相对布局RelativeLayout3.3网格布局GridLayout3.4滚动视图ScrollView 4.按钮触控4.1But…

集线器、交换机、路由器是如何转发包的

集线器、交换机、路由器是如何转发包的 集线器交换机MAC地址表的维护 路由器路由表中的信息路由器的包接收操作查询路由表确定输出端口找不到匹配路由时选择默认路由包的有效期通过分片功能拆分大网络包路由器发送操作中的一些特点 参考文档 集线器 集线器是一层&#xff08;物…

异常:找不到匹配的key exchange算法

目录 问题描述原因分析解决方案 问题描述 PC 操作系统&#xff1a;Windows 10 企业版 LTSC PC 异常软件&#xff1a;XshellPortable 4(Build 0127) PC 正常软件&#xff1a;PuTTY Release 0.74、MobaXterm_Personal_23.1 服务器操作系统&#xff1a;OpenEuler 22.03 (LTS-SP2)…

【数据结构-二叉树 九】【树的子结构】:树的子结构

废话不多说&#xff0c;喊一句号子鼓励自己&#xff1a;程序员永不失业&#xff0c;程序员走向架构&#xff01;本篇Blog的主题是【子结构】&#xff0c;使用【二叉树】这个基本的数据结构来实现&#xff0c;这个高频题的站点是&#xff1a;CodeTop&#xff0c;筛选条件为&…

Qt单一应用实例判断

原本项目中使用QSharedMemory的方法来判断当前是否已存在运行的实例&#xff0c;但在MacOS上&#xff0c;当程序异常崩溃后&#xff0c;QSharedMemory没有被正常销毁&#xff0c;导致应用程序无法再次被打开。 对此&#xff0c;Qt assistant中有相关说明&#xff1a; 摘抄 qt-s…