目录
前言
五种IO模型
非阻塞IO
IO多路转接 --- select
一个简单的select服务器
HandlerEvent
socket就绪条件
select的特点
select缺点
IO多路转接 --- poll
poll的优点
poll的缺点
IO多路转接 --- epoll
epoll工作原理
epoll的优点
基于epoll封装的服务器
epoll工作方式
LT工作模式
ET工作模式
在C++中,多路复用(Multiplexing)是一种技术,用于同时监视多个输入/输出流,并在其中任何一个流准备好进行读取或写入操作时进行响应。
前言
以读写为例,调用read,但是底层没有数据,read就是进入阻塞状态。在调用write的时候,其实本质就是把数据从用户层写给OS---本质就是拷贝函数。
什么叫做高校的IO呢? 单位时间内,IO过程中,等的比重越小,IO效率越高。几乎所有的提高IO效率的策略,本质就是这个。
线程同步和同步IO没有关系
五种IO模型
- 阻塞IO
在内核将数据准备好之前, 系统调用会一直等待. 所有的套接字, 默认都是阻塞方式
- 非阻塞IO
如果内核还未将数据准备好, 系统调用仍然会直接返回, 并且返回EWOULDBLOCK错误码
非阻塞IO往往需要程序员循环的方式反复尝试读写文件描述符, 这个过程称为轮询. 这对CPU来说是较大的浪费, 一般只有特定场景下才使用
- 信号驱动
内核将数据准备好的时候, 使用SIGIO信号通知应用程序进行IO操作
- IO多路转接
虽然从流程图上看起来和阻塞IO类似. 实际上最核心在于IO多路转接能够同时等待多个文件
描述符的就绪状态.
- 异步IO
由内核在数据拷贝完成时, 通知应用程序(而信号驱动是告诉应用程序何时可以开始拷贝数据).
任何IO过程中, 都包含两个步骤. 第一是等待, 第二是拷贝. 而且在实际的应用场景中, 等待消耗的时间往往都远远高于拷贝的时间. 让IO更高效, 最核心的办法就是让等待的时间尽量少
一个钓鱼的例子:
- 张三是一个刚学会钓鱼的新手,做完准备工作后,张三开始钓鱼了,钓鱼的过程中,张三手死死握着鱼竿,眼盯着湖面,只要鱼鳔不动,张三不动,就算有人喊张三,张三也不搭理。如果有鱼咬钩了,张三才会收杆。
张三等待鱼上钩的过程,本质就是在等待,检测,有人喊张三,张三也不搭理,我们把这种钓鱼方式称为阻塞式 - 李四是一个钓鱼三四年的中手,做完准备工作后,开始钓鱼了,李四钓鱼跟张三钓鱼不同,李四不会死死的等待鱼竿动,而是把鱼竿放一边,开始干其他的事情,当有鱼上钩的时候,李四才会收杆。
这个过程,李四检测鱼竿有没有动,没动,就做其他事情,并不会因为鱼竿没动,就死死盯着鱼竿,在这期间还能干其他事情。我们把这种钓鱼方式称为非阻塞轮询 - 王五是一个钓鱼七八年的老手,做完准备工作后,开始钓鱼,王五的鱼竿顶部有一个铃铛,他把鱼竿插在地上,然后就不管了,躺下来就开始玩手机,有铃铛响了开始收杆。
铃铛响了,王五才会动,我们把这种钓鱼方式称为信号驱动式IO - 赵六是一个有钱人,他来的时候把带了很多鱼竿,赵六把鱼竿都插在岸上,开始钓鱼,赵六开始查询,看哪个鱼竿动了,就收杆。以周期性的方式去检测。我们把这种钓鱼方式称为多路复用/多路转接。
这四个人,哪个人的钓鱼效率最高?毫无疑问,肯定是赵六的钓鱼效率最高。因为这么多鱼竿都是并行的。
- 田七比赵六还有钱,他去钓鱼的时候,带了一个助理,田七在钓鱼的时候突然要回公司开会,就让助理去钓鱼了。田七就走了,助理在钓鱼,田七没有参与钓鱼,更类似于钓鱼行为的发起者,他要的是鱼,并不是结果。
这种钓鱼行为是异步IO。小王是操作系统,当有数据的时候,操作系统会去提醒田七。
阻塞式IO和非阻塞式IO有什么区别?
这两种IO方式本质上是没有什么区别的,IO = 等 + 拷贝,他两个都在等,只是等待的方式不同。
同步IO的本质就是有没有参与IO,
异步IO的本质就是不参与IO,只是发起IO,最后拿结果就行了。
那么这五种IO方式哪一种效率方式这么高?多路复用和非阻塞。
非阻塞IO
fcntl
一个文件描述符,默认都是阻塞IO。
函数原型如下
#include <unistd.h>
#include <fcntl.h>
int fcntl(int fd, int cmd, ... /* arg */ );
传入的cmd的值不同,后面追加的参数也不同。
fcntl函数有五种功能
复制一个现有的描述符(cmd=F_DUPFD).
获得/设置文件描述符标记(cmd=F_GETFD或F_SETFD).
获得/设置文件状态标记(cmd=F_GETFL或F_SETFL).
获得/设置异步I/O所有权(cmd=F_GETOWN或F_SETOWN).
获得/设置记录锁(cmd=F_GETLK,F_SETLK或F_SETLKW)
函数返回值取决于cmd
成功:
若cmd为F_DUPFD,返回一个新的文件描述符
若cmd为F_GETFL,返回文件描述符的flags值
若cmd为F_SETFL,返回0
失败返回-1,并设置errno值
fcntl函数常用操作
1. 赋值一个新的文件描述符
int newfd = fcntl(fd, F_DUPFD, 0);
2. 获取文件的属性标志
int flag = fcntl(fd, F_GETFL, 0);
3. 设置文件状态标志
flag = flag | OAPPEND;
fcntl(fd, F_SETFL, flag);
4. 常用的属性标志
O_APPEND --- 设置文件打开为末尾添加
O_NONBLOCK --- 设置打开的文件描述符为非阻塞
用第三中功能,获取/设置文件状态标记,就可以将一个文件描述符设置为非阻塞
#include <iostream>
#include <unistd.h>
using std::cout;
using std::cin;
using std::endl;
int main()
{
char buffer[1024];
while (true)
{
ssize_t n = read(0, buffer, sizeof(buffer) - 1);
if (n > 0)
{
buffer[n] = 0;
cout << buffer << endl;
}
else if (n == 0)
{
cout << "n == 0" << endl;
break;
}
else
{
cout << "n < 0" << endl;
break;
}
}
return 0;
}
当程序执行的时候,系统无法从键盘上得到资源,就会进入阻塞状态。
#include <iostream>
#include <unistd.h>
#include <fcntl.h>
using std::cout;
using std::cin;
using std::endl;
void setFileBlock(int fd)
{
int f = fcntl(fd, F_GETFL);
if (f < 0)
{
cout << "fcntl error" << endl;
return ;
}
fcntl(fd, F_SETFL, f | FNONBLOCK);
}
int main()
{
setFileBlock(0);
char buffer[1024];
while (true)
{
ssize_t n = read(0, buffer, sizeof(buffer) - 1);
if (n > 0)
{
buffer[n] = 0;
cout << buffer << endl;
}
else if (n == 0)
{
cout << "n == 0" << endl;
break;
}
else
{
cout << "n < 0" << endl;
break;
}
}
return 0;
}
设置为非阻塞之后,在运行程序,程序会直接退出。把文件状态设置为非阻塞,执行到read之后,因为没有接收到键盘资源,而代码会继续往下执行,这就导致了n接收到read的值为-1.
#include <iostream>
#include <unistd.h>
#include <fcntl.h>
using std::cout;
using std::cin;
using std::endl;
void setFileBlock(int fd)
{
int f = fcntl(fd, F_GETFL);
if (f < 0)
{
cout << "fcntl error" << endl;
return ;
}
fcntl(fd, F_SETFL, f | FNONBLOCK);
}
int main()
{
setFileBlock(0);
char buffer[1024];
while (true)
{
ssize_t n = read(0, buffer, sizeof(buffer) - 1);
if (n > 0)
{
buffer[n] = 0;
cout << buffer << endl;
}
else if (n == 0)
{
cout << "n == 0" << endl;
break;
}
else
{
// #define EWOULDBLOCK EAGAIN /* Operation would block */
if (errno == EWOULDBLOCK)
{
cout << "没有键盘资源" << endl;
sleep(1);
}
else
{
cout << "n < 0" << endl;
break;
}
}
}
return 0;
}
将文件状态设置为非阻塞之后,如果底层fd数据没有就绪,read等IO函数返回值会以出错的形式返回。但这并不是真的出错,只是因为底层没有就绪。如果区分这个状态?同故宫errno区分,当read出错的时候,会把errno置为某一个值。通过errno == EWOULDBLOCK来进行区分即可。
IO多路转接 --- select
任何IO过程中, 都包含两个步骤. 第一是等待, 第二是拷贝. 而且在实际的应用场景中, 等待消耗的时间往往都远远高于拷贝的时间. 让IO更高效, 最核心的办法就是让等待的时间尽量少.
系统提供select函数来实现多路复用输入/输出模型。
select系统调用是用来让我们的程序监视多个文件描述符的状态变化的。程序会停在select这里等待,直到被监视的文件描述符有一个或多个发生了状态改变。
函数原型
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
fd_set:文件描述符集合 --- 本质是一个位图
参数:
参数nfds:需要监视的最大文件描述符值 + 1;
readfds:读集合,是一个传入传出参数。
传入:指的是告诉内核哪些文件描述符需要监控
传出:指的是内核告诉应用程序哪些文件描述符发生了变化
writefds:写文件描述符集合(传入传出参数);
exceptfds:输入输出参数,一般表示异常事件
timeout:超时时间:
NULL:表示永久阻塞,直到有事件发生
0:表示不阻塞,不管有没有事件发生,都会立刻返回。
>0:表示阻塞的时长,若没有超过时长,则一直阻塞,若超过时长,则立刻返回。
返回值:成功返回发生变化的文件描述符个数。
当有错误发生时,则返回-1,错误原因存于errno,此时参数readfds,writefds,exceptfds和
timeout的值变为不可预测。
错误值可能为:
EBADF 文件描述词为无效的或该文件已关闭
EINTR 此调用被信号所中断
EINVAL 参数n 为负值。
ENOMEM 核心内存不足
关于fd_set结构
相关的fd_set接口
void FD_CLR(int fd, fd_set *set); // 用来清除描述词组set中相关fd 的位
int FD_ISSET(int fd, fd_set *set); // 用来测试描述词组set中相关fd 的位是否为真
void FD_SET(int fd, fd_set *set); // 用来设置描述词组set中相关fd的位
void FD_ZERO(fd_set *set); // 用来清除描述词组set的全部位
关于timeval结构
timeval结构用于描述一段时间长度,如果在这个时间内,需要监视的描述符没有事件发生则函数返回,返回值为0。
一个简单的select服务器
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include "log1.hpp"
Log lg;
const int backlog = 10;
enum
{
SocketErr = 2,
BindErr,
ListenErr,
};
class Sock
{
public:
Sock()
{}
~Sock()
{}
int Socket()
{
_listensocket = socket(AF_INET, SOCK_STREAM, 0);
if (_listensocket < 0)
{
lg(Fatal, "socker error, %s: %d", strerror(errno), errno);
exit(SocketErr);
}
}
void Bind(int listensock, uint16_t port)
{
struct sockaddr_in serv;
bzero(&serv, sizeof(serv));
serv.sin_family = AF_INET;
serv.sin_port = htons(port);
serv.sin_addr.s_addr = INADDR_ANY;
if (bind(listensock, (const sockaddr*)&serv, sizeof(serv)) < 0)
{
lg(Fatal, "bind error, %s: %d", strerror(errno), errno);
exit(BindErr);
}
}
void Listen(int listensock)
{
if (listen(listensock, backlog) < 0)
{
lg(Fatal, "listen error, %s: %d", strerror(errno), errno);
exit(ListenErr);
}
}
int Accept(int listensock, std::string& ip, uint16_t& port)
{
struct sockaddr_in serv;
bzero(&serv, sizeof(serv));
socklen_t len = sizeof(serv);
int sockfd = accept(listensock, (struct sockaddr*)&serv, &len);
if (sockfd < 0)
{
lg(Warning, "accept error, %s: %d", strerror(errno), errno);
return -1;
}
char ipstr[64];
inet_ntop(AF_INET, &serv.sin_addr.s_addr, ipstr, sizeof(ipstr));
ip = ipstr;
port = ntohs(serv.sin_port);
return sockfd;
}
bool Connect(int listensock, const std::string &ip, const uint16_t& port)
{
struct sockaddr_in serv;
bzero(&serv, sizeof(serv));
serv.sin_family = AF_INET;
serv.sin_port = htons(port);
inet_pton(AF_INET, ip.c_str(), &serv.sin_addr.s_addr);
int n = connect(listensock, (const struct sockaddr*)&serv, sizeof(serv));
if (n < 0)
{
std::cerr << "connect to " << ip << ":" << port << " error" << std::endl;
return false;
}
return true;
}
void Close(int listensock)
{
close(listensock);
}
int Fd()
{
return _listensocket;
}
private:
int _listensocket;
};
#include <iostream>
#include <sys/select.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/time.h>
#include "Socket.hpp"
extern Log lg;
const uint16_t port = 9999;
class HTTPSelect {
public:
HTTPSelect():_port(port)
{}
bool Init()
{
Sock s;
_listensocket = s.Socket();
s.Bind(_listensocket, _port);
s.Listen(_listensocket);
int opt = 1;
return true;
}
void Start()
{
for (;;)
{
fd_set readf;
FD_ZERO(&readf);
FD_SET(_listensocket, &readf);
struct timeval timeout = {5, 0};
int n = select(_listensocket + 1, &readf, nullptr, nullptr, &timeout);
if (n < 0)
{
lg(Debug, "select error %d : %s", errno, strerror(errno));
}
else if (n == 0)
{
std::cout << "n < 0" << std::endl;
}
else
{
lg(Info, "get a link");
}
}
}
~HTTPSelect()
{
close(_listensocket);
}
private:
int _listensocket;
uint16_t _port;
std::string _ip;
};
#include "HTTPSelect.hpp"
#include <memory>
int main()
{
std::unique_ptr<HTTPSelect> s(new HTTPSelect());
s->Init();
s->Start();
return 0;
}
当程序执行的时候,如果不连接,select会一直去进行监视。
当连接服务器的时候。
会不停的打印得到一个链接。这是因为select在监视到有新链接来的时候,如果你没有对这个通知做处理,会不停的通知你来了一个新链接。接下来对HTTPSelect.hpp进行修改,把新链接进行处理。
HandlerEvent
select就绪之后,可以直接使用read进行读取数据吗?不可以,read是一个阻塞函数,当链接进来进行通信的时候,调用到read一直没有收到数据,就会阻塞在这里,这个程序只是一个单进程的。一旦被阻塞,HandlerEvent就不会返回。IO事件有没有就绪,我们是不知道的,但是select知道。这里可以想办法把sock文件描述符设置到select里面。
这里可以搞一个辅助数组,select中的fd_set是一张位图,位图是有大小的,在我的linux中算出来的大小是1024。将辅助数组的大小就是fd_set的大小,把文件描述符放入到这个数组当中,当有新链接到来,就加入到数组,当有链接关闭,就将数组中对应的值设为-1.如何分别accept和socket的文件描述符?
if (fd == _listensocket)
{}
else
{}
可以通过这样的形式去判断。
见完整代码。
#include <iostream>
#include <sys/select.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/time.h>
#include "Socket.hpp"
extern Log lg;
const uint16_t port = 9000;
const int fd_num_max = sizeof(fd_set) * 8;
const int defaultfd = -1;
class HTTPSelect
{
public:
HTTPSelect() : _port(port)
{
for (int i = 0; i < fd_num_max; i++)
{
fd_array[i] = defaultfd;
}
}
bool Init()
{
_listensocket = s.Socket();
s.Bind(_listensocket, _port);
s.Listen(_listensocket);
int opt = 1;
setsockopt(_listensocket, SOL_SOCKET, SO_REUSEADDR, (const void*)&opt, sizeof(opt));
return true;
}
void HandlerEvent(fd_set *readf)
{
for (int i = 0; i < fd_num_max; i++)
{
int fd = fd_array[i];
if (fd == defaultfd)
continue;
if (FD_ISSET(fd, readf))
{
if (fd == _listensocket)
{
std::string clientip;
uint16_t clientport;
int sockfd = s.Accept(_listensocket, clientip, clientport);
lg(Info, "accept success, %s:%d , sock fd : %d", clientip.c_str(), clientport, sockfd);
int pos = 0;
for (; pos < fd_num_max; pos++)
{
if (fd_array[pos] == defaultfd)
break;
}
if (pos == fd_num_max)
{
lg(Info, "server is full, close %d now", sockfd);
close(sockfd);
}
else
{
fd_array[pos] = sockfd;
PrintFd();
}
}
else
{
char buffer[1024];
ssize_t n = read(fd, buffer, sizeof(buffer) - 1);
if (n == 0)
{
lg(Info, "client quit, me too, close fd is : %d", fd_array[i]);
close(fd_array[i]);
fd_array[i] = defaultfd;
}
else if (n < 0)
{
lg(Warning, "recv error : fd is : %d ", fd_array[i]);
close(fd_array[i]);
fd_array[i] = defaultfd;
}
else
{
buffer[n] = 0;
std::cout << "get a msg : " << buffer << std::endl;
}
}
}
}
}
void PrintFd()
{
for (int i = 0; i < fd_num_max; i++)
{
if (fd_array[i] == defaultfd)
continue;
std::cout << fd_array[i] << " ";
}
std::cout << std::endl;
}
void Start()
{
fd_array[0] = _listensocket;
for (;;)
{
fd_set readf;
FD_ZERO(&readf);
int maxfd = fd_array[0];
for (int i = 0; i < fd_num_max; i++)
{
if (fd_array[i] == defaultfd)
continue;
FD_SET(fd_array[i], &readf);
if (maxfd < fd_array[i])
maxfd = fd_array[i];
}
PrintFd();
struct timeval timeout = {5, 0};
int n = select(maxfd + 1, &readf, nullptr, nullptr, &timeout);
if (n < 0)
{
lg(Debug, "select error %d : %s", errno, strerror(errno));
}
else if (n == 0)
{
std::cout << "n < 0" << std::endl;
}
else
{
lg(Info, "get a link");
HandlerEvent(&readf);
}
}
}
~HTTPSelect()
{
close(_listensocket);
}
private:
Sock s;
int _listensocket;
uint16_t _port;
int fd_array[fd_num_max];
};
socket就绪条件
读就绪
socket内核中, 接收缓冲区中的字节数, 大于等于低水位标记SO_RCVLOWAT. 此时可以无阻塞的读该文件描述符, 并且返回值大于0;
socket TCP通信中, 对端关闭连接, 此时对该socket读, 则返回0;
监听的socket上有新的连接请求;
socket上有未处理的错误;
写就绪
socket内核中, 发送缓冲区中的可用字节数(发送缓冲区的空闲位置大小), 大于等于低水位标记
SO_SNDLOWAT, 此时可以无阻塞的写, 并且返回值大于0;
socket的写操作被关闭(close或者shutdown). 对一个写操作被关闭的socket进行写操作, 会触发SIGPIPE信号;
socket使用非阻塞connect连接成功或失败之后;
socket上有未读取的错误;
select的特点
可监控的文件描述符个数取决与sizeof(fd_set)的值. 我这边服务器上sizeof(fd_set)=512,每bit表示一个文件描述符,则我服务器上支持的最大文件描述符是512*8=4096.
将fd加入select监控集的同时,还要再使用一个数据结构array保存放到select监控集中的fd,
一是用于再select 返回后,array作为源数据和fd_set进行FD_ISSET判断。
二是select返回后会把以前加入的但并无事件发生的fd清空,则每次开始select前都要重新从array取得fd逐一加入(FD_ZERO最先),扫描array的同时取得fd最大值maxfd,用于select的第一个参数 。
select缺点
每次调用select, 都需要手动设置fd集合, 从接口使用角度来说也非常不便.
每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大
同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大
select支持的文件描述符数量太小
IO多路转接 --- poll
poll函数接口
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
// pollfd结构
struct pollfd {
int fd; /* file descriptor */
short events; /* requested events */
short revents; /* returned events */
};
跟select类似,监控多路IO,但poll不能跨平台。
参数:
fds:传入传出参数,实际上是一个结构体数组,包含了三部分内容:文件描述符,监听事件集合
返回的事件集合。
fds.fd:要监控的文件描述符
fds.events:
POLLIN->读事件
POLLOUT->写事件
fds.revent:返回的事件
nfds:数据实际有效内容的个数
timeout:超时时间,单位是毫秒。
timeout:
=0:不阻塞,立刻返回
-1:表示一直阻塞,直到有事件发生
>0:表示阻塞时长,在时长范围内若有事件发生会立刻返回
如果超过了时长也会立刻返回
函数返回值:
>0:发生变化的文件描述符个数
=0:没有文件描述符发生变化
-1:表示异常
events和revents的取值。
poll的优点
pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式. 接口使用比select更方便.
poll并没有最大数量限制 (但是数量过大后性能也是会下降)
poll的缺点
poll中监听的文件描述符数目增多时
和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符.
每次调用poll都需要把大量的pollfd结构从用户态拷贝到内核中.
同时连接的大量客户端在一时刻可能只有很少的处于就绪状态, 因此随着监视的描述符数量的增长, 其效率也会线性下降.
IO多路转接 --- epoll
epoll是为了处理大批量句柄而做了改进的poll
epoll有3个相关的系统调用
#include <sys/epoll.h>
int epoll_create(int size);
创建一个epoll的句柄
自从linux2.6.8之后,size参数是被忽略的,用完之后必须调用close()关闭
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epoll的事件注册函数
他不同于select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型.
第一个参数epoll_create()的返回值(epoll的句柄)
第二个参数表示动作,用三个宏来表示
EPOLL_CTL_ADD :注册新的fd到epfd中;
EPOLL_CTL_MOD :修改已经注册的fd的监听事件;
EPOLL_CTL_DEL :从epfd中删除一个fd;
第三个参数是要监听的fd
第四个参数是告诉内核需要监听什么事
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
收集epoll监控的事件中已经发送的事件
参数events是分配好的epoll_event结构体数组.
epoll将会把发生的事件赋值到events数组中(events不可以是空指针,内核只负责把数据复制到这个
events数组中,不会去帮助我们在用户态中分配内存).
maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create时的size
参数timeout时超时事件(毫秒,0会立刻返回,-1是永久阻塞).
如果函数调用成功,返回对应IO上已准备好的文件描述符数目,如果返回0表示已经超时,
返回小于0表示函数失败
struct epoll_event结构
events可以是以下几个宏的集合
EPOLLIN : 表示对应的文件描述符可以读 (包括对端SOCKET正常关闭);
EPOLLOUT : 表示对应的文件描述符可以写;
EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来);
EPOLLERR : 表示对应的文件描述符发生错误;
EPOLLHUP : 表示对应的文件描述符被挂断;
EPOLLET : 将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的.
EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要
再次把这个socket加入到EPOLL队列里
epoll工作原理
当某一进程调用epoll_create方法时,Linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用方式密切相关.
struct eventpoll{
....
/*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
struct rb_root rbr;
/*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
struct list_head rdlist;
....
};
每一个epoll对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件.
这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是lgn,其中n为树的高度).
而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当响应的事件发生时会调用这个回调方法.
这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中.
在epoll中,对于每一个事件,都会建立一个结构体
struct epitem {
struct rb_node rbn;//红黑树节点
struct list_head rdllink;//双向链表节点
struct epoll_filefd ffd; //事件句柄信息
struct eventpoll *ep; //指向其所属的eventpoll对象
struct epoll_event event; //期待发生的事件类型
}
当调用epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可.
如果rdlist不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户. 这个操作的时间复杂度是O(1).
epoll的使用过程就是三部曲:
调用epoll_create创建一个epoll句柄;
调用epoll_ctl, 将要监控的文件描述符进行注册;
调用epoll_wait, 等待文件描述符就绪;
epoll的优点
接口使用方便: 虽然拆分成了三个函数, 但是反而使用起来更方便高效. 不需要每次循环都设置关注的文件描述符, 也做到了输入输出参数分离开
数据拷贝轻量: 只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝到内核中, 这个操作并不频繁(而select/poll都是每次循环都要进行拷贝)
事件回调机制: 避免使用遍历, 而是使用回调函数的方式, 将就绪的文件描述符结构加入到就绪队列中,
epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪. 这个操作时间复杂度O(1). 即使文件描述符数目很多, 效率也不会受到影响.
没有数量限制: 文件描述符数目无上限
基于epoll封装的服务器
// HttpEpoll.hpp
#pragma once
#include <memory>
#include <iostream>
#include <unistd.h>
#include "Socket.hpp"
#include "Epoll.hpp"
extern Log lg;
#define EPOLL_IN EPOLLIN
#define EPOLL_OUT EPOLLOUT
// epoll_ctl中的一些选项
#define ADD EPOLL_CTL_ADD
#define DEL EPOLL_CTL_DEL
#define MOD EPOLL_CTL_MOD
class HttpEpoll {
public:
// 指针和引用类型必须在初始化列表中进行初始化。
HttpEpoll(uint16_t port):_port(port),sock_ptr(new Sock()),epoll_ptr(new Epoll())
{}
void Init()
{
// Socket
_listensocket = sock_ptr->Socket();
sock_ptr->Bind(_listensocket, _port);
sock_ptr->Listen(_listensocket);
}
void Start()
{
struct epoll_event ev[64];
epoll_ptr->EpollUpdate(ADD, sock_ptr->Fd(), EPOLL_IN);
for (;;)
{
int n = epoll_ptr->EpollWait(ev, sizeof(ev));
if (n < 0)
{
lg(Warning, "EpollWait error : %s", strerror(errno));
}
else if (n == 0)
{
lg(Info, "EpollWait overtime : %s", strerror(errno));
sleep(1);
}
else
{
lg(Info, "EpollWait is success : %d", n);
}
}
}
~HttpEpoll()
{
close(_listensocket);
}
private:
std::shared_ptr<Epoll> epoll_ptr;
std::shared_ptr<Sock> sock_ptr;
int _listensocket;
uint16_t _port;
};
// nocopy.hpp
#pragma once
class noncopy {
public:
noncopy(){}
noncopy(const noncopy&) = delete;
const noncopy& operator=(const noncopy&) = delete;
};
// Socket.hpp
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include "log1.hpp"
Log lg;
const int backlog = 10;
enum
{
SocketErr = 2,
BindErr,
ListenErr,
};
class Sock
{
public:
Sock()
{}
~Sock()
{}
int Socket()
{
_listensocket = socket(AF_INET, SOCK_STREAM, 0);
if (_listensocket < 0)
{
lg(Fatal, "socker error, %s: %d", strerror(errno), errno);
exit(SocketErr);
}
}
void Bind(int listensock, uint16_t port)
{
struct sockaddr_in serv;
bzero(&serv, sizeof(serv));
serv.sin_family = AF_INET;
serv.sin_port = htons(port);
serv.sin_addr.s_addr = INADDR_ANY;
if (bind(listensock, (const sockaddr*)&serv, sizeof(serv)) < 0)
{
lg(Fatal, "bind error, %s: %d", strerror(errno), errno);
exit(BindErr);
}
}
void Listen(int listensock)
{
if (listen(listensock, backlog) < 0)
{
lg(Fatal, "listen error, %s: %d", strerror(errno), errno);
exit(ListenErr);
}
}
int Accept(int listensock, std::string& ip, uint16_t& port)
{
struct sockaddr_in serv;
bzero(&serv, sizeof(serv));
socklen_t len = sizeof(serv);
int sockfd = accept(listensock, (struct sockaddr*)&serv, &len);
if (sockfd < 0)
{
lg(Warning, "accept error, %s: %d", strerror(errno), errno);
return -1;
}
char ipstr[64];
inet_ntop(AF_INET, &serv.sin_addr.s_addr, ipstr, sizeof(ipstr));
ip = ipstr;
port = ntohs(serv.sin_port);
return sockfd;
}
bool Connect(int listensock, const std::string &ip, const uint16_t& port)
{
struct sockaddr_in serv;
bzero(&serv, sizeof(serv));
serv.sin_family = AF_INET;
serv.sin_port = htons(port);
inet_pton(AF_INET, ip.c_str(), &serv.sin_addr.s_addr);
int n = connect(listensock, (const struct sockaddr*)&serv, sizeof(serv));
if (n < 0)
{
std::cerr << "connect to " << ip << ":" << port << " error" << std::endl;
return false;
}
return true;
}
void Close(int listensock)
{
close(listensock);
}
int Fd()
{
return _listensocket;
}
private:
int _listensocket;
};
// Epoll.hpp
#pragma once
#include "nocopy.hpp"
#include <cstring>
#include <sys/epoll.h>
#include "log1.hpp"
#define EPOLL_SIZE 10
extern Log lg;
class Epoll : public noncopy {
public:
Epoll() {
// 创建epoll句柄,并在创建之后做出判断
_epoll_fd = epoll_create(EPOLL_SIZE);
if (_epoll_fd < 0)
{
lg(Warning, "epoll_create error : %s ", strerror(errno));
}
else
{
lg(Info, "epoll_create is successful, fd : %d " ,_epoll_fd);
}
}
// epoll的事件注册
int EpollUpdate(int op, int sockfd, uint32_t event)
{
// 当事件为删除事件的时候,就不需要关心events了
int n;
if (op == EPOLL_CTL_DEL)
{
n = epoll_ctl(_epoll_fd, op, sockfd, nullptr);
if (n < 0)
lg(Info, "epoll_ctl delete error : %s", strerror(errno));
else
lg(Info, "epoll_ctl %d success, sockfd : %d ", op,n);
}
else
{
struct epoll_event ev;
ev.data.fd = sockfd;
ev.events = event;
n = epoll_ctl(_epoll_fd, op, sockfd, &ev);
if (n < 0)
lg(Info, "epoll_ctl %d error : %s", op,strerror(errno));
else
lg(Info, "epoll_ctl %d success, sockfd : %d ", op,n);
}
return n;
}
int EpollWait(struct epoll_event *events, int maxevents)
{
// 收集epoll监控的事件中已经发送的事件
int n = epoll_wait(_epoll_fd, events, maxevents, 0);
return n;
}
~Epoll()
{
if (_epoll_fd >= 0)
{
// 句柄需要关闭
close(_epoll_fd);
}
}
private:
int _epoll_fd;
};
// Main.cc
#include "HttpEpoll.hpp"
int main()
{
std::unique_ptr<HttpEpoll> ep_ptr (new HttpEpoll(9999));
ep_ptr->Init();
ep_ptr->Start();
return 0;
}
执行程序
刚开始没有任何事件就绪。通过telnet进行一个连接。
epoll服务器就会疯狂打印有事件来临的信息,这个特征跟select和poll一样,当有事件就绪的时候,就会不停的通知。编写一个HandlerEvent来处理就绪的事件。
#pragma once
#include <memory>
#include <iostream>
#include <unistd.h>
#include "Socket.hpp"
#include "Epoll.hpp"
extern Log lg;
#define EPOLL_IN EPOLLIN
#define EPOLL_OUT EPOLLOUT
// epoll_ctl中的一些选项
#define ADD EPOLL_CTL_ADD
#define DEL EPOLL_CTL_DEL
#define MOD EPOLL_CTL_MOD
class HttpEpoll
{
public:
// 指针和引用类型必须在初始化列表中进行初始化。
HttpEpoll(uint16_t port) : _port(port), sock_ptr(new Sock()), epoll_ptr(new Epoll())
{}
void Init()
{
// Socket
_listensocket = sock_ptr->Socket();
sock_ptr->Bind(_listensocket, _port);
sock_ptr->Listen(_listensocket);
}
void Accepter()
{
std::string clientip;
uint16_t clientport;
int sockfd = sock_ptr->Accept(sock_ptr->Fd(), clientip, clientport);
if (sockfd > 0)
{
epoll_ptr->EpollUpdate(ADD, sockfd, EPOLL_IN);
}
}
void Recver(int fd)
{
char buffer[1024];
int n = read(fd, buffer, sizeof(buffer) - 1);
if (n < 0)
{
lg(Warning, "read is fail : %s", strerror(errno));
epoll_ptr->EpollUpdate(DEL, fd, 0);
close(fd);
}
else if (n == 0)
{
lg(Info, "read is fail : %s", strerror(errno));
epoll_ptr->EpollUpdate(DEL, fd, 0);
close(fd);
}
else
{
buffer[n] = 0;
std::cout << "get a msg : " << buffer << std::endl;
std::string echo_string = "server echo $ ";
echo_string += buffer;
write(fd, echo_string.c_str(), echo_string.size());
}
}
void HandlerEvent(struct epoll_event ev[], int num)
{
// 如果epoll_wait返回的值为socket
for (int i = 0; i < num; i++)
{
int fd = ev[i].data.fd;
uint32_t events = ev[i].events;
// 区分事件
if (events & EPOLL_IN)
{
if (fd == sock_ptr->Fd())
{
Accepter();
}
else
{
Recver(fd);
}
}
else if (events & EPOLL_OUT)
{
}
else
{
}
}
}
void Start()
{
struct epoll_event ev[64];
epoll_ptr->EpollUpdate(ADD, sock_ptr->Fd(), EPOLL_IN);
for (;;)
{
int n = epoll_ptr->EpollWait(ev, 64);
if (n < 0)
{
lg(Warning, "EpollWait error : %s", strerror(errno));
}
else if (n == 0)
{
lg(Info, "EpollWait overtime : %s", strerror(errno));
sleep(1);
}
else
{
lg(Info, "EpollWait is success : %d", ev[0].data.fd);
HandlerEvent(ev, n);
}
}
}
~HttpEpoll()
{
close(_listensocket);
}
private:
std::shared_ptr<Epoll> epoll_ptr;
std::shared_ptr<Sock> sock_ptr;
int _listensocket;
uint16_t _port;
};
epoll工作方式
epoll有两种工作方式 - 水平触发(LT)和边缘触发(ET)
LT工作模式
LT工作模式也是epoll默认模式:LT模式。事件到来,但是上层不处理,高电平,一直有效。
- 当epoll检测到socket上事件就绪的时候,可以不立刻进行处理,或者只处理一部分。
- 一次性读取数据读不完,在第二次调用epoll_wait时,epoll_wait仍会立刻返回并通知socket读事件就绪。
- 直到缓冲区上所有的数据都被处理完,epoll_wait才不会立刻返回。
- 支持阻塞读写和非阻塞读写。
ET工作模式
如果我们在第一步将socket添加到epoll描述符的时候,使用EPOLLET标志,epoll进入ET工作模式。
- 当epoll检测到socket上事件就绪时,必须立刻处理。
- 读取数据的时候,一次性读取不完,在第二次调用epoll_wait的时候,epoll_wait不会再返回了。
- ET模式下,文件描述符上的事件就绪后,只有一次处理机会
- ET的性能比LT性能更高(epoll_wait返回的次数少了很多),Nginx默认采用ET模式的epoll
- 只支持非阻塞的读写
select和poll其实也是工作在LT模式下,epoll既可以支持LT,也可以支持ET。
因为ET的工作模式,这就会逼着程序员在每次通知的时候,必须在本轮把数据全部取走,循环读取。当数据读完的时候。
LT是epoll的默认行为,使用ET能够减少epoll触发的次数,相对于一个文件描述符就绪之后,不会反复被提示就绪,看起来就比LT更高效一些,但是在LT情况下如果也能 做到每次就绪的文件描述符都立刻处理,不让这个就绪被重复提示的话,其实性能也是一样的。另一方面,ET的代码复杂程度提高了。