高级IO(select、poll、epoll)

news2024/12/22 18:28:11

在介绍本文之前,先提出一个问题

什么是IO?

等+数据拷贝
1.等 - IO事件就绪(检测功能成分)
2.数据拷贝
高效的IO就是:单位时间,等的比重越小,IO的效率越高

五种IO模型

IO模型:

阻塞式IO:钓鱼中,一直盯着鱼竿,直到鱼上钩就钓【自己等(阻塞),自己钓】
非阻塞IO:钓鱼中,看一会手机,再看看鱼上钩没,上钩就钓【自己等(非阻塞 / 轮询),自己钓】
信号驱动式IO:在鱼竿上放个铃铛,听到铃铛上,说明上钩,直接钓,其它时间一直看手机【没有直接等,自己钓】
多路转接(多路复用):一次带来几百个鱼竿,一起钓,哪个上钩就钓哪个【自己等(一次检测多个),自己钓】
异步IO:让别人帮自己钓鱼,但是别人钓鱼的成果要给自己【没有自己等,没有自己钓,但是要拿结果】

5中IO模型中,效率最高的是多路转接.

除了异步IO外,其它4个都是同步IO。同步IO和异步IO的区别就是:有没有参与IO细节,参与了的就是同步IO,没参与就是异步IO

正常使用中,90%是:阻塞IO,部分是非阻塞和多路转接

阻塞IO

在内核将数据准备好之前, 系统调用会一直等待. 所有的套接字, 默认都是阻塞方式.

阻塞IO是最常见的IO模型

在这里插入图片描述

非阻塞IO

如果内核还未将数据准备好, 系统调用仍然会直接返回, 并且返回EWOULDBLOCK错误码

非阻塞IO往往需要程序员循环的方式反复尝试读写文件描述符, 这个过程称为轮询. 这对CPU来说是较大的浪费, 一般只有特定场景下才使用

在这里插入图片描述

信号驱动IO

内核将数据准备好的时候, 使用SIGIO信号通知应用程序进行IO操作

在这里插入图片描述

IO多路转接

虽然从流程图上看起来和阻塞IO类似. 实际上最核心在于IO多路转接能够同时等待多个文件

描述符的就绪状态

在这里插入图片描述

异步IO

由内核在数据拷贝完成时, 通知应用程序(而信号驱动是告诉应用程序何时可以开始拷贝数据)

在这里插入图片描述

任何IO过程中, 都包含两个步骤. 第一是等待, 第二是拷贝. 而且在实际的应用场景中, 等待消耗的时间往往都远远高于拷贝的时间. 让IO更高效, 最核心的办法就是让等待的时间尽量少

代码编写

阻塞IO

#include <iostream>
#include <unistd.h>
#include <fcntl.h>
int main()
{
    char buffer[1024];
    while(1)
    {
        ssize_t s=read(0,buffer,sizeof(buffer)-1);
        if(s>0)
        {
            buffer[s]=0;
            std::cout << "echo# " << buffer << std::endl;
        }
        else if(s==0)
        {
            std::cout << "read end" << std::endl;
        }
        else
        {
            
        }
    }
    return 0;
}

这时它会卡住,只有你输入才会有反应:

在这里插入图片描述

输入后:

在这里插入图片描述

非阻塞IO

一个文件描述符, 默认都是阻塞IO.

fcntl

int fcntl(int fd, int cmd, ... /* arg */ )

第一个参数fd:要设置哪个文件描述符
第二个参数cmd:要做什么
第三个参数arg:要设置什么状态

fcntl函数五种功能

复制一个现有的描述符(cmd=F_DUPFD) .
获得/设置文件描述符标记(cmd=F_GETFD或F_SETFD).
获得/设置文件状态标记(cmd=F_GETFL或F_SETFL).
获得/设置异步I/O所有权(cmd=F_GETOWN或F_SETOWN).
获得/设置记录锁(cmd=F_GETLK,F_SETLK或F_SETLKW.

util.hpp

#include <iostream>
#include <unistd.h>
#include <fcntl.h>
void SetNonBlock(int fd)
{
    int flag=fcntl(fd,F_GETFL);
    if(flag<0)
    {
        std::cout << "fctnl failed" << std::endl;
        return;
    }
    int n=fcntl(fd,F_SETFL,flag|O_NONBLOCK);
}

使用F_GETFL将当前的文件描述符的属性取出来(这是一个位图).
然后再使用F_SETFL将文件描述符设置回去. 设置回去的同时, 加上一个O_NONBLOCK参数

test.cpp

#include"util.hpp"
#include <string.h>
void print()
{
    std::cout << "我在空闲时间做其他事情" << std::endl;
}
int main()
{
    char buffer[1024];
    SetNonBlock(0);
    while(1)
    {
        ssize_t s=read(0,buffer,sizeof(buffer)-1);
        if(s>0)
        {
            buffer[s-1]=0;
            std::cout << "echo# " << buffer << std::endl;
        }
        else if(s==0)
        {
            std::cout << "read end" << std::endl;
            break;
        }
        else
        {
            //1.当我不在输入的时候,底层没有数据,会返回错误!但是!这并不是错误,只是以错误的形式返回了
            // std::cout << "s: " << s << " errno: " << strerror(errno) << std::endl;
            //查看man手册 man 2 read,EAGAIN || EWOULDBLOCK 不算
            //EINTR被信号中断
            if(errno==EAGAIN)
            {
                std::cout << "我没有错,只是没有数据" << std::endl;
                print();
            }
            else if(errno==EINTR)
            {
                continue;
            }
            else
            {
                std::cout << "出错" << std::endl;
                break;
            }
        }
        // sleep(1);
    }
    return 0;
}

在这里插入图片描述

多路转接—select

文件描述符状态的变化:1. 可读不可读的变化 2. 可写不可写的变化 3. 异常不异常的变化

因此,等fd,其实就是在等事件:1. 读事件就绪 2. 写事件就绪 3. 异常事件

系统提供select函数来实现多路复用输入/输出模型

select只负责一件事情:等(等待文件描述符事件变化)

select系统调用是用来让我们的程序监视多个文件描述符的状态变化的
程序会停在select这里等待,直到被监视的文件描述符有一个或多个发生了状态改变

select函数:

int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout)

第一个参数nfds:所要等的众多的文件描述符中最大的文件描述符+1.

第二、三、四个参数readfds、writedfs、exceptfds是读文件描述符集.

输入时:用户告诉OS,要注意我设置的多个fd中的读事件是否就绪.
输出时:OS告诉用户,你让我注意的多个fd中,有哪些已经就绪.

fd_set是位图结构的.
以读事件readfds为例:

1.比特位的位置代表fd的编号”是否“的概念
2.输入时(用户告诉内核):要注意我设置的多个fd"是否"中的事件是否就绪.
3.输出时(内核告诉用户):用户曾经让我注意的多个fd中,有哪些事件已经就绪

比如readfds的位图结构为0110 1010(输入时,用户给OS的),这里需要OS注意4个fd,然后输出时readfds的位图结构为0000 0010,这就就说明OS注意的4个fd中,只有第7位的fd事件已就绪.

输入和输出用的是同一张位图.

第五个参数timeout为结构timeval,用来设置select()的等待时间.

这里等待的策略问题也可以分为:① 阻塞等待 ② 非阻塞等待 ③ 设定deadline

这个timeout就是设置一个deadline(一个时间),如果阻塞,超时就会立马返回,如果在时间之前完成,就返回剩余的时间值

NULL:则表示select()没有timeout, select将一直被阻塞,直到某个文件描述符上发生了事件;
0:仅检测描述符集合的状态,然后立即返回,并不等待外部事件的发生。
特定的时间值:如果在指定的时间段里没有事件发生, select将超时返回、

返回值int:

  1. 0 :有几个fd就绪了

  1. == 0:timeout
  2. <0:报错了

编写一段select代码,只实现读

sock.hpp

#pragma once

#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
class Sock
{
    const static int backlog = 5;
    public:
    static int Socket()
    {
        // 1. 创建socket文件套接字对象
        int sock = socket(AF_INET, SOCK_STREAM, 0);
        if (sock < 0)
        {
            std::cout << "create socket error" << std::endl;
            exit(-1);
        }
        std::cout<< "create socket success: " << sock << std::endl;

        int opt = 1;
        //防止bind失败
        setsockopt(sock, SOL_SOCKET, SO_REUSEADDR|SO_REUSEPORT, &opt, sizeof(opt));
        return sock;
    }
    static void Bind(int sock, int port)
    {
        // 2. bind绑定自己的网络信息
        struct sockaddr_in local;
        memset(&local, 0, sizeof(local));
        local.sin_family = AF_INET;
        local.sin_port = htons(port);
        local.sin_addr.s_addr = INADDR_ANY;
        if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0)
        {
            std::cout <<  "bind socket error" << std::endl;
            exit(-1);
        }
        std::cout << "bind socket success" << std::endl;
    }
    static void Listen(int sock)
    {
        // 3. 设置socket 为监听状态
        if (listen(sock, backlog) < 0) // 第二个参数backlog后面在填这个坑
        {
            std::cout << "listen socket error" << std::endl;
            exit(-1);
        }
        std::cout << "listen socket success" << std::endl;
    }
    static int Accept(int listensock, std::string *clientip, uint16_t *clientport)
    {
        struct sockaddr_in peer;
        socklen_t len = sizeof(peer);
        int sock = accept(listensock, (struct sockaddr *)&peer, &len);
        if (sock < 0)
            std::cout <<  "accept error, next" << std::endl;
        else
        {
            std::cout << "accept a new link success, get new sock: " << sock << std::endl; 
            *clientip = inet_ntoa(peer.sin_addr);
            *clientport = ntohs(peer.sin_port);
        }

        return sock;
    }
};

selectserver.hpp

#pragma once
#include "sock.hpp"
namespace select_ns
{
    // 因为是位图所有要*8
    static const int fdnum = sizeof(fd_set) * 8;
    static const int port = 8081;
    static const int defaultfd = -1;
    class SelectServer
    {
    public:
        SelectServer()
            : _port(port), _listensock(-1), fdarray(nullptr)
        {
        }
        void InitServer()
        {
            _listensock = Sock::Socket();
            Sock::Bind(_listensock, port);
            Sock::Listen(_listensock);
            fdarray = new int[fdnum];
            for (int i = 0; i < fdnum; ++i)
            {
                fdarray[i] = defaultfd;
            }
            fdarray[0] = _listensock;
        }
        void Print()
        {
            std::cout << "fd list: ";
            for (int i = 0; i < fdnum; ++i)
            {
                if (fdarray[i] != defaultfd)
                    std::cout << fdarray[i] << " ";
            }
            std::cout << std::endl;
        }
        void Accepter(int listensock)
        {
            // 目前一定是listensock,只有一个
            // 走到这里,accept函数,不会阻塞
            std::string clientip;
            uint16_t clientport = 0;
            int sock = Sock::Accept(_listensock, &clientip, &clientport);
            if (sock < 0)
            {
                return;
            }
            std::cout << "accept success,clientip: " << clientip.c_str() << " clientport: " << clientport;
            int i = 0;
            for (; i < fdnum; ++i)
            {
                if (fdarray[i] != defaultfd)
                    continue;
                else
                    break;
            }
            if (i == fdnum)
            {
                std::cout << "server if full,please wait" << std::endl;
                close(sock);
            }
            else
            {
                fdarray[i] = sock;
            }
            Print();
        }
        void Recver(int sock,int pos)
        {
            char buffer[1024];
            //这样读取是有问题的,因为没有保证读到的是否是一个完整的报文
            //但是我们这里这样写
            ssize_t s=recv(sock,buffer,sizeof(buffer)-1,0);
            if(s>0)
            {
                buffer[s]=0;
                std::cout << "client say #" << buffer << std::endl;
            }
            //对方关闭文件描述符
            else if(s==0)
            {
                close(sock);
                //下次不让操作系统去关心他了
                fdarray[pos]=defaultfd;
                return ;
            }
            else
            {
                close(sock);
                fdarray[pos]=defaultfd;
                return ;
            }
            //处理reques
            std::string resp=buffer;
            write(sock,resp.c_str(),resp.size());
        }
        void HandlerEvent(fd_set &rfds)
        {
            for(int i=0;i<fdnum;++i)
            {
                //过滤非法fd
                if(fdarray[i]==defaultfd)
                {
                    continue;
                }
                //正常的fd
                //在FD_ISSET里面才是就绪的fd
                if(fdarray[i]==_listensock&&FD_ISSET(fdarray[i],&rfds))
                {
                    Accepter(_listensock);
                }
                else if(FD_ISSET(fdarray[i],&rfds))
                {
                    Recver(fdarray[i],i);
                }
                else
                {

                }
            }
        }
        void start()
        {
            for (;;)
            {
                fd_set rfds;
                FD_ZERO(&rfds);
                int maxfd = fdarray[0];
                for (int i = 0; i < fdnum; ++i)
                {
                    if (fdarray[i] == defaultfd)
                        continue;
                    FD_SET(fdarray[i], &rfds);
                    if (maxfd < fdarray[i])
                        maxfd = fdarray[i];
                }
                struct timeval timeout = {1, 0};
                int n = select(maxfd + 1, &rfds, nullptr, nullptr, nullptr);
                switch (n)
                {
                case 0:
                    std::cout << "timeout.." << std::endl;
                    break;
                case -1:
                    std::cout << "select err,code:" << errno << " err string " << strerror(errno);
                default:
                    // 说明有事件就绪了,目前只有一个监听事件就绪了
                    std::cout << "get a new link...";
                    HandlerEvent(rfds);
                    break;
                }
            }
        }

    private:
        int _port;
        int _listensock;
        int *fdarray;
    };
}

test.cpp

#include"selectserver.hpp"
#include<memory>
using namespace std;
int main(int argc,char* argv[])
{
    unique_ptr<select_ns::SelectServer> svr(new select_ns::SelectServer());
    svr->InitServer();
    svr->start();
    return 0;
}

在这里插入图片描述

在这里插入图片描述

select编码特征:

select之前要进行所有参数的重置,之后,要遍历所有的合法fd进行事件检测
select需要用户自己维护第三方数组,来保存所有的合法fd,方便select进行批量处理
一旦特定的fd事件就绪,本次读取或者写入不会被阻塞

select优缺点:

1.优点:
占用资源少,并且高效(对比之前的多进程、多线程)
2.缺点
每一次都要进行大量的重置工作,效率比较低
每一次能够坚持的fd数量是有上限的
每一次都需要内核到用户,用户到内核传递位置参数,出现较为大量的数据拷贝工作
select编码特别不方便,需要用户自己维护数组
select底层需要同步遍历的方式,检测所有需要检测的fd(传入最大maxfd + 1)

多路转接—poll

poll函数

int poll(struct pollfd *fds, nfds_t nfds, int timeout);
struct pollfd {
        int fd;               /* file descriptor */
        short events;   /* requested events */
        short revents;  /* returned events */
};

第一个参数fds是struct pollfd*(指针类型,可以当作一个数组来看)类型的,这个结构体里包括所要等的文件描述符fd,请求事件events(用户告诉内核),请求的哪些事件就绪revent(内核告诉用户). 针对事件,poll将事件进行了拆分,输入和输出进行了分离. 因此,传入的相当于一个数组,数组中的每一个元素告诉内核一个文件描述符fd的哪一个事件要关心.(每一个fd,都可能有不同的事件要关心,或者发生).。

第二个参数nfds表示fds数组的长度。

第三个参数timeout表示poll函数的超时事件,单位是ms(timeout是输入型参数、单位是ms,select的timeout是输入输出型,单位是s).timeout设置为1000如果没有链接到来是每1s检测一次,timeout为0是一直在检测,是非阻塞式的检测,timeout为-1则不检测,代表的是永久阻塞。

events和revents的取值:

在这里插入图片描述

这里重点是POLLIN和POLLOUT,一个是读一个是写,这些都是宏,如果想要读和写当然是可以用 异或| 来连接的。

返回值:

返回值小于0, 表示出错;
返回值等于0, 表示poll函数等待超时;
返回值大于0, 表示poll由于监听的文件描述符就绪而返回

总结一下:可以用一个结构体来表示一个文件描述符所对应的events(用户告诉内核)、revents(内核告诉用户)相关的事件,用指针(代表数组)可以让poll关心多个文件描述符,每一个文件描述符的输入输出事件都可以使用不同的位图来表征,所以select上的参数都可以使用poll来取代了。select需要一个第三方数组,而poll则不需要了,poll可以把这个结构体数组作为全局数组,就可以让poll进行事件监听,又可以让新链接把文件描述符加进来。如果将来有一个文件描述符不关心事件了,就可以将event清空,并且将fd设置为-1,poll在底层就自动不会去关心了。
使用一下poll:

PollServe.hpp:

#pragma once
#include "sock.hpp"
#include <poll.h>
namespace poll_ns
{
    static const int num = 2048;
    static const int port = 8081;
    static const int defaultfd = -1;
    class pollServer
    {
    public:
        pollServer()
            : _port(port), _listensock(-1),_rfds(nullptr)
        {
        }
        void InitServer()
        {
            _listensock = Sock::Socket();
            Sock::Bind(_listensock, port);
            Sock::Listen(_listensock);
            _rfds = new struct pollfd[num];
            for (int i = 0; i < num; ++i)
            {
                ResetItem(i);   
            }
            _rfds[0].fd=_listensock;
            _rfds[0].events=POLLIN;
            _rfds[0].revents=0;
        }
        void ResetItem(int i)
        {
            _rfds[i].fd=defaultfd;
            _rfds[i].events=0;
            _rfds[i].revents=0;
        }
        void Print()
        {
            std::cout << "fd list: ";
            for (int i = 0; i < num; ++i)
            {
                if (_rfds[i].fd != defaultfd)
                    std::cout << _rfds[i].fd << " ";
            }
            std::cout << std::endl;
        }
        void Accepter(int listensock)
        {
            // 目前一定是listensock,只有一个
            // 走到这里,accept函数,不会阻塞
            std::string clientip;
            uint16_t clientport = 0;
            int sock = Sock::Accept(_listensock, &clientip, &clientport);
            if (sock < 0)
            {
                return;
            }
            std::cout << "accept success,clientip: " << clientip.c_str() << " clientport: " << clientport;
            int i = 0;
            for (; i < num; ++i)
            {
                if (_rfds[i].fd != defaultfd)
                    continue;
                else
                    break;
            }
            if (i == num)
            {
                std::cout << "server if full,please wait" << std::endl;
                close(sock);
            }
            else
            {
                _rfds[i].fd = sock;
                _rfds[i].events=POLLIN;
                _rfds[i].revents=0;
            }
            Print();
        }
        void Recver(int pos)
        {
            char buffer[1024];
            //这样读取是有问题的,因为没有保证读到的是否是一个完整的报文
            //但是我们这里这样写
            ssize_t s=recv(_rfds[pos].fd,buffer,sizeof(buffer)-1,0);
            if(s>0)
            {
                buffer[s]=0;
                std::cout << "client say #" << buffer << std::endl;
            }
            //对方关闭文件描述符
            else if(s==0)
            {
                close(_rfds[pos].fd);
                //下次不让操作系统去关心他了
                ResetItem(pos);
                return ;
            }
            else
            {
                close(_rfds[pos].fd);
                ResetItem(pos);
                return ;
            }
            //处理reques
            std::string resp=buffer;
            write(_rfds[pos].fd,resp.c_str(),resp.size());
        }
        void HandlerEvent()
        {
            for(int i=0;i<num;++i)
            {
                //过滤非法fd
                if(_rfds[i].fd==defaultfd)
                {
                    continue;
                }
                if(!(_rfds[i].events&POLLIN)) continue;
                //正常的fd
                //在FD_ISSET里面才是就绪的fd
                if((_rfds[i].revents&POLLIN)&&_rfds[i].fd==_listensock) 
                {
                    Accepter(_listensock);
                }
                else if(_rfds[i].revents&POLLIN)
                {
                    Recver(i);
                }
                else
                {

                }
            }
        }
        void start()
        {
            int timeout=-1;
            for (;;)
            {
               int n=poll(_rfds,num,timeout);
                switch (n)
                {
                case 0:
                    std::cout << "timeout.." << std::endl;
                    break;
                case -1:
                    std::cout << "select err,code:" << errno << " err string " << strerror(errno);
                default:
                    // 说明有事件就绪了,目前只有一个监听事件就绪了
                    std::cout << "get a new link...";
                    HandlerEvent();
                    break;
                }
            }
        }

    private:
        int _port;
        int _listensock;
        struct pollfd* _rfds;
    };
}

实验现象和select一致,这里不在演示

poll去掉了select的一些缺点,优点更多了.

优点:
1.poll的输入和输出进行了分离,pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式. 接口使用比select更方便
2.poll没有了最大数量限制 (但是数量过大后性能也是会下降)
缺点:
3.和select函数一样, poll返回后,需要轮询pollfd来获取就绪的描述符
4.每次调用poll都需要把大量的pollfd结构从用户态拷贝到内核中
5.同时连接的大量客户端在一时刻可能只有很少的处于就绪状态, 因此随着监视的描述符数量的增长, 其效率也会线性下降

多路转接—epoll

学了select和poll,目前多路转接还存在这一些问题:

  1. select、poll都是基于多个fd进行遍历检测,来识别事件,链接多的时候,一定会引起遍历周期的增加。
  2. 对应事件(用户告诉内核,内核通知用户)需要使用的数据结构(数组),需要由程序员自己维护。

因此,为了解决这些问题,epoll出现了。虽然epoll和poll的名称很像,但是实际上是完全不同的。

epoll几乎解决了多路转接方案的所有缺点,具备之前所说的一切优点,是现在性能最好的多路IO就绪通知方法。

epoll接口

不同与上面两个,epoll有3个接口。但是无论有多少个接口,核心工作都是:只负责等(包括:1. 用户告诉内核 2. 内核告诉用户)。

1.epoll_create()

int epoll_create(int size);

size参数不重要,现在一般设置为特定的一个值即可,128、256或者512等。

重点是返回值,如果成功就返回一个文件描述符。会创建出一个epoll模型。

2.epoll_ctl

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

完成用户告诉内核的工作。

第一个参数:输入创建成功epoll模型之后,所返回的文件描述符

第二个参数:要操作的方式(增加ADD,修改MOD,删除DEL )

第三个参数:需要监听的fd

第四个参数:用户告诉内核要监听的事件

总结:对epoll模型中特定的文件描述符所要关心的事件进行操作(增加、修改、删除)。

第二个参数的取值:

EPOLL_CTL_ADD :注册新的fd到epfd中;
EPOLL_CTL_MOD :修改已经注册的fd的监听事件;
EPOLL_CTL_DEL :从epfd中删除一个fd

第四个参数的类型struct epoll_event:

struct epoll_event{
        uint32_t events;       /* Epoll events */
        epoll_data_t data;   /* User data variable */
}

调用epoll_ctl的时候,向epoll模型中添加对应的文件描述符及其关心事件时,这个events代表的是用户告诉内核,当在epoll内返回的时候,拿到的事件是内核告诉用户。在接口上做了分离。

typedef union epoll_data
{
        void *ptr;
        int fd;
        uint32_t u32;
        uint64_t u64;
}epoll_data_t;

3.epoll_wait

int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

完成内核告诉用户的工作

第一个参数:输入创建成功epoll模型之后,所返回的文件描述符

第二个参数:输出型参数,请求的哪些事件就绪**(内核告诉用户)**

第三个参数:告知这个events有多大

第四个参数:与poll中的timeout完全相同

从epoll模型中提取events,将已经就绪的事件返回。

工作原理

OS如何知道网络中的数据到来了呢?

网卡先得到数据,然后会向CPU发送硬件中断,调用OS预设的中断函数,负责从外设进行数据拷贝,从外设拷贝到内核缓冲区中。

如何理解数据在硬件上流动?

电信号的流动,高电压流到低电压,可以把高电压当作1,低电压当作0。

epoll模型

epoll存在一个回调机制,会针对特定的一个或者多个fd,设定对应的回调机制:当fd缓冲区中有数据的时候,进行回调。

只要调用了这个函数(拷贝函数),底层有数据到来的时候,fd就不需要再轮询检测底层是否有数据,只要设定好回调,就会自动通知。

回调函数作用:

获取就绪的fd
获取就绪的事件是什么
构建queue_node节点
将节点链入就绪队

创建epoll模型的时候会创建一颗红黑树,是一个空树,节点字段:

用户向关心哪一个文件描述上的什么事件就由这个字段决定。

struct rb_struct
{
        int fd;          // 哪一个文件描述符由fd决定
        int events;  // 什么事件由events决定(events是位图结构,与poll中的一样)
}

因此这个字段是为了解决用户告诉内核的问题。

调用epoll_ctl这个函数本质就是在这个红黑树中新插入对应的节点。OS维护了这个红黑树,那么当用户历史上注册过很多的文件描述符和事件,OS就可以知道用户关心哪个文件描述符的哪些事件。

key: fd、sockfd

创建epoll模型还会创建一个队列结构,是一个空队列,节点字段:

内核中,在红黑树中,哪些fd上的哪些事件已经就绪由这个字段决定。

struct queue_node
{
        int fd;
        int revents;
}

因此这个字段是为了解决内核告诉用户的问题

所以上面的三个函数的作用分别是:

epoll_create: 创建整个epoll模型:建立好回调机制,创建一颗空的红黑树,创建一个就绪队列

epoll_ctl: **操作红黑树:**向红黑树中新增节点、查找红黑树节点找到后修改、删除红黑树中节点(这个红黑树等价于poll中的数组)

epoll_wait: **检测就绪队列:**检测就绪队列是否为空,不为空就直接从就绪队列中获取节点

一旦建立epoll模型,本质上,哪些fd上的哪些event就绪,整个过程用户是不用关心的,都是OS自动做的。

那么epoll为什么高效呢?

  1. 采用红黑树来做管理(epoll_ctl的数据结构是红黑树,针对的场景是用户告诉内核的问题,红黑树节点是没有上限的,所以插入的文件描述符也是没有上限的。红黑树的管理成本非常低)
  2. 采用回调机制,彻底解放OS,不需要OS主动轮询,大大提高了检测效率(OS在拷贝函数时,只要有数据,就通过回调的方式获取对应的就绪文件描述符、就绪事件,构建就绪队列,链入到就绪队列,不需要OS在底层遍历每一个文件描述符了,只会为就绪的节点提供服务,未就绪的节点不需要管,就不再需要O(N)的时间遍历了)
  3. epoll_wait调用的时候不再需要进行遍历了,直接在就绪队列中取节点即可。(就绪队列有数据时,就会可能告诉用户有就绪的数据了。有人拿节点。有人放节点,这就是生产者消费者模型,但是是不需要加锁了,epoll的代码考虑了线程安全的问题,epoll底层是多线程、多进程安全的)

epoll.hpp

#pragma once
#include "sock.hpp"
#include <sys/epoll.h>
namespace epoll_ns
{
    static const int num = 2048;
    static const int port = 8081;
    static const int defaultfd = -1;
    static const int size = 128;
    class epollServer
    {
    public:
        epollServer()
            : _port(port), _listensock(-1), _epfd(defaultfd), _revs(nullptr)
        {
        }
        ~epollServer()
        {
            if (_listensock != defaultfd)
                close(_listensock);
            if (_epfd != defaultfd)
                close(_listensock);
            if (_revs != nullptr)
                delete[] _revs;
        }
        void InitServer()
        {
            _listensock = Sock::Socket();
            Sock::Bind(_listensock, port);
            Sock::Listen(_listensock);
            //创建epoll模型
            _epfd = epoll_create(size);
            if (_epfd < 0)
            {
                std::cout << "epoll create error" << std::endl;
            }
            //将listensock添加到epoll中
            struct epoll_event event;
            //当时间就绪,被捞取上来的时候,要知道哪一个时间就绪了
            event.data.fd = _listensock;
            event.events = EPOLLIN;
            epoll_ctl(_epfd, EPOLL_CTL_ADD, _listensock, &event);
            //申请就绪事件的空间
            _revs = new struct epoll_event[num];
        }
        void Accepter()
        {
            // 目前一定是listensock,只有一个
            // 走到这里,accept函数,不会阻塞
            std::string clientip;
            uint16_t clientport = 0;
            int sock = Sock::Accept(_listensock, &clientip, &clientport);
            std::cout << "accept success,clientip: " << clientip.c_str() << " clientport: " << clientport;
            if (sock < 0)
            {
                return;
            }
            struct epoll_event event;
            event.data.fd = sock;
            event.events = EPOLLIN;
            epoll_ctl(_epfd, EPOLL_CTL_ADD, sock, &event);
        }
        void Recver(int sock)
        {

            char buffer[1024];
            //这样读取是有问题的,因为没有保证读到的是否是一个完整的报文
            //但是我们这里这样写
            ssize_t s = recv(sock, buffer, sizeof(buffer) - 1, 0);
            if (s > 0)
            {
                buffer[s] = 0;
                std::cout << "client say #" << buffer << std::endl;
            }
            //对方关闭文件描述符
            else if (s == 0)
            {

                //下次不让操作系统去关心他了
                epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);
                close(sock);
                return;
            }
            else
            {
                epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);
                close(sock);
                return;
            }
            std::string resp = buffer;
            //这个发送其实也有问题,因为你不知道当前的发送条件是就绪的
            send(sock, resp.c_str(), resp.size(), 0);
        }
        void HandlerEvent(int readyNum)
        {
            for (int i = 0; i < num; ++i)
            {
                int sock = _revs[i].data.fd;
                uint32_t event = _revs[i].events;
                if (sock == _listensock && (event & EPOLLIN))
                {
                    std::cout << "Accpter" << std::endl;
                    //_listensock读事件就绪,获取新连接
                    Accepter();
                }
                else if (event & EPOLLIN)
                {
                    std::cout << "Recver" << std::endl;
                    //普通读事件就绪
                    Recver(sock);
                }
                else
                {
                }
            }
        }
        void start()
        {
            int timeout = 1000;
            for (;;)
            {
                // n的个数是就绪事件的个数
                int n = epoll_wait(_epfd, _revs, num, -1);
                switch (n)
                {
                case 0:
                    std::cout << "timeout.." << std::endl;
                    break;
                case -1:
                    std::cout << "select err,code:" << errno << " err string " << strerror(errno);
                default:
                    // 说明有事件就绪了,目前只有一个监听事件就绪了
                    std::cout << "get a new link...";
                    HandlerEvent(n);
                    break;
                }
            }
        }

    private:
        int _port;
        int _listensock;
        int _epfd;
        struct epoll_event *_revs;
    };
}

在这里插入图片描述

1.这里fd为什么是5?

因为epoll模型占用了fd 4

工作方式

epoll有2种工作方式:1.水平触发(LT) 2.边缘触发(ET)

水平触发(LT):如果接收缓冲区里有数据,OS就会一直发消息通知你底层有数据

边缘触发(ET):如果接收缓冲区里有数据,OS通知你一次你没管,OS就不会再通知了

LT:只要底层有数据,就会一直通知你,这是多路转接的默认模式。程序员在上层编码的时候,可以暂时不把数据读取完毕,不用担心底层不通知你进而导致的数据丢失的问题。

ET:只有底层在从无到有、从有到多变化的时候,才会通知你。这就需要程序员一旦收到通知,就必须将自己收到的数据从内核中全部读取完成,否则可能会有数据丢失的风险。

一般来说是ET更高效,ET通知的量最小,拷贝的数据量也是最小的。本质是让上层尽快取走数据的一种机制,有更大的窗口大小。让对方不用对于滑动窗口,流量控制做太多的控制,可以尽快将数据交付,并且对方的延迟应答等其它提高效率的策略也能够在TCP层面上体现出来,进而提高效率。

我怎么知道数据被取完了呢?

read、recv不经过读取,是无法得知的,只有读取了之后才能知道是否取完了。

while(true) read(); (errno -> break)
在网络中,读取的所有文件描述符默认是阻塞的,阻塞的特点是有数据就直接常规返回,没有数据就调用read,此时服务器会因为没有数据而被阻塞,所以因为必须要循环读,直到读取出错或者全部读取完再读一次才能知道没有数据了。那么这最后一次读取,一定会导致read/recv阻塞住。

因此ET模式下,所有的fd、sock必须处于非阻塞模式。LT模式设阻塞模式不会受影响,但是也建议设置为非阻塞模式。

LT工作模式:

epoll默认状态下就是LT工作模式

当epoll检测到socket上事件就绪的时候, 可以不立刻进行处理, 或者只处理一部分。
如果只读了1K数据, 缓冲区中还剩1K数据, 在第二次调用 epoll_wait 时, epoll_wait仍然会立刻返回并通知socket读事件就绪。
直到缓冲区上所有的数据都被处理完, epoll_wait 才不会立刻返回。
支持阻塞读写和非阻塞读写。

ET工作模式:

如果我们在第1步将socket添加到epoll描述符的时候使用了EPOLLET标志, epoll进入ET工作模式

当epoll检测到socket上事件就绪时, 必须立刻处理。
如果只读了1K的数据, 缓冲区还剩1K的数据, 在第二次调用 epoll_wait 的时候, epoll_wait 不会再返回了。
也就是说, ET模式下, 文件描述符上的事件就绪后, 只有一次处理机会。
ET的性能比LT性能更高( epoll_wait 返回的次数少了很多)。Nginx默认采用ET模式使用epoll。
只支持非阻塞的读写。
select和poll其实也是工作在LT模式下. epoll既可以支持LT, 也可以支持ET
LT是 epoll 的默认行为。使用 ET 能够减少 epoll 触发的次数,但是代价就是强迫着程序员一次响应就绪过程中就把所有的数据都处理完。
相当于一个文件描述符就绪之后, 不会反复被提示就绪, 看起来就比 LT 更高效一些。但是在 LT 情况下如果也能做到每次就绪的文件描述符都立刻处理, 不让这个就绪被重复提示的话, 其实性能也是一样的。
另一方面, ET 的代码复杂程度更高了。

如何写入?

读写事件就绪

读:底层有数据 -> recv -> 底层没有数据,不会读取

写:底层有空间 -> send -> 底层如果没有空间,不应该再写入

默认我们只设置让epoll帮我们关心读事件,没有关心写事件
为什么没有关心写事件:因为最开始的时候,写空间一定是就绪的
但是运行中可能会存在条件不足,即写空间被写满了

写:

  1. 如果LT模式,一定是要先检测有没有对应的空间(检测方式:先打开对写事件的关心,epoll会自动进行事件派发),然后才写入。LT模式下,只要打开了写入,我们想要实现的代码就会自动进行调用sender方法,进行发送
  2. 如果是ET模式,也可以采用上面的方法,不过,一般用ET追求高效,直接发送,通过发送"是否全部发送完成",来决定是否要进行打开写事件进行关心
    a. 先发送,发完就完了
    b. 先发送,如果没有发完,打开写事件关心,让epoll自动帮我们进行发送

注意:一般写事件关心,不能常打开,一定是在需要的时候,再进行打开,不需要的时候就要关闭对写事件的关心

当我们开启对写事件关心的时候,首次或者每次打开都会自动触发一次epoll事件就绪

reactor

在这里插入图片描述

  • Reactor 对象的作用是监听和分发事件;
  • Acceptor 对象的作用是获取连接;
  • Handler 对象的作用是处理业务;

单 Reactor 单进程的方案因为全部工作都在同一个进程内完成,所以实现起来比较简单,不需要考虑进程间通信,也不用担心多进程竞争。

但是,这种方案存在 2 个缺点:

  • 第一个缺点,因为只有一个进程,无法充分利用 多核 CPU 的性能
  • 第二个缺点,Handler 对象在业务处理时,整个进程是无法处理其他连接的事件的,如果业务处理耗时比较长,那么就造成响应的延迟

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/957589.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

仓库运行状况如何得知?数据挖掘是关键!

库存、订单、出入库记录、物流信息、货物状态等数据&#xff0c;是仓库管理的重要组成部分。 仓库数据的重要性 做好仓库数据管理对企业的重要性不言而喻。通过有效地管理数据&#xff0c;企业可以更好地了解市场需求和库存情况&#xff0c;快速响应市场变化&#xff0c;提高库…

iOS开发Swift-5-自动布局AutoLayout-摇骰子App

1.在iOS坐标系中&#xff0c;以向左、向下为正方向。图片以左上角为基准点。 2.打开之前的摇骰子App&#xff0c;对它的界面做一些适应所有iPhone机型的效果。 3.先对上方logo做一个y轴约束和一个宽高约束。 宽高约束&#xff1a; 水平居中&#xff1a; 对y轴进行约束。将虚线点…

【STM32单片机】STM32F103RCT6 串口1 串口2 串口3 串口4 串口5 初始化,标准库 ,支持printf

文章目录 单片机介绍引脚DMA—直接存储器访问串口 引脚串口1&#xff0c;初始化&#xff0c;发送与接收串口2&#xff0c;初始化&#xff0c;发送与接收串口3&#xff0c;初始化&#xff0c;发送与接收串口4&#xff0c;初始化&#xff0c;发送与接收串口5&#xff0c;初始化&a…

说说Flink on yarn的启动流程

分析&回答 核心流程 FlinkYarnSessionCli 启动的过程中首先会检查Yarn上有没有足够的资源去启动所需要的container&#xff0c;如果有&#xff0c;则上传一些flink的jar和配置文件到HDFS&#xff0c;这里主要是启动AM进程和TaskManager进程的相关依赖jar包和配置文件。接着…

软件架构设计(一) 软件架构的概念

在讲到软件架构的概念时,首先我们要了解到,架构是在做什么样的事情,它在整个软件开发周期中所属什么样的位置。 之前学习软件工程时,我们学到了开发模型,里面涉及到需求分析,概要设计,详细设计,编码,测试。但事实上,没有提到架构这个东西。 为什么这么重要的东西没…

造测试数据

对应sql&#xff1a; from openpyxl import Workbook from faker import Faker# 创建一个Workbook对象 workbook Workbook() # 获取默认的活动工作表 sheet workbook.active# 创建一个Faker对象 fake Faker()# 写入表头 header [Name, Address, Email] sheet.append(heade…

实操教程 | 触发器实现 Apache DolphinScheduler 失败钉钉自动告警

作者 | sqlboy-yuzhenc 背景介绍 在实际应用中&#xff0c;我们经常需要将特定的任务通知给特定的人&#xff0c;虽然 Apache DolphinScheduler 在安全中心提供了告警组和告警实例&#xff0c;但是配置起来相对复杂&#xff0c;并且还需要在定时调度时指定告警组。通过这篇文…

Vert.x 源码解析(4.x)(一)——Future源码解析

目录 1. 简介 在现代的软件开发中&#xff0c;异步编程已经变得非常重要。它可以提高应用程序的并发性能&#xff0c;使应用程序能够更有效地处理大量的并行操作。Vert.x 是一个面向事件驱动、非阻塞的异步编程框架&#xff0c;它提供了丰富的工具来简化异步编程的复杂性。 如…

误删文件恢复怎么做?2023最新方法公布!

“突然感觉闯了大祸&#xff0c;在用朋友的电脑时&#xff0c;误删了一些他电脑里非常重要的文件&#xff0c;现在真的感觉很对不起&#xff0c;有什么方法可以找回这些误删的文件吗&#xff1f;非常着急&#xff0c;希望大家给我一些建议&#xff01;” 如今&#xff0c;电脑已…

Go:关于‘fresh‘ 不是内部或外部命令,也不是可运行的程序问题的解决方案

如果你使用了go get命令来安装fresh包&#xff0c;那么fresh命令可能没有被正确添加到系统的PATH环境变量中&#xff0c;需要修改你的fresh.exe的文件存放位置。 一般而言&#xff0c;你会将GO的安装文件夹Go与工作区文件夹GoProjects分开&#xff08;你的文件夹名称与我的不同…

linux刻录iso到u盘

需要的工具&#xff1a;Linux系统、U盘、ISO镜像文件。 首先在Linux系统中打开终端&#xff0c;使用dd命令&#xff0c;格式如下&#xff1a; sudo dd ifxxx.iso of/dev/sdb 命令中xxx.iso是你的ISO镜像文件的路径&#xff0c;of后面的你的U盘路径&#xff0c;一般就是/dev/sdb…

软件架构设计(二) 软件架构风格其他风格简介

架构师备战(四)-软件架构设计(二) 软件架构风格其他风格简介 架构风格其实是很重要的知识,我们先了解了基本的五种架构风格, 我们之前也提到除了这五种风格之外, 还有一些没有收录在这几种风格之内的, 这次会去做一个探索。 1、闭环控制架构风格(过程控制)【重要】 概念 …

ChatGPT Prompting开发实战(四)

一、chaining prompts应用解析及输出文本的设定 由于输入和输出都是字符串形式的自然语言&#xff0c;为了方便输入和输出信息与系统设定使用的JSON格式之间进行转换&#xff0c;接下来定义从输入字符串转为JSON list的方法&#xff1a; 定义从JSON list转为输出字符串的方法&…

基于北方苍鹰算法优化的BP神经网络(预测应用) - 附代码

基于北方苍鹰算法优化的BP神经网络&#xff08;预测应用&#xff09; - 附代码 文章目录 基于北方苍鹰算法优化的BP神经网络&#xff08;预测应用&#xff09; - 附代码1.数据介绍2.北方苍鹰优化BP神经网络2.1 BP神经网络参数设置2.2 北方苍鹰算法应用 4.测试结果&#xff1a;5…

C#安装“Windows 窗体应用(.NET Framework)”

目录 背景: 第一步: 第二步: 第三步&#xff1a; 总结: 背景: 如下图所示:在Visual Studio Installer创建新项目的时候&#xff0c;想要添加windows窗体应用程序&#xff0c;发现里面并没有找到Windows窗体应用(.NET Framework)模板&#xff0c;快捷搜索也没有发现&#…

插入排序(Insertion Sort)

C自学精简教程 目录(必读) 插入排序 每次选择未排序子数组中的第一个元素&#xff0c;从后往前&#xff0c;插入放到已排序子数组中&#xff0c;保持子数组有序。 打扑克牌&#xff0c;起牌。 输入数据 42 20 17 13 28 14 23 15 执行过程 完整代码 #include <iostream…

springboot的mybatis问题

自动映射 在数据库列名和java类属性名相同的情况&#xff0c;mybatis会自动将数据库的值自动匹配到java类的属性当中。 java的price等变量 mysql的price等字段 mybatis会自动将数据库的值自动匹配到java类的属性当中。 开启驼峰命名 在application中配置 mybatis:type-…

【ArcGIS Pro二次开发】(64):多分式标注

在ArcGIS中有时会遇到需要二分式标注的情况&#xff0c;有时甚至是三分式、四分式。 通过输入标注表达式&#xff0c;可以做出如下的效果&#xff0c;但是代码不短&#xff0c;每次都要输一遍也挺麻烦。 网上也有一些分式标注的python工具&#xff0c;但不够直观&#xff0c;于…

Flink 如何定位反压节点?

分析&回答 Flink Web UI 自带的反压监控 —— 直接方式 Flink Web UI 的反压监控提供了 Subtask 级别的反压监控。监控的原理是通过Thread.getStackTrace() 采集在 TaskManager 上正在运行的所有线程&#xff0c;收集在缓冲区请求中阻塞的线程数&#xff08;意味着下游阻…

CXL 存储设备标签存储区(LSA)

&#x1f525;点击查看精选 CXL 系列文章&#x1f525; &#x1f525;点击进入【芯片设计验证】社区&#xff0c;查看更多精彩内容&#x1f525; &#x1f4e2; 声明&#xff1a; &#x1f96d; 作者主页&#xff1a;【MangoPapa的CSDN主页】。⚠️ 本文首发于CSDN&#xff0c…