目录
I/O多路转接 - poll
poll 函数
poll 服务器
poll 服务器
poll 的优点
poll 的缺点
I/O 多路转接 - epoll
epoll 的相关系统调用
epoll_create 函数
epoll_ctl 函数
epoll_wait 函数
epoll 工作原理
epoll 服务器
编辑 epoll 的优点(与 select 的缺点对应)
select、poll 和 epoll 的不同之处
epoll 的工作方式
水平触发(Level Triggered,LT)
边缘触发(Edge Triggered,ET)
ET 工作模式下如何进行读写
I/O多路转接 - poll
• poll 系统调用也可以让程序同时监视多个文件描述符上的事件是否就绪,和 select 的定位是一样的,适用场景也是一样的。
poll 函数
参数说明:
• fds:表示监视的结构列表,每一个元素包含三个部分内容,文件描述符,监视的事件集合,就绪的事件集合
• nfds:表示 fds 数组的长度
• timeout:表示poll 函数的超时事件,单位是毫秒(ms)
timeout 的取值:
• -1:poll 调用后进行阻塞等待,直到某个被监视的文件描述符上的某个事件就绪
• 0:poll 调用后进行非阻塞等待,无论被监视的文件描述符上的事件是否就绪,poll 检测后立马返回
• 特定的时间值:poll 调用后在指定的时间内进行阻塞等待,如果被监视的文件描述符上一直没有事件就绪,则在该时间后 poll 进行超时返回
poll 返回值说明:
• 如果函数调用成功,则返回有事件就绪的文件描述符的个数
• 如果 timeout 时间耗尽,则返回0
• 如果函数调用失败,则返回 -1,同时错误码被设置,可能会被设置为如下:
( • EFAULT:fds 数组不包含在调用程序的地址空间中
• EINTR:此调用被信号所中断
• EINVAL:nfds 值超过RLIMIT_NOFILE 值
• ENOMEM:核心内存不足 )
struct pollfd 结构:
• fd:特定的文件描述符,若设置为负值,则忽略events字段,并且revents字段返回0
• events:需要监视的文件描述符上的哪些事件
• revents:poll 函数返回时告知用户该文件描述符上的哪些事件已经就绪
events 和 revents 的取值:
事件 | 描述 | 是否可作为输入 | 是否可作为输出 |
POLLIN | 数据(包括普通数据和优先数据)可读 | 是 | 是 |
POLLRDNORM | 普通数据可读 | 是 | 是 |
POLLRDBAND | 优先级带数据可读(Linux不支持) | 是 | 是 |
POLLPRI | 高优先级数据可读,比如TCP带外数据 | 是 | 是 |
POLLOUT | 数据(包括普通数据和优先数据)可读 | 是 | 是 |
POLLWRNORM | 普通数据可写 | 是 | 是 |
POLLWRBAND | 优先级带数据可写 | 是 | 是 |
POLLRDHUP | TCP连接被对方关闭,或者对方关闭了写操作,它由GNU引入 | 是 | 是 |
POLLERR | 错误 | 否 | 是 |
POLLHUP | 挂起,比如管道的写端被关闭后,读端描述符上将收到POLLHUP事件 | 否 | 是 |
POLLNVAL | 文件描述符没有打开 | 否 | 是 |
这些取值实际上都是以宏的方式进行定义的,二进制序列当中只要一个比特位是1,且1的位置各不相同:
• 在调用 poll 函数之前,将要监视的事件设置进入到 events 成员中
• poll 函数返回后,可以通过运算符检测在 revents 成员中是否包含特定的事件,得知对应的描述符的特定事件是否就绪
poll 服务器
该服务器也只是读取客户端发来的数据然后进行打印即可,成员变量需要包含监听套接字和端口号这两个,服务器绑定时将IP地址设置为 INADDR_ANY(之前对Sock操作等进行了封装)。
• 在初始化 poll 服务器的时候,依次进行套接字的创建,绑定,监听等工作
• 析构函数中,可以选择调用 close 函数对监听套接字等进行关闭
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
#include <sys/time.h>
#include <sys/types.h>
#include <functional>
#include <poll.h>
#include "sock.hpp"
class pollServer
{
public:
pollServer(func_t f, int port)
: _func(f), _listensock(-1), _port(port)
{
}
~pollServer()
{
if (_listensock)
{
close(_listensock);
}
if (_rfds)
{
delete[] _rfds;
}
}
private:
int _port;
int _listensock;
};
初始化及运行服务器
• 在 poll 服务器运行之前,先初始化服务器,定义一个 fds 结构体数组,该数组中的每个位置都是一个struct pollfd 结构,先将每个位置初始化为无效,并将监听套接字添加到 fds 数组中,服务器开始运行时,只需要监视监听套接字的读事件即可。
• 运行服务器,不断调用 poll 函数监视读事件是否就绪,如果 poll 函数的返回值大于0,则说明 poll 函数调用成功,此时已经有文件描述符的读事件就绪了,接下来就是对就绪事件的处理逻辑;返回值等于0,说明 timeout 时间耗尽,超时了,继续进行下一次的 poll 调用即可,返回值为-1,说明 poll 调用失败,根据错误码进一步判断是否继续调用 poll 函数。
void pollInit(int pos)
{
_rfds[pos].fd = defaultnum;
_rfds[pos].events = 0;
_rfds[pos].revents = 0;
}
void InitServer()
{
_listensock = Sock::Socket();
Sock::Bind(_listensock, _port);
Sock::Listen(_listensock);
_rfds = new struct pollfd[num];
for (int i = 0; i < num; i++)
{
pollInit(i);
}
// 第一个位置添加为 _listensock套接字
_rfds[0].fd = _listensock;
_rfds[0].events = POLLIN;
}
void Start()
{
// 等 + 处理数据 select返回的是fd的个数
int timeout = -1; // 毫秒
for (;;)
{
int n = poll(_rfds, num, timeout);
switch (n)
{
case 0:
// 超时
logMessage(NORMAL, "time out ...");
break;
case -1:
logMessage(WARNING, "select errno code: %d, select errno message: %s", errno, strerror(errno));
break;
default:
// 走到这里说明有fd就绪了,需要进行处理,但是只有 listensock 就绪
logMessage(NORMAL, "have event ready!\n");
// 进行业务逻辑处理
Handerevent();
break;
}
}
}
事件处理
当 poll 检测到有文件描述符的读事件就绪时,就会在其对应的 struct pollfd 结构体中的 revents 成员中添加读事件并返回,后面对就绪事件进行处理:
• 遍历 rfds 数组中的每个struct pollfd 结构,如果该结果当中的 fd 有效,且 revents 当中包含读事件,说明该文件描述符的读事件就绪,对该文件描述符进一步的判断,是监听套接字还是与客户端建立连接的套接字
• 如果是监听套接字的读事件就绪,就调用 accept 函数将底层建立好的连接获取上来,并添加到 rfds 数组中,下一次调用 poll 函数时需要监视该套接字的读事件
• 如果是与客户端建立连接对应的读事件就绪,则调用 read 函数读取客户端发来的数据,并将读到的数据在服务端进行打印
• 如果在调用 read 函数时,发现客户端将连接关闭或 read 函数失败,则 poll 服务器也直接关闭对应的连接,并将连接对应的文 rfds 数组中移除,下一次调用 poll 函数时不需要再监视该套接字的读事件
void Print()
{
std::cout << "fd list: ";
for (int i = 0; i < num; i++)
{
if (_rfds[i].fd != defaultnum)
{
std::cout << _rfds[i].fd << " ";
}
}
std::cout << std::endl;
}
void Accpeter(int listensock)
{
// 获取新连接后,直接添加进入到 _fdarray 数组中
logMessage(NORMAL, "Accpeter begin ...\n");
uint16_t clientPort = 0;
std::string clientIP;
int sock = Sock::Accpet(listensock, &clientPort, &clientIP);
if (sock < 0)
{
return;
}
int i = 0;
for (; i < num; i++)
{
if (_rfds[i].fd != defaultnum)
{
continue;
}
else
{
break;
}
}
// 找到位置
if (i == num)
{
// 说明已经满了
logMessage(WARNING, "server is full,please wait ...\n");
close(sock);
}
else
{
_rfds[i].fd = sock;
_rfds[i].events = POLLIN; // 只考虑读事件
_rfds[i].revents = 0;
}
// 进行打印
Print();
logMessage(NORMAL, "Accpeter end ...\n");
}
void Recver(int pos)
{
// 通过sock这个fd进行接受数据
logMessage(NORMAL, "Recver begin ...\n");
char buffer[1024];
ssize_t n = recv(_rfds[pos].fd, buffer, sizeof(buffer) - 1, 0); // 这里读取不会阻塞,只有sock就绪了,才进来
if (n > 0)
{
buffer[n - 1] = 0;
logMessage(NORMAL, "client# %s", buffer);
}
else if (n < 0)
{
// 读取错误
close(_rfds[pos].fd);
pollInit(pos);
logMessage(WARNING, "recv error# %s", strerror(errno));
return;
}
else // == 0
{
// client 退出了
close(_rfds[pos].fd);
pollInit(pos);
logMessage(NORMAL, "client quit,me too ...");
return;
}
// 此时数据都在buffer当中,处理 request
std::string response = _func(buffer);
write(_rfds[pos].fd, response.c_str(), response.size());
logMessage(NORMAL, "Recver end ...\n");
}
// 处理逻辑
void Handerevent()
{
// 判断是listensock,还是普通sock的
for (int i = 0; i < num; i++)
{
if (_rfds[i].fd == defaultnum)
{
continue; // 后面需要进行置空,不能break
}
if (_rfds[i].fd == _listensock && _rfds[i].revents & POLLIN)
{
// 需要进行accpet
Accpeter(_rfds[i].fd);
}
else if (_rfds[i].revents & POLLIN) // 普通读事件
{
// 其他fd 而且就绪
Recver(i);
}
else
{
}
}
}
因为 rfds 数组的大小是固定设置的,在获取新连接并添加到数组中时,可能会因为数组已满而导致添加失败,只需要将 poll 服务器获取上来的连接套接字进行关闭即可。
poll 服务器
先实例化一个 pollServer 对象,再初始化服务器,和运行服务器:
#include <iostream>
#include <string>
#include <memory>
#include "error.hpp"
#include "pollServer.hpp"
void Usage(std::string arg)
{
std::cout << "\n Usage: \n\t" << arg << " port"
<< "\n\t" << std::endl;
}
std::string transmition(const std::string &request)
{
return request;
}
// ./main 8080
int main(int argc, char *argv[])
{
if (argc != 2)
{
Usage(argv[0]);
}
std::unique_ptr<Server::pollServer> us(new Server::pollServer(transmition, atoi(argv[1])));
us->InitServer();
us->Start();
return 0;
}
在调用 poll 服务器时,此时 timeout 被设置为 -1,运行服务器,如果没有客户端发送连接请求,服务器就会调用 poll 函数后进行阻塞等待:
适用 telnet 工具当客户端进行连接 poll 服务器请求,此时 poll 函数检测到监听套接字的读事件就绪后,就立即进行业务逻辑处理:
poll 的优点
• struct pollfd 结构当中包含了 events 和 revents,相当于select 函数的输入输出型参数进行分离,不再适用 select 参数 - 值 传递的方式,接口适用更简单
• poll 并没有最大数量限制(但是数量过大后性能也会下降)
poll 的缺点
poll 中监听的文件描述符数量太多时:
• 和 select 函数一样,poll 返回后,需要轮询 pollfd 来获取就绪的描述符
• 每次调用 poll 都需要把大量的 pollfd 结构从用户态拷贝到内核中
• 同时连接的大量客户端在一起时刻,可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降
I/O 多路转接 - epoll
epoll 也是系统提供的一个多路转接接口。
• epoll 在命名上比 poll 多了一个e,这个 e 可以理解为 extend,epoll 也是为了同时监视多个文件描述符上的事件是否就绪而对 poll 的改进。
epoll 的相关系统调用
epoll 有三个相关的系统调用,epoll_create、epoll_ctl、epoll_wait 。
epoll_create 函数
用于创建一个 epoll 模型:
• size:自从Linux 2.6.8 之后,size 参数是被忽略的,但是 size 的值必须设置为大于 0 的值
• 用完之后,必须调用 close 函数
返回值说明:
• epoll 模型创建成功返回其对应的文件描述符,否则返回 -1,同时错误码被设置。
epoll_ctl 函数
epoll_ctl 用于向指定的 epoll 模型中注册事件:
参数说明:
• epfd:指定的 epoll 模型
• op:表示具体的动作,用三个宏来表示
• fd:需要监视的文件描述符
• event:需要监视该文件描述符的哪些事件
返回值说明:
• 函数调用成功返回0,调用失败返回 -1,同时错误码被设置。
struct epoll_event 结构如下:
struct epoll_event 结构中有两个成员,第一个成员 events 表示的是需要监视的事件,第二个成员 data 是一个联合体,一般选择使用该结构中的 fd 成员,表示需要监听的文件描述符。
这些也都是宏的定义方式,二进制序列中有且只有一个比特位1,并且1的位置各不相同。
epoll_wait 函数
用于收集监视的事件中已经就绪的事件:
参数说明:
• epfd:指定的 epoll 模型
• events:内核会将已经就绪的事件拷贝到 events 数组中(events 不能是空指针,内核只负责将就绪事件拷贝到该数组中,不负责在用户态中分配内存空间)
• maxevents:events 数组中的元素个数,该值不能大于创建 epoll 模型时传入的 size 值
• timeout:表示 epoll_wait 函数的超时时间,单位是毫秒(ms)这里的 timeout 事件与 poll 一致。
返回值说明:
• 如果函数调用成功,则返回有事件就绪的文件描述符的个数
• 如果 timeout 时间耗尽,则返回0
• 如果函数调用失败,则返回 -1,同时错误码被设置,可能会被设置为如下:
epoll 工作原理
当某一个进程调用 epoll_create 函数时,Linux 会创建一个 eventpoll 的结构体,也就是 epoll 模型,eventpoll 结构体当中的成员 rbr 和 rdlist 与 epoll 的使用方式密切相关:
struct eventpoll{
...
//红黑树的根节点,这棵树中存储着所有添加到epoll中的需要监视的事件
struct rb_root rbr;
//就绪队列中则存放着将要通过epoll_wait返回给用户的满足条件的事件
struct list_head rdlist;
...
}
• 每一个epoll对象有一个独立的eventpoll结构体,用于存放通过 epoll_ctl 方法向 epoll 对象中添加进来的事件
• 这些事件会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间复杂度是 log N,N是树的高度)
• 而所添加到 epoll 中的事件都会与设备(网卡)驱动程序建立回调关系,响应的事件发生时会调用这个回调方法
• 这个回调方法在内核中叫 ep_poll_callback,会将发生的事件添加到 rdlist 双链表中(就绪队列)
• 在 epoll 中,对于每一个事件都会建立一个epitem结构体
struct epitem{
struct rb_node rbn; //红黑树节点
struct list_head rdllink; //双向链表节点
struct epoll_filefd ffd; //事件句柄信息
struct eventpoll *ep; //指向其所属的eventpoll对象
struct epoll_event event; //期待发生的事件类型
}
ffd 记录的是指定文件描述符,event 记录的是该文件描述符对应的事件:
• 对于当中的 rbn 成员来说,ffd 与 event 的含义是,需要监视 ffd 上的 event 事件是否就绪
• 对于 rdlink 成员来说,ffd 与 event 的含义是,ffd上的event事件已经就绪
• 对于 epoll 来说,操作系统不需要主动进行事件都检测,当红黑树中监视的事件就绪时,会自动调用对应的回调方法,将就绪的事件添加到就绪队列中
• 当调用 epoll_wait 函数获取就绪事件时,只需要关注底层就绪队列是否为空,如果不为空,将就绪队列当中的就绪事件拷贝给用户即可
epoll 服务器
这里的 epoll 服务器,也只是读取客户端发来的数据并进行打印。
类中成员除了包含监听的套接字和端口号之外,还需要包含 epoll 模型对应的文件描述符,
#include <iostream>
#include <unistd.h>
#include <sys/epoll.h>
#include <functional>
#include <string>
#include "sock.hpp"
#include "error.hpp"
namespace Server
{
static const int defaultPort = 8080;
static const int size = 128;
static const int defaultnum = 64;
static const int defaultvalue = -1;
using func_t = std::function<std::string(const std::string &)>;
class epollServer
{
public:
epollServer(func_t func, uint16_t port = defaultPort, int num = defaultnum)
: _port(port), _listensock(defaultvalue), _num(num), _epfd(defaultvalue), _func(func)
{
}
~epollServer()
{
if (_listensock != defaultvalue)
{
close(_listensock);
}
if (_erfds)
{
delete[] _erfds;
}
if (_epfd != defaultvalue)
{
close(_epfd);
}
}
private:
uint16_t _port;
int _listensock;
int _epfd;
int _num; // 就绪事件的空间大小
struct epoll_event *_erfds; // 就绪事件
func_t _func;
};
}
初始化及运行服务器:
• 在运行服务器之前,需要需要依次调用封装Sock类中的函数,创建套接字,绑定和设置监听状态,并创建 epoll 模型,再调用 epoll_ctl 将监听套接字添加到 epoll 模型当中,表示服务器开始运行时,只需要监视监听套接字的读事件
• epoll 服务器不断调用 epoll_wait 函数监视是否有读事件就绪,如果 epoll_wait 函数的返回值大于0,说明已经有文件描述符的读事件就绪,接下来就是对就绪事件的逻辑操作
• epoll_wait 函数的返回值等于0,说明 timeout 时间耗尽,准备继续下一次的 epoll_wait 调用,返回值等于 -1,需要根据错误码进一步判断是否继续调用 epoll_wait 函数
void InitServer()
{
// 1.创建套接字
_listensock = Sock::Socket();
Sock::Bind(_listensock, _port);
Sock::Listen(_listensock);
// 2.创建epoll模型
_epfd = epoll_create(size);
if (_epfd < 0)
{
logMessage(ERROR, "create epoll 模型 error");
exit(EPOLL_CREATE_ERR);
}
// 3.将listensock添加进入epoll中
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = _listensock;
epoll_ctl(_epfd, EPOLL_CTL_ADD, _listensock, &ev);
// 申请就绪空间的大小
_erfds = new struct epoll_event[_num];
logMessage(NORMAL, "Init epoll success!");
}
void Start()
{
int timeout = -1;
for (;;)
{
int n = epoll_wait(_epfd, _erfds, _num, timeout); // 就绪事件全部存储在 _erfds 中
switch (n)
{
case 0: // 超时
logMessage(NORMAL, "time out ...\n");
break;
case -1:
logMessage(FATAL, "EPOLL_WAIT EORROR\n");
break;
default:
logMessage(NORMAL, "have a event readys\n");
Handlerevents(n);
break;
}
}
}
注意:
• 只要底层有就绪事件没有处理,epoll 会一直通知用户,本质原因是实际没有对底层就绪的数据进行读取。
事件处理:
如果底层就绪队列中有就绪事件,epoll_wait 函数会将底层就绪队列中的事件拷贝到定义的 _erfds
数组中,epoll 服务器再进行对就绪事件的处理:
• 根据 epoll_wait 的返回值,来判断操作系统向 _erfds 数组中拷贝了多少个 struct epoll_event 结构,进而对这些文件描述法上的事件进行处理
• 对于该结构当中的 events 当中包含读事件,则说明该文件描述符对应的读事件就绪,接下来还需要进一步判断该文件描述符是监听套接字,还是与客户端建立连接的套接字
• 如果是监听套接字的读事件就绪,就调用 accept 函数将底层建立好的连接获取上来,并调用 epoll_ctl 函数将获取的套接字添加到 epoll 模型中,表示下一次调用 epoll_wait 函数时需要检视该套接字
• 如果是与客户端建立的连接对应的事件就绪,则调用 recv 函数读取客户端发来的数据,并将读取到的数据在服务端打印
• 如果在调用 recv 函数时发现客户端将连接关闭,或者 recv 函数调用失败,则 epoll 服务器也直接关闭对应的连接,则调用 epoll_ctl 函数将对应连接的文件描述符从 epoll 模型中移除,表示下一次调用 epoll_wait 函数时无需监视该套接字的读事件
void Handlerevents(int readyNum)
{
logMessage(DEBUG, "Handlerevents begin ...\n");
for (int i = 0; i < readyNum; i++)
{
uint32_t event = _erfds[i].events;
int sock = _erfds[i].data.fd;
if (sock == _listensock && (event & EPOLLIN))
{
uint16_t clientPort;
std::string clientIP;
// 监听套接字就绪
int fd = Sock::Accpet(sock, &clientPort, &clientIP);
if (fd < 0)
{
logMessage(NORMAL, "Accept fail ...\n");
continue;
}
// 将fd添加到 epoll 模型中
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = fd;
epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &ev);
}
else if (event & EPOLLIN)
{
// 普通事件就绪
char buffer[2048];
int n = recv(sock, buffer, sizeof(buffer) - 1, 0);
if (n > 0)
{
buffer[n] = 0;
logMessage(DEBUG, "client#: %s\n", buffer);
std::string response = _func(buffer);
send(sock, response.c_str(), response.size(), 0);
}
else if (n == 0)
{
// 客户端退出 先将sock从epoll中移除,再关闭sock
epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);
close(sock);
logMessage(NORMAL, "client quit,me too ...\n");
}
else
{
epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);
close(sock);
logMessage(WARNING, "recv error ...\n");
}
}
}
}
在调用 epoll_wait 函数时,将 timeout 值设置为了 -1,运行服务器后,如果没有客户端发来的连接请求,服务器就会在 epoll_wait 函数后进行阻塞等待:
使用 telnet 工具连接 epoll 服务器后,epoll 服务器调用 epoll_wait 函数在检测到监听套接字的读事件就绪后,就会调用 accept 函数获取建立好的连接:
使用 ls /proc/PID/fd 命令,查看当前 epoll 服务器文件描述的使用情况,文件描述符0、1、2 是默认打开的分别是 标准输入,标准输出,标准错误,3号文件描述符是监听套接字,4号文件描述符是服务器创建的 epoll 模型,5号文件描述是 telnet 工具建立连接的客户端:
epoll 的优点(与 select 的缺点对应)
• 接口使用起来更方便,虽然被拆分成了三个函数,但使用起来更方便高效,不需要每次循环设置关注的关键描述符,也做到了输入输出参数分离开
• 数据拷贝轻量:只在新增监视事件都时候,调用 epoll_ctl 时将数据从用户拷贝到内核,而 select 和 poll 每次都需要重新将需要监视的事件从用户拷贝到内核。此外,调用 epoll_wait 获取就绪事件时,只会拷贝就绪的事件,不会进行不必须的拷贝操作
• 事件回调机制:避免使用遍历,而是使用回调函数的方式,将就绪的文件描述符结构加入到就绪队列中,epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪,这个操作的时间复杂度是O(1),即使文件描述符数量很多,效率也不会受到影响
• 没有数量限制:监视的文件描述符数量无上限,只要内存允许,就可以一直向红黑数中新增节点
注意:
网上说epoll 使用了内存映射机制?
• 内存映射机制:内核直接将就绪队列通过 mmap 的方式映射到用户态,避免了拷贝内存这样的额外性能开销
• 这种说法是不准确的,实际操作系统并没有做任何的映射机制,操作系统是不相信任何人的,不会让用户进程直接访问到内核的数据,用户只能通过系统调用来获取内核的数据
• 用户要获取内核当中的数据,势必需要将内核的数据拷贝到用户空间
select、poll 和 epoll 的不同之处
• 在使用 select 和 poll 时,都需要借助第三方数组来维护之前的文件描述符以及需要监视的事件,这个第三方数据时用户自己维护的,对用户的增删改操作都是用户自己进行
• epoll 的使用,不需要用户自己维护第三方数据,底层的红黑树就充当了这个数组的功能,该红黑树的增删改操作都是内核来维护的,用户只需要调用 epoll_ctl 函数让内核对红黑树进行对应的操作即可
• 使用多路转接接口时,数据流都有两个方向,一个是告知内核,一个是告知用户,select 和 poll 将这两个事件都交给了同一个函数来完成,而 epoll 在接口层面上将这两件事情分离了,调用 epoll_ctl 完成用户告知内核,epoll_wait 完成内核告知用户
epoll 的工作方式
epoll 有两种工作模式,一种是水平触发工作模式,一种是边缘触发工作模式:
水平触发(Level Triggered,LT)
• 只要底层有事件就绪,epoll 就会一直通知用户,epoll 默认状态下就是 LT 的工作模式(select 和 poll 也是)
• 当 epoll 检测底层读事件就绪时,可以不立即处理,或者只处理一部分,因为只要底层数据没有处理完,下一次 epoll 还会通知用户事件就行
• 支持阻塞读写和非阻塞读写
边缘触发(Edge Triggered,ET)
• 只有底层就绪事件数量由无到有或由有到多发生变化时,从会通知用户
• epoll 检测到底层读事件就绪时,必须立即进行处理,而且必须全部处理完毕,有可能此后底层再也没有事件就绪,epoll 就再也不会通知用户进行事件处理,此时相当于没有处理完的数据丢失了
• ET 模式下 epoll 通知用户的次数比LT模式下少,因此ET的性能比LT性能高,Nginx 就是采用ET模式使用的 epoll
• 只支持非阻塞读写
如果将 epoll 服务器修改为 ET 工作模式,就需要在初始化服务器添加事件时设置 EPOLLET 选项:
void InitServer()
{
// 1.创建套接字
_listensock = Sock::Socket();
Sock::Bind(_listensock, _port);
Sock::Listen(_listensock);
// 2.创建epoll模型
_epfd = epoll_create(size);
if (_epfd < 0)
{
logMessage(ERROR, "create epoll 模型 error");
exit(EPOLL_CREATE_ERR);
}
// 3.将listensock添加进入epoll中
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET; //添加 EPOLLET 选项
ev.data.fd = _listensock;
epoll_ctl(_epfd, EPOLL_CTL_ADD, _listensock, &ev);
// 申请就绪空间的大小
_erfds = new struct epoll_event[_num];
logMessage(NORMAL, "Init epoll success!");
}
此时,注释掉事件处理逻辑函数,并运行服务器,此时因为服务器工作模式是 ET 的,所以只通知用户及时取走数据一次:
ET 工作模式下如何进行读写
在ET工作模式下,当写事件必须一次向发送缓冲区写满,否则可能再也没有机会进行读写,读事件前面以及说过了,必须一次性读取完。
• 当底层读事件就绪时,循环调用 recv 函数进行读取,直到某次调用 recv 读取时,实际读取到的字节数小于期望读取到字节数,说明本次底层数据已经读取完毕
• 有可能最后一次调用 recv 函数读取时,刚好读取的字节数和期望的字节数相等,此时底层数据也读取完毕,如果再调用 recv 函数读取,就会因为底层没有数据而被阻塞住
• 如果服务器是单进程的,recv 被阻塞住,此时该数据再也就绪,相当于服务器挂掉了,因此 ET 工作模式下循环调用 recv 函数进行读取时,必须将对应的文件描述设置为非阻塞
• 调用 send 函数写数据时,也是一样的原理,必须将对应的文件描述符设置为非阻塞(必须的必)