1. poll
poll也是一种多路转接的方案,解决了select的fd有上限和每次调用都要重新设置关心的fd的问题。
2. poll接口
#include
int poll(struct pollfd* fds, nfds_t nfds, int timeout);
参数:fds:可以看成是动态数组/数组/结构体数组
nfds:数组类长度
timeout:单位ms(毫秒) >0:在timeout内阻塞,否则非阻塞
=0:非阻塞等待
<0:阻塞等待
返回值:同select
poll的作用和select一模一样:只负责等待
struct pollfd{
int fd; //文件描述符 用户->内核:要帮我关心一下fd
short events; //等待的事件 用户->内核:关心fd上的事件
short revents; //实际发生的事件 内核->用户:要关心的fd上面的events中,有哪些事件就绪了
};
输入时看:fd+events
输出时看:fd+revents
poll将输入输出进行了分离,使得不需要对参数进行重新设定
poll中的events和revents的取值:
pollfd这个数组可以由程序员自己决定,解决了selectfd有上限的问题。
3. poll代码实现
poll的代码基于前一篇文章的的select修改得来的,只修改了selectserver.hpp:
pollserver.hpp:
#pragma once
#include <string>
#include <iostream>
#include <functional>
#include "sock.hpp"
#include "log.hpp"
#include "err.hpp"
namespace poll_ns
{
static const int defaultport = 8080;
static const int num = 2048;
static const int defaultfd = -1;
using func_t = std::function<std::string (const std::string&)>;
class PollServer
{
public:
PollServer(func_t func, int port = defaultport)
: _port(port), _listenSock(-1), _func(func), _rfds(nullptr)
{
}
void initServer()
{
_listenSock = Sock::Socket();
Sock::Bind(_listenSock, _port);
Sock::Listen(_listenSock);
// logMessage(NORMAL, "creat socket..");
_rfds = new struct pollfd[num];
for (int i = 0; i < num; i++) ResetItem(i);
_rfds[0].fd = _listenSock; // 不变了
_rfds[0].events = POLLIN;
}
void Print()
{
std::cout << "fd list: ";
for (int i = 0; i < num; i++)
{
if (_rfds[i].fd != defaultfd)
std::cout << _rfds[i].fd << " ";
}
std::cout << std::endl;
}
void ResetItem(int i)
{
_rfds[i].fd = defaultfd;
_rfds[i].events = 0;
_rfds[i].revents = 0;
}
void Accepter(int listenSock)
{
// 走到这里accept不会阻塞 listensock套接字已经就绪了
string clientIp;
uint16_t clientPort = 0;
int sock = Sock::Accept(listenSock, &clientIp, &clientPort);
if (sock < 0)
return;
logMessage(NORMAL, "accept success [%s:%d]", clientIp.c_str(), clientPort);
// sock 我们能直接recv/read吗?--不能 整个代码 只有select有资格检测事件是否就绪
// 将新的sock交给select
// 将新的sock托管给select的本质,将sock添加到_fdArray数组中
int i;
for (i = 0; i < num; i++)
{
if (_rfds[i].fd != defaultfd)
continue;
else
break;
}
if (i == num)
{
logMessage(WARNING, "server is full, please wait!");
close(sock);
}
else
{
_rfds[i] .fd= sock; // 将新创建的sock设置到rfds中
_rfds[i].events = POLLIN;
_rfds[i].revents = 0;
}
Print();
}
void Recver(int pos)
{
// 1.读取
// 这样读取有问题!不能保证是否读取到一个完整的报文
char buffer[1024];
ssize_t s = recv(_rfds[pos].fd, buffer, sizeof(buffer) - 1, 0); // 这里在进行recv'时,不会被阻塞,因为走到这里时文件描述符已经就绪了
if (s > 0)
{
buffer[s] = 0;
logMessage(NORMAL, "client# %s", buffer);
}
else if (s == 0)
{
close(_rfds[pos].fd);
ResetItem(pos);
logMessage(NORMAL, "client quit");
return;
}
else
{
close(_rfds[pos].fd);
ResetItem(pos);
logMessage(ERROR, "client quit: %s", strerror(errno));
return;
}
// 2.处理request
std::string response = _func(buffer);
// 3.返回response
// write
write(_rfds[pos].fd, response.c_str(), response.size());
}
// handler event 中 不仅仅是有一个fd就绪,可能有多个
// 我们的select只处理了read
void HandlerReadEvent()
{
for (int i = 0; i < num; i++)
{
// 过滤掉非法的fd
if (_rfds[i].fd == defaultfd)
continue;
if(!(_rfds[i].revents & POLLIN)) continue;
// 下面的为正常的fd
// 正常的fd不一定就绪
// 目前一定是listen套接字
if (_rfds[i].fd == _listenSock && (_rfds[i].revents & POLLIN))
Accepter(_listenSock);
else if(_rfds[i].revents & POLLIN)
Recver(i);
else
{
}
}
}
void start()
{
int timeout = -1;
for (;;)
{
int n = poll(_rfds, num, timeout);
switch (n)
{
case 0:
logMessage(NORMAL, "timeout...");
break;
case -1:
logMessage(WARNING, "poll error, code: %d, err string: %s", errno, strerror(errno));
break;
default:
// 说明有时间就绪了,目前只有一个监听事件就绪
logMessage(NORMAL, "get a new link...");
HandlerReadEvent();
break;
}
}
}
~PollServer()
{
if (_listenSock < 0)
close(_listenSock);
if (_rfds)
delete[] _rfds;
}
private:
int _port;
int _listenSock;
struct pollfd* _rfds;
func_t _func;
};
}
得到了与select相同的实验结果,并且结局了select的两个缺点:
poll的缺点主要是遍历的问题:
poll中监听的文件描述符数目增多时 和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符. 次调用poll都需要把大量的pollfd结构从用户态拷贝到内核中. 同时连接的大量客户端在一时刻可能只有很少的处于就绪状态, 因此随着监视的描述符数量的增长, 其效率也会线性下降.
4. epoll
它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法.
5. epoll接口
#include <sys/epoll.h>
int epoll_create(int size); -- 创建一个epoll模型,size>0
返回值:成功则返回一个文件描述符,失败返回-1
int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);参数:epfd:为epoll_create的返回值
op:增、改、删:fd对应的事件的类型 EPOLL_CTL_ADD、EPOLL_CTL_MOD、EPOLL_CTL_DEL
fd:用户告诉内核:你要帮我关心fd上的event事件
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;//保存触发事件的某个文件描述符相关的数据
struct epoll_event {
__uint32_t events; /* epoll event */
epoll_data_t data; /* User data variable */
};
其中events表示感兴趣的事件和被触发的事件,可能的取值为:
EPOLLIN:表示对应的文件描述符可以读;
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数可读;
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: ET的epoll工作模式;
int eoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
返回值:>0:有几个fd就绪 ==0:超时返回 <0:调用失败
参数:epfd:epoll_create的返回值
events:输出型参数,数组,内核告诉用户:哪些关心的fd事件就绪
maxevents:events的最大容量
timeout:>0: ms ==0:非阻塞等待 <0: 阻塞等待
6. epoll底层原理
红黑树的节点表示:用户告诉内核,哪些sock上的哪些events需要OS关心
还有一个就绪队列,双向链表,每个节点表示:内核告诉用户,哪些sock上的events已经就绪了
就绪的过程就相当于是从红黑树节点转移到就绪队列节点的过程,两种数据结构使用的是同一个节点,只需更该指针即可
每一个节点对应的是一个fd,底层有对应的struct file对象,其中有一个void* private_data字段,指向一个回调函数,这个回调函数的作用就是转移节点的,从而不需要遍历树来确定某个fd对应的事件是否就绪。
epoll模型:
epoll_create:创建epoll模型
epoll_ctl:向红黑树中增、删、改
epoll_wait:找到对应的epoll莫i选哪个,拿到就绪队列中的就绪事件(就绪队列中的事件已经就绪了);不需要遍历检测(检测事件就绪),只需要遍历拷贝
epoll_wait将所有就绪的事件按照顺序放入用户传入的数组中,有几个就绪事件则返回几。就绪队列中数据很多时,一次拿不完也没事(队列先进先出)。epoll不需要自己维护辅助数组,换为了OS维护的红黑树。
7. epoll demo
这里只写epollserver,其余的代码模块与select和poll中的相同,代码如下:
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <sys/epoll.h>
#include <functional>
#include "err.h"
#include "log.hpp"
#include "sock.hpp"
namespace epoll_ns
{
class EpollServer
{
static const int defaultPort = 8081;
static const int size = 128;
static const int defaultValue = -1;
static const int defaultnum = 64;
using func_t = std::function<std::string (const string&)>;
public:
EpollServer(func_t func, uint16_t port = defaultPort, int num = defaultnum)
:_func(func)
,_num(num)
,_port(port)
{
}
void initServer()
{
// 1.创建socket
_listenSock = Sock::Socket();
Sock::Bind(_listenSock, _port);
Sock::Listen(_listenSock);
// 2.创建epoll模型
_epfd = epoll_create(size);
if(_epfd < 0)
{
logMessage(FATAL, "epoll create error: %s", strerror(errno));
exit(EPOLL_CREATE_ERR);
}
// 3.添加listensock到epoll中
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = _listenSock; // 当事件就绪 被重新捞取上来的时候,我们要知道是哪一个fd就绪了
epoll_ctl(_epfd, EPOLL_CTL_ADD, _listenSock, &ev);
// 4.申请就绪时间的空间
_revs = new struct epoll_event[_num];
logMessage(NORMAL, "init server success!");
}
void Print(int readyNum)
{
std::cout << "fd list: ";
for(int i = 0; i < readyNum; i++)
{
std::cout << _revs[i].data.fd << " ";
}
std::cout << std::endl;
}
void HandlerEvent(int readyNum)
{
logMessage(DEBUG, "Handler in");
for(int i = 0; i < readyNum; i++)
{
uint32_t events = _revs[i].events;
int sock = _revs[i].data.fd;
Print(readyNum);
if(sock == _listenSock && (events & EPOLLIN))
{
// _listensock的读事件就绪,获取新连接
std::string clientip;
uint16_t clientport;
int fd = Sock::Accept(sock, &clientip, &clientport);
if(fd < 0)
{
logMessage(WARNING, "accept error");
continue;
}
// 获取新的fd成功,可以直接读取吗?不能直接读取,可能会阻塞,将其放入epoll中
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = fd;
epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &ev);
}
else if(events & EPOLLIN)
{
// 普通的读事件就绪
// 读取是有问题的 没有协议定制
logMessage(DEBUG, "recv start");
char buffer[1024];
// 把本轮读完也不一定读到一个完整的请求
int n = recv(sock, buffer, sizeof(buffer)-1, 0);
if(n > 0)
{
buffer[n] = 0;
logMessage(DEBUG, "client# %s", buffer);
std::string response = _func(buffer);
send(sock, response.c_str(), response.size(), 0);
}
else if(n == 0)
{
// 建议先从epoll溢出,才close 文件描述符
epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);
close(sock);
logMessage(NORMAL, "client quit");
}
else
{
epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);
close(sock);
logMessage(ERROR, "recv error, code: %d, errstring: %s", errno, strerror(errno));
}
}
else if(events & EPOLLOUT)
{}
}
logMessage(DEBUG, "Handler out");
}
void start()
{
int timeout = -1;
for(;;)
{
int n = epoll_wait(_epfd, _revs, _num, timeout);
switch(n)
{
case 0:
logMessage(NORMAL, "timeout...");
break;
case -1:
logMessage(WARNING, "epoll_wait failed, code: %d, errstring: %s", errno, strerror(errno));
break;
default:
logMessage(NORMAL, "have event ready");
HandlerEvent(n); // 有n个事件就绪
break;
}
}
}
~EpollServer()
{
if(_listenSock != defaultValue) close(_listenSock);
if(_epfd != defaultValue) close(_epfd);
if(_revs) delete[] _revs;
}
private:
uint16_t _port;
int _listenSock;
int _epfd;
struct epoll_event *_revs;
int _num;
func_t _func;
};
}
得到的实验结果也是与预期相符的:
8. 再谈epoll原理 (LT/ET)
什么叫做事件就绪:底层的IO条件满足了,可以进行某种IO行为了。
select/poll/epoll 为等待 :IO就绪事件的通知机制
通知机制的策略,epoll的不同工作模式:
- Level Triggered(LT):水平触发,只要底层有数据epoll就会一直通知用户要读取数据
- Edge Triggered(ET):边缘触发,底层如果数据没有被读完,epoll不在通知用户,除非底 层数据变化的时候(数据增多),才会再一次通知一次
select/poll/epoll默认工作模式为LT
EP模式只有数据从无到有/从有到多的时候,才会通知上层且只通知一次。使得程序员将本轮就绪的数据全部读取到上层,那么是如何吧本次就绪的底层数据全部读完的呢?-- 循环读取,直到读不到数据;一般的fd是阻塞式的,但是ET模式下对应的fd必须是非阻塞式的,因为若是我们读到最后一次底层没有数据时,阻塞式的fd会导致程序在读取时阻塞。
在epoll采用ET(Edge Triggered)工作模式时,如果一次没有读完底层来的数据,epoll_wait将不会再次触发事件,直到数据完全被读取或者关闭连接。这意味着,如果数据处理不完全,代码可能会阻塞在epoll_wait上,等待更多数据。
LT的fd可以是阻塞/非阻塞的。LT模式下也可以模仿ET的工作方式。并不能说LT/ET谁的工作模式更高效,要结合特定的场景来看。
一般情况下,ET的高效不只体现在通知机制上,还有会尽快让上层把数据取走。-- 当上层把数据取走后TCP可以更新出一个更大的滑动窗口,提高底层的发送效率,更阿红的利用诸如TCP应答机制等策略。 -- TCP中PSH的作用就是让底层数据就绪,再让上层知道。