目录
五种IO模型
阻塞IO
非阻塞IO
信号驱动IO
IO多路转接
异步IO
小结
同步通信 vs 异步通信(synchronous communication/ asynchronous communication)
同步和异步关注的是消息通信机制
阻塞 vs 非阻塞
其他高级IO
非阻塞IO
fcntl
实现函数SetNoBlock
I/O多路转接之select
理解select执行过程
socket就绪条件
读就绪
写就绪
select的特点
select缺点
select使用示例: 检测标准输入输出
select使用示例
I/O多路转接之poll
poll函数接口
返回结果
poll示例: 使用poll监控标准输入
I/O多路转接之epoll
epoll初识
epoll的相关系统调用
epoll_create
epoll_ctl
epoll_wait
底层机制
epoll为什么高效?
epoll工作原理
epoll的优点(和 select 的缺点对应)
epoll工作方式
水平触发Level Triggered 工作模式
边缘触发Edge Triggered工作模式
对比LT和ET
理解ET模式和非阻塞文件描述符
epoll的使用场景
epoll中的惊群问题
epoll示例: epoll服务器(LT模式)
tcp_epoll_server.hpp
epoll示例: epoll服务器(ET模式)
tcp_socket.hpp
tcp_epoll_server.hpp
高级IO:之前的基础io对应的系统(文件描述符),属于是本地通信。而高级io是一些接口,即在网络通信中如何写出高效的网络服务器代码。
多路转接:linux有三种方案(select,poll,epoll)
再一个是多了个设计模式:reactor模式,基于多路转接,写出比较好的代码。
五种IO模型
当我们进行io的场景当中,如果对应的进程曾经参与过io的具体过程,那么他就叫做同步的过程,如果一个进程他在io的场景当中,没有参与io的细节,我们就称为异步io。(所以同步io和异步io差别就是有没有参与io细节)
阻塞IO
非阻塞IO
如果内核还未将数据准备好, 系统调用仍然会直接返回, 并且返回EWOULDBLOCK错误码
非阻塞 IO 往往需要程序员循环的方式反复尝试读写文件描述符 , 这个过程称为 轮询 . 这对 CPU 来说是较大的浪费 , 一般只有特定场景下才使用信号驱动IO
内核将数据准备好的时候, 使用SIGIO信号通知应用程序进行IO操作
IO多路转接
虽然从流程图上看起来和阻塞IO类似. 实际上最核心在于IO多路转接能够同时等待多个文件描述符的就绪状态异步IO
由内核在数据拷贝完成时, 通知应用程序(而信号驱动是告诉应用程序何时可以开始拷贝数据)
小结
任何IO过程中, 都包含两个步骤. 第一是等待, 第二是拷贝. 而且在实际的应用场景中, 等待消耗的时间往往都远远高于拷贝的时间. 让IO更高效, 最核心的办法就是让等待的时间尽量少异步io和信号的方式的区别:
异步io:等待数据和拷贝数据进程他都不关心,等你拷贝完成了再告诉进程,然后进程再回调去处理数据。
信号:是等的时候不管,但是数据准备好了进程要去拿
同步通信 vs 异步通信(synchronous communication/ asynchronous communication)
同步和异步关注的是消息通信机制
- 所谓同步,就是在发出一个调用时,在没有得到结果之前,该调用就不返回. 但是一旦调用返回,就得到返回值了; 换句话说,就是由调用者主动等待这个调用的结果.
- 异步则是相反,调用在发出之后,这个调用就直接返回了,所以没有返回结果; 换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果; 而是在调用发出后,被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用.
另外 , 我们回忆在讲多进程多线程的时候 , 也提到同步和互斥 . 这里的同步通信和进程之间的同步是完全不想干的概念
- 进程/线程同步也是进程/线程之间直接的制约关系.
- 是为完成某种任务而建立的两个或多个线程,这个线程需要在某些位置上协调他们的工作次序而等待、传递信息所产生的制约关系. 尤其是在访问临界资源的时候.
阻塞 vs 非阻塞
阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态
- 阻塞调用是指调用结果返回之前,当前线程会被挂起. 调用线程只有在得到结果之后才会返回.
- 非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程
其他高级IO
非阻塞 IO ,纪录锁,系统 V 流机制, I/O 多路转接(也叫 I/O 多路复用) ,readv 和 writev 函数以及存储映射IO( mmap ),这些统称为高级 IO我们此处重点讨论的是 I/O 多路转接
非阻塞IO
fcntl
一个文件描述符 , 默认都是阻塞 IO函数原型如下#include <unistd.h>#include <fcntl.h>int fcntl(int fd, int cmd, ... /* arg */ );传入的 cmd 的值不同 , 后面追加的参数也不相同fcntl函数有5种功能:
- 复制一个现有的描述符(cmd=F_DUPFD).
- 获得/设置文件描述符标记(cmd=F_GETFD或F_SETFD).
- 获得/设置文件状态标记(cmd=F_GETFL或F_SETFL).
- 获得/设置异步I/O所有权(cmd=F_GETOWN或F_SETOWN).
- 获得/设置记录锁(cmd=F_GETLK,F_SETLK或F_SETLKW)
我们此处只是用第三种功能 , 获取 / 设置文件状态标记 , 就可以将一个文件描述符设置为非阻塞实现函数SetNoBlock
于 fcntl, 我们实现一个 SetNoBlock 函数 , 将文件描述符设置为非阻塞void SetNoBlock(int fd) { int fl = fcntl(fd, F_GETFL); if (fl < 0) { perror("fcntl"); return; } fcntl(fd, F_SETFL, fl | O_NONBLOCK); }
- 使用F_GETFL将当前的文件描述符的属性取出来(这是一个位图).
- 然后再使用F_SETFL将文件描述符设置回去. 设置回去的同时, 加上一个O_NONBLOCK参数
I/O多路转接之select
系统提供 select 函数来实现多路复用输入 / 输出模型select 系统调用是用来让我们的程序监视多个文件描述符的状态变化的 ;程序会停在 select 这里等待,直到被监视的文件描述符有一个或多个发生了状态改变select的函数原型如下: #include <sys/select.h>int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);注意他是用的同一张位图,这是他的问题,也是我们写代码的时候要注意的
PS:为什么是max_fd+1 因为底层是按数组的方式去遍历的,左闭右开。select:time(最后一个参数),设置时间就是隔一段时间就timeout一次(如果没有就绪)。time设置为0就是,非阻塞等待。设置为nullptr就是永久阻塞等待(如果所等待的都没有就绪)。
PS:多路转接就是为了去取代多进程多线程的。
然后是SelectServer的一堆代码
select:批量设置,批量监听,批量处理。1 多路转接服务器就是为了取代多进程多线程版本的服务器的。
2 select是输入输出都是用的同一个数组参数解释 :
- 参数nfds是需要监视的最大的文件描述符值+1;
- rdset,wrset,exset分别对应于需要检测的可读文件描述符的集合,可写文件描述符的集 合及异常文件描述符的集合;
- 参数timeout为结构timeval,用来设置select()的等待时间.
参数timeout取值:
- NULL:则表示select()没有timeout,select将一直被阻塞,直到某个文件描述符上发生了事件;
- 0:仅检测描述符集合的状态,然后立即返回,并不等待外部事件的发生。
- 特定的时间值:如果在指定的时间段里没有事件发生,select将超时返回。
关于 fd_set 结构其实这个结构就是一个整数数组, 更严格的说, 是一个 "位图". 使用位图中对应的位来表示要监视的文件描述符
提供了一组操作fd_set的接口, 来比较方便的操作位图
void FD_CLR(int fd, fd_set *set); // 用来清除描述词组 set 中相关 fd 的位int FD_ISSET(int fd, fd_set *set); // 用来测试描述词组 set 中相关 fd 的位是否为真void FD_SET(int fd, fd_set *set); // 用来设置描述词组 set 中相关 fd 的位void FD_ZERO(fd_set *set); // 用来清除描述词组 set 的全部位关于timeval结构
timeval 结构用于描述一段时间长度,如果在这个时间内,需要监视的描述符没有事件发生则函数返回,返回值为0。函数返回值:
- 执行成功则返回文件描述词状态已改变的个数
- 如果返回0代表在描述词状态改变前已超过timeout时间,没有返回
- 当有错误发生时则返回-1,错误原因存于errno,此时参数readfds,writefds, exceptfds和timeout的值变成不可预测。
错误值可能为:
EBADF 文件描述词为无效的或该文件已关闭EINTR 此调用被信号所中断EINVAL 参数 n 为负值。ENOMEM 核心内存不足常见的程序片段如下 :fs_set readset ;FD_SET(fd,&readset);select(fd+1,&readset,NULL,NULL,NULL);if(FD_ISSET(fd,readset)){……}理解select执行过程
理解 select 模型的关键在于理解 fd_set, 为说明方便,取 fd_set 长度为 1 字节, fd_set 中的每一 bit 可以对应一个文件描述符fd 。则 1 字节长的 fd_set 最大可以对应 8 个 fd* (1)执行 fd_set set; FD_ZERO(&set); 则 set 用位表示是 0000,0000 。 * (2)若 fd = 5, 执行 FD_SET(fd,&set); 后set 变为 0001,0000( 第 5 位置为 1) * (3)若再加入 fd = 2 , fd=1, 则 set 变为 0001,0011 * (4)执行 select(6,&set,0,0,0)阻塞等待 * (5)若 fd=1,fd=2 上都发生可读事件,则 select 返回,此时 set 变为 0000,0011。注意:没有事件发生的 fd=5 被清空socket就绪条件
读就绪
- socket内核中, 接收缓冲区中的字节数, 大于等于低水位标记SO_RCVLOWAT. 此时可以无阻塞的读该文件描述符, 并且返回值大于0;
- socket TCP通信中, 对端关闭连接, 此时对该socket读, 则返回0;
- 监听的socket上有新的连接请求;
- socket上有未处理的错误;
写就绪
- socket内核中, 发送缓冲区中的可用字节数(发送缓冲区的空闲位置大小), 大于等于低水位标记SO_SNDLOWAT, 此时可以无阻塞的写, 并且返回值大于0;
- socket的写操作被关闭(close或者shutdown). 对一个写操作被关闭的socket进行写操作, 会触发SIGPIPE信号;
- socket使用非阻塞connect连接成功或失败之后;
- socket上有未读取的错误;
select的特点
- 可监控的文件描述符个数取决与sizeof(fd_set)的值. 我这边服务器上sizeof(fd_set)=512,每bit表示一个文件描述符,则我服务器上支持的最大文件描述符是512*8=4096.
- 将fd加入select监控集的同时,还要再使用一个数据结构array保存放到select监控集中的
- fd一是用于再select 返回后,array作为源数据和fd_set进行FD_ISSET判断。 二是select
- 返回后会把以前加入的但并无事件发生的fd清空,则每次开始select前都要重新从array取得fd逐一加入(FD_ZERO最先),扫描array的同时取得fd最大值maxfd,用于select的第一个参数。
select缺点
- 每次调用select, 都需要手动设置fd集合, 从接口使用角度来说也非常不便.
- 每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大
- 同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大
- select支持的文件描述符数量太小
select使用示例: 检测标准输入输出
#include <stdio.h> #include <unistd.h> #include <sys/select.h> int main() { fd_set read_fds; FD_ZERO(&read_fds); FD_SET(0, &read_fds); for (;;) { printf("> "); fflush(stdout); int ret = select(1, &read_fds, NULL, NULL, NULL); if (ret < 0) { perror("select"); continue; } if (FD_ISSET(0, &read_fds)) { char buf[1024] = {0}; read(0, buf, sizeof(buf) - 1); printf("input: %s", buf); } else { printf("error! invaild fd\n"); continue; } FD_ZERO(&read_fds); FD_SET(0, &read_fds); } return 0; }
说明:当只检测文件描述符 0 (标准输入)时,因为输入条件只有在你有输入信息的时候,才成立,所以如果一直不输入,就会产生超时信息select使用示例
使用 select 实现字典服务器tcp_select_server.hpp#pragma once #include <vector> #include <unordered_map> #include <functional> #include <sys/select.h> #include "tcp_socket.hpp" // 必要的调试函数 inline void PrintFdSet(fd_set* fds, int max_fd) { printf("select fds: "); for (int i = 0; i < max_fd + 1; ++i) { if (!FD_ISSET(i, fds)) { continue; } printf("%d ", i); } printf("\n"); } typedef std::function<void (const std::string& req, std::string* resp)> Handler; // 把 Select 封装成一个类. 这个类虽然保存很多 TcpSocket 对象指针, 但是不管理内存 class Selector { public: Selector() { // [注意!] 初始化千万别忘了!! max_fd_ = 0; FD_ZERO(&read_fds_); } bool Add(const TcpSocket& sock) { int fd = sock.GetFd(); printf("[Selector::Add] %d\n", fd); if (fd_map_.find(fd) != fd_map_.end()) { printf("Add failed! fd has in Selector!\n"); return false; } fd_map_[fd] = sock; FD_SET(fd, &read_fds_); if (fd > max_fd_) { max_fd_ = fd; } return true; } bool Del(const TcpSocket& sock) { int fd = sock.GetFd(); printf("[Selector::Del] %d\n", fd); if (fd_map_.find(fd) == fd_map_.end()) { printf("Del failed! fd has not in Selector!\n"); return false; } fd_map_.erase(fd); FD_CLR(fd, &read_fds_); // 重新找到最大的文件描述符, 从右往左找比较快 for (int i = max_fd_; i >= 0; --i) { if (!FD_ISSET(i, &read_fds_)) { continue; } max_fd_ = i; break; } return true; } // 返回读就绪的文件描述符集 bool Wait(std::vector<TcpSocket>* output) { output->clear(); // [注意] 此处必须要创建一个临时变量, 否则原来的结果会被覆盖掉 fd_set tmp = read_fds_; // DEBUG PrintFdSet(&tmp, max_fd_); int nfds = select(max_fd_ + 1, &tmp, NULL, NULL, NULL); if (nfds < 0) { perror("select"); return false; } // [注意!] 此处的循环条件必须是 i < max_fd_ + 1 for (int i = 0; i < max_fd_ + 1; ++i) { if (!FD_ISSET(i, &tmp)) { continue; } output->push_back(fd_map_[i]); } return true; } private: fd_set read_fds_; int max_fd_; // 文件描述符和 socket 对象的映射关系 std::unordered_map<int, TcpSocket> fd_map_; }; class TcpSelectServer { public: TcpSelectServer(const std::string& ip, uint16_t port) : ip_(ip), port_(port) { } bool Start(Handler handler) const { // 1. 创建 socket TcpSocket listen_sock; bool ret = listen_sock.Socket(); if (!ret) { return false; } // 2. 绑定端口号 ret = listen_sock.Bind(ip_, port_); if (!ret) { return false; } // 3. 进行监听 ret = listen_sock.Listen(5); if (!ret) { return false; } // 4. 创建 Selector 对象 Selector selector; selector.Add(listen_sock); // 5. 进入事件循环 for (;;) { std::vector<TcpSocket> output; bool ret = selector.Wait(&output); if (!ret) { continue; } // 6. 根据就绪的文件描述符的差别, 决定后续的处理逻辑 for (size_t i = 0; i < output.size(); ++i) { if (output[i].GetFd() == listen_sock.GetFd()) { // 如果就绪的文件描述符是 listen_sock, 就执行 accept, 并加入到 select 中 TcpSocket new_sock; listen_sock.Accept(&new_sock, NULL, NULL); selector.Add(new_sock); } else { // 如果就绪的文件描述符是 new_sock, 就进行一次请求的处理 std::string req, resp; bool ret = output[i].Recv(&req); if (!ret) { selector.Del(output[i]); // [注意!] 需要关闭 socket output[i].Close(); continue; } // 调用业务函数计算响应 handler(req, &resp); // 将结果写回到客户端 output[i].Send(resp); } } // end for } // end for (;;) return true; } private: std::string ip_; uint16_t port_; };
I/O多路转接之poll
poll函数接口
#include <poll.h>int poll(struct pollfd *fds, nfds_t nfds, int timeout);// pollfd 结构struct pollfd {int fd; /* file descriptor */short events; /* requested events */short revents; /* returned events */};
参数说明
- fds是一个poll函数监听的结构列表. 每一个元素中, 包含了三部分内容: 文件描述符, 监听的事件集合, 返回的事件集合.
- nfds表示fds数组的长度.
- timeout表示poll函数的超时时间, 单位是毫秒(ms)
events和revents的取值:
返回结果
- 返回值小于0, 表示出错;
- 返回值等于0, 表示poll函数等待超时;
- 返回值大于0, 表示poll由于监听的文件描述符就绪而返回.
socket就绪条件
同select
poll的优点
不同与select使用三个位图来表示三个fdset的方式,poll使用一个pollfd的指针实现.
- pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式. 接口使用比select更方便
- poll并没有最大数量限制 (但是数量过大后性能也是会下降)
poll的缺点
poll 中监听的文件描述符数目增多时
- 和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符.
- 每次调用poll都需要把大量的pollfd结构从用户态拷贝到内核中.
- 同时连接的大量客户端在一时刻可能只有很少的处于就绪状态, 因此随着监视的描述符数量的增长, 其效率也会线性下降.
poll示例: 使用poll监控标准输入
#include <poll.h> #include <unistd.h> #include <stdio.h> int main() { struct pollfd poll_fd; poll_fd.fd = 0; poll_fd.events = POLLIN; for (;;) { int ret = poll(&poll_fd, 1, 1000); if (ret < 0) { perror("poll"); continue; } if (ret == 0) { printf("poll timeout\n"); continue; } if (poll_fd.revents == POLLIN) { char buf[1024] = {0}; read(0, buf, sizeof(buf) - 1); printf("stdin:%s", buf); } } }
I/O多路转接之epoll
epoll初识
按照 man 手册的说法 : 是为处理大批量句柄而作了改进的 poll .它是在 2.5.44 内核中被引进的 (epoll(4) is a new API introduced in Linux kernel 2.5.44)它几乎具备了之前所说的一切优点,被公认为 Linux2.6 下性能最好的多路 I/O 就绪通知方法 .epoll的相关系统调用
epoll_create:成功返回一个文件描述符--创建了一个epoll模型
epoll_ctl:可以向我们的epoll模型中,增加/删除/修改指定文件描述符的事件(解决的就是用户告诉内核的问题)epoll_wait:多路转接的核心工作就是等待,那么怎么等待就是这个函数。
epoll_create
int epoll_create(int size);创建一个epoll的句柄.自从 linux2.6.8 之后, size 参数是被忽略的 .用完之后 , 必须调用 close() 关闭 .epoll_ctl
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);epoll的事件注册函数
- 它不同于select()是在监听事件时告诉内核要监听什么类型的事件, 而是在这里先注册要监听的事件类型.
- 第一个参数是epoll_create()的返回值(epoll的句柄).
- 第二个参数表示动作,用三个宏来表示.
- 第三个参数是需要监听的fd.
- 第四个参数是告诉内核需要监听什么事
第二个参数的取值 :
- EPOLL_CTL_ADD :注册新的fd到epfd中;
- EPOLL_CTL_MOD :修改已经注册的fd的监听事件;
- EPOLL_CTL_DEL :从epfd中删除一个fd;
struct epoll_event结构如下:
events可以是以下几个宏的集合:
- EPOLLIN : 表示对应的文件描述符可以读 (包括对端SOCKET正常关闭);
- EPOLLOUT : 表示对应的文件描述符可以写;
- EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来);
- EPOLLERR : 表示对应的文件描述符发生错误;
- EPOLLHUP : 表示对应的文件描述符被挂断;
- EPOLLET : 将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的.
- EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要再次把这个socket加入到EPOLL队列里
epoll_wait
可以从特定的epoll模型当中,去提取就绪的事件(后面两参数是输出型参数,可以把就绪的事件进行返回)(第三个就是设置epoll的等待方式,还是那三种)(解决的就是内核告诉用户的问题)
poll是在数据上做分离,epoll就是接口和数据做了分离(要等待的和已经就绪的)(这两个函数调用的时候,他们的eopll_event里面的events是不一样的!)int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
收集在epoll监控的事件中已经发送的事件
- 参数events是分配好的epoll_event结构体数组.
- epoll将会把发生的事件赋值到events数组中 (events不可以是空指针,内核只负责把数据复制到这个events数组中,不会去帮助我们在用户态中分配内存).
- maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size.参数timeout是超时时间 (毫秒,0会立即返回,-1是永久阻塞).
- 如果函数调用成功,返回对应I/O上已准备好的文件描述符数目,如返回0表示已超时, 返回小于0表示函数失败
底层机制
在红黑树创建节点的时候,底层就自动创建了回调函数,有数据准备好了就会发生回调告诉OS可以返回了,他就会把要返回的加载到就绪队列。
我们将红黑树,就绪队列和底层的回调机制,称为epoll模型。
(**epoll_create**就是创建一个epoll模型,返回一个文件描述符。即在底层建立好回调机制,创建一个空的红黑树,创建一个就绪队列。这些都可以打包成一个数据结构,连接到我们的文件系统当中(struct files,然后设置其他的参数,比如文件类型,指向的这颗红黑树模型),然后用这个文件描述符就可以找到这个文件模型)
(**epoll_ctl**就是添加红黑树(key:fd)节点/查找节点/修改节点/删除节点。这个红黑树就是用户告诉内核,哪些文件描述符的哪些事件内核需要关心)
(**epoll_wait**一旦建立epoll模型,本质上哪些fd上的哪些event就绪,整个过程用户是不用担心的(因为底层有回调(关于回调函数:1 获取就绪的fd 2 获取就绪的事件是什么 3 构建queue_node节点 4 将节点链接进入就绪队列))。所以这个函数的作用就是把底层就绪队列的数据拿上来就可以了,即他是一个等(就绪队列是否为空)+拷贝的函数)epoll为什么高效?
1 创建epoll模型之后,用epll_ctl来管理文件描述符及其事件,epoll_ctl针对的数据结构是红黑树,针对的场景是用户告诉内核的问题,因为红黑树节点是没有上限的,所以文件描述符的节点也没有上限
2 彻底解放OS,不用OS去遍历每一个文件描述符了,而是OS在创建epoll模型时,自动在底层处理拷贝函数时,设置回调机制,当底层有数据了就通过回调的机制,获取就绪文件描述符,获取就绪事件,构建就绪节点,连接到就绪队列里面统一由回调函数自动完成,因为是回调策略,所以不需要OS主动轮询,大大提高了检测的效率,也就是说OS现在只为就绪的节点提供服务,及不需要以O(N)的方式去遍历了。
3 epoll_wait获取节点也不需要遍历了,只需要从就绪队列里面直接拿节点就可以了(内核告诉用户)epoll工作原理
当某一进程调用 epoll_create 方法时, Linux 内核会创建一个 eventpoll 结构体,这个结构体中有两个成员与epoll 的使用方式密切相关struct eventpoll{..../* 红黑树的根节点,这颗树中存储着所有添加到 epoll 中的需要监控的事件 */struct rb_root rbr;/* 双链表中则存放着将要通过 epoll_wait 返回给用户的满足条件的事件 */struct list_head rdlist;....};
- 每一个epoll对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件.
- 这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是lgn,其中n为树的高度).
- 而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当响应的事件发生时会调用这个回调方法.
- 这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中.
- 在epoll中,对于每一个事件,都会建立一个epitem结构体
struct epitem{struct rb_node rbn;// 红黑树节点struct list_head rdllink;// 双向链表节点struct epoll_filefd ffd; // 事件句柄信息struct eventpoll *ep; // 指向其所属的 eventpoll 对象struct epoll_event event; // 期待发生的事件类型}
- 当调用epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可.
- 如果rdlist不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户. 这个操作的时间复杂度是O(1)
总结一下, epoll的使用过程就是三部曲:
- 调用epoll_create创建一个epoll句柄;
- 调用epoll_ctl, 将要监控的文件描述符进行注册;
- 调用epoll_wait, 等待文件描述符就绪;
epoll的优点(和 select 的缺点对应)
- 接口使用方便: 虽然拆分成了三个函数, 但是反而使用起来更方便高效. 不需要每次循环都设置关注的文件描述符, 也做到了输入输出参数分离开
- 数据拷贝轻量: 只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝到内核中, 这个操作并不频繁(而select/poll都是每次循环都要进行拷贝)
- 事件回调机制: 避免使用遍历, 而是使用回调函数的方式, 将就绪的文件描述符结构加入到就绪队列中,epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪. 这个操作时间复杂度O(1). 即使文件描述符数目很多, 效率也不会受到影响.
- 没有数量限制: 文件描述符数目无上限
epoll工作方式
epoll 有 2 种工作方式 - 水平触发 (LT) 和边缘触发 (ET)水平触发Level Triggered 工作模式
假如有这样一个例子:
我们已经把一个 tcp socket 添加到 epoll 描述符这个时候 socket 的另一端被写入了 2KB 的数据调用 epoll_wait ,并且它会返回 . 说明它已经准备好读取操作然后调用 read, 只读取了 1KB 的数据继续调用 epoll_wait......epoll 默认状态下就是 LT 工作模式 .
- 当epoll检测到socket上事件就绪的时候, 可以不立刻进行处理. 或者只处理一部分.
- 如上面的例子, 由于只读了1K数据, 缓冲区中还剩1K数据, 在第二次调用 epoll_wait 时, epoll_wait仍然会立刻返回并通知socket读事件就绪.
- 直到缓冲区上所有的数据都被处理完, epoll_wait 才不会立刻返回.
- 支持阻塞读写和非阻塞读写
边缘触发Edge Triggered工作模式
如果我们在第1步将socket添加到epoll描述符的时候使用了EPOLLET标志, epoll进入ET工作模式
- 当epoll检测到socket上事件就绪时, 必须立刻处理.
- 如上面的例子, 虽然只读了1K的数据, 缓冲区还剩1K的数据, 在第二次调用 epoll_wait 的时候, epoll_wait 不会再返回了.
- 也就是说, ET模式下, 文件描述符上的事件就绪后, 只有一次处理机会.
- ET的性能比LT性能更高( epoll_wait 返回的次数少了很多). Nginx默认采用ET模式使用epoll.
- 只支持非阻塞的读写
select和poll其实也是工作在LT模式下. epoll既可以支持LT, 也可以支持ET对比LT和ET
LT 是 epoll 的默认行为 . 使用 ET 能够减少 epoll 触发的次数 . 但是代价就是强逼着程序猿一次响应就绪过程中就把所有的数据都处理完相当于一个文件描述符就绪之后 , 不会反复被提示就绪 , 看起来就比 LT 更高效一些 . 但是在 LT 情况下如果也能做到每次就绪的文件描述符都立刻处理, 不让这个就绪被重复提示的话 , 其实性能也是一样的另一方面 , ET 的代码复杂程度更高了理解ET模式和非阻塞文件描述符
使用 ET 模式的 epoll, 需要将文件描述设置为非阻塞 . 这个不是接口上的要求 , 而是 " 工程实践 " 上的要求 .假设这样的场景: 服务器接受到一个 10k 的请求 , 会向客户端返回一个应答数据 . 如果客户端收不到应答 , 不会发送第二个10k 请求如果服务端写的代码是阻塞式的 read, 并且一次只 read 1k 数据的话 (read 不能保证一次就把所有的数据都读出来 ,参考 man 手册的说明 , 可能被信号打断 ), 剩下的 9k 数据就会待在缓冲区中此时由于 epoll 是 ET 模式 , 并不会认为文件描述符读就绪 . epoll_wait 就不会再次返回 . 剩下的 9k 数据会一直在缓冲区中. 直到下一次客户端再给服务器写数据 . epoll_wait 才能返回但是问题来了
- 服务器只读到1k个数据, 要10k读完才会给客户端返回响应数据.
- 客户端要读到服务器的响应, 才会发送下一个请求
- 客户端发送了下一个请求, epoll_wait 才会返回, 才能去读缓冲区中剩余的数据
所以 , 为了解决上述问题 ( 阻塞 read 不一定能一下把完整的请求读完 ), 于是就可以使用非阻塞轮训的方式来读缓冲区 ,保证一定能把完整的请求都读出来而如果是LT没这个问题. 只要缓冲区中的数据没读完, 就能够让 epoll_wait 返回文件描述符读就绪
epoll的使用场景
epoll的高性能, 是有一定的特定场景的. 如果场景选择的不适宜, epoll的性能可能适得其反
对于多连接, 且多连接中只有一部分连接比较活跃时, 比较适合使用epoll.
例如, 典型的一个需要处理上万个客户端的服务器, 例如各种互联网APP的入口服务器, 这样的服务器就很适合epoll
如果只是系统内部 , 服务器和服务器之间进行通信 , 只有少数的几个连接 , 这种情况下用 epoll 就并不合适 . 具体要根据需求和场景特点来决定使用哪种IO 模型epoll中的惊群问题
惊群问题有些面试官可能会问到参考 http://blog.csdn.net/fsmiy/article/details/36873357epoll示例: epoll服务器(LT模式)
tcp_epoll_server.hpp
/// // 封装一个 Epoll 服务器, 只考虑读就绪的情况 /// #pragma once #include <vector> #include <functional> #include <sys/epoll.h> #include "tcp_socket.hpp" typedef std::function<void (const std::string&, std::string* resp)> Handler; class Epoll { public: Epoll() { epoll_fd_ = epoll_create(10); } ~Epoll() { close(epoll_fd_); } bool Add(const TcpSocket& sock) const { int fd = sock.GetFd(); printf("[Epoll Add] fd = %d\n", fd); epoll_event ev; ev.data.fd = fd; ev.events = EPOLLIN; int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev); if (ret < 0) { perror("epoll_ctl ADD"); return false; } return true; } bool Del(const TcpSocket& sock) const { int fd = sock.GetFd(); printf("[Epoll Del] fd = %d\n", fd); int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, NULL); if (ret < 0) { perror("epoll_ctl DEL"); return false; } return true; } bool Wait(std::vector<TcpSocket>* output) const { output->clear(); epoll_event events[1000]; int nfds = epoll_wait(epoll_fd_, events, sizeof(events) / sizeof(events[0]), -1); if (nfds < 0) { perror("epoll_wait"); return false; } // [注意!] 此处必须是循环到 nfds, 不能多循环 for (int i = 0; i < nfds; ++i) { TcpSocket sock(events[i].data.fd); output->push_back(sock); } return true; } private: int epoll_fd_; }; class TcpEpollServer { public: TcpEpollServer(const std::string& ip, uint16_t port) : ip_(ip), port_(port) { } bool Start(Handler handler) { // 1. 创建 socket TcpSocket listen_sock; CHECK_RET(listen_sock.Socket()); // 2. 绑定 CHECK_RET(listen_sock.Bind(ip_, port_)); // 3. 监听 CHECK_RET(listen_sock.Listen(5)); // 4. 创建 Epoll 对象, 并将 listen_sock 加入进去 Epoll epoll; epoll.Add(listen_sock); // 5. 进入事件循环 for (;;) { // 6. 进行 epoll_wait std::vector<TcpSocket> output; if (!epoll.Wait(&output)) { continue; } // 7. 根据就绪的文件描述符的种类决定如何处理 for (size_t i = 0; i < output.size(); ++i) { if (output[i].GetFd() == listen_sock.GetFd()) { // 如果是 listen_sock, 就调用 accept TcpSocket new_sock; listen_sock.Accept(&new_sock); epoll.Add(new_sock); } else { // 如果是 new_sock, 就进行一次读写 std::string req, resp; bool ret = output[i].Recv(&req); if (!ret) { // [注意!!] 需要把不用的 socket 关闭 // 先后顺序别搞反. 不过在 epoll 删除的时候其实就已经关闭 socket 了 epoll.Del(output[i]); output[i].Close(); continue; } handler(req, &resp); output[i].Send(resp); } // end for } // end for (;;) } return true; } private: std::string ip_; uint16_t port_; };
epoll示例: epoll服务器(ET模式)
基于 LT 版本稍加修改即可1. 修改 tcp_socket.hpp, 新增非阻塞读和非阻塞写接口2. 对于 accept 返回的 new_sock 加上 EPOLLET 这样的选项注意 : 此代码暂时未考虑 listen_sock ET 的情况 . 如果将 listen_sock 设为 ET, 则需要非阻塞轮询的方式 accept. 否则会导致同一时刻大量的客户端同时连接的时候, 只能 accept 一次的问题tcp_socket.hpp
// 以下代码添加在 TcpSocket 类中 // 非阻塞 IO 接口 bool SetNoBlock() { int fl = fcntl(fd_, F_GETFL); if (fl < 0) { perror("fcntl F_GETFL"); return false; } int ret = fcntl(fd_, F_SETFL, fl | O_NONBLOCK); if (ret < 0) { perror("fcntl F_SETFL"); return false; } return true; } bool RecvNoBlock(std::string* buf) const { // 对于非阻塞 IO 读数据, 如果 TCP 接受缓冲区为空, 就会返回错误 // 错误码为 EAGAIN 或者 EWOULDBLOCK, 这种情况也是意料之中, 需要重试 // 如果当前读到的数据长度小于尝试读的缓冲区的长度, 就退出循环 // 这种写法其实不算特别严谨(没有考虑粘包问题) buf->clear(); char tmp[1024 * 10] = {0}; for (;;) { ssize_t read_size = recv(fd_, tmp, sizeof(tmp) - 1, 0); if (read_size < 0) { if (errno == EWOULDBLOCK || errno == EAGAIN) { continue; } perror("recv"); return false; } if (read_size == 0) { // 对端关闭, 返回 false return false; } tmp[read_size] = '\0'; *buf += tmp; if (read_size < (ssize_t)sizeof(tmp) - 1) { break; } } return true; } bool SendNoBlock(const std::string& buf) const { // 对于非阻塞 IO 的写入, 如果 TCP 的发送缓冲区已经满了, 就会出现出错的情况 // 此时的错误号是 EAGAIN 或者 EWOULDBLOCK. 这种情况下不应放弃治疗 // 而要进行重试 ssize_t cur_pos = 0; // 记录当前写到的位置 ssize_t left_size = buf.size(); for (;;) { ssize_t write_size = send(fd_, buf.data() + cur_pos, left_size, 0); if (write_size < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { // 重试写入 continue; } return false; } cur_pos += write_size; left_size -= write_size; // 这个条件说明写完需要的数据了 if (left_size <= 0) { break; } } return true; }
tcp_epoll_server.hpp
/// // 封装一个 Epoll ET 服务器 // 修改点: // 1. 对于 new sock, 加上 EPOLLET 标记 // 2. 修改 TcpSocket 支持非阻塞读写 // [注意!] listen_sock 如果设置成 ET, 就需要非阻塞调用 accept 了 // 稍微麻烦一点, 此处暂时不实现 /// #pragma once #include <vector> #include <functional> #include <sys/epoll.h> #include "tcp_socket.hpp" typedef std::function<void (const std::string&, std::string* resp)> Handler; class Epoll { public: Epoll() { epoll_fd_ = epoll_create(10); } ~Epoll() { close(epoll_fd_); } bool Add(const TcpSocket& sock, bool epoll_et = false) const { int fd = sock.GetFd(); printf("[Epoll Add] fd = %d\n", fd); epoll_event ev; ev.data.fd = fd; if (epoll_et) { ev.events = EPOLLIN | EPOLLET; } else { ev.events = EPOLLIN; } int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev); if (ret < 0) { perror("epoll_ctl ADD"); return false; } return true; } bool Del(const TcpSocket& sock) const { int fd = sock.GetFd(); printf("[Epoll Del] fd = %d\n", fd); int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, NULL); if (ret < 0) { perror("epoll_ctl DEL"); return false; } return true; } bool Wait(std::vector<TcpSocket>* output) const { output->clear(); epoll_event events[1000]; int nfds = epoll_wait(epoll_fd_, events, sizeof(events) / sizeof(events[0]), -1); if (nfds < 0) { perror("epoll_wait"); return false; } // [注意!] 此处必须是循环到 nfds, 不能多循环 for (int i = 0; i < nfds; ++i) { TcpSocket sock(events[i].data.fd); output->push_back(sock); } return true; } private: int epoll_fd_; }; class TcpEpollServer { public: TcpEpollServer(const std::string& ip, uint16_t port) : ip_(ip), port_(port) { } bool Start(Handler handler) { // 1. 创建 socket TcpSocket listen_sock; CHECK_RET(listen_sock.Socket()); // 2. 绑定 CHECK_RET(listen_sock.Bind(ip_, port_)); // 3. 监听 CHECK_RET(listen_sock.Listen(5)); // 4. 创建 Epoll 对象, 并将 listen_sock 加入进去 Epoll epoll; epoll.Add(listen_sock); // 5. 进入事件循环 for (;;) { // 6. 进行 epoll_wait std::vector<TcpSocket> output; if (!epoll.Wait(&output)) { continue; } // 7. 根据就绪的文件描述符的种类决定如何处理 for (size_t i = 0; i < output.size(); ++i) { if (output[i].GetFd() == listen_sock.GetFd()) { // 如果是 listen_sock, 就调用 accept TcpSocket new_sock; listen_sock.Accept(&new_sock); epoll.Add(new_sock, true); } else { // 如果是 new_sock, 就进行一次读写 std::string req, resp; bool ret = output[i].RecvNoBlock(&req); if (!ret) { // [注意!!] 需要把不用的 socket 关闭 // 先后顺序别搞反. 不过在 epoll 删除的时候其实就已经关闭 socket 了 epoll.Del(output[i]); output[i].Close(); continue; } handler(req, &resp); output[i].SendNoBlock(resp); printf("[client %d] req: %s, resp: %s\n", output[i].GetFd(), req.c_str(), resp.c_str()); } // end for } // end for (;;) } return true; } private: std::string ip_; uint16_t port_; };