文章目录
- IO 多路转接之 epoll
- 1、IO 多路转接之 poll
- 1.1、poll 函数
- 1.2、poll 函数返回值
- 1.3、Socket 就绪条件
- 1.3.1、读就绪
- 1.3.2、写就绪
- 1.3.3、异常就绪
- 1.4、poll 的优点
- 1.5、poll 的缺点
- 1.6、poll 改写 select
- 2、IO 多路转接之 epoll
- 2.1、epoll 函数
- 2.2、epoll_create
- 2.3、epoll_ctl
- 2.4、epoll_wait
- 2.5、epoll 工作原理
- 2.6、水平触发 Level Triggered 工作模式(默认模式 LT)
- 2.7、边缘触发 Edge Triggered 工作模式
- 2.8、LT 和 ET 对比
- 2.9、理解 ET 模式和非阻塞文件描述符
- 2.10、epoll 使用场景
- 2.11、epoll 示例
- 2.11、LT 模式的 epoll 服务器响应程序
- 2.12、ET 模式的 epoll 服务器响应程序
IO 多路转接之 epoll
1、IO 多路转接之 poll
1.1、poll 函数
#include <poll.h> int poll(struct pollfd *fds, nfds_t nfds, int timeout);
参数:
- fds:fds 是一个 poll 函数监听的结构列表,每一个元素中,包含了三部分内容:文件描述符,监听的事件集合,返回的事件集合(监听到的)。
struct pollfd { int fd; /* file descriptor */ short events; /* requested events */ short revents; /* returned events */ };
nfds:监控的文件描述符个数
timeout:单位是毫秒
其中,events 和 revents 的取值可以是下面表格中的值:
事件 描述 是否可作为输入 是否可作为输出 POLLIN
数据(包括普通数据和优先数据)可读 是 是 POLLRDNORM
普通数据可读 是 是 POLLRDBAND
优先级带数据可读(Linux 不支持) 是 是 POLLPRI
高优先级数据可读,例如 TCP 带外数据 是 是 POLLOUT
数据(包括普通数据和优先数据)可写 是 是 POLLWRNORM
普通数据可写 是 是 POLLWRBAND
优先级带数据可写 是 是 POLLRDHUP
TCP 连接被对方关闭,或者对方关闭了写操作。它由 GNU 引入 是 是 POLLERR
错误 否 是 POLLHUP
挂起。比如管道的写端关闭后,该端描述符上将收到 POLLHUP 事件 否 是 POLLNVAL
文件描述符没有打开 否 是
1.2、poll 函数返回值
>0
表示等待某些事件成功。=0
表示超时。<0
表示错误。错误信息会设置在 errno 中,错误情况类别和 select 一样。
1.3、Socket 就绪条件
1.3.1、读就绪
- socket 内核中,接收缓冲区中的字节数,大于等于低水位标记 SO_RCVLOWAT (有足够的数据可以读的意思吧),此时可以无阻塞的读该文件描述符,并且返回值大于 0
- socket TCP 通信中,对端关闭连接,此时对该 socket 读,则返回 0
- 监听的 socket 上有新的连接请求
- socket 上有未处理的错误
1.3.2、写就绪
socket 内核中,发送缓冲区中的可用字节数(发送缓冲区的空闲位置大小),大于等于低水位标记 SO_SNDLOWAT,此时可以无阻塞的写,并且返回值大于 0
socket 的写操作被关闭(close 或者 shutdown),对一个写操作被关闭的 socket进行写操作, 会触发 SIGPIPE 信号
socket 使用非阻塞 connect 连接成功或失败之后
socket 上有未读取的错误
1.3.3、异常就绪
socket 上收到带外数据。
关于带外数据,和 TCP 紧急模式相关(回忆 TCP 协议头中,有一个紧急指针的字段)。
这个数据需要紧急处理。
1.4、poll 的优点
不同于 select 使用三个位图来表示三个 fd_set 的方式,poll 使用一个 pollfd 的指针实现。
- pollfd 结构包含了要监视的 event 和发生的 event,不再使用 select“参数-值”传递的方式
- 接口使用比 select 更方便
- poll 并没有最大数量限制 (但是数量过大后性能也是会下降)
1.5、poll 的缺点
poll 中监听的文件描述符数目增多时:
- 和 select 函数一样,poll 返回后,需要轮询 pollfd 来获取就绪的描述符
- 每次调用 poll 都需要把大量的 pollfd 结构从用户态拷贝到内核中
- 同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降
1.6、poll 改写 select
前面有提到 select 的缺点:
文件描述符限制:
select
有一个限制,即它最多只能监控 1024 个文件描述符(在一些系统中,可以通过重新编译内核修改这个值)。这是因为select
使用的是一个固定大小的位掩码来表示文件描述符。静态数组:文件描述符通过
fd_set
结构表示,这个结构是一个固定大小的位数组,每次调用select
后需要重新设置。效率低下:随着监控的文件描述符增多,
select
需要遍历整个集合来检查状态变化,性能较差。尤其是当大量文件描述符中只有少数活跃时,这种线性扫描的方式会非常低效。poll 对 select 进行了改进。
poll 对 select 的主要改进总结:
- 文件描述符限制:
poll
取消了select
中的 1024 文件描述符限制,可以处理任意数量的文件描述符。- 无需重置集合:在
select
中,每次调用后都需要重置fd_set
,而在poll
中不需要这样做。- 性能提升:
poll
在实现上更为高效,特别是文件描述符数量较多的情况下,poll
的结构体数组相比select
的位掩码结构更加灵活,减少了不必要的遍历。改进后的主要文件代码:
SelectServer.hpp
->PollServer.hpp
:#pragma once #include <iostream> #include <memory> #include <poll.h> #include <sys/time.h> #include <string> #include "Socket.hpp" using namespace socket_ns; class PollServer { const static int defaultfd = -1; const static int N = 1024; const static int timeout = -1; // 负数阻塞式等待,整数等待的毫秒值 public: PollServer(uint16_t port) : _port(port), _listensock(std::make_unique<TcpSocket>()) { InetAddr client("0", port); _listensock->BuildListenSocket(client); for (int i = 0; i < N; ++i) { _fds[i].fd = defaultfd; _fds[i].revents = 0; _fds[i].events = 0; } _fds[0].fd = _listensock->SockFd(); // 第一个肯定是listensock的文件描述符 _fds[0].events = POLLIN; // 对读事件关心 } void AcceptClient() { InetAddr clientaddr; socket_sptr sockefd = _listensock->Accepter(&clientaddr); int fd = sockefd->SockFd(); if (fd >= 0) { LOG(DEBUG, "Get new Link ,sockefd is :%d ,client info : %s:%d", fd, clientaddr.Ip().c_str(), clientaddr.Port()); } // 把新到的文件描述符交给select托管,使用辅助数组 int pos = 1; for (; pos < N; pos++) { if (_fds[pos].fd == defaultfd) break; } if (pos == N) { // 满了 ::close(fd); // 这里就是比select更好,可以扩容,也可以直接关闭文件描述符 LOG(WARNING, "server full ..."); return; } else { _fds[pos].fd = fd; // 添加新文件描述符 _fds[pos].events = POLLIN;// 对读事件关心 _fds[pos].revents = 0; LOG(WARNING, "%d sockfd add to select array", fd); } LOG(DEBUG, "cur fdarr[] fd list : %s", RfdsToStr().c_str()); } void ServiceIO(int pos) { char buff[1024]; ssize_t n = ::recv(_fds[pos].fd, buff, sizeof(buff) - 1, 0); if (n > 0) { buff[n] = 0; LOG(DEBUG, "client # %s", buff); std::string message = "Server Echo# "; message += buff; ::send(_fds[pos].fd, message.c_str(), message.size(), 0); } else if (n == 0) { LOG(DEBUG, "%d socket closed!", _fds[pos].fd); ::close(_fds[pos].fd); // 有用户退出,把该文件描述符重置为默认值 _fds[pos].fd = defaultfd; _fds[pos].events = 0; _fds[pos].revents = 0; LOG(DEBUG, "cur fdarr[] fd list : %s", RfdsToStr().c_str()); } else { LOG(DEBUG, "%d recv error!", _fds[pos].fd); ::close(_fds[pos].fd); _fds[pos].fd = defaultfd; _fds[pos].events = 0; _fds[pos].revents = 0; LOG(DEBUG, "cur fdarr[] fd list : %s", RfdsToStr().c_str()); } } void HandlerRead() { for (int i = 0; i < N; i++) { if (_fds[i].fd == defaultfd) continue; if (_fds[i].revents & POLLIN) // 读事件就绪 { if (_fds[i].fd == _listensock->SockFd()) // listensock { AcceptClient(); } else // 真正的读事件就绪 { // socket读事件就绪 ServiceIO(i); } } else if(_fds[i].revents & POLLOUT) { // 写事件就绪,后面epoll再做 } } } void Loop() { while (true) { int n = poll(_fds, N, timeout); if (n > 0) { // 处理读文件描述符 HandlerRead(); } else if (n == 0) { // 时间到了 LOG(DEBUG, "time out ..."); } else { // 错误 LOG(FATAL, "select error ..."); } } } std::string RfdsToStr() { std::string rfdstr; for (int i = 0; i < N; ++i) { if (_fds[i].fd != defaultfd) { rfdstr += std::to_string(_fds[i].fd); rfdstr += " "; } } return rfdstr; } ~PollServer() {} private: uint16_t _port; std::unique_ptr<TcpSocket> _listensock; struct pollfd _fds[N]; // 可以设置成容量满自动扩容模式 };
poll 服务器响应程序整体代码
尽管
poll
在很多方面改进了select
,但它在某些场景下依然存在效率问题(例如大量空闲文件描述符时依然需要线性扫描)。为此,Linux 后来引入了更高效的机制,例如epoll
。
2、IO 多路转接之 epoll
2.1、epoll 函数
按照 man 手册的说法:是为处理大批量句柄而作了改进的 poll。
它是在 2.5.44 内核中被引进的(epoll(4) is a new API introduced in Linux kernel 2.5.44),它几乎具备了之前所说的一切优点,被公认为 Linux2.6 下性能最好的多路 I/O 就绪通知方法。
The epoll API performs a similar task to poll(2): monitoring multiple file descriptors to see if I/O is possible on any of them. The epoll API can be used either as an edge-triggered or a level-triggered interface and scales well to large numbers of watched file descriptors.
下面介绍 epoll 相关的系统调用。
2.2、epoll_create
#include <sys/epoll.h> int epoll_create(int size);
创建一个 epoll 的句柄
自从 linux2.6.8 之后,size 参数是被忽略的
用完之后, 必须调用 close()关闭
2.3、epoll_ctl
#include <sys/epoll.h> int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); // epoll_ctl(_epfd, EPOLL_CTL_ADD, _listensock->SockFd(), &ev);
epoll 的事件注册函数
它不同于 select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型
第一个参数是 epoll_create()的返回值(epoll 的句柄)
第二个参数表示动作,用三个宏来表示(EPOLL_CTL_ADD,EPOLL_CTL_MOD,EPOLL_CTL_DEL)
第三个参数是需要监听的 fd
第四个参数是告诉内核需要监听什么事
struct epoll_event 结构如下:
typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t; struct epoll_event { uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ } __EPOLL_PACKED;
events 可以是以下几个宏的集合:
- EPOLLIN:表示对应的文件描述符可以读 (包括对端 SOCKET 正常关闭)
- EPOLLOUT:表示对应的文件描述符可以写
- EPOLLPRI:表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来)
- EPOLLERR:表示对应的文件描述符发生错误
- EPOLLHUP:表示对应的文件描述符被挂断
- EPOLLET:将 EPOLL 设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的
- EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个 socket 的话,需要再次把这个 socket 加入到 EPOLL 队列里。
2.4、epoll_wait
#include <sys/epoll.h> int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
收集在 epoll 监控的事件中已经到来的事件
- 参数 events 是分配好的 epoll_event 结构体数组
- epoll 将会把发生的事件赋值到 events 数组中 (events 不可以是空指针,内核只负责把数据复制到这个 events 数组中,不会去帮助我们在用户态中分配内存)
- maxevents 告之内核这个 events 有多大,这个 maxevents 的值不能大于创建epoll_create()时的 size
- 参数 timeout 是超时时间 (毫秒,0 会立即返回,-1 是永久阻塞)
- 如果函数调用成功,返回对应 I/O 上已准备好的文件描述符数目,如返回 0 表示已超时,返回小于 0 表示函数失败
2.5、epoll 工作原理
当某一进程调用 epoll_create 方法时,Linux 内核会创建一个 eventpoll 结构体,这个结构体中有两个成员与 epoll 的使用方式密切相关。
struct eventpoll { struct rb_root rbr; // 红黑树根节点 struct rb_root wbr; // 写事件的红黑树 struct list_head rdllist; // 读等待队列 struct list_head wrdllist; // 写等待队列 struct list_head active; // 活动事件列表 struct mutex mtx; // 互斥锁 wait_queue_head_t wait; // 等待队列 int epfd; // 文件描述符 int maxevents; // 最大事件数 // 其他字段... };
- 每一个 epoll 对象都有一个独立的 eventpoll 结构体,用于存放通过 epoll_ctl 方法向 epoll 对象中添加进来的事件
- 这些事件都会挂载在红黑树中,如此重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是 logn,其中 n 为树的高度)
- 而所有添加到 epoll 中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当响应的事件发生时会调用这个回调方法
- 这个回调方法在内核中叫 ep_poll_callback,它会将发生的事件添加到 rdlist 双链表中
- 在 epoll 中,对于每一个事件,都会建立一个 epitem 结构体
2.6、水平触发 Level Triggered 工作模式(默认模式 LT)
当 epoll 检测到 socket 上事件就绪的时候,可以不立刻进行处理,或者只处理一部分
如上面的例子,由于只读了 1K 数据,缓冲区中还剩 1K 数据,在第二次调用 epoll_wait 时,epoll_wait 仍然会立刻返回并通知 socket 读事件就绪,直到缓冲区上所有的数据都被处理完,epoll_wait 才不会立刻返回(也就是只要有事件就绪,没有处理完就回一只返回)。
支持阻塞读写和非阻塞读写
2.7、边缘触发 Edge Triggered 工作模式
如果我们在第 1 步将 socket 添加到 epoll 描述符的时候使用了 EPOLLET 标志,epoll 进入 ET 工作模式
- 当 epoll 检测到 socket 上事件就绪时,必须立刻处理
- 如上面的例子,虽然只读了 1K 的数据,缓冲区还剩 1K 的数据,在第二次调用 epoll_wait 的时候,epoll_wait 不会再返回了
- 也就是说,ET 模式下,文件描述符上的事件就绪后,只有一次处理机会(倒逼程序员必须处理完这轮数据)
- 原因就是如果在ET模式下,上一次的事件未处理完,且之后没有新事件到来,那么
epoll_wait
将不会返回这些未处理的事件,导致它们一直处于未处理状态,如果有新事件到来,还是会处理掉上一次的事件- ET 的性能比 LT 性能更高( epoll_wait 返回的次数少了很多)
- Nginx 默认采用 ET 模式使用 epoll
- 只支持非阻塞的读写
2.8、LT 和 ET 对比
LT 是 epoll 的默认行为
- 使用 ET 能够减少 epoll 触发的次数,但是代价就是强逼着程序猿一次响应就绪过程中就把所有的数据都处理完。相当于一个文件描述符就绪之后,不会反复被提示就绪,看起来就比 LT 更高效一些。
- 但是在 LT 情况下如果也能做到每次就绪的文件描述符都立刻处理,不让这个就绪被重复提示的话,其实性能也是一样的
- 另一方面,ET 的代码复杂程度更高了
2.9、理解 ET 模式和非阻塞文件描述符
使用 ET 模式的 epoll,需要将文件描述设置为非阻塞。
这个不是接口上的要求,而是 “工程实践” 上的要求。
假设这样的场景:
服务器接收到一个 10k 的请求,会向客户端返回一个应答数据。
如果客户端收不到应答,不会发送第二个 10k 请求。
如果服务端写的代码是阻塞式的 read,并且一次只 read 1k 数据的话(read 不能保证一次就把所有的数据都读出来,参考 man 手册的说明,可能被信号打断),剩下的 9k 数据就会待在缓冲区中
此时由于 epoll 是 ET 模式,并不会认为文件描述符读就绪。epoll_wait 就不会再次返回,剩下的 9k 数据会一直在缓冲区中,直到下一次客户端再给服务器写数据,epoll_wait 才能返回
但是问题来了:
- 服务器只读到 1k 个数据,要 10k 读完才会给客户端返回响应数据
- 客户端要读到服务器的响应,才会发送下一个请求
- 客户端发送了下一个请求,epoll_wait 才会返回,才能去读缓冲区中剩余的数据
所以,为了解决上述问题(阻塞 read 不一定能一下把完整的请求读完),于是就可以使用非阻塞轮训的方式来读缓冲区,保证一定能把完整的请求都读出来。
而如果是 LT 没这个问题,只要缓冲区中的数据没读完,就能够让 epoll_wait 返回文件描述符读就绪。
2.10、epoll 使用场景
epoll 的高性能,是有一定的特定场景的。
如果场景选择的不适宜,epoll 的性能可能适得其反。
对于多连接,且多连接中只有一部分连接比较活跃时,比较适合使用 epoll。
- 例如,典型的一个需要处理上万个客户端的服务器,例如各种互联网 APP 的入口服务器,这样的服务器就很适合 epoll
- 如果只是系统内部,服务器和服务器之间进行通信,只有少数的几个连接,这种情况下用epoll 就并不合适
- 具体要根据需求和场景特点来决定使用哪种 IO 模型
2.11、epoll 示例
2.11、LT 模式的 epoll 服务器响应程序
相对于前面的 select、poll 服务器响应程序,就修改了对应的
Server.hpp
文件(对应的Main.cc
也需要改一下<>中的类型)。LT 模式的 epoll 服务器响应程序
EpollServer.hpp
文件:主要修改的代码文件#pragma once #include <iostream> #include <memory> #include <sys/epoll.h> #include <sys/time.h> #include <string> #include "Socket.hpp" using namespace socket_ns; class EpollServer { const static int defaultfd = -1; const static int N = 64; const static int timeout = -1; // 负数阻塞式等待,整数等待的毫秒值 public: EpollServer(uint16_t port) : _port(port), _listensock(std::make_unique<TcpSocket>()), _epfd(defaultfd) { InetAddr client("0", port); _listensock->BuildListenSocket(client); memset(_events, 0, sizeof(_events)); _epfd = epoll_create(128); if (_epfd < 0) { LOG(FATAL, "epoll create error..."); exit(-1); } LOG(DEBUG, "epoll create sucess, epoll fd : %d", _epfd); struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = _listensock->SockFd(); // _events[0].events = EPOLLIN; // _events[0].data.fd = _listensock->SockFd(); epoll_ctl(_epfd, EPOLL_CTL_ADD, _listensock->SockFd(), &ev); } void AcceptClient() { InetAddr clientaddr; socket_sptr sockefd = _listensock->Accepter(&clientaddr); int fd = sockefd->SockFd(); if(fd < 0) return; if (fd >= 0) { LOG(DEBUG, "Get new Link ,sockefd is :%d ,client info : %s:%d", fd, clientaddr.Ip().c_str(), clientaddr.Port()); } // 把新到的文件描述符交给select托管,使用辅助数组 struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = fd; epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &ev); LOG(DEBUG, "%d sockfd add to epoll rbtree", fd); } void ServiceIO(int fd) { char buff[1024]; ssize_t n = ::recv(fd, buff, sizeof(buff) - 1, 0); if (n > 0) { buff[n] = 0; LOG(DEBUG, "client # %s", buff); std::string message = "Server Echo# "; message += buff; ::send(fd, message.c_str(), message.size(), 0); } else if (n == 0) { LOG(DEBUG, "%d socket closed!", fd); epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr); // 得保证要删除的fd是合法的 ::close(fd); // 有用户退出,把该文件描述符重置为默认值 } else { LOG(DEBUG, "%d recv error!", fd); epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr); ::close(fd); // 有用户退出,把该文件描述符重置为默认值 } } void HandlerRead(int num) { for (int i = 0; i < num; i++) { uint32_t events = _events[i].events; int sockfd = _events[i].data.fd; if (events & EPOLLIN) // 读事件就绪 { if (sockfd == _listensock->SockFd()) // listensock { AcceptClient(); } else // 真正的读事件就绪 { // socket读事件就绪 ServiceIO(sockfd); } } else if (events & EPOLLOUT) { // 写事件就绪,后面epoll再做 } } } void Loop() { while (true) { int n = epoll_wait(_epfd, _events, N, timeout); // 返回请求I/O文件描述符的个数 switch (n) { case -1: // 错误 LOG(FATAL, "epoll wait error ..."); break; case 0: // 时间到了 LOG(DEBUG, "time out ..."); break; default: // 处理读文件描述符 HandlerRead(n); break; } } } ~EpollServer() { ::close(_listensock->SockFd()); if (_epfd >= 0) ::close(_epfd); } private: uint16_t _port; std::unique_ptr<TcpSocket> _listensock; int _epfd; struct epoll_event _events[N]; };
2.12、ET 模式的 epoll 服务器响应程序
后面博客 Reactor 反应堆模式下我们再写。
OKOK,IO 多路转接之 epoll 就到这里,如果你对Linux和C++也感兴趣的话,可以看看我的主页哦。下面是我的github主页,里面记录了我的学习代码和leetcode的一些题的题解,有兴趣的可以看看。
Xpccccc的github主页