文章目录
- 高级IO
- 0. IO介绍
- 1. 五种IO模型
- 1.0 感性理解(故事版)
- 1.1 阻塞IO
- 1.2 非阻塞IO
- 1.3 信号驱动IO
- 1.4 IO多路转接
- 1.5 异步IO
- 2. 高级IO重要概念
- 2.1 同步通信 vs 异步通信
- 2.2 阻塞 vs 非阻塞
- 3. 其他高级IO
- 4. 非阻塞IO
- 4.1 fcntl
- 4.2 实现函数SetNoBlock
- 4.2.0 阻塞方式读取标准输入
- 4.2.1 SetNoBlock函数
- 4.2.2 轮询方式读取标准输入
- (1) 非阻塞等待
- (2) 非阻塞轮询
- 5. I/O多路转接之select
- 5.1 初识select
- 5.2 select函数原型
- 5.3 socket就绪条件
- 5.4 select的特点
- 5.5 SelectServer
- (1) SelectServer_V1
- (2) SelectServer_V2
- 5.6 select缺点
- 6. I/O多路转接之poll
- 6.1 poll函数接口
- 6.2 PollServer
- 6.3 poll的优缺点
- 7. I/O多路转接之epoll
- 7.1 epoll初识
- 7.2 epoll的相关系统调用
- (1) epoll_create
- (2) epoll_ctl
- (3) epoll_wait
- 7.3 epoll工作原理
- 7.4 epoll工作方式
- 7.4.1 水平触发Level Triggered 工作模式
- 7.4.2 边缘触发Edge Triggered工作模式
- 7.4.3 感性理解
- 7.4.4 LT VS ET
- 7.5 epoll的使用场景
- 7.6 EpollServer
- (1) EpollServer_V1
- (2) EpollServer_V2
- (3) EpollServer_V3
- Reactor介绍
高级IO
0. IO介绍
IO = 等 + 数据拷贝
要进行IO, 是要有条件的,要有数据,要有空间; 操作系统提供了 read, send, recv, write接口来进行IO
当条件满足时,就会有IO事件就绪
高效的IO: 单位时间内,等的比重越低,效率越高
1. 五种IO模型
1.0 感性理解(故事版)
现在这里有一条河,一天清晨陆陆续续有人来钓鱼。
- 张三带着他的工具来钓鱼,他钓鱼的方式是: 坐着一动不动,直至鱼咬钩他看见鱼漂动时,才会将鱼钓上来放入桶中继续钓鱼。
- 不一会,李四来了,他看见张三也在钓鱼就跟张三聊天,但是张三还是坐着一动不动不理他只顾自己钓鱼,于是他自觉无趣,他钓鱼的方式是: 将鱼竿放入水中后,去玩玩手机,喝喝茶,看看书,也不时检查鱼漂是否会动,直至鱼漂动它会将鱼钓上来,然后继续重复这一系列的动作。
- 再过了一会,王五来了,他还是想和张三和李四说话,但是张三依旧不理他,他钓鱼的方式是: 在鱼漂上面挂一个铃铛,将鱼竿放入水中后,去玩玩手机,和李四聊聊天,看看书,直至铃铛响代表鱼漂动了他才会将鱼钓上来,然后继续重复这一系列的动作。
- 再过了一会,赵六开车来了,他是方圆五里的富豪,本人很喜欢钓鱼,他从车上面拿出100条鱼竿和其他钓鱼工具,他钓鱼的方式是:将这100条鱼竿全部放入水中后,自己不断的检测哪个鱼漂动了,发现后将鱼钓起来并且重新在鱼钩上放上诱饵,然后继续重复这一系列的动作。
- 再过了一会,方圆百里的富豪田七路过,在车上看到这四人都在钓鱼,他本人很喜欢吃鱼,看到这个场景自己也想要鱼,但是他要赶自己公司的会议,于是他想到了一个方法: 让自己的司机小王去钓鱼,他从车上面拿下来了桶,其他的钓鱼的工具和一部手机交给小王,告诉小王如果桶里装满了鱼就给他打电话他开车来取并接走小王,交代完这一系列的事后,他开车去公司开会,小王则去钓鱼。
最终这五人都成功地钓上了鱼。
详解五人的钓鱼过程:
1.1 阻塞IO
- 阻塞IO: 在内核将数据准备好之前, 系统调用会一直等待。所有的套接字, 默认都是阻塞方式。
阻塞IO是最常见的IO模型。
1.2 非阻塞IO
- 非阻塞IO: 如果内核还未将数据准备好, 系统调用仍然会直接返回, 并且返回EWOULDBLOCK错误码
非阻塞IO往往需要程序员循环的方式反复尝试读写文件描述符, 这个过程称为轮询。这对CPU来说是较大的浪费,
般只有特定场景下才使用。
1.3 信号驱动IO
- 信号驱动IO: 内核将数据准备好的时候, 使用SIGIO信号通知应用程序进行IO操作。
1.4 IO多路转接
- IO多路转接: 虽然从流程图上看起来和阻塞IO类似。实际上最核心在于IO多路转接能够同时等待多个文件描述符的就绪状态。
1.5 异步IO
- 异步IO: 由内核在数据拷贝完成时, 通知应用程序(而信号驱动是告诉应用程序何时可以开始拷贝数据)。
小结
任何IO过程中, 都包含两个步骤. 第一是等待, 第二是拷贝. 而且在实际的应用场景中, 等待消耗的时间往往都远远高于拷贝的时间。让IO更高效, 最核心的办法就是让等待的时间尽量少。
2. 高级IO重要概念
在这里, 我们要强调几个概念
2.1 同步通信 vs 异步通信
同步和异步关注的是消息通信机制。
- 所谓同步,就是在发出一个调用时,在没有得到结果之前,该调用就不返回. 但是一旦调用返回,就得到返回值了; 换句话说,就是由调用者主动等待这个调用的结果;
- 异步则是相反,调用在发出之后,这个调用就直接返回了,所以没有返回结果; 换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果; 而是在调用发出后,被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用。
另外, 我们回忆在讲多进程多线程的时候, 也提到同步和互斥。这里的同步通信和进程之间的同步是完全不相关的概念。
-
进程/线程同步也是进程/线程之间直接的制约关系
-
是为完成某种任务而建立的两个或多个线程,这个线程需要在某些位置上协调他们的工作次序而等待、传递信息所产生的制约关系。尤其是在访问临界资源的时候。
大家以后在看到 “同步” 这个词, 一定要先搞清楚大背景是什么. 这个同步, 是同步通信异步通信的同步, 还是同步与互斥的同步。
2.2 阻塞 vs 非阻塞
阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态。
- 阻塞调用是指调用结果返回之前,当前线程会被挂起. 调用线程只有在得到结果之后才会返回。
- 非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。
3. 其他高级IO
非阻塞IO,纪录锁,系统V流机制,I/O多路转接(也叫I/O多路复用),readv和writev函数以及存储映射IO(mmap),这些统称为高级IO。
我们本篇博客重点讨论的是I/O多路转接。
4. 非阻塞IO
4.1 fcntl
一个文件描述符, 默认都是阻塞IO。
函数原型如下。
#include <unistd.h>
#include <fcntl.h>
int fcntl(int fd, int cmd, ... /* arg */ );
传入的cmd的值不同, 后面追加的参数也不相同。
fcntl函数有5种功能:
- 复制一个现有的描述符(cmd=F_DUPFD)
- 获得/设置文件描述符标记(cmd=F_GETFD或F_SETFD)
- 获得/设置文件状态标记(cmd=F_GETFL或F_SETFL)
- 获得/设置异步I/O所有权(cmd=F_GETOWN或F_SETOWN)
- 获得/设置记录锁(cmd=F_GETLK,F_SETLK或F_SETLKW)
我们此处只是用第三种功能, 获取/设置文件状态标记, 就可以将一个文件描述符设置为非阻塞。
4.2 实现函数SetNoBlock
4.2.0 阻塞方式读取标准输入
#include<iostream>
#include<unistd.h>
using namespace std;
// 阻塞式等待
int main()
{
char buffer[64];
while (true)
{
printf(">>> ");
fflush(stdout);
// read做了两件事: (检测条件是否就绪)等 + 拷贝
ssize_t n = read(0, buffer, sizeof(buffer) - 1); // 从标准输入(键盘)中读取
if (n > 0)
{
buffer[n - 1] = 0; // 还要过滤掉'\n'
cout << "echo# " << buffer << endl;
}
}
}
运行结果: 观察在输入数据前此进程处于S+休眠状态。
我们这是按住 ctrl+d 就表示读到文件结束,此程序就会终止
4.2.1 SetNoBlock函数
基于fcntl, 我们实现一个SetNoBlock函数, 将文件描述符设置为非阻塞。
void SetNonBlock(int fd)
{
int fl=fcntl(fd,F_GETFL); // 使用F_GETFL将当前的文件描述符的属性取出来(这是一个位图)
if(fl<0)
{
cerr<<"error string: "<<strerror(errno)<<"error code: "<<errno<<endl;
return;
}
// 然后再使用F_SETFL将文件描述符设置回去. 设置回去的同时, 加上一个O_NONBLOCK参数
fcntl(fd,F_SETFL,fl| O_NONBLOCK);
}
4.2.2 轮询方式读取标准输入
(1) 非阻塞等待
#include<iostream>
#include<unistd.h>
#include<fcntl.h>
#include<cstring>
using namespace std;
// 非阻塞式等待
void SetNonBlock(int fd)
{
int fl=fcntl(fd,F_GETFL); // 使用F_GETFL将当前的文件描述符的属性取出来(这是一个位图)
if(fl<0)
{
cerr<<"error string: "<<strerror(errno)<<"error code: "<<errno<<endl;
return;
}
fcntl(fd,F_SETFL,fl| O_NONBLOCK); // 然后再使用F_SETFL将文件描述符设置回去. 设置回去的同时, 加上一个O_NONBLOCK参数
}
int main()
{
char buffer[64];
SetNonBlock(0);
while (true)
{
printf(">>> ");
fflush(stdout);
// 1. 成功 2. 结束 3. 出错(一旦底层没有数据就绪, 就以出错的形式返回,但是不算真正的出错) --- 错误码进行具体判断
// read做了两件事: (检测条件是否就绪)等 + 拷贝
ssize_t n = read(0, buffer, sizeof(buffer) - 1); // 从标准输入(键盘)中读取
if (n > 0)
{
buffer[n - 1] = 0; // 还要过滤掉'\n'
cout << "echo# " << buffer << endl;
}
else if(n==0)
{
cout<<"end file"<<endl;
break;
}
else
{
if(errno==EAGAIN || errno==EWOULDBLOCK)
{
// 底层数据没有准备好, 希望你下次继续来检测
sleep(1);
cout<<"data not ready" <<endl;
continue;
}
else if(errno == EINTR)
{
// 这次IO被信号中断, 也需要重新读取
continue;
}
else
{
// 真正的错误
cout << "read error??"<< "error string: " << strerror(errno) << "error code: " << errno << endl;
break;
}
}
}
}
运行结果:
(2) 非阻塞轮询
#include<iostream>
#include<unistd.h>
#include<fcntl.h>
#include<cstring>
#include<vector>
#include<functional>
using namespace std;
// 非阻塞式轮询
using func_t =function<void(void)>;
vector<func_t> funcs;
void PrintLog()
{
cout<<"这是一个日志例程"<<endl;
}
void OperMysql()
{
cout<<"这是一个数据库的例程"<<endl;
}
void CheckNet()
{
cout<<"这是一个检测网络状态的例程"<<endl;
}
// 如果一个线程想向另一个线程派发任务
void LoadTask()
{
funcs.push_back(PrintLog);
funcs.push_back(OperMysql);
funcs.push_back(CheckNet);
}
void HandlerALLTask()
{
for(const auto&func:funcs)
func();
}
void SetNonBlock(int fd)
{
int f1=fcntl(fd,F_GETFL); // 使用F_GETFL将当前的文件描述符的属性取出来(这是一个位图)
if(f1<0)
{
cerr<<"error string: "<<strerror(errno)<<"error code: "<<errno<<endl;
return;
}
fcntl(fd,F_SETFL,f1| O_NONBLOCK); // 然后再使用F_SETFL将文件描述符设置回去. 设置回去的同时, 加上一个O_NONBLOCK参数
}
int main()
{
char buffer[64];
SetNonBlock(0);
LoadTask();
while (true)
{
printf(">>> ");
fflush(stdout);
// 1. 成功 2. 结束 3. 出错(一旦底层没有数据就绪, 就以出错的形式返回,但是不算真正的出错) --- 错误码进行具体判断
// read做了两件事: (检测条件是否就绪)等 + 拷贝
ssize_t n = read(0, buffer, sizeof(buffer) - 1); // 从标准输入(键盘)中读取
if (n > 0)
{
buffer[n - 1] = 0; // 还要过滤掉'\n'
cout << "echo# " << buffer << endl;
}
else if(n==0)
{
cout<<"end file"<<endl;
break;
}
else
{
if(errno==EAGAIN || errno==EWOULDBLOCK)
{
// 底层数据没有准备好, 希望你下次继续来检测
HandlerALLTask();
sleep(1);
cout<<"data not ready" <<endl;
continue;
}
else if(errno == EINTR)
{
// 这次IO被信号中断, 也需要重新读取
continue;
}
else
{
// 真正的错误
cout << "read error??"<< "error string: " << strerror(errno) << "error code: " << errno << endl;
break;
}
}
}
}
运行结果:
5. I/O多路转接之select
5.1 初识select
系统提供select函数来实现多路复用输入/输出模型。
select系统调用是用来让我们的程序监视多个文件描述符的状态变化的;
程序会停在select这里等待,直到被监视的文件描述符有一个或多个发生了状态改变;
5.2 select函数原型
select的函数原型如下:
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds,
struct timeval *timeout)
参数说明:
-
参数nfds: 输入型参数,输入等待的多个fd数字层面最大的fd+1
-
剩下的4个参数,每一个参数都是输入输出型参数,用户要把自己的数据交给OS, OS也要通过输出型参数交给用户,让用户和内核之间进行信息传递。
-
中间3个参数是同质的,以readfds为例,参数类型是fd_set,是一个 “位图”。用位图结构表示多个fd, 来进行用户与内核之间的信息互相传递
OS提供了一组操作fd_set的接口, 来比较方便的操作位图。
void FD_CLR(int fd, fd_set *set); // 用来清除描述词组set中相关fd 的位
int FD_ISSET(int fd, fd_set *set); // 用来测试描述词组set中相关fd 的位是否为真
void FD_SET(int fd, fd_set *set); // 用来设置描述词组set中相关fd的位
void FD_ZERO(fd_set *set); // 用来清除描述词组set的全部位
使用此参数(以readfds为例):
- 用户首先要告诉内核,哪些fd读事件需要内核关心(输入)
- 内核要告诉用户,哪些fd读事件已经就绪了(输出)
具体的设置:
-
参数timeout: 它的类型是struct timeval,用来设置select()的等待时间(等待的形式)
设置的形式:
- NULL: 阻塞等待
- {0,0}: 非阻塞等待
- {n,m}: 比如{5,0}, 5s以内阻塞等待,否则timeout一次(进行一次非阻塞, 相当于条件不满足返回一次)
此参数作输出时,返回的时候,表示剩余时间;比如设置为{5,0}, 第2秒已经就绪了返回3秒(表示剩余时间)
struct timeval
{
time_t tv_sec; /* seconds */
suseconds_t tv_usec; /* microseconds */
};
-
此函数的返回值:
- n>0, 代表有几个fd是就绪的
- n==0,timeout(超时了,在当前等待时间内没有任何一个fd就绪)
- n<0,代表有等待失败的情况(比如要等待某文件描述符,但其根本不存在)
-
错误值可能为:
EBADF 文件描述词为无效的或该文件已关闭
EINTR 此调用被信号所中断
EINVAL 参数n 为负值。
ENOMEM 核心内存不足
5.3 socket就绪条件
- 读就绪
- socket内核中, 接收缓冲区中的字节数, 大于等于低水位标记SO_RCVLOWAT. 此时可以无阻塞的读该文件
描述符, 并且返回值大于0; - socket TCP通信中, 对端关闭连接, 此时对该socket读, 则返回0;
- 监听的socket上有新的连接请求;
- socket上有未处理的错误;
- socket内核中, 接收缓冲区中的字节数, 大于等于低水位标记SO_RCVLOWAT. 此时可以无阻塞的读该文件
- 写就绪
- socket内核中, 发送缓冲区中的可用字节数(发送缓冲区的空闲位置大小), 大于等于低水位标记SO_SNDLOWAT, 此时可以无阻塞的写, 并且返回值大于0;
- socket的写操作被关闭(close或者shutdown). 对一个写操作被关闭的socket进行写操作, 会触发SIGPIPE
信号; - socket使用非阻塞connect连接成功或失败之后;
- socket上有未读取的错误;
- 异常就绪
- socket上收到带外数据. 关于带外数据, 和TCP紧急模式相关(回忆TCP协议头中, 有一个紧急指针的字段),
5.4 select的特点
- 可监控的文件描述符个数取决与sizeof(fd_set)的值. 我这边服务器上sizeof(fd_set)=128,每bit表示一个文件
描述符,则我服务器上支持的最大文件描述符是128*8=1024。 - 将fd加入select监控集的同时,还要再使用一个数据结构array保存放到select监控集中的fd
- 一是用于再select 返回后,array作为源数据和fd_set进行FD_ISSET判断。
- 二是select返回后会把以前加入的但并无事件发生的fd清空,则每次开始select前都要重新从array取得
fd逐一加入(FD_ZERO最先),扫描array的同时取得fd最大值maxfd,用于select的第一个参数
select服务器,使用的时候,需要程序员自己维护一个第三方数组,来进行对已经获得的sock进行管理。
5.5 SelectServer
此代码用到了之前自己封装的Sock.hpp, log.hpp, err.hpp
Sock.hpp
#pragma once
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include"log.hpp"
#include"err.hpp"
#include<cstring>
static const int gbacklog=32;
static const int defaultfd=-1;
class Sock
{
public:
Sock()
:_sock(defaultfd)
{
}
void Socket()
{
_sock=socket(AF_INET,SOCK_STREAM,0);
if(_sock<0)
{
logMessage(Fatal,"create socket error, code: %d, error string: %s",errno, strerror(errno));
exit(SOCKET_ERR);
}
}
void Bind(const uint16_t& port)
{
struct sockaddr_in local;
memset(&local,0,sizeof(local));
local.sin_port=htons(port);
local.sin_family=AF_INET;
local.sin_addr.s_addr=INADDR_ANY;
if(bind(_sock,(struct sockaddr*)&local,sizeof(local))<0)
{
logMessage(Fatal,"bind socket error, code: %d, error string: %s", errno, strerror(errno));
exit(BIND_ERR);
}
}
void Listen()
{
if(listen(_sock,gbacklog)<0)
{
logMessage(Fatal,"listen socket error, code: %d, error string:%s", errno, strerror(errno));
exit(LISTEN_ERR);
}
}
int Accept(string*clientip, uint16_t*clientport) // 获取新连接,并想知道客户端的信息
{
struct sockaddr_in temp; // 临时结构体
socklen_t len=sizeof(temp);
int sock=accept(_sock,(struct sockaddr*)&temp,&len);
if(sock<0)
{
logMessage(Warning,"accept error, code: %d, error string: %s",errno, strerror(errno));
}
else
{
*clientport=ntohs(temp.sin_port);
*clientip=inet_ntoa(temp.sin_addr); // 把4字节对应的IP转化成字符串风格的
}
return sock;
}
int Connect(const string& serverip, const uint16_t& serverport) // 让其他客户端可以连接
{
struct sockaddr_in server;
memset(&server,0,sizeof(server));
server.sin_family=AF_INET;
server.sin_port=htons(serverport);
server.sin_addr.s_addr=inet_addr(serverip.c_str());
return connect(_sock,(struct sockaddr*)&server,sizeof(server));
}
int Fd()
{
return _sock;
}
void Close()
{
if(_sock!=defaultfd)
close(_sock);
}
~Sock()
{
}
private:
int _sock;
};
log.hpp
#pragma once
#include<iostream>
#include<string>
#include<sys/types.h>
#include<unistd.h>
#include<stdio.h>
#include<cstdarg>
#include<time.h>
using namespace std;
// 日志系统是有等级的
enum
{
Debug=0,
Info,
Warning,
Error,
Fatal,
Uknown
};
static string toLevelString(int level)
{
switch (level)
{
case Debug:
return "Debug";
case Info:
return "Info";
case Warning:
return "Warning";
case Error:
return "Error";
case Fatal:
return "Fatal";
default:
return "Uknown";
}
}
string getTime()
{
time_t curr=time(nullptr);
struct tm*tmp=localtime(&curr);
char buffer[128];
snprintf(buffer,sizeof(buffer),"%d-%d-%d %d:%d:%d",tmp->tm_year+1900,tmp->tm_mon+1,tmp->tm_mday,
tmp->tm_hour,tmp->tm_min,tmp->tm_sec);
return buffer;
}
// 日志格式: 日志等级 时间 pid 消息体
// logLeft: 日志等级 时间 pid logRight: 消息体
// logMessage(Debug, "hello: %d, %s",12,s.c_str()) Debug, hello: 12, world
void logMessage(int level, const char*format,...)
{
char logLeft[1024];
string level_string=toLevelString(level);
string curr_time=getTime();
snprintf(logLeft,sizeof(logLeft),"[%s] [%s] [%d] ",level_string.c_str(),curr_time.c_str(),getpid());
char logRight[1024];
va_list p;
va_start(p,format);
vsnprintf(logRight,sizeof(logRight),format,p);
va_end(p);
printf("%s%s\n",logLeft,logRight);
}
err.hpp
#pragma once
enum
{
USAGE_ERR=1,
SOCKET_ERR,
BIND_ERR,
LISTEN_ERR,
CONNECT_ERR,
SET_ERR,
OPEN_ERR
};
main.cc
#include"SelectServer.hpp"
#include<memory>
#include<iostream>
int main()
{
std::unique_ptr<SelectServer> svr(new SelectServer());
svr->InitServer();
svr->Start();
return 0;
}
(1) SelectServer_V1
V1版本是一个简单的只关心写的版本
#include"Sock.hpp"
#include <sys/select.h>
typedef int type_t;
const static int gport=8889;
// 只关心读事件
class SelectServer
{
static const int N=(sizeof(fd_set)*8);
public:
SelectServer(uint16_t port=gport)
:port_(port)
{
}
void InitServer()
{
listensock_.Socket();
listensock_.Bind(port_);
listensock_.Listen();
for(int i=0;i<N;++i)
fdarray_[i]=defaultfd;
}
// 处理连接的动作
void Accepter()
{
cout << "有一个新连接到来了" << endl;
// 这里在进行Accept会不被阻塞呢? 不会的!
string clientip;
uint16_t clientport;
int sock = listensock_.Accept(&clientip, &clientport);
if (sock < 0)
return;
// 思路转变1:
// 得到了对应的sock,我们可不可以进行read/recv读取sock呢?不能
// 你怎么知道sock上有数据就绪了呢?你不知道, 所以我们需要将sock交给select, 让select进行管理
logMessage(Debug, "[%s:%d], sock: %d", clientip.c_str(), clientport, sock);
// 要让select帮我们进行关心, 只要把sock添加到fdarray_[]里面即可
// 就是在数组中找一个合法位置添加
int pos = 1;
for (; pos < N; pos++)
{
if (fdarray_[pos] == defaultfd) // 说明这个位置没有被占用
break;
}
if (pos >= N)
{
close(sock);
logMessage(Warning, "sockfd array[] full");
}
else
{
fdarray_[pos] = sock;
}
}
// echo server
void HandlerEvent(fd_set&rfds) // 判断文件描述符在集合里
{
for (int i = 0; i < N; ++i) // 循环检测
{
if(fdarray_[i]==defaultfd)
continue;
if (fdarray_[i]==listensock_.Fd() && FD_ISSET(listensock_.Fd(), &rfds))
{
Accepter();
}
else if(fdarray_[i]!=listensock_.Fd() && FD_ISSET(fdarray_[i], &rfds)) // 普通文件描述符就绪
{
// ServiceIO(); BUG
int fd=fdarray_[i];
char buffer[1024];
ssize_t s=recv(fd,buffer,sizeof(buffer)-1,0); // 读取一次不会被阻塞
if(s>0)
{
buffer[s-1]=0;
cout<<"client# "<<buffer<<endl;
// 把数据发送回去也要被select管理的
string echo=buffer;
echo+="[select server echo]";
send(fd,echo.c_str(),echo.size(),0);
}
else
{
if (s == 0)
logMessage(Info, "client quit ..., fdarray_[i] -> defaultfd: %d- >%d", fd, defaultfd);
else
logMessage(Warning, "recv error, client quit ..., fdarray_[i] -> defaultfd: %d->%d", fd, defaultfd);
close(fdarray_[i]);
fdarray_[i]=defaultfd;
}
}
}
}
void Start()
{
// 1. 这里我们能够直接获取新的链接吗?
// 2. 最开始的时候, 我们的服务器是没有太多的sock的, 甚至只有一个sock! listensock_
// 3. 在网络中, 新连接到来被当做 读事件就绪!
// listensock_.Accept(); 不能
// 把套接字添加到select中
// demo1
fdarray_[0]=listensock_.Fd();
while(true)
{
// 因为rfds是一个输入输出型参数, 注定了每次必须对rfds进行重置,
// 重置则必定要知道我历史上有哪些fd?fdarray_[]
// 因为服务器在运行中, sockfd的值一直在动态变化, 所以maxfd也一定在变化, maxfd是不是也要进行动态更新
// struct timeval timeout={0,0};
fd_set rfds;
FD_ZERO(&rfds);
int maxfd=fdarray_[0];
for(int i=0;i<N;++i)
{
if(fdarray_[i]==defaultfd)
continue;
// 合法fd
FD_SET(fdarray_[i], &rfds);
if(maxfd<fdarray_[i])
maxfd=fdarray_[i];
}
int n=select(maxfd+1,&rfds,nullptr,nullptr,nullptr);
switch(n)
{
case 0:
logMessage(Debug, "timeout, %d: %s", errno, strerror(errno));
break;
case -1:
logMessage(Warning, "%d: %s", errno, strerror(errno));
break;
default:
// 成功了
logMessage(Debug, "有一个就绪事件发生了: %d", n);
HandlerEvent(rfds);
DebugPrint();
break;
}
// sleep(1);
}
}
// 打印就绪的fd
void DebugPrint()
{
cout<<"fdarray[]: ";
for(int i=0;i<N;++i)
{
if(fdarray_[i]==defaultfd)
continue;
cout<<fdarray_[i]<<" ";
}
cout<<"\n";
}
~SelectServer()
{
listensock_.Close();
}
private:
uint16_t port_;
Sock listensock_;
type_t fdarray_[N]; // 管理所有的fd
};
运行结果: 用telnet作为客户端来连接这个服务器
如果将时间设置成这样,服务器就会每隔两秒timeout一次
(2) SelectServer_V2
V2版本在V1版本的基础上,解决了想要关心写和异常的问题,至于具体如何处理写和异常的问题,请看epoll
#include"Sock.hpp"
#include <sys/select.h>
static const int gport=8888;
static const int defaultevent=0;
typedef struct FdEvent
{
int fd;
uint8_t event;
string clientip;
uint16_t clientport;
}type_t;
#define READ_EVENT (0x1)
#define WRITE_EVENT (0x1<<1)
#define EXCEPT_EVENT (0x1<<2)
class SelectServer
{
static const int N=(sizeof(fd_set)*8);
public:
SelectServer(uint16_t port=gport)
:port_(port)
{
}
void InitServer()
{
listensock_.Socket();
listensock_.Bind(port_);
listensock_.Listen();
for(int i=0;i<N;++i)
{
fdarray_[i].fd=defaultfd;
fdarray_[i].event=defaultevent;
fdarray_[i].clientport=0;
}
}
// 处理连接的动作
void Accepter()
{
cout << "有一个新连接到来了" << endl;
// 这里在进行Accept会不被阻塞呢? 不会的!
string clientip;
uint16_t clientport;
int sock = listensock_.Accept(&clientip, &clientport);
if (sock < 0)
return;
// 思路转变1:
// 得到了对应的sock,我们可不可以进行read/recv读取sock呢?不能
// 你怎么知道sock上有数据就绪了呢?你不知道, 所以我们需要将sock交给select, 让select进行管理
logMessage(Debug, "[%s:%d], sock: %d", clientip.c_str(), clientport, sock);
// 要让select帮我们进行关心, 只要把sock添加到fdarray_[]里面即可
// 就是在数组中找一个合法位置添加
int pos = 1;
for (; pos < N; pos++)
{
if (fdarray_[pos].fd == defaultfd) // 说明这个位置没有被占用
break;
}
if (pos >= N)
{
close(sock);
logMessage(Warning, "sockfd array[] full");
}
else
{
fdarray_[pos].fd = sock;
// fdarray_[pos].event = (READ_EVENT | WRITE_EVENT); // 这是同时关心读写
fdarray_[pos].event = READ_EVENT;
fdarray_[pos].clientip = clientip;
fdarray_[pos].clientport = clientport;
}
}
void Recver(int index)
{
// ServiceIO(); BUG
int fd = fdarray_[index].fd;
char buffer[1024];
ssize_t s = recv(fd, buffer, sizeof(buffer) - 1, 0); // 读取一次不会被阻塞
if (s > 0)
{
buffer[s - 1] = 0;
cout << fdarray_[index].clientip <<":"<< fdarray_[index].clientport << "# " << buffer << endl;
// 把数据发送回去也要被select管理的
string echo = buffer;
echo += "[select server echo]";
send(fd, echo.c_str(), echo.size(), 0);
}
else
{
if (s == 0)
logMessage(Info, "client quit ..., fdarray_[i] -> defaultfd: %d->%d",
fd, defaultfd);
else
logMessage(Warning, "recv error, client quit ..., fdarray_[i] -> defaultfd: %d->%d", fd, defaultfd);
close(fdarray_[index].fd);
fdarray_[index].fd= defaultfd;
fdarray_[index].event= defaultevent;
fdarray_[index].clientip.resize(0);
fdarray_[index].clientport=0;
}
}
// echo server
void HandlerEvent(fd_set&rfds,fd_set&wfds) // 判断文件描述符在集合里
{
for (int i = 0; i < N; ++i) // 循环检测
{
if(fdarray_[i].fd==defaultfd)
continue;
if ((fdarray_[i].event&READ_EVENT) && (FD_ISSET(fdarray_[i].fd, &rfds)))
{
// 处理读取, 1. accept 2. recv
if (fdarray_[i].fd == listensock_.Fd())
{
Accepter();
}
else if (fdarray_[i].fd != listensock_.Fd())
{
Recver(i);
}
else
{
}
}
else if((fdarray_[i].event&WRITE_EVENT) && (FD_ISSET(fdarray_[i].fd, &wfds)))
{
// TODO
}
else
{
}
}
}
void Start()
{
// 1. 这里我们能够直接获取新的链接吗?
// 2. 最开始的时候, 我们的服务器是没有太多的sock的, 甚至只有一个sock! listensock_
// 3. 在网络中, 新连接到来被当做 读事件就绪!
// listensock_.Accept(); 不能
fdarray_[0].fd=listensock_.Fd();
fdarray_[0].event=READ_EVENT;
while(true)
{
// 因为rfds是一个输入输出型参数, 注定了每次必须对rfds进行重置,
// 重置则必定要知道我历史上有哪些fd?fdarray_[]
// 因为服务器在运行中, sockfd的值一直在动态变化, 所以maxfd也一定在变化, maxfd是不是也要进行动态更新
// struct timeval timeout={0,0};
fd_set rfds;
fd_set wfds;
FD_ZERO(&rfds);
FD_ZERO(&wfds);
int maxfd=fdarray_[0].fd;
for(int i=0;i<N;++i)
{
if(fdarray_[i].fd==defaultfd)
continue;
// 合法fd
if(fdarray_[i].event&READ_EVENT)
FD_SET(fdarray_[i].fd, &rfds);
if(fdarray_[i].event&WRITE_EVENT)
FD_SET(fdarray_[i].fd, &wfds);
if(maxfd<fdarray_[i].fd)
maxfd=fdarray_[i].fd;
}
int n=select(maxfd+1,&rfds,&wfds,nullptr,nullptr);
switch(n)
{
case 0:
logMessage(Debug, "timeout, %d: %s", errno, strerror(errno));
break;
case -1:
logMessage(Warning, "%d: %s", errno, strerror(errno));
break;
default:
// 成功了
logMessage(Debug, "有一个就绪事件发生了: %d", n);
HandlerEvent(rfds,wfds);
DebugPrint();
break;
}
// sleep(1);
}
}
// 打印就绪的fd
void DebugPrint()
{
cout<<"fdarray[]: ";
for(int i=0;i<N;++i)
{
if(fdarray_[i].fd==defaultfd)
continue;
cout<<fdarray_[i].fd<<" ";
}
cout<<"\n";
}
~SelectServer()
{
listensock_.Close();
}
private:
uint16_t port_;
Sock listensock_;
type_t fdarray_[N]; // 管理所有的fd
};
运行结果:
5.6 select缺点
- 每次调用select, 都需要手动设置fd集合, 从接口使用角度来说也非常不便
- 每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大
- 同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大
- select支持的文件描述符数量太小
6. I/O多路转接之poll
6.1 poll函数接口
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
// pollfd结构
struct pollfd
{
int fd; /* file descriptor */
short events; /* requested events */
short revents; /* returned events */
};
参数说明
- fds是一个poll函数监听的结构列表。每一个元素中, 包含了三部分内容: 文件描述符, 监听的事件集合, 返
回的事件集合- 每个结构体的 events 域是由用户来设置,告诉内核我们关注的是什么,而 revents 域是返回时内核设置的,以说明对该描述符发生了什么事件
- nfds表示fds数组的长度
- timeout表示poll函数的超时时间, 单位是毫秒(ms)
poll 将输入参数和输出参数进行了分离,不用在调用poll进行重新设置了。
events和revents的取值:
事件 | 描述 | 是否可作为输入(events) | 是否可作为输出(revents) |
---|---|---|---|
POLLIN | 数据可读(包括普通数据&优先数据) | 是 | 是 |
POLLOUT | 数据可写(普通数据&优先数据) | 是 | 是 |
POLLRDNORM | 普通数据可读 | 是 | 是 |
POLLRDBAND | 优先级带数据可读(linux不支持) | 是 | 是 |
POLLPRI | 高优先级数据可读,比如TCP带外数据 | 是 | 是 |
POLLWRNORM | 普通数据可写 | 是 | 是 |
POLLWRBAND | 优先级带数据可写 | 是 | 是 |
POLLRDHUP | TCP连接被对端关闭,或者关闭了写操 | 是 | 是 |
POPPHUP | 挂起 | 否 | 是 |
POLLERR | 错误 | 否 | 是 |
POLLNVAL | 文件描述符没有打开 | 否 | 是 |
返回结果(与select相同)
- 返回值小于0, 表示出错;
- 返回值等于0, 表示poll函数等待超时;
- 返回值大于0, 表示poll由于监听的文件描述符就绪而返回.
6.2 PollServer
#include"Sock.hpp"
#include <sys/select.h>
#include<poll.h>
const static int gport=8889;
const static int N=4096;
typedef struct pollfd type_t;
const static short defaultevent=0;
class PoolServer
{
public:
PoolServer(uint16_t port=gport)
:port_(port)
,fdarray_(nullptr)
{
}
void InitServer()
{
listensock_.Socket();
listensock_.Bind(port_);
listensock_.Listen();
fdarray_=new type_t[N];
for(int i=0;i<N;++i)
{
fdarray_[i].fd=defaultfd;
fdarray_[i].events=defaultevent;
fdarray_[i].revents=defaultevent;
}
}
// 处理连接的动作
void Accepter()
{
cout << "有一个新连接到来了" << endl;
// 这里在进行Accept会不被阻塞呢? 不会的!
string clientip;
uint16_t clientport;
int sock = listensock_.Accept(&clientip, &clientport);
if (sock < 0)
return;
// 思路转变1:
// 得到了对应的sock,我们可不可以进行read/recv读取sock呢?不能
// 你怎么知道sock上有数据就绪了呢?你不知道, 所以我们需要将sock交给select, 让select进行管理
logMessage(Debug, "[%s:%d], sock: %d", clientip.c_str(), clientport, sock);
// 要让select帮我们进行关心, 只要把sock添加到fdarray_[]里面即可
// 就是在数组中找一个合法位置添加
int pos = 1;
for (; pos < N; pos++)
{
if (fdarray_[pos].fd == defaultfd) // 说明这个位置没有被占用
break;
}
if (pos >= N)
{
close(sock); // poll这里可以选择动态扩容, 扩容失败再关闭
logMessage(Warning, "sockfd array[] full");
}
else
{
fdarray_[pos].fd= sock;
fdarray_[pos].events= POLLIN; // (POLLIN | POLLOUT) 读写同时关心
fdarray_[pos].revents= defaultevent;
}
}
// echo server
void HandlerEvent()
{
for (int i = 0; i < N; ++i)
{
int fd=fdarray_[i].fd;
short revents=fdarray_[i].revents;
if(fd==defaultfd)
continue;
if (fd==listensock_.Fd() && (revents & POLLIN))
{
Accepter();
}
else if(fd!=listensock_.Fd() && (revents & POLLIN)) // 普通文件描述符就绪
{
// ServiceIO(); BUG
char buffer[1024];
ssize_t s=recv(fd,buffer,sizeof(buffer)-1,0); // 读取一次不会被阻塞
if(s>0)
{
buffer[s-1]=0;
cout<<"client# "<<buffer<<endl;
// fdarray_[i].events | POLLUT; // 关心写事件
// 把数据发送回去也要被select管理的
string echo=buffer;
echo+="[poll server echo]";
send(fd,echo.c_str(),echo.size(),0);
}
else
{
if (s == 0)
logMessage(Info, "client quit ..., fdarray_[i] -> defaultfd: %d- >%d", fd, defaultfd);
else
logMessage(Warning, "recv error, client quit ..., fdarray_[i] -> defaultfd: %d->%d", fd, defaultfd);
close(fd);
fdarray_[i].fd=defaultfd;
fdarray_[i].events=defaultevent;
fdarray_[i].revents=defaultevent;
}
}
}
}
void Start()
{
// 1. 这里我们能够直接获取新的链接吗?
// 2. 最开始的时候, 我们的服务器是没有太多的sock的, 甚至只有一个sock! listensock_
// 3. 在网络中, 新连接到来被当做 读事件就绪!
// listensock_.Accept(); 不能
fdarray_[0].fd=listensock_.Fd();
fdarray_[0].events=POLLIN;
while(true)
{
int timeout=-1;
int n=poll(fdarray_,N,timeout); // fdarray_内容管理, 合法fd, event全部入到fdarray_最左侧
switch(n)
{
case 0:
logMessage(Debug, "timeout, %d: %s", errno, strerror(errno));
break;
case -1:
logMessage(Warning, "%d: %s", errno, strerror(errno));
break;
default:
// 成功了
logMessage(Debug, "有一个就绪事件发生了: %d", n);
HandlerEvent();
DebugPrint();
break;
}
// sleep(1);
}
}
// 打印就绪的fd
void DebugPrint()
{
cout<<"fdarray[]: ";
for(int i=0;i<N;++i)
{
if(fdarray_[i].fd==defaultfd)
continue;
cout<<fdarray_[i].fd<<" ";
}
cout<<"\n";
}
~PoolServer()
{
listensock_.Close();
if(fdarray_)
delete [] fdarray_;
}
private:
uint16_t port_;
Sock listensock_;
type_t* fdarray_; // 管理所有的fd
};
运行结果:
6.3 poll的优缺点
- poll的优点
不同与select使用三个位图来表示三个fdset的方式,poll使用一个pollfd的指针实现。- pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式。接口使用比select更方便。
- poll并没有最大数量限制 (但是数量过大后性能也是会下降)。
- poll的缺点
- poll中监听的文件描述符数目增多时和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符
- 每次调用poll都需要把大量的pollfd结构从用户态拷贝到内核中
- 同时连接的大量客户端在一时刻可能只有很少的处于就绪状态, 因此随着监视的描述符数量的增长, 其效率也会线性下降
7. I/O多路转接之epoll
7.1 epoll初识
按照man手册的说法: 是为处理大批量句柄而作了改进的poll。
它是在2.5.44内核中被引进的(epoll(4) is a new API introduced in Linux kernel 2.5.44)
它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。
7.2 epoll的相关系统调用
epoll 有3个相关的系统调用。
(1) epoll_create
int epoll_create(int size);
创建一个epoll的句柄。
- 自从linux2.6.8之后,size参数是被忽略的。
- 用完之后, 必须调用close()关闭。
(2) epoll_ctl
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epoll的事件注册函数。
- 它不同于select()是在监听事件时告诉内核要监听什么类型的事件, 而是在这里先注册要监听的事件类型
- 第一个参数是epoll_create()的返回值(epoll的句柄)
- 第二个参数表示动作,用三个宏来表示
- 第三个参数是需要监听的fd
- 第四个参数是告诉内核需要监听什么事(用户告诉内核: 你要帮我关心哪个文件描述符上的哪个事件[一个,多个])
第二个参数的取值:
-
EPOLL_CTL_ADD:注册新的fd到epfd中;
-
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
-
EPOLL_CTL_DEL:从epfd中删除一个fd;
struct epoll_event结构如下:
events可以是以下几个宏的集合:
- EPOLLIN : 表示对应的文件描述符可以读 (包括对端SOCKET正常关闭);
- EPOLLOUT : 表示对应的文件描述符可以写;
- EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来);
- EPOLLERR : 表示对应的文件描述符发生错误;
- EPOLLHUP : 表示对应的文件描述符被挂断;
- EPOLLET : 将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的;
- EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要再次把这个socket加入到EPOLL队列里;
(3) epoll_wait
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
收集在epoll监控的事件中已经发送的事件。
- 参数events是分配好的epoll_event结构体数组
- epoll将会把发生的事件赋值到events数组中 (events不可以是空指针,内核只负责把数据复制到这个events数组中,不会去帮助我们在用户态中分配内存) (内核告诉用户: 你关心的哪些fd上的哪些event已经触发了)
- maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size
- 参数timeout是超时时间 (毫秒,0会立即返回,-1是永久阻塞)
- 如果函数调用成功,返回对应I/O上已准备好的文件描述符数目,如返回0表示已超时, 返回小于0表示函数失败
7.3 epoll工作原理
数据从应用层自顶向下交付时,其实是OS在做拷贝的工作;
但是一旦当网卡收到了数据,OS是怎么知道网卡上有数据呢?
OS不会自己去检测硬件,当硬件有数据时,会向CPU发送硬件中断,CPU会通过中断号调用中断向量表中的方法把数据从外设拷贝到内存。(实际是拷贝到OS的接收缓冲区中,再经协议栈进行解包处理)
- 当我们创建epoll时底层会为我们创建一颗红黑树和一个就绪队列
- 红黑树的节点中保存了一些相关属性,用户告诉内核要关心那些事件,调用epoll_ctl时: 是通过epoll模型对红黑树做操作,告诉OS需要新增,删除或修改哪一个节点,该节点保存了文件描述符和对应的事件。红黑树操作时效率高(红黑树的插入时间效率是lgn,其中n为树的高度)。这里的fd就是红黑树中的key值。
- 一旦特定文件描述符对应的事件发生了,此时我们就可以将红黑树这个节点链接到就绪队列中,就绪队列中只保存已经准备好的文件描述符;此时不会为就绪队列单独维节点,而是在红黑树节点中添加一些就绪队列的属性,此时就绪队列中的节点其实红黑树中的节点,一旦底层没有数据就绪时该节点只属于红黑树,一旦数据就绪时就同样把这个节点连入到就绪队列中并且设置就绪事件。
- 则数据就绪,“形成节点放入就绪队列中" 其实是: 将红黑树中节点关系也添加到就绪队列即可
- 调用epoll_wait时内核会告诉用户你所关心哪些文件描述符的哪些事件已经就绪了,epoll_wait会从就绪队列中获得已经就绪的节点。epoll_wait会以时间复杂度为O(1)的方式,检测有没有事件就绪,即检测就绪队列是否为空。
- 在底层会给每一个rb_node(背后是文件),注册回调机制,当有数据就绪会调用这个回调方法。
- 红黑树,就绪队列,回调机制共同构成了epoll模型; 当我们调用epoll_creat就是在帮助我们创建epoll模型
- 这里的红黑树相当与select和poll中的数组
- 进程与多路转接之间的关系: 当你创建一个epoll模型时,其实是在OS中创建struct file结构,struct file结构中的指针会指向整个epoll模型, 在使用epoll_ctl时是用户通过当前这个进程调用这个接口,通过第一个参数epfd找到这个epoll模型,结合op就可以在红黑树中进行操作
总结一下, epoll的使用过程就是三部曲:
- 调用epoll_create创建一个epoll句柄;
- 调用epoll_ctl, 将要监控的文件描述符进行注册;
- 调用epoll_wait, 等待文件描述符就绪;
7.4 epoll工作方式
select, poll, epoll最基本的情况下: 一旦有事件就绪,若果上层不取底层会一直通知我事件就绪了
于是我们介绍epoll有2种工作方式-水平触发(LT)和边缘触发(ET)
7.4.1 水平触发Level Triggered 工作模式
epoll默认状态下就是LT工作模式。
- 当epoll检测到socket上事件就绪的时候, 可以不立刻进行处理. 或者只处理一部分。
- 比如由于只读了1K数据, 缓冲区中还剩1K数据, 在第二次调用 epoll_wait 时, epoll_wait仍然会立刻返回并通知socket读事件就绪。
- 直到缓冲区上所有的数据都被处理完, epoll_wait 才不会立刻返回。
- 支持阻塞读写和非阻塞读写。
7.4.2 边缘触发Edge Triggered工作模式
如果我们在第1步将socket添加到epoll描述符的时候使用了EPOLLET标志, epoll进入ET工作模式。
-
当epoll检测到socket上事件就绪时, 必须立刻处理
-
比如, 虽然只读了1K的数据, 缓冲区还剩1K的数据, 在第二次调用 epoll_wait 的时候,epoll_wait 不会再返回了
也就是说, ET模式下, 文件描述符上的事件就绪后, 只有一次处理机会。 -
ET的性能比LT性能更高( epoll_wait 返回的次数少了很多). Nginx默认采用ET模式使用epoll。只支持非阻塞的读写
select和poll其实也是工作在LT模式下. epoll既可以支持LT, 也可以支持ET
7.4.3 感性理解
你叫小李,有一天你在淘宝购买了5件商品,淘宝把商品放到了菜鸟驿站中,你也不想去菜鸟驿站取,于是等待快递员派发。
今天轮到张三上班了,张三带了很多快递其中还有你的3个快递,他到了你们楼下,给你打电话让你下楼取快递你满口答应,可是你电话一挂继续打游戏,张三则给其他人打电话,过了2min张三发现小李又没下楼,继续打电话催小李取快递,小李还是一挂电话打游戏,这样重复了几次,后来张三给小李说不取快递自己就要下班了,小李无奈只能下楼取,小李下来后告诉张三自己身体弱只能拿2个快递,剩下的那个一会来取,小李刚取回快递就继续打游戏,张三发现小李没下来还是周而复始给他打电话,但是小李还是不下楼;后来快递员李四来了,他带了小李2个包裹,他快下班了,发现张三还没下班,问清情况后发现张三有小李1个包裹,于是把自己手里小李的2个包裹交给张三让其帮忙,张三答应了,张三还是给小李打电话告诉他你又新增了2个包裹加上那1个一共3个,小李不耐烦了,叫上朋友帮忙取走了3个包裹,张三终于下班了。
到了第2天,小李还是买了5件商品,今天是快递员李四派送,他此时开始派发了,李四是职场老手他打电话给小李说他这里有小李几个包裹只给他打一次电话,时间一到自己就下班了,电话挂了后,小李立马去拿自己快递,下来后他因为身体弱只拿走2个包裹还剩1个,李四已经给他打过电话了不会再给他打电话让他取那一个包裹了,他就给其他人打电话,此时李四遇到了张三,张三请求李四帮助派送自己手里小李的2个包裹,李四答应了,李四有新增了小李2个包裹,包裹从有到多了,李四手里小李的包裹变化了就给小李打电话,告诉小李自己手里新增了他2个包裹,只等他1min就下班了,于是小李叫上朋友取走了自己3个包裹。
在上面的故事中
张三: 水平触发(LT)模式,一旦有事件就绪,若果上层不取底层会一直通知我事件就绪了
李四:边缘触发(ET)模式,有效通知只有一次,数据从无到有,从有到多,变化的时候,才有第二次通知
张三 VS 李四
-
李四效率高,通知效率高
-
李四IO效率也高,李四派发完了,就走了,只通知一次,倒逼上层,尽快取走数据
7.4.4 LT VS ET
- LT是 epoll 的默认行为。
- 边缘触发(ET)模式,有效通知只有一次,数据从无到有,从有到多,变化的时候,才有第二次通知 ;就会倒逼程序员,必须一次将本轮数据全部读取完毕 => 怎么保证呢?循环读取,用recv/read => recv/read默认是阻塞的 => 即循环读取,可能会被阻塞 => 想要非阻塞读取怎么做呢 => 结论:在ET的工作模式下,所有的读取和写入,都必须是非阻塞的接口!
- LT模式,可以在阻塞模式下工作吗?可以;可以在非阻塞模式下工作吗?可以
关于高效的问题:
- 一次通知就是一次系统调用,一次返回必定对应一次调用 — ET有效减少系统调用次数
- ET倒逼程序员尽快取走所有数据,本质是:让TCP底层更新出更大的接收窗口,从而在较大概率上,提供对方的滑动窗口的大小,提高发送效率(滑动窗口 = min(拥塞窗口,接收缓冲区剩余空间的大小))
效率: ET>=LT
7.5 epoll的使用场景
epoll的高性能, 是有一定的特定场景的。如果场景选择的不适宜, epoll的性能可能适得其反。
对于多连接, 且多连接中只有一部分连接比较活跃时, 比较适合使用epoll。
例如, 典型的一个需要处理上万个客户端的服务器, 例如各种互联网APP的入口服务器, 这样的服务器就很适合epoll
如果只是系统内部, 服务器和服务器之间进行通信, 只有少数的几个连接, 这种情况下用epoll就并不合适。具体要根据需求和场景特点来决定使用哪种IO模型
7.6 EpollServer
此代码用到了之前自己封装的Sock.hpp, log.hpp, err.hpp看上面selectserver部分, 还自己封装了epoll相关的接口
Epoll.hpp
#pragma once
#include<iostream>
#include<sys/epoll.h>
#include<unistd.h>
#include"log.hpp"
#include"err.hpp"
static const int defaultepfd=-1;
static const int gsize=128;
class Epoller
{
public:
Epoller()
:epfd_(defaultepfd)
{
}
void Create()
{
epfd_=epoll_create(gsize);
if(epfd_<0)
{
logMessage(Fatal, "epoll_create error, code: %d, errstring: %s", errno, strerror(errno));
exit(EPOLL_CREAT_ERR);
}
}
// 用户 -> 内核
bool AddEvent(int fd, uint32_t events)
{
struct epoll_event ev;
ev.events=events;
ev.data.fd=fd; // 用户数据, epoll底层不对该数据做任何修改, 就是为了给未来就绪返回的!
int n=epoll_ctl(epfd_,EPOLL_CTL_ADD,fd,&ev);
if(n<0)
{
logMessage(Fatal, "epoll_create error, code: %d, errstring: %s", errno, strerror(errno));
return false;
}
return true;
}
bool DelEvent(int fd)
{
return epoll_ctl(epfd_,EPOLL_CTL_DEL,fd,nullptr)==0;
}
int Wait(struct epoll_event *revs,int num,int timeout)
{
return epoll_wait(epfd_,revs,num,timeout);
}
int Fd()
{
return epfd_;
}
void Close()
{
if(epfd_!= defaultepfd)
close(epfd_);
}
~Epoller()
{
}
private:
int epfd_;
};
main.cc
#include"EpollServer.hpp"
#include<memory>
#include<iostream>
string echoServer(string r)
{
string resp=r;
resp+="[echo]\r\n";
return resp;
}
int main()
{
// fd_set fd;
// std::cout<<sizeof(fd)*8<<std::endl;
std::unique_ptr<EpollServer> svr(new EpollServer(echoServer));
svr->InitServer();
svr->Start();
return 0;
}
(1) EpollServer_V1
V1版本是一个epoll工作在LT模式下简单的服务器
#pragma once
#include<iostream>
#include"Sock.hpp"
#include"log.hpp"
#include"Epoll.hpp"
#include<functional>
#include<assert.h>
const static int gport=8888;
using func_t =function<string(string)>;
class EpollServer
{
const static int gnum = 64;
public:
EpollServer( func_t func,uint16_t port = gport)
:func_(func)
,port_(port)
{
}
void InitServer()
{
listensock_.Socket();
listensock_.Bind(port_);
listensock_.Listen();
epoller_.Create();
logMessage(Debug, "init server success");
}
void Start()
{
// 1. 这里我们能够直接获取新的链接吗?
// 2. 最开始的时候, 我们的服务器是没有太多的sock的, 甚至只有一个sock! listensock_
// 3. 在网络中, 新连接到来被当做 读事件就绪!
// listensock_.Accept(); 不能
// (1) 将listensock添加到epoll中!
bool r = epoller_.AddEvent(listensock_.Fd(), EPOLLIN);
assert(r);
(void)r;
int timeout = -1;
while (true)
{
// (2) 从底层获取事件
int n = epoller_.Wait(revs_, gnum, timeout);
switch (n)
{
case 0:
logMessage(Debug, "timeout...");
break;
case -1:
logMessage(Warning, "epoll_wait failed");
break;
default:
// 成功了
logMessage(Debug, "有%d个事件就绪了", n);
HandlerEvent(n);
break;
}
}
}
void HandlerEvent(int num)
{
for (int i = 0; i < num; ++i)
{
int fd = revs_[i].data.fd;
uint32_t events = revs_[i].events;
logMessage(Debug, "当前正在处理%d上的%s", fd, (events & EPOLLIN) ? "EPOLLIN" : "OTHER");
if (events & EPOLLIN)
{
if (fd == listensock_.Fd())
{
// 1. 新连接到来了
string clientip;
uint16_t clientport;
int sock = listensock_.Accept(&clientip, &clientport);
if (sock < 0)
continue;
logMessage(Debug, "%s:%d 已经连上了服务器了", clientip.c_str(), clientport);
// 1.1 此时在这里, 我们能不能进行read/recv?不能,
// 只有epoll知道sock上面的事件情况, 将sock添加到epoll中
bool r = epoller_.AddEvent(sock, EPOLLIN);
assert(r);
(void)r;
}
else
{
// 我们目前无法保证我们读到一个完整的报文
// 为什么? 完整报文由应用层协议规定, 本质就是你没有应用层协议!
// 怎么办? 自定义应用层协议
char request[1024];
ssize_t s = recv(fd, request, sizeof(request) - 1, 0);
if (s > 0)
{
request[s - 1] = 0; // \r\n
request[s - 2] = 0; // \r\n
string response=func_(request);
send(fd, response.c_str(),response.size(), 0);
}
else
{
if (s == 0)
logMessage(Info, "client quit ...");
else
logMessage(Warning, "recv error, client quit ...");
// 再处理异常的时候,先从epoll中移除, 然后再关闭
epoller_.DelEvent(fd);
close(fd);
}
}
}
}
}
~EpollServer()
{
listensock_.Close();
epoller_.Close();
}
private:
uint16_t port_;
Sock listensock_;
Epoller epoller_;
struct epoll_event revs_[gnum];
func_t func_;
};
运行结果:
Protocol.hpp
#include<string>
#include<iostream>
#include<vector>
#include<cstring>
#include<sys/types.h>
#include<sys/socket.h>
#include"Util.hpp"
#include<jsoncpp/json/json.h>
using namespace std;
// #define MYSELF 1
// 给网络版本计算器定制协议
namespace Protocol_ns
{
#define SEP " "
#define SEP_LEN strlen(SEP) // 绝对不能写成sizeof
#define HEADER_SEP "\r\n"
#define HEADER_SEP_LEN strlen("\r\n")
// "长度"\r\n" "_x op _y"\r\n
// "10 + 20" => "7"r\n""10 + 20"\r\n => 报头 + 有效载荷
// 请求/响应 = 报头\r\n有效载荷\r\n
// 请求 = 报头\r\n有效载荷\r\n报头\r\n有效载荷\r\n报头\r\n有效载荷\r\n
// 未来: "长度"\r\n"协议号\r\n""_x op _y"\r\n
// "10 + 20" => "7"r\n""10 + 20"\r\n
string AddHeader(string&str)
{
cout<<"AddHeader 之前:\n"
<<str<<endl;
string s=to_string(str.size());
s+=HEADER_SEP;
s+=str;
s+=HEADER_SEP;
cout<<"AddHeader 之后:\n"
<<s<<endl;
return s;
}
// "7"r\n""10 + 20"\r\n => "10 + 20"
string RemoveHeader(const string&str,int len)
{
cout<<"RemoveHeader 之前:\n"
<<str<<endl;
// 从后面开始截取
string res=str.substr(str.size()-HEADER_SEP_LEN-len,len);
cout<<"RemoveHeader 之后:\n"
<<res<<endl;
return res;
}
int Parsepackage(string &inbuffer, string *package)
{
cout << "ReadPackage inbuffer 之前:\n"
<< inbuffer << endl;
// 边分析, "7"r\n""10 + 20"\r\n
auto pos = inbuffer.find(HEADER_SEP);
if (pos == string::npos)
return 0;
string lenStr = inbuffer.substr(0, pos); // 获取头部字符串, 没有动inbuffer
int len = Util::toInt(lenStr); // 得到有效载荷的长度 => "123" -> 123
int targetPackageLen = len + 2 * HEADER_SEP_LEN + lenStr.size(); // 得到整个报文长度
if (inbuffer.size() < targetPackageLen) // 不是一个完整的报文
return 0;
*package = inbuffer.substr(0, targetPackageLen); // 提取到了报文有效载荷, 没有动inbuffer
inbuffer.erase(0, targetPackageLen); // 从inbuffer中直接移除整个报文
cout << "ReadPackage inbuffer 之后:\n"
<< inbuffer << endl;
return len;
}
// Request && Response都要提供序列化和反序列化功能
// 1. 自己手写
// 2. 用别人的
class Request
{
public:
Request()
{
}
Request(int x,int y,char op)
:_x(x)
,_y(y)
,_op(op)
{
}
// 序列化: struct->string
bool Serialize(string* outStr)
{
*outStr="";
#ifdef MYSELF
string x_string=to_string(_x);
string y_string=to_string(_y);
// 手动序列化
*outStr=x_string + SEP + _op + SEP + y_string;
std::cout << "Request Serialize:\n"
<< *outStr << std::endl;
#else
Json::Value root; // Value: 一种万能对象, 接受任意的kv类型
root["x"]=_x;
root["y"]=_y;
root["op"]=_op;
Json::FastWriter writer; // writer: 是用来进行序列化的 struct -> string
// Json::StyledWriter writer;
*outStr=writer.write(root);
#endif
return true;
}
// 反序列化: string->struct
bool Deserialize(const string&inStr)
{
#ifdef MYSELF
// inStr: 10 + 20 => [0]=>10, [1]=>+, [2]=>20
vector<string> result;
Util::StringSplit(inStr,SEP,&result);
if(result.size()!=3)
return false;
if(result[1].size()!=1)
return false;
_x=Util::toInt(result[0]);
_y=Util::toInt(result[2]);
_op=result[1][0];
#else
Json::Value root;
Json::Reader reader; // Reader: 是用来反序列化的
reader.parse(inStr,root);
_x=root["x"].asUInt();
_y=root["y"].asUInt();
_op=root["op"].asUInt();
#endif
Print();
return true;
}
void Print()
{
std::cout << "_x: " << _x << std::endl;
std::cout << "_y: " << _y << std::endl;
std::cout << "_z: " << _op << std::endl;
}
~Request()
{
}
public:
// _x op _y ==> 10 * 9 ? ==> 10 / 0 ?
int _x;
int _y;
char _op;
};
class Response
{
public:
Response()
{
}
Response(int result,int code)
:_result(result)
,_code(code)
{
}
// 序列化: struct->string
bool Serialize(string* outStr)
{
// _result _code
*outStr="";
#ifdef MYSELF
string res_string = to_string(_result);
string code_string = to_string(_code);
// 手动序列化
*outStr=res_string + SEP + code_string;
#else
Json::Value root;
root["result"]=_result;
root["code"]=_code;
Json::FastWriter writer;
// Json::StyledWriter writer;
*outStr=writer.write(root);
#endif
return true;
}
// 反序列化: string->struct
bool Deserialize(const string&inStr)
{
#ifdef MYSELF
// 10 0
vector<string> result;
Util::StringSplit(inStr,SEP,&result);
if(result.size()!=2)
return false;
_result=Util::toInt(result[0]);
_code=Util::toInt(result[1]);
#else
Json::Value root;
Json::Reader reader;
reader.parse(inStr, root);
_result = root["result"].asUInt();
_code = root["code"].asUInt();
#endif
Print();
return true;
}
void Print()
{
std::cout << "_result: " << _result << std::endl;
std::cout << "_code: " << _code << std::endl;
}
~Response()
{
}
public:
int _result;
int _code; // 0 success; 1,2,3,4代表不同错误码
};
}
Util.hpp
#pragma once
#include<iostream>
#include <unistd.h>
#include <fcntl.h>
#include<string>
#include<vector>
using namespace std;
// 工具集
class Util
{
public:
static bool SetNonBlock(int fd)
{
int fl = fcntl(fd, F_GETFL);
if (fl < 0)
return false;
fcntl(fd, F_SETFL, fl | O_NONBLOCK);
return true;
}
// 输入: const &
// 输出: *
// 输入输出: *
static bool StringSplit(const string &str, const string &sep, vector<string> *result)
{
// 10 + 20
size_t start = 0;
while (start < str.size())
{
auto pos = str.find(sep, start);
if (pos == string::npos)
break;
result->push_back(str.substr(start, pos - start));
// 更新位置
start = pos + sep.size();
}
// 处理最后的字符串
if (start < str.size())
result->push_back(str.substr(start));
return true;
}
static int toInt(const string &s) // 字符串转整数
{
return atoi(s.c_str());
}
};
main.cc
#include"EpollServer.hpp"
#include<memory>
#include<iostream>
Response calculateHelper(const Request &req)
{
Response resq(0, 0);
switch (req._op)
{
case '+':
resq._result = req._x + req._y;
break;
case '-':
resq._result = req._x - req._y;
break;
case '*':
resq._result = req._x * req._y;
break;
case '/':
if (req._y == 0)
resq._code = 1;
else
resq._result = req._x / req._y;
break;
case '%':
if (req._y == 0)
resq._code = 2;
else
resq._result = req._x % req._y;
break;
default:
resq._code = 3;
break;
}
return resq;
}
void calculate(Connection *conn, const Request &req)
{
Response resp= calculateHelper(req);
string sendStr;
resp.Serialize(&sendStr);
sendStr=Protocol_ns::AddHeader(sendStr);
// 该怎么发送呢??
// 在epoll中, 关于fd的读取一般要常设置(一直要让epoll关心)
// 关于fd的写入, 一般是按需设置(不能常设置), 只有需要发送的时候, 才设置!!
// version1版本
conn->outbuffer_+=sendStr;
// 开启对写事件关心
conn->R->EnableReadWrite(conn,true,true); // 初次设置对写事件的关心, 对应的fd会立马触发一次就绪(发送buffer一定是空的)
}
int main()
{
// fd_set fd;
// std::cout<<sizeof(fd)*8<<std::endl;
std::unique_ptr<EpollServer> svr(new EpollServer(calculate));
svr->InitServer();
svr->Dispatcher();
return 0;
}
(2) EpollServer_V2
V2版本在V1的基础上,改成了ET模式并且处理了无法读到一个完整的报文的问题
#pragma once
#include<iostream>
#include"Sock.hpp"
#include"log.hpp"
#include"Epoll.hpp"
#include"Util.hpp"
#include"Protocol.hpp"
#include<functional>
#include<unordered_map>
#include<assert.h>
using namespace Protocol_ns;
const static int gport=8889;
const static int bsize=1024;
class EpollServer;
class Connection;
using func_t =function<void(Connection *conn,const Request&)>;
using callback_t =function<void(Connection*)>;
// 大号的结构体
class Connection
{
public:
Connection(int fd, const string&clientip, const uint16_t&clientport)
:fd_(fd)
,clientip_(clientip)
,clientport_(clientport)
{
}
void Register(callback_t recver, callback_t sender,callback_t excepter)
{
recver_=recver;
sender_=sender;
excepter_=excepter;
}
~Connection()
{
}
public:
// IO信息
int fd_;
string inbuffer_;
string outbuffer_;
// IO处理函数
callback_t recver_;
callback_t sender_;
callback_t excepter_;
// 用户信息 client info, only debug
string clientip_;
uint16_t clientport_;
// 给conn带上自己要关心的事件
uint32_t events;
// 回指指针
EpollServer*R;
};
class EpollServer
{
const static int gnum = 64;
public:
EpollServer(func_t func,uint16_t port = gport)
:func_(func)
,port_(port)
{
}
void InitServer()
{
listensock_.Socket();
listensock_.Bind(port_);
listensock_.Listen();
epoller_.Create();
AddConnection(listensock_.Fd(), EPOLLIN | EPOLLET);
logMessage(Debug, "init server success");
}
// 事件派发器
int Dispatcher()
{
// 1. 这里我们能够直接获取新的链接吗?
// 2. 最开始的时候, 我们的服务器是没有太多的sock的, 甚至只有一个sock! listensock_
// 3. 在网络中, 新连接到来被当做 读事件就绪!
// listensock_.Accept(); 不能
int timeout = -1;
while (true)
{
Looponce(timeout);
}
}
void Looponce(int timeout)
{
int n = epoller_.Wait(revs_, gnum, timeout);
for (int i = 0; i < n; ++i)
{
int fd = revs_[i].data.fd;
uint32_t events = revs_[i].events;
logMessage(Debug, "当前正在处理%d上的%s", fd, (events & EPOLLIN) ? "EPOLLIN" : "OTHER");
// 我们将所有的异常情况,最后全部转化成recv,send的异常
if ((events & EPOLLERR)||(events & EPOLLHUP))
events|=(EPOLLIN|EPOLLOUT);
if ((events & EPOLLIN) && (ConnIsExists(fd)))
connections_[fd]->recver_(connections_[fd]);
if ((events & EPOLLOUT)&& (ConnIsExists(fd)))
connections_[fd]->sender_(connections_[fd]);
}
}
void AddConnection(int fd, uint32_t events, std::string ip = "127.0.0.1", uint16_t port = gport)
{
// 1. 设置fd是非阻塞的
if(events & EPOLLET)
Util::SetNonBlock(fd);
// 2. 构建connection对象, 交给connections_来进行管理
Connection *conn = new Connection(fd, ip, port);
if (fd == listensock_.Fd())
{
conn->Register(std::bind(&EpollServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
}
else
{
conn->Register(std::bind(&EpollServer::Recver, this, std::placeholders::_1),
std::bind(&EpollServer::Sender, this, std::placeholders::_1),
std::bind(&EpollServer::Excepter, this, std::placeholders::_1));
}
conn->events=events;
conn->R=this;
connections_.insert(make_pair(fd, conn));
// 3. 将fd & events 写透到内核中
bool r = epoller_.AddModEvent(fd,events, EPOLL_CTL_ADD);
assert(r);
(void)r;
logMessage(Debug, "AddConnection success, fd: %d, clientinfo: [%s:%d]", fd, ip.c_str(), port);
}
bool EnableReadWrite(Connection *conn ,bool readable,bool writeable)
{
conn->events=((readable?EPOLLIN:0)|(writeable?EPOLLOUT:0)|EPOLLET);
return epoller_.AddModEvent(conn->fd_,conn->events,EPOLL_CTL_MOD);
}
// 连接管理器
void Accepter(Connection*conn)
{
do
{
int err=0;
// 1. 新连接到来了
string clientip;
uint16_t clientport;
int sock = listensock_.Accept(&clientip, &clientport,&err);
if (sock > 0)
{
logMessage(Debug, "%s:%d 已经连上了服务器了", clientip.c_str(), clientport);
// 将sock添加到epoller中
AddConnection(sock, EPOLLIN | EPOLLET, clientip, clientport);
}
else
{
if(err==EAGAIN || err==EWOULDBLOCK)
break;
else if(err=EINTR)
continue;
else // 获取连接失败
{
logMessage(Warning,"errstring : %s,errcode: %d",strerror(err),err);
continue;
}
}
} while (conn->events & EPOLLET);
logMessage(Debug, "Accepter done ....");
}
void Recver(Connection*conn)
{
// 我们目前无法保证我们读到一个完整的报文
// 为什么? 完整报文由应用层协议规定, 本质就是你没有应用层协议!
// 怎么办? 自定义应用层协议
// 简单方法
// char request[1024];
// ssize_t s=recv(conn->fd_,request,sizeof(request)-1,0);
// conn->inbuffer_+=request;
// conn->inbuffer_ check
// 读取完本轮数据了
do
{
char buffer[bsize];
ssize_t n=recv(conn->fd_,buffer,sizeof(buffer)-1,0);
if(n>0)
{
buffer[n]=0;
conn->inbuffer_+=buffer;
string requestStr;
// 根据基本协议, 进行数据分析 --- 自己定过一个
int n=Protocol_ns::Parsepackage(conn->inbuffer_, &requestStr);
if (n > 0)
{
requestStr=Protocol_ns::RemoveHeader(requestStr,n);
Request req;
req.Deserialize(requestStr);
func_(conn,req); // request 保证一定是一个完整的请求报文
}
logMessage(Debug, "inbuffer: %s, [%d]", conn->inbuffer_.c_str(), conn->fd_);
}
else if(n==0)
{
conn->excepter_(conn);
break;
}
else
{
if(errno==EAGAIN || errno==EWOULDBLOCK)
break;
else if(errno==EINTR)
continue;
else
{
conn->excepter_(conn);
break;
}
}
} while (conn->events & EPOLLET);
// logMessage(Debug,"Recver..., fd: %d, clientinfo:[%s:%d]",conn->fd_,conn->clientip_.c_str(),conn->clientport_);
}
void Sender(Connection *conn)
{
do
{
ssize_t n = send(conn->fd_, conn->outbuffer_.c_str(), conn->outbuffer_.size(), 0);
if (n > 0)
{
conn->outbuffer_.erase(0,n);
if(conn->outbuffer_.empty()) // 数据发完了
{
EnableReadWrite(conn,true,false);
break;
}
else
{
EnableReadWrite(conn,true,true); // 数据没发完
}
}
else
{
if (errno == EAGAIN || errno == EWOULDBLOCK) // 发送缓冲区写满了, 对方来不及收了,即写条件不就绪
break;
else if (errno == EINTR) // 写被信号中断了, 导致直接返回了。要继续再写
continue;
else
{
conn->excepter_(conn);
break;
}
}
} while (conn->events & EPOLLET);
}
void Excepter(Connection*conn)
{
logMessage(Debug,"Excepter..., fd: %d, clientinfo:[%s:%d]",conn->fd_,conn->clientip_.c_str(),conn->clientport_);
}
bool ConnIsExists(int fd)
{
return connections_.find(fd)!=connections_.end();
}
~EpollServer()
{
listensock_.Close();
epoller_.Close();
}
private:
uint16_t port_;
Sock listensock_;
Epoller epoller_;
struct epoll_event revs_[gnum];
func_t func_;
unordered_map<int,Connection*> connections_;
};
运行结果: 用之前的网络版本计算器客户端充当这个的客户端
main.cc
#include"EpollServer.hpp"
#include<memory>
#include<iostream>
Response calculate(const Request &req)
{
Response resq(0, 0);
switch (req._op)
{
case '+':
resq._result = req._x + req._y;
break;
case '-':
resq._result = req._x - req._y;
break;
case '*':
resq._result = req._x * req._y;
break;
case '/':
if (req._y == 0)
resq._code = 1;
else
resq._result = req._x / req._y;
break;
case '%':
if (req._y == 0)
resq._code = 2;
else
resq._result = req._x % req._y;
break;
default:
resq._code = 3;
break;
}
return resq;
}
int main()
{
// fd_set fd;
// std::cout<<sizeof(fd)*8<<std::endl;
std::unique_ptr<EpollServer> svr(new EpollServer(calculate));
svr->InitServer();
svr->Dispatcher();
return 0;
}
(3) EpollServer_V3
V3在V2的基础上修改成了Reactor模式,这只是一个简单的demo版本
Reactor介绍
基于多路转接的包含事件派发器,连接管理等半同步半异步的IO服务器
一旦有事件就绪就会调用connection底层对应的回调方法,就像打地鼠一样用锤子去打老鼠,底层的Epoll服务器帮助我们监测有哪些事件就绪了,就像好像地上的洞一样帮我们监测哪一个节点有地鼠
#pragma once
#include<iostream>
#include"Sock.hpp"
#include"log.hpp"
#include"Epoll.hpp"
#include"Util.hpp"
#include"Protocol.hpp"
#include<functional>
#include<unordered_map>
#include<assert.h>
using namespace Protocol_ns;
const static int gport=8889;
const static int bsize=1024;
class EpollServer;
class Connection;
using func_t =function<Response(const Request&)>;
using callback_t =function<void(Connection*)>;
// 大号的结构体
class Connection
{
public:
Connection(int fd, const string&clientip, const uint16_t&clientport)
:fd_(fd)
,clientip_(clientip)
,clientport_(clientport)
{
}
void Register(callback_t recver, callback_t sender,callback_t excepter)
{
recver_=recver;
sender_=sender;
excepter_=excepter;
}
~Connection()
{
}
public:
// IO信息
int fd_;
string inbuffer_;
string outbuffer_;
// IO处理函数
callback_t recver_;
callback_t sender_;
callback_t excepter_;
// 用户信息 client info, only debug
string clientip_;
uint16_t clientport_;
// 给conn带上自己要关心的事件
uint32_t events;
// 回指指针
EpollServer*R;
};
class EpollServer
{
const static int gnum = 64;
public:
EpollServer(func_t func,uint16_t port = gport)
:func_(func)
,port_(port)
{
}
void InitServer()
{
listensock_.Socket();
listensock_.Bind(port_);
listensock_.Listen();
epoller_.Create();
AddConnection(listensock_.Fd(), EPOLLIN | EPOLLET);
logMessage(Debug, "init server success");
}
// 事件派发器
int Dispatcher()
{
// 1. 这里我们能够直接获取新的链接吗?
// 2. 最开始的时候, 我们的服务器是没有太多的sock的, 甚至只有一个sock! listensock_
// 3. 在网络中, 新连接到来被当做 读事件就绪!
// listensock_.Accept(); 不能
int timeout = -1;
while (true)
{
Looponce(timeout);
}
}
void Looponce(int timeout)
{
int n = epoller_.Wait(revs_, gnum, timeout);
for (int i = 0; i < n; ++i)
{
int fd = revs_[i].data.fd;
uint32_t events = revs_[i].events;
logMessage(Debug, "当前正在处理%d上的%s", fd, (events & EPOLLIN) ? "EPOLLIN" : "OTHER");
// 我们将所有的异常情况,最后全部转化成recv,send的异常
if ((events & EPOLLERR)||(events & EPOLLHUP))
events|=(EPOLLIN|EPOLLOUT);
if ((events & EPOLLIN) && (ConnIsExists(fd)))
connections_[fd]->recver_(connections_[fd]);
if ((events & EPOLLOUT)&& (ConnIsExists(fd)))
connections_[fd]->sender_(connections_[fd]);
}
}
void AddConnection(int fd, uint32_t events, std::string ip = "127.0.0.1", uint16_t port = gport)
{
// 1. 设置fd是非阻塞的
if(events & EPOLLET)
Util::SetNonBlock(fd);
// 2. 构建connection对象, 交给connections_来进行管理
Connection *conn = new Connection(fd, ip, port);
if (fd == listensock_.Fd())
{
conn->Register(std::bind(&EpollServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
}
else
{
conn->Register(std::bind(&EpollServer::Recver, this, std::placeholders::_1),
std::bind(&EpollServer::Sender, this, std::placeholders::_1),
std::bind(&EpollServer::Excepter, this, std::placeholders::_1));
}
conn->events=events;
conn->R=this;
connections_.insert(make_pair(fd, conn));
// 3. 将fd & events 写透到内核中
bool r = epoller_.AddModEvent(fd,events, EPOLL_CTL_ADD);
assert(r);
(void)r;
logMessage(Debug, "AddConnection success, fd: %d, clientinfo: [%s:%d]", fd, ip.c_str(), port);
}
bool EnableReadWrite(Connection *conn ,bool readable,bool writeable)
{
conn->events=((readable?EPOLLIN:0)|(writeable?EPOLLOUT:0)|EPOLLET);
return epoller_.AddModEvent(conn->fd_,conn->events,EPOLL_CTL_MOD);
}
// 连接管理器
void Accepter(Connection*conn)
{
do
{
int err=0;
// 1. 新连接到来了
string clientip;
uint16_t clientport;
int sock = listensock_.Accept(&clientip, &clientport,&err);
if (sock > 0)
{
logMessage(Debug, "%s:%d 已经连上了服务器了", clientip.c_str(), clientport);
// 将sock添加到epoller中
AddConnection(sock, EPOLLIN | EPOLLET, clientip, clientport);
}
else
{
if(err==EAGAIN || err==EWOULDBLOCK)
break;
else if(err=EINTR)
continue;
else // 获取连接失败
{
logMessage(Warning,"errstring : %s,errcode: %d",strerror(err),err);
continue;
}
}
} while (conn->events & EPOLLET);
logMessage(Debug, "Accepter done ....");
}
void HandlerRequest(Connection *conn)
{
int quit = false;
while (!quit)
{
string requestStr;
// 1. 提取完整报文
int n = Protocol_ns::Parsepackage(conn->inbuffer_, &requestStr);
if (n > 0)
{
// 2. 提取有效载荷
requestStr = Protocol_ns::RemoveHeader(requestStr, n);
// 3. 进行反序列化
Request req;
req.Deserialize(requestStr);
// 4. 进行业务处理
Response resp=func_(req); // request 保证一定是一个完整的请求报文
// 5. 序列化
string RespStr;
resp.Serialize(&RespStr);
// 6. 添加报头
RespStr=AddHeader(RespStr);
// 7. 进行返回
conn->outbuffer_+=RespStr;
}
else
quit=true;
}
}
bool RecverHelper(Connection *conn)
{
int ret=true;
// 读取完毕本轮数据!
do
{
char buffer[bsize];
ssize_t n = recv(conn->fd_, buffer, sizeof(buffer) - 1, 0);
if (n > 0)
{
buffer[n] = 0;
conn->inbuffer_ += buffer;
logMessage(Debug, "inbuffer: %s, [%d]", conn->inbuffer_.c_str(), conn->fd_);
}
else if (n == 0)
{
conn->excepter_(conn);
ret=false;
break;
}
else
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
else if (errno == EINTR)
continue;
else
{
conn->excepter_(conn);
ret=false;
break;
}
}
} while (conn->events & EPOLLET);
return ret;
}
void Recver(Connection*conn)
{
if(!RecverHelper(conn))
return;
HandlerRequest(conn);
// 一般在面对写入的时候, 直接写入, 没写完, 才交给epoll!
if(!conn->outbuffer_.empty())
conn->sender_(conn); // 只是第一次触发发送
}
void Sender(Connection *conn)
{
bool safe=true;
do
{
ssize_t n = send(conn->fd_, conn->outbuffer_.c_str(), conn->outbuffer_.size(), 0);
if (n > 0)
{
conn->outbuffer_.erase(0,n);
if (conn->outbuffer_.empty())
break;
}
else
{
if (errno == EAGAIN || errno == EWOULDBLOCK) // 发送缓冲区写满了, 对方来不及收了,即写条件不就绪
break;
else if (errno == EINTR) // 写被信号中断了, 导致直接返回了。要继续再写
continue;
else
{
safe=false;
conn->excepter_(conn);
break;
}
}
} while (conn->events & EPOLLET);
if(!safe)
return;
if (!conn->outbuffer_.empty()) // 数据发完了
EnableReadWrite(conn, true, true);
else
EnableReadWrite(conn, true, false); // 数据没发完
}
void Excepter(Connection*conn)
{
// 1. 先从epoll中移除fd
epoller_.DelEvent(conn->fd_);
// 2. 移除unordered_map中的KV关系
connections_.erase(conn->fd_);
// 3. 关闭fd
close(conn->fd_);
// 4. conn对象释放掉
delete conn;
logMessage(Debug,"Excepter..., fd: %d, clientinfo:[%s:%d]",conn->fd_,conn->clientip_.c_str(),conn->clientport_);
}
bool ConnIsExists(int fd)
{
return connections_.find(fd)!=connections_.end();
}
~EpollServer()
{
listensock_.Close();
epoller_.Close();
}
private:
uint16_t port_;
Sock listensock_;
Epoller epoller_;
struct epoll_event revs_[gnum];
func_t func_;
unordered_map<int,Connection*> connections_;
};
运行效果与V2相同