Linux学习之高级IO

news2024/11/26 4:36:44

之前的内容我们基本掌握了基础IO,如套接字,文件描述符,重定向,缓冲区等知识都是文的基本认识,而高级IO则是指更加高效的IO。

对于应用层,在读写的时候,本质就是把数据写给OS,若一方为空,就会进行阻塞式等待,读写的本质也就是数据的拷贝。

所以高效的 IO就需要在多线程的情况下,单位时间的的拷贝的数据更多,等待的比重越小。

目录

五种IO模型

非阻塞式IO

多路复用(多路转接)

IO多路转接之select

初识select

 select函数原型

IO多路转接之epoll

1.认识有关epoll的接口

2.epoll原理

3.编写epoll

4.epoll的工作模式

注意:

读写的改进 reactor


五种IO模型

 当前有五种IO模型:

我们以钓鱼佬钓鱼为例:(鱼竿:文件描述符) (上钩:读写就绪)(鱼:数据)

1.钓鱼佬张三,在完成打窝,之后选择好地点就开始钓鱼,在等待的过程中,雷打不动,任何人都干扰不到他,他死死的盯着鱼漂,一有动静,钩被咬了就提竿上鱼。这种等待的方式我们称为阻塞式IO,这也是大部分常用的读写接口的方式

2.钓鱼佬李四是一个资深的钓鱼佬,在完成准备工作时,李四就先观察了鱼漂,发现没动静,就拿起了手机刷视频,看一会儿之后再检查鱼漂发现上钩了,于是提杆上鱼。李四每隔一段时间进行检查,如果满足条件就读写,反之就干自己的事。这种方式称为非阻塞式IO--非阻塞轮询

3.钓鱼佬王五,拥有较为先进的鱼竿,完成准备工作后,直接就把鱼竿插入地上,一心一意的干其它事,鱼竿上有报警装置,一旦上鱼,立刻发送警报告知王五,所以王五只需要看有没有报警信号。这种方式被称为信号驱动式IO

4.赵六,身家万贯的热爱钓鱼的钓鱼佬,秉持着杆多鱼多的准则,直接准备了一卡车鱼竿,依次完成准备工作,入水后准备开钓,由于鱼竿数量多,赵六在岸上来回检查看哪只鱼竿上货了。这种方式称为多路复用(多路转接)。

5.田七,以吃鱼为爱好的世界五百强上市公司CEO,司机小王拉着田七来钓鱼,刚准备钓鱼,由于公司业务繁忙,对钓鱼不是很感冒,田七又回到了公司处理,且告知小王车上装备齐全,你在这里钓鱼,掉满了之后再给我打电话,我来接你。对于田七来说,这种方式称为异步IO

首先我们来看看阻塞式与非阻塞式 IO:

两者在效率上基本一样(有人觉得非阻塞式效率会高一点,这指的是非阻塞可以去干别的事,但是对于IO效率(等+拷贝)是一样的),区别是等的方式不同,一个一直等,一个间隔等一会儿。

再看同步与异步:

同步:参与等或者参数与了拷贝,都可以是同步IO。异步:两者都没参与,只拿数据。

那么综上:哪一种效率更高呢?

实际上就是田七,多路复用IO。我们学习的重点也就是多路复用.

非阻塞式IO

fcntl
一个文件描述符 , 默认都是阻塞 IO,通过函数fcntl可以修改文件描述符的属性。
函数原型如下
#include <unistd.h>
#include <fcntl.h>
int fcntl(int fd, int cmd, ... /* arg */ );
传入的 cmd 的值不同 , 后面追加的参数也不相同 .
fcntl 函数有 5 种功能 :
复制一个现有的描述符( cmd=F_DUPFD .
获得 / 设置文件描述符标记 (cmd=F_GETFD F_SETFD).
获得 / 设置文件状态标记 (cmd=F_GETFL F_SETFL).
获得 / 设置异步 I/O 所有权 (cmd=F_GETOWN F_SETOWN).
获得 / 设置记录锁 (cmd=F_GETLK,F_SETLK F_SETLKW).
我们此处只是用第三种功能 , 获取 / 设置文件状态标记 , 就可以将一个文件描述符设置为非阻塞
实现函数 SetNoBlock
基于 fcntl, 我们实现一个 SetNoBlock 函数 , 将文件描述符设置为非阻塞 .
void SetNoBlock(int fd) { 
 int fl = fcntl(fd, F_GETFL); 
 if (fl < 0) { 
 perror("fcntl");
 return; 
 }
 fcntl(fd, F_SETFL, fl | O_NONBLOCK); 
}
使用 F_GETFL 将当前的文件描述符的属性取出来 ( 这是一个位图 ).
然后再使用 F_SETFL 将文件描述符设置回去 . 设置回去的同时 , 加上一个 O_NONBLOCK 参数

了解到这两个主要的接口,我们就可以简单的实现一下非阻塞:

#include<iostream>
#include<string.h>
#include<unistd.h>
#include<fcntl.h>
#include<cstdio>
using namespace std;



//默认阻塞,我们设置为非阻塞
void setNonBlock(int fd)
{
  int f1=fcntl(fd,F_GETFL);
  if(f1<0)
  {
    cerr<<"get failed"<<endl;
  }
  //设置标记位,实际上是在原基础上新增一个标记位
  fcntl(fd,F_SETFL,f1 | O_NONBLOCK);//以非阻塞式打开
  cout<<"set O_NONBLOCK succed"<<endl;
}



int main()
{
  char buffer[1024];//缓冲区
  while(true)
  {
    printf("please enter#");
    fflush(stdout);
    setNonBlock(0);
    sleep(1);
    ssize_t size=read(0,buffer,sizeof(buffer)-1);//从标准输入中读取
    if(size>0)
    {
      buffer[size-1]=0;
      cout<<"读取到数据 :"<<buffer<<endl;
    }else if(size==0)
    {
      cout<<"read done"<<endl;
      break;
    }else{
      cerr<<errno<<":"<<strerror(errno)<<endl;
    }
  }

  return 0;
}

一般阻塞式,系统就会等待着我们输入数据后,才进行下一步工作(这里是打印输入的字符)。

我们设置为非阻塞式有几点需要注意:

1.设置为非阻塞,系统不会等待着我们输出,才打印,而是以读取到数据<0打印错误信息。

2,出错非两种情况:第一个就是fd真的异常 ,第二个就是底层还没就绪,你就返回了。(这里是第二种)。

3.非阻塞也能获取到数据并打印,不过给你就绪的时间非常短。

对于第二点,如何区分是还没就绪还是fd异常,可以通过打印错误码而知晓。

多路复用(多路转接)

实际上对于阻塞式,非阻塞式,非阻塞式轮询这些等待和拷贝一个函数就会搞定了,但是对于多路转接,因为要提高IO效率,因此等待与拷贝是分开的的,多路复用的本质就是非阻塞轮询,因此我们需要将非阻塞和轮询逐一实现。

IO多路转接之select

初识select

系统提供 select 函数来实现多路复用输入 / 输出模型 .
select 系统调用是用来让我们的程序监视多个文件描述符的状态变化的 ;
程序会停在 select 这里等待,直到被监视的文件描述符有一个或多个发生了状态改变

 select函数原型

 select的函数原型如下: #include <sys/select.h>

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

其中有三个参数类型为fd_set,位图类型,为输入输出型参数。

为输入型参数时:用户告诉内核,我给一些fd,你帮我注意观察一些fd的读事件,如果读事件就绪,你就告诉我

为输出型参数时:内核告诉用户,你给我传来的fd,我已经帮你知晓了有哪些已经就绪(进行位图修改),你可以读了。

至于为什么用位图,因为位图本质就是设置某个数的第几位为0/1,举例:如果此时内核发现0号和1号文件被描述符已经就绪,就会把第0位和第1位比特位置为1, (比特位的位置代表对应文件描述符,从右向左)。比特位的内容就可以表示内核是否要关心。

作为输入型位图:

比特位的位置代表文件描述符

比特位的内容代表是否内核需要关心

 作为输出型位图:

比特位的位置还是文件描述符

比特位的内容代表用户关心的fd的读事件就绪了

于是就提供了一系列位图的接口用来修改为位图。

  其中timeout为一个结构体,里面有两个成员:一个代表秒,一个代表微秒。、

如果设置了timeout,在该时间内如果有已经就绪的,立即返回,之后的timeout表示剩下的时间。

如果没设置,只要有一个就绪了,就返回。

select的返回值:

  1. 返回值

    • 正常情况下,返回已准备好的文件描述符的个数。
    • 经过超时时长后仍无设备准备好,返回值为 0。
    • 如果 select 被某个信号中断,返回 -1 并设置 errno 为 EINTR
    • 如果出错,返回 -1 并设置相应的 errno

对于服务端来说,使用多路复用可以提高IO效率,当没有用访问服务端使,此时select的个数为零,但一有客户端访问,就会有对应的文件描述符(接收缓冲区)被创建,并且添加进行状态检测。

select.hpp

#include "Socket.hpp"
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <iostream>

const int max_fd = (sizeof(fd_set)) * 8; // 8个位图最多存储的bit位
static const uint32_t defaultport = 8080;
class SelectServer
{
public:
  SelectServer(const uint32_t Port = defaultport) : port(Port)
  {
    // 对辅助数组初始化为-1
    for (int i = 0; i < 1024; i++)
    {
      fd_arry[i] = defaultfd;
    }
  }
  ~SelectServer()
  {
    _listenfd.Close();
  }
  bool Init()
  {
    _listenfd.Createsockfd();
    _listenfd.Bind(port);
    _listenfd.Listen();
    return true;
  }
  void Accept()
  {
     // 连接就绪,获取新连接可以通信了
        std::string clientip;
        uint16_t clientport;
        int sockfd = _listenfd.Accept(&clientip, &clientport); // 就绪了,不在阻塞
        if (sockfd < 0)
          return;
        std::cout << "获取连接成功"
                  << "描述符:" << sockfd << std::endl;
        // 获取之后不能进行读取,而是设置select,不然还是会阻塞在select中
        int pos = 1;
        for (; pos < max_fd; pos++)
        {
          if (fd_arry[pos] != defaultfd)
          {
            continue;
          }
          else
          {
            break;
          }
        }
        if (pos == max_fd)
        {
          std::cout << "can not handle" << std::endl;
          close(sockfd); // 无法处理就关掉接收的文件描述符
        }
        else
        {
          // 找到了空闲位置,此时放到fd_arry里面管理
          fd_arry[pos] = sockfd;
        }

  }
  void recver(int fd,int i)
  {
        // 不在位图中代表的是接收后新的fd
        char buffer[1024];
        ssize_t n = read(fd, buffer, sizeof(buffer) - 1);
        if (n > 0)
        {
          std::cout << "read meesage:" << buffer << std::endl;
        }else if(n==0)
        {
          std::cout<<"client closed" << std::endl;
          //如果为空,说明要关闭连接了
          close(fd);
          fd_arry[i]=defaultfd;

        }else{
          std::cout<<"read warning"<<std::endl;
          close(fd);
          fd_arry[i]=defaultfd;
        }
  }
  void Disapacher(fd_set fds)
  {
    // 为了确定就绪的文件描述符。需要遍历整个数组
    for (int i = 0; i < max_fd; i++)
    {
      int fd = fd_arry[i];
      if (fd == defaultfd)
      {
        continue;
      }
      // 先判断文件描述符是否在位图中
      if (fd == _listenfd.Fd() && FD_ISSET(fd, &fds)) // 判断是否是位图中的,是就代表就绪
      {
       Accept();
      }
      else
      {
        recver(fd,i);
      }
    }
  }
  void Start()
  {
    int listensock = _listenfd.Fd(); // 获取套接字的文件描述符
    fd_arry[0] = listensock;
    fd_set rfds;                     // 读的文件描述符位图
    FD_ZERO(&rfds);                  // 先进行清空
    struct timeval timeout = {10, 0}; // 设置时间戳,每隔五秒检验一次
    int fdmax = fd_arry[0];
    while (true)
    {
      for (int i = 0; i < max_fd; i++)
      {
        if (fd_arry[i] == defaultfd)
        {
          continue;
        }
        else
        {
          // 如果不为-1,说明该位置有要检测的文件描述符
          FD_SET(fd_arry[i], &rfds); // 将指定的文件描述符添加进来(交给select进行检测)
          // 同时找出最大的文件描述符
          if (fd_arry[i] > fdmax)
          {
            fdmax = fd_arry[i];
            std::cout << "max has updated:" << fdmax << std::endl;
          }
        }
      }

      // 这里不能直接进行accept,accept的本质就是检测并获取listenfd上面的事件,accept只能阻塞等一个
      // 因为我们要实现多路复用,所以这里是要去等待多个套接字并检测他们的状态,使用select
      int n = select(fdmax + 1, &rfds, nullptr, nullptr, &timeout); // 通过内核的select帮我进行检测,刚开始启动只有一个listenfd
      switch (n)
      {
      case 0:
        std::cout << "time out" << std::endl;
        break;
      case -1:
        std::cout << "select error" << std::endl;
      default:
        // 有链接了,如果对链接不处理,select会一直通知你,需要处理并设置位图
        // 新连接到来我们认为是读事件,
        Disapacher(rfds);//名为事件派发器
        break;
      }
    }
  }

private:
  Sock _listenfd;
  uint32_t port;
  int fd_arry[max_fd]; // 辅助数组,统计所有描述符
};

 main.cc

#include"SelectSever.hpp"
#include<memory>
#include<iostream>


void usehelp(char*s)
{
  printf("use correct code:%s port[1024]\n",s);
}
int main(int argc,char*argv[])
{

  std::unique_ptr<SelectServer>  server(new(SelectServer));
  server->Init();
  server->Start();
  return 0;
}

编写select的思路:

1.首先创建,绑定,监听套接字,之后创建文件描述符数组对所有文件描述符管理。

2.插入最开始的listenfd到数组里,遍历数组找出最大的文件描述符,设置位图用来标识就绪的文件描述符,之后select数组最大fd+1文。

2.select之后判断是否有新链接,有新的连接(这里我们默认是读事件)就会有新的文件描述符,对事件进行处理。

3.处理时,数组里的文件描述符分两种,一种使位图中的就绪的fd,一种是就绪后进行accept的新的fd,因此对整个数组进行判断,是位图中的,就行新接受新连接,是accept的进行读事件,反之是-1,continue。

但是select是有很明显的缺陷:

1.等待的fd是有上限的--1024

2.输入输出参数较多,数据拷贝频率高

3.输入输出参数多,每一次都对需要关心的fd重置以达到复用

4.管理数组fd,有太多次进行遍历

为了解决种种情况,我们使用了下一种接口(poll):

poll 函数接口
说明
fds 是一个 poll 函数监听的结构列表 . 每一个元素中 , 包含了三部分内容 : 文件描述符 , 监听的事件集合 , 返 回的事件集合.
nfds 表示 fds 数组的长度 .
timeout 表示 poll 函数的超时时间 , 单位是毫秒 (ms).
返回结果
返回值小于 0, 表示出错 ;
返回值等于 0, 表示 poll 函数等待超时 ;
返回值大于 0, 表示 poll 由于监听的文件描述符就绪而返回

这里我们介绍一下,重点放在select_poll上。

IO多路转接之epoll

epoll的说明是:为了处理大批量句柄而改进的poll,

1.认识有关epoll的接口

epoll_create 创建一个epoll模型

 epoll_wait  等待多长时间进行一次事件检查 返回就绪的fd与event

 返回值位已经准备就绪的个数。对于event的宏表示的就绪时间,类型如下:

EPOLLIN : 表示对应的文件描述符可以读 ( 包括对端 SOCKET 正常关闭 );
EPOLLOUT : 表示对应的文件描述符可以写 ;
EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 ( 这里应该表示有带外数据到来 );
EPOLLERR : 表示对应的文件描述符发生错误 ;
EPOLLHUP : 表示对应的文件描述符被挂断 ;
EPOLLET : EPOLL 设为边缘触发 (Edge Triggered) 模式 , 这是相对于水平触发 (Level Triggered) 来说的 .
EPOLLONESHOT :只监听一次事件 , 当监听完这次事件之后 , 如果还需要继续监听这个 socket 的话 , 需要
再次把这个 socket 加入到 EPOLL 队列里 .

epoll_ctl  作用是对系统的文件描述符的事件增加,删除,修改。即对文件描述符做管理

 第一个参数就是epoll_create的返回值,op代表三个选项(增加,修改,删除),第三个代表那个fd对应的哪个事件。

对于epoll的使用时要使用这三个系统接口的。

2.epoll原理

当某一进程调用 epoll_create 方法时, Linux 内核会创建一个 eventpoll 结构体,这个结构体中有两个成 员与epoll 的使用方式密切相关
每一个 epoll 对象都有一个独立的 eventpoll 结构体,用于存放通过 epoll_ctl 方法向 epoll 对象中添加进来 的事件.
这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来 ( 红黑树的插 入时间效率是lgn ,其中 n 为树的高度 ).
而所有添加到 epoll 中的事件都会与设备 ( 网卡 ) 驱动程序建立回调关系,也就是说,当响应的事件发生时 会调用这个回调方法.
这个回调方法在内核中叫 ep_poll_callback, 它会将发生的事件添加到 rdlist 双链表中 .
epoll 中,对于每一个事件,都会建立一个 epitem结构体

实际上epoll和poll是两个不同的模型,没太多关系,epoll是专门为用户设计的一个底层事件调度的模型。

3.编写epoll

对于select来说,我们使用一个系统调用接口加上文件描述符数组来进行非阻塞式的等待。

那么对于epoll来说就是通过三个系统调用接口,1.创建模型,3.进行非阻塞式等待(时间自己设置),2.使用epoll_ctl管理事件(本质上时使用红黑树的节点绑定事件进行高效的管理)。

每一个事件和她的描述符都被封装在一个结构体epoll_event当中,之后通过一个数组管理这里epol_event。

第一个不太完美的版本的eopp如下:

epoler.hpp

//封装epoll接口
#pragma once
#include <sys/epoll.h>
#include <unistd.h>
#include<iostream>
#include<string.h>
#include"Nocopy.hpp"

uint32_t EVENT_IN=(EPOLLIN); //读事件
uint32_t EVENT_OUT=(EPOLLOUT);//写事件

//因为我们不想该类被拷贝,所以继承不被拷贝的类
class Epoll :public nocopy
{
    public:
    static const int size=128;
       Epoll():_timeout(3000)     //设置检测时间3s一次,若为零就是非阻塞等待
       {
         //创建epoll模型
         _epfd=epoll_create(size);
         if(_epfd==-1)
         {
            std::cout<<"errno: "<<errno<<"error message:"<<strerror(errno)<<std::endl;
         }else
         {
            std::cout<<"create succed ,epfd:"<<_epfd<<std::endl;
         }

       }
       //使用epoll接口进行等待
       int Epollwait(struct epoll_event events[],int num) //输入参数events表示就序的事件 num表示个数
       {
         int ret=epoll_wait(_epfd,events,num,_timeout);//就绪了就从这里拿
       }

       int Epollctl(int oper,int sock,uint32_t event)
       {
         //向指定模型根据oper添加修改删除事件
         if(oper==EPOLL_CTL_DEL)
         {
            //这里对删除事件进行单独处理
            int ret=epoll_ctl(_epfd,oper,sock,nullptr);
         }else
         {
            //注册事件
            struct epoll_event ev; //创建事件结构体并设置
            ev.events=event;//设置事件
            ev.data.fd=sock;//设置fd
            int ret=epoll_ctl(_epfd,oper,sock,&ev);//进行什么操作呢  本质就是向红黑树新增节点,节点绑定事件
            if(ret==-1)
            {
               std::cout<<"errno: "<<errno<<"error message:"<<strerror(errno)<<std::endl;
            }else
            {
               std::cout<<"add succed ,epfd:"<<_epfd<<std::endl;

            }
            return ret;
         }
       }
       ~Epoll()
       {
        if(_epfd==-1)
        {
         close(_epfd);
         std::cout<<"close succed ,epfd:"<<_epfd<<std::endl;
        }
       }
    private:
        int _epfd;
        int _timeout; 
};

epollserver.hpp

#pragma once
#include"Socket.hpp"
#include"Epoller.hpp"
#include"Nocopy.hpp"
#include<iostream>
#include<memory>
#include<sys/epoll.h>

//设置你要关心的时事件

const uint16_t defalutport=8080;
class EpollServer :public nocopy
{
     public:
         static const int num=64;//每次从就绪队列中取64个就绪事件
        EpollServer(uint16_t port=defalutport):_port(port),_listensock(new Sock),_epoller(new Epoll)
        {
        }
        bool Init()
        {
         _listensock->Createsockfd();
         _listensock->Bind(_port);
         _listensock->Listen();
         return true;
        }
        void Start()
        {
          //将listensock添加到epoll中检测你关心的读写事件,这里就是添加
        _epoller->Epollctl(EPOLL_CTL_ADD,_listensock->Fd(),EVENT_IN);
        
         struct epoll_event evns[num];//num代表最多就绪的个数,若果等待的个数太多,就会分配批次取wait
         while(true)
         {
          int n=_epoller->Epollwait(evns,num); //返回值代表就绪的个数
          if(n>0)
          {
            //在事件插入之后,有事件就绪了
            std::cout<<"检测对应的fd的事件,fd:"<<evns[0].data.fd<<std::endl; 
            //既然就绪,那就开始处理事件
            HandlerEvent(evns,n);

          }else if(n==0)
          {
            std::cout<<"time out"<<std::endl;
          }else
          {
            std::cout<<"wait error"<<std::endl;
          }
         }

        }
        void Accepter()
        {
          std::string clientip;
                uint16_t clientport;
               int sock =_listensock->Accept(&clientip,&clientport);
               if(sock>0)
               {
                //获取到新连接再插入到红黑树当中
                _epoller->Epollctl(EPOLL_CTL_ADD,sock,EVENT_IN);
               }else
               {
                  std::cout<<"accept error"<<std::endl;
               }
        }
        void Dispatcher(int fd)
        {
          char buffer[1024];
          // 已经获取新连接,开始read
          int n = read(fd, buffer, sizeof(buffer) - 1);
          if (n > 0)
          {
            buffer[n] = 0;
            std::cout << "读取到数据:" << buffer << std::endl;
            //读完之后写回给客户端
            std::string echo_string="server get message";
            std::string content=echo_string+std::string(buffer);
            write(fd,content.c_str(),content.size());

          }else if(n==0)
          {
            std::cout << "client close:"<< std::endl;
            //删除对应的文件描述符 
            _epoller->Epollctl(EPOLL_CTL_DEL,fd,0); //从红黑树中拿出去,再关闭fd
            close(fd);
          }else
          {
            std::cout << "read error!!" << std::endl;
            _epoller->Epollctl(EPOLL_CTL_DEL,fd,0); //从红黑树中拿出去,再关闭fd
            close(fd);
          }
        }

        //这里的事件假设就两种,读 写
        void HandlerEvent( struct epoll_event evns[],int num)
        {
          for(int i=0;i<num;i++)
          {
            uint32_t event=evns[i].events;
            int fd=evns[i].data.fd; 
            //判断是哪一种事件
            if(event&EVENT_IN)
            {
              //判断fd是等待的fd,还是已经获取新连接的fd
              if(fd==_listensock->Fd())
              {
                Accepter();
              }else
              {
                Dispatcher(fd);
              }
              
            }else if(event & EVENT_OUT)
            {
              //写事件就绪
            }
            
           }
        }


        ~EpollServer()
        {
            _listensock->Close();
        }


     private:
        std::shared_ptr<Sock> _listensock;
        std::shared_ptr<Epoll> _epoller;
        uint16_t _port;

};

4.epoll的工作模式

epoll的工作模式有利各种:ET 和LT

LT(level triggered)工作模式--水平工作模式

epoll 默认状态下就是 LT 工作模式 .
1.当 epoll 检测到 socket 上事件就绪的时候 , 可以不立刻进行处理 . 或者只处理一部分 .
如上面的例子 , 由于只读了 1K 数据 , 缓冲区中还剩 1K 数据 , 在第二次调用 epoll_wait , epoll_wait 仍然会立刻返回并通知socket 读事件就绪 .
直到缓冲区上所有的数据都被处理完 , epoll_wait 才不 会立刻返回 .
2.支持阻塞读写和非阻塞读写

 通俗点说,如果上层有新的数据,你不拿走,上层就一直通知你让你取数据,只要有就拿走。

ET(Edge Triggered)工作模式  --边缘工作模式

如果我们在第1步将socket添加到epoll描述符的时候使用了EPOLLET标志, epoll进入ET工作模式.

epoll 检测到 socket 上事件就绪时 , 必须立刻处理 .
如上面的例子 , 虽然只读了 1K 的数据 , 缓冲区还剩 1K 的数据 , 在第二次调用 epoll_wait 的时候 ,
epoll_wait 不会再返回了 .
也就是说 , ET 模式下 , 文件描述符上的事件就绪后 , 只有一次处理机会 .
ET 的性能比 LT 性能更高 ( epoll_wait 返回的次数少了很多 ). Nginx 默认采用 ET 模式使用 epoll.
只支持非阻塞的读写

通俗点说,只有发生变化(从没链接到有新连接,从一个变多个,从多个变没),就会进行通知取走数据,只有变化,才会拿走。

由于通知的次数少,且通知是有限的,所以一般而言ET的工作效率更高一些,但也因为之中特性,程序员必须每次把数据都取完(循环读取,直到读取出错),否则就会遗漏。

但是读完数据,我们此时的read是阻塞的,我们不能让她阻塞,这会导致服务器挂起(这是不好的),所以我们需要设置读是非阻塞的。直到非阻塞状态下还读完了,就会返回错误码。

且由于我们把缓冲区的数据都把走了,这一位则TCP通信时,窗口就更大了,一次拿去的数据也多了。

注意:

即使如此,一般情况ET比LT高效,但是你是一直比LT高效吗,而且既然你ET都能一次性非阻塞的读取完,我LT不行吗?只是我们一般去使用ET,如果可以,你也可以去使用LT修改。

读写的改进 reactor

所以我们的读写是不合理的,首先没有协议的定制,其次读取并不是非阻塞的。

这里就还是以编写计算器的客户端与服务端为例:

重要的读写部分在TcpServer.hpp中:

TcpServer.hpp

#pragma once
#include<string>
#include<iostream>
#include<memory>
#include<functional>
#include<unordered_map>
#include <arpa/inet.h>
#include"Nocopy.hpp"
#include"Socket.hpp"
#include"Epoller.hpp"
#include"Comen.hpp"
#include"Protocol.hpp"
#include"Calculator.hpp"

class Connection;
class TcpServer;
using funct=std::function<void (std::shared_ptr<Connection>)>; //定义了返回值为void,参数为td::shared_ptr<Connection>指针的包装器
const static int buffersize=1024;
const uint16_t defaultport=8080;                                                                                                    
const int num=64; 
class Connection
{
    public:
      void SetHandle(funct &recv_cb,funct &send_cb,funct &except_cb)
      {
          _recver_callback=recv_cb;
          _send_callback=send_cb;
          _except_callback=except_cb;
      }
      void Appendinbuffer(std::string buffer)
      {
        _inbuffer+=buffer;//输入缓冲区

      }
      void Appendoubuffer(std::string buffer)
      {
        _outbuffer+=buffer;
      }
      const std::string &inbuffer()
      {
        return _inbuffer;
      }

      Connection(int sock,std::shared_ptr<TcpServer> tcpserver_ptr):_sock(sock),_tcpserver_callback(tcpserver_ptr)//回值指针用来传递TCPserver
      {
      }
      int getsock()
      {
        return _sock;
      }
      std::string& getinbuf()
      {
        return _inbuffer;
      }
      std::string& getoutbuf()
      {
        return _outbuffer;
      }
      ~Connection(){}
    private:
    int _sock;
    private:
    std::string _inbuffer;//输入缓冲区
    std::string _outbuffer;//输出缓冲区
    public:
    funct _recver_callback; //接收回调
    funct _send_callback;   //发送回调
    funct _except_callback; //异常回调

    //tcpserver回值指针
    std::shared_ptr<TcpServer> _tcpserver_callback;
};

class TcpServer
{
    public:
        TcpServer(funct OnMessage,uint16_t port=defaultport):_epoll_server(new Epoll),_listensocket(new Sock),_port(port),_quit(true)//回调函数用来设置处理事件
        {
            _OnMessage=OnMessage;
            _listensocket->Createsockfd();
            _listensocket->Bind(_port);
            _listensocket->Listen();
            SetNonBlock(_listensocket->Fd()); //将文件设置为非阻塞等待
            AddConnection(_listensocket->Fd(),EVENT_IN,std::bind(&TcpServer::Accepter,this,std::placeholders ::_1),nullptr,nullptr);
        }
        void Init()
        {
           
        }
        void Accepter(std::shared_ptr<Connection> connection)
        {
          //连接到来开始接受
          while(true)
          {
            struct sockaddr_in peer;
            socklen_t len=sizeof(peer);
            int fd=::accept(connection->getsock(),(sockaddr*)&peer,&len); //获取新链接
            if(fd>0)
            {
              uint16_t clientport=ntohs(peer.sin_port);
              char ipbuffer[10];
              inet_ntop(AF_INET,&peer.sin_addr,ipbuffer,sizeof(ipbuffer));
              std::string ip=std::string(ipbuffer);
              std::cout<<"get new link,ip:"<<ip<<",fd :"<<fd<<",port"<<clientport<<std::endl;
              //获取的新连接设置非阻塞,现在是ET模式
              SetNonBlock(fd);
              //之后再添加到哈希表中,epoll_event中,
               AddConnection(fd,EVENT_IN,\
                   std::bind(&TcpServer::Recver,this,std::placeholders::_1),   //根据函数地址与参数bind成一个包装器对象
                   std::bind(&TcpServer::Sender,this,std::placeholders::_1),\
                   std::bind(&TcpServer::Exepter,this,std::placeholders::_1));
            }else
            {
              //直到非阻塞被读到没有新的连接了
              if(errno==EWOULDBLOCK)
              {
                break;
              }else if(errno==EINTR)
              {
                continue;
              }else{
                //读出错
                break;
              }
              
            }
          }

        }
        void AddConnection(int sock,uint32_t event,funct recv_cb,funct send_cb,funct except_cb)
        {
            // 将listensock添加到epoll中检测你关心的读写事件,这里就是添加 并且设置为边缘工作模式
            _epoll_server->Epollctl(EPOLL_CTL_ADD, sock, event);
            //c除此之外,每一个fd都要构建成Connetcion对象,通过connect管理事件的处理
            std::shared_ptr<Connection> new_connection=std::make_shared<Connection>(sock,std::shared_ptr<TcpServer>(this));
            new_connection->SetHandle(recv_cb,send_cb,except_cb);
            //第三步,就是添加到unordered_map
            _connection_map[sock]=new_connection;
        }

        //事件管理,进行事件的处理
        void Recver(std::shared_ptr<Connection> connection)
        {
          int sock=connection->getsock();
          //ET模式读
          while(true)
          {
            char buffer[buffersize];
            memset(buffer,0,sizeof(buffer));
            ssize_t n=recv(sock,buffer,sizeof(buffer)-1,0); //这里的数据默认为字符流,如果还有二进制,需要换为vector 
            if(n>0)
            {
              //读取成功
              connection->Appendinbuffer(buffer);
            }else if(n==0)
            {
              //连接关闭
              std::cout<<"link closed !"<<std::endl;
              connection->_except_callback(connection);


            }else
            {
             //读取错误
              if(errno==EWOULDBLOCK)
              {
                break;
              }else if(errno==EINTR)
              {
                continue;
              }else{

              std::cout<<"read error!"<<std::endl;
              connection->_except_callback(connection);
              break;
              }
            }
          }
            //读完之后,将数据交给上层
            _OnMessage(connection); //协议的定制
        }
        void Sender(std::shared_ptr<Connection> connection)
        {
          while(true)
          {
            std::string &buffer=connection->getoutbuf();
            int sockfd=connection->getsock();
            ssize_t n=send(sockfd,buffer.c_str(),buffer.size(),0);
            if(n>0)
            {
              //清空缓冲区
              buffer.erase(0,n);
              if(buffer.empty())
                break;
            }else if(n==0)
            {
              return ;
            }else 
            {
              if(errno==EWOULDBLOCK)
              {
                break;
              }else if(errno==EINTR)
              {
                continue;//信号中断,跳过本次
              }else 
              {
                //发失败了,进入异常处理
                std::cout<<"send error"<<std::endl;
                connection->_except_callback(connection);
                return;
              }
            }
           if(!buffer.empty())
           {
              //对写事件的关心
              
              EnableEvent(connection->getsock(),true,true);
           }else
           {
             //关闭对写事件的处理
              EnableEvent(connection->getsock(),true,false);
           }
          }
        }
        void EnableEvent(int fd,bool readable,bool writeable )
        {
          uint32_t event=0;
          event |=((readable ? EPOLLIN:0)|(writeable ? EPOLLOUT : 0)|EPOLLET);
          _epoll_server->Epollctl(EPOLL_CTL_MOD,fd,event);//修改事件的读写,如果没发完,就继续发,通过修改事件为写
        }
        void Exepter(std::shared_ptr<Connection> connection)
        {
          //处理链接异常或着断开
          std::cout<<"事件异常,断开连接!"<<std::endl;
          //首先移除链接
          //EnableEvent(connection->getsock(),false,false);//读写都设置为False
          _epoll_server->Epollctl(EPOLL_CTL_DEL,connection->getsock(),0);//移出事件 
          //关闭异常的文件描述符
          close(connection->getsock());
          std::cout<<"已移除connection,已关闭文件描述符:"<<connection->getsock()<<std::endl;
          //把map中的也删了
        }

        bool isConnectionsafe(int fd)
        {
          auto it=_connection_map.find(fd);
          if(it==_connection_map.end())
          {
            return false;
          }
          return true;
        }
        void Disptcher(int timeout)
        {
          //进行等待就绪时间
          int n=_epoll_server->Epollwait(events,num,timeout);
          for(int i=0;i<n;i++)
          {
            uint32_t event=events[i].events;
            int sock=events[i].data.fd;
            //进行事件判断,并转化成读写事件
            if((event & EVENT_IN)&& isConnectionsafe(sock))
            {
              //读就绪
              if(_connection_map[sock]->_recver_callback)//函数存在
              {
                _connection_map[sock]->_recver_callback(_connection_map[sock]);//fd对应的connetcion信息,其中调用对应的方法,参数为connection对象
              }
            }
            if((event & EPOLLOUT)&& isConnectionsafe(sock))
            {
              //写就绪
              if(_connection_map[sock]->_recver_callback)
              {
                _connection_map[sock]->_send_callback(_connection_map[sock]);
              }
            }
            if(event & EPOLLHUP)
            {
              //读关闭
                event |=(EPOLLIN & EPOLLOUT);
            }
            if(event &EPOLLERR)
            {
              //错误
              
              event |=(EVENT_IN & EVENT_OUT);
            }
          }

        }
        void Loop()
        {
            _quit=false;
            while (!_quit)
            {
             Disptcher(3000);//3s

            }
            _quit=true;
        }
        
        ~TcpServer(){}

    private:
        std::shared_ptr<Epoll> _epoll_server;  //epoll接口
        std::unordered_map<int,std::shared_ptr<Connection>> _connection_map;  //连接池
        std::shared_ptr<Sock> _listensocket; //套接字接口
        struct epoll_event events[num];
        uint32_t _port;
        bool _quit; //是否退出

        funct _OnMessage;//上层回调处理信息

}; 

剩余代码的所在地:reactor · 但成伟/编程学习 - 码云 - 开源中国 (gitee.com)

主要难点是对事件与链接的管理,事件的执行,读写非阻塞这几点。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1652292.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MFC DLL注入失败一些错误总结

使用cheat Engine为MFC窗口程序注入DLL时一定要注意&#xff0c;被注入的exe程序和注入的DLL 的绝对路径中一定不要带有中文字符&#xff0c;否则会遇到各种各样的奇怪错误&#xff0c;如下所示&#xff1a; 以下是dll绝对路径中均含有中文字符&#xff0c;会报错误&#xff…

浙大最新开源:MGMap-掩码引导学习的在线矢量化高精地图构建方法

论文标题&#xff1a; MGMap: Mask-Guided Learning for Online Vectorized HD Map Construction 论文作者&#xff1a; Xiaolu Liu, Song Wang, Wentong Li, Ruizi Yang, Junbo Chen, Jianke Zhu 作者单位&#xff1a;浙江大学&#xff0c;有鹿科技 开源地址&#xff1a;…

RUST 编程语言使构建更安全的软件变得更加容易。RUST ALL THE THINGS 需要什么?

人不走空 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌赋&#xff1a;斯是陋室&#xff0c;惟吾德馨 目录 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌…

极致设计!详解专业网页设计的全套步骤和流程

在当今的数字时代&#xff0c;拥有一个专业、易于使用和有吸引力的网页对任何企业或个人都至关重要。专业的网页设计是实现这一目标的关键步骤之一。本文将详细介绍专业的网页设计步骤和过程&#xff0c;以帮助您了解如何设计一个优秀的网页。 在介绍专业网页设计步骤和流程之…

温暖家居新风尚,能率壁挂炉——设计新风尚,体验再升级

随着家居品质要求的提升&#xff0c;现代人对家居的舒适性和设计感有了更高的追求。壁挂炉&#xff0c;作为现代家居中不可或缺的一部分&#xff0c;其重要性日益凸显。中国国际供热通风空调、卫浴及舒适家居系统展览会&#xff08;ISH China & CIHE&#xff09;将于2024年…

短视频矩阵系统电脑端/手机版双端加盟saas技术源头开发

短视频矩阵系统是为了帮助用户更高效地管理、编辑、发布和分析短视频内容而设计的综合性平台。这种系统通常包含电脑端和手机端两个部分&#xff0c;每个部分都有其特定的功能模型&#xff0c;以适应不同设备的操作习惯和使用场景。以下是根据搜索结果中提供的信息&#xff0c;…

差动绕组电流互感器过电压保护器ACTB

安科瑞薛瑶瑶18701709087/17343930412 电流互感器在运行中如果二次绕组开路或一次绕组流过异常电流&#xff0c;都会在二次侧产生数千伏甚至上万伏的过电压。这不仅会使CT和二次设备损坏&#xff0c;也严重威胁运行人员的生命安全&#xff0c;并造成重大经济损失。采用电流互感…

java项目+maven+sonarqube+githook 实现代码提交审查

java项目mavensonarqubegithook 实现代码提交审查 当在团队开发过程中, 由于开发人员技术和风格的不同,导致代码质量和风格各不相同, 为了减少合并时的工作量, 以及基础的一些bug 的避免, 可以使用 (sonarqube阿里巴巴开发规范) 实现代码质量的检测. 为了更加简便, 即在使用git…

大规模LiDAR数据处理

点云存在许多描述环境或建筑物等物体的 x、y、z 坐标。通过激光技术 (LiDAR) 获取的点云通常带有每个坐标的额外测量值和特征。例如&#xff0c;反射强度、回波次数、回波、扫描角度和 RGB 值。换句话说&#xff0c;点云本质上是大量的数据集。 在本文中&#xff0c;我们使用开…

acwing算法提高之数据结构--平衡树Treap

目录 1 介绍2 训练 1 介绍 本博客用来记录使用平衡树求解的题目。 插入、删除、查询操作的时间复杂度都是O(logN)。 动态维护一个有序序列。 2 训练 题目1&#xff1a;253普通平衡树 C代码如下&#xff0c; #include <cstdio> #include <cstring> #include …

小语言模型的潜力

想象一下这样一个世界&#xff1a;智能助手不在云端&#xff0c;而是在你的手机上&#xff0c;无缝了解你的需求并以闪电般的速度做出响应。这不是科幻小说&#xff0c;而是科幻小说。这是小语​​言模型 (SLM) 的希望&#xff0c;这是一个快速发展的领域&#xff0c;有可能改变…

Beego 使用教程 5:页面视图

beego 是一个用于Go编程语言的开源、高性能的 web 框架 beego 被用于在Go语言中企业应用程序的快速开发&#xff0c;包括RESTful API、web应用程序和后端服务。它的灵感来源于Tornado&#xff0c; Sinatra 和 Flask beego 官网&#xff1a;http://beego.gocn.vip/ 上面的 bee…

【笔试训练】day21

1.爱丽丝的人偶 题目意思就是构造一个序列&#xff0c;序列的每个元素要么比左右两个高&#xff0c;要么比左右两个低。 可以看成是一条上下波动的曲线。 我们可以模拟波动的这个过程。 假设有一个数组&#xff0c;里面元素是1-n.遍历每一个位置。用一个指针pos来表示当前检…

短视频矩阵系统源码/saas--总后台端、商户端、代理端、源头开发

短视频矩阵系统源码/saas--总后台端、商户端、代理端、源头开发 搭建短视频矩阵系统源码的交付步骤可以概括为以下几个关键环节&#xff1a; 1. **系统需求分析**&#xff1a;明确系统需要支持的功能&#xff0c;如短视频的上传、存储、播放、分享、评论、点赞等。 2. **技术选…

C++算法题 - 二叉树(2)

TOC 114. 二叉树展开为链表 LeetCode_link 给你二叉树的根结点 root &#xff0c;请你将它展开为一个单链表&#xff1a; 展开后的单链表应该同样使用 TreeNode &#xff0c;其中 right 子指针指向链表中下一个结点&#xff0c;而左子指针始终为 null 。展开后的单链表应该与…

比亚迪CAN数据实时监控分析应用数字化差异化的决策价值洞察

在当今这个信息化飞速发展的时代&#xff0c;汽车数字化转型已成为企业持续竞争力的关键。中国新能源汽车行业的领军企业——比亚迪&#xff0c;其数字化之旅充分展现了企业的创新精神和对未来的深远洞察。 比亚迪的数字化战略不是简单的技术应用&#xff0c;而是一场深刻的商…

Calendar 366 II for Mac v2.15.5激活版:智能日历管理软件

在繁忙的工作和生活中&#xff0c;如何高效管理日程成为了许多人的难题。Calendar 366 II for Mac&#xff0c;作为一款全方位的日历管理软件&#xff0c;以其独特的功能和优秀的用户体验&#xff0c;成为您的日程好帮手。 Calendar 366 II for Mac支持多种视图模式&#xff0c…

成功解决:Could not install packages due to an OSError

成功解决&#xff1a;Could not install packages due to an OSError: 错误&#xff1a; ERROR: Could not install packages due to an OSError: [Errno 2] No such file or directory: C:\Users\XIAODA~1\AppData\Local\Temp\pip-install-yeyhod79\opencv-contribpython_fb…

在线音视频下载

https://cobalt.tools/ 支持 bilibili 等网站

STM32F103学习笔记 | 7.使用寄存器点亮LED灯

int main(void) { // 分析指南者硬件原理图得知要实现点亮灯泡需要将PB0设置为低电位&#xff0c; // 查阅STM32F10x中文手册的端口配置低寄存器&#xff0c;得知一个PB有8个配置位&#xff0c;查阅手册找到了PB0的位置是3:2位置&#xff0c; // 插入未知知识&#xff1a;将端…