阻塞IO模型
阻塞IO是最通用的IO类型,使用这种模型进行数据接收的时候,在数据没有到之前程序会一直等待。例如,对于函数recvfrom(),内核会一直阻塞该请求直到有数据到来才返回。
非阻塞IO模型
当把套接字设置成非阻塞的IO,则对每次请求,内核都不会阻塞,会立即返回;当没有数据的时候,会返回一个错误。例如,对recvfrom()函数,前几次都没有数据返回,直到最后内核才向用户层的空间复制数据。
非阻塞方式的操作与阻塞方式的操作最大的不同点是函数的调用立刻返回,不管数据是否成功读取或者成功写入。使用fcntl()套接字文件描述符按照如下的代码进行设置后,可以进行非阻塞的编程:
fcntl(s, F_SETFL, O_NONBLOCK);
其中的s是套接字文件描述符,使用F_SETFL命令将套接字s设置为非阻塞方式后,再进行读写操作就可以马上返回了 。
fcntl函数原型:
#include <unistd.h>
#include <fcntl.h>int fcntl(int fd, int cmd, ... /* arg */ );
fcntl函数有5种功能:
- 复制一个现有的描述符(cmd=F_DUPFD).
- 获得/设置文件描述符标记(cmd=F_GETFD或F_SETFD).
- 获得/设置文件状态标记(cmd=F_GETFL或F_SETFL).
- 获得/设置异步I/O所有权(cmd=F_GETOWN或F_SETOWN).
- 获得/设置记录锁(cmd=F_GETLK,F_SETLK或F_SETLKW).
我们此处只是用第三种功能, 获取/设置文件状态标记, 就可以将一个文件描述符设置为非阻塞.
#include <iostream>
#include <cerrno>
#include <fcntl.h>
#include <unistd.h>
bool SetNonBlock(int fd)
{
int fl = fcntl(fd,F_GETFD);//在底层获取当前fd对应的文件读写标志位
if(fl < 0)
{
return false;
}
fcntl(fd,F_SETFL,fl | O_NONBLOCK);//设置非阻塞
return true;
}
int main()
{
SetNonBlock(0);
char buffer[1024];
while(true)
{
sleep(1);
errno = 0;
ssize_t s = read(0,buffer,sizeof(buffer)-1);
if(s > 0)
{
buffer[s] = 0;
std::cout << "echo# " << buffer << " errno "<< errno << "errstring: " << std::endl;
}
else
{
//errno的值是11,代表底层数据没就绪
//std::cout << " read \"error\" " << " errno "<< errno << "errstring: " << std::endl;
if(errno == EWOULDBLOCK ||errno == EAGAIN)
{
std::cout << " 当前0号fd数据没有就绪" << std::endl;
continue;
}
else if(errno == EINTR)
{
std::cout << " 当前0号fd数据没有就绪" << std::endl;
continue;
}
}
}
return 0;
}
IO复用
使用IO复用模型可以在等待的时候加入超时的时间,当超时时间没有到达的时候,阻塞的情况一致,而当超时间到达仍然没有数据接收到,系统会返回,不再等待,select()函数按照一定的超时时间轮询,直到需要等待的套接字有数据到来,利用recvfrom()函数将数据复制到应用层。
IO的本质是 等 + 数据拷贝,想要让IO更高效,就是缩短等的时间,select()函数可以缩短等的时间。 select()函数可以帮用户一次等待多个文件sock,当哪些文件sock就绪了,select()函数就会通知用户,对应的sock有哪些,然后用户再调用recv/recvfrom/read等函数进行读取。
select()函数
函数select()与之前的函数recv()和send()直接操作文件描述符不同。使用select()函数可以先对需要操作的文件描述符进行查询,查看目标文件描述符是否可以进行读、写或者错误操作,然后当文件描述符满足操作的条件的时候才进行真正的IO操作。
/* According to POSIX.1-2001 */
#include <sys/select.h>/* According to earlier standards */
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
除了nfds都是输入输出型参数。fd_set是一个位图结构。
函数select()允许程序监视多个文件描述符,当一个或者多个监视的文件描述准备就绪,可以进行IO操作的时候返回。函数监视一个文件描述符的对应操作是不可以进行,例如对监视读文件集的对文件描述符可操作。
函数可以同时监视3类文件描述符。将监视在readfds文件描述符集合中的文件是否可读,即判断对此文件描述符进行读操作是否被阻塞;函数监视writeids文件描述符集合中的文件是否可写,即判断是否对此文件描述符进行写操作是否阻塞;另外,函数还监视文件描述符集合exceptfds中的文件描述符是否发生意外。当函数退出的时候,上述的集合发生了改变。当不需要监视某种文件集时,可以将对应的文件集设置为NULL,如果所有的文件集和均为NULL,则表示等待一段时间。
参数timeout的类型是如下的结构:
struct timeval
{
time_t tv_ sec; /*秒*/long tv usec; /*微秒*/
};
- 成员tv_sec表示超时的秒数。
- 成员tv_usec 表示超时的微妙数,即1/1000000s.
有4个宏可以操作文件描述符的集合。
- FD_ZERO(): 清理文件描述符集合。
- FD_SET(): 向某个文件描述符集合中加入文件描述符。
- FD_CLR():从某个文件描述符的集合中取出某个文件描述符。
- FD_ISSET(): 测试某个文件描述符是否某个集合中的一员。
也注意:文件描述符的集合存在最大的限制,其最大值为FD_SETSIZE, 当超出最大值时,将发生不能确定的事情。
readfds:
a.输入时:用户->内核,我的比特位中,比特位的位置,表示文件描述符值,比特位的内容表示用户是否关心。
b.输出时:内核->用户,我是OS,用户让我关心的多个fd有结果了。比特位的位置,表示文件描述符值,比特位的内容,表示是否就绪。
后续用户可以直接读取位图中为1的比特位所代表的号,而不会被阻塞。
因为用户和内核都会修改同一个位图结构所以这个参数用一次之后,一定需要进行重新设定。
函数select()的参数含义如下所述。
- nfds:一个整型的变量,它比所有文件描述符集合中的文件描述符的最大值大1。使用select()的时候必须计算最大值的文件描述的值,将值通过nfds传入。
- readfds:这个文件描述符集合监视文件集中的任何文件是否有数据可读,当select()函数返回的时候,readfds将清除其中不可读的文件描述符,只留下可读的文件描述符,即可以被函数recv()、read()等进行读数据的操作。
- writefds: 这个文件描述符集合监视文件集中的任何文件是否有数据可写,当select()函数返回的时候,readfds将清除其中的不可写的文件描述符,只留下可写的文件描述符,即可以被send()、write()函数等进行写数据的操作。
- exceptfds:这个文件集将监视文件集中的任何文件是否发生错误,其实,它可于其他的用途。例如,监视带外数据OOB,带外数据使用MSG_OOB标志发送到套接字上。当select()函数返回的时候,readfds将清除其中的其他文件描述符,只留下可读OOB数据。
- timeout:设置在select()所监视的文件集合中的事件没有发生时,最长的等待时间,当超过此时间时,函数会返回。当超时间为NULL时,表示阻塞操作,会一直等待,直到某个监视的文件集为0时,selet会立即返回。(select等待多个fd,等待策略可以选择:1、阻塞式nullptr ⒉、非阻塞式{0,0} 3、可以设置timeout时间,时间内阻塞,时间到,立马返回{(5,0)}
等待时间内,如果有fd就绪, timeout,表现输出性,返回距离下一次timeout,剩余多长时间。)
函数select() 返回值为0、-1或者一个大于1的整数值:当监视的文件集中有文件描述符符合要求,即读文件描述符集中的文件可读、写文件描述符中的文件可写或者错误文件描述符中的文件发生错误时,返回值为大于0的正值;当超时的时候返回0;当返回值为-1的时候发生了错误,其错误值由errno指定。
错误值可能为:
EBADF :文件描述符为无效的或该文件已关闭
EINTR :此调用被信号所中断
EINVAL: 传递了不合法的参数
ENOMEM :没有足够的内存
select优缺点:
优点:任何一个多路转接方案,都具备:
- 效率高
- 应用场景:有大量的连接,但是只有少量是活跃的,节省了资源
缺点:
- 为了维护第三方数组,select服务器会充满大量的遍历,OS底层帮我们关心fd的时候,也要遍历b.每一次都要对select输出参数进行重新设定
- 能够同时管理的fd的个数是有上限的
- 因为几乎每一个参数都是输入输出型的,select一定会频繁的进行用户到内核,内核到用户的参数数据拷贝
- 编码比较复杂
poll()函数
poll()函数等待某个文件描述符上的某个事件的发生
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
poll()函数监视在fds数组指明的一组文件描述符上发生的动作,当满足条件或者包超时的时候会退出。
- 参数fds是一个指向结构poll数组的指针,监视的文件描述符和条件放在里面。
- 参数nfds是比监视的最大描述符的值大1的值。
- 参数timeout是超时时间,单位为毫秒,当为负值时,表示永远等待。
poll()函数返回值的含义如下所述。
大于0:表示成功,等待的某个条件满足,返回值为满足条件的监视文件描描述符的数量。0:表示超时。-1:表示发生错误,errno的错误码与select一样。
结构struct poll的原型如下:
struct pollfd {
int fd; /*文件描述符*/
short events; /*请求的事件*/
short revents; /*返回的事件*/
};
- 成员fd表示监视的文件描述符。
- 成员events表示输入的监视事件,其值及含义revents的值及含义如下。
- 成员revents表示返回的监视事件,即返回时发生的事件。
poll的优点:
- 效率高
- 有大量的连接,但是只有少量的是活跃的。节省了资源
- 输入输出参数分离的,不需要进行大量的重置。
- poll参数级别,没有可以管理的fd的上限
poll缺点:
- poll依旧需要不少的遍历,在用户层检测时间就绪,与内核检测fd就绪。都是一样,用户还是要维护数组
- poll需要内核到用户的拷贝。
- poll的代码也比较复杂--比select容易。
epoll()函数
epoll 有3个相关的系统调用.
epoll_create()
创建一个epoll的句柄
#include <sys/epoll.h>
int epoll_create(int size);
- 自从linux2.6.8之后,size参数是被忽略的.
- 用完之后, 必须调用close()关闭.
epoll_ctl()
epoll的事件注册函数
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
它不同于select()是在监听事件时告诉内核要监听什么类型的事件, 而是在这里先注册要监听的事件类型。第一个参数是epoll_create()的返回值(epoll的句柄); 第二个参数表示动作,用三个宏来表示.;第三个参数是需要监听的fd; 第四个参数是告诉内核需要监听什么事。
第二个参数的取值:
- EPOLL_CTL_ADD:注册新的fd到epfd中;
- EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
- EPOLL_CTL_DEL:从epfd中删除一个fd;
struct epoll_event结构
typedef union epoll_data
{
void *ptr;int fd;
uint32_t u32;uint64_t u64;
}epoll_data_t;
struct epoll_event{
uint32_t events;epoll_data_t data;
}_EPOLL_PACKED;
events可以是以下几个宏的集合:
- EPOLLIN : 表示对应的文件描述符可以读 (包括对端SOCKET正常关闭);
- EPOLLOUT : 表示对应的文件描述符可以写;
- EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来); EPOLLERR : 表示对应的文件描述符发生错误;
- EPOLLHUP : 表示对应的文件描述符被挂断;
- EPOLLET : 将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的.
- EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要再次把这个socket加入到EPOLL队列里.
epoll_wait()
收集在epoll监控的事件中已经发送的事件.
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);
参数events是分配好的epoll_event结构体数组. epoll将会把发生的事件赋值到events数组中 (events不可以是空指针,内核只负责把数据复制到这个 events数组中,不会去帮助我们在用户态中分配内存). maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size. 参数timeout是超时时间 (毫秒,0会立即返回,-1是永久阻塞). 如果函数调用成功,返回对应I/O上已准备好的文件描述符数目,如返回0表示已超时, 返回小于0表示函 数失败.
epoll()函数工作原理
epoll.hpp
#include <iostream>
#include <sys/epoll.h>
#include <unistd.h>
class Epoll
{
public:
static const int gsize = 256;
public:
static int CreateEpoll()
{
int epfd = epoll_create(gsize);
if(epfd > 0) return epfd;
exit(5);
}
static bool CtlEpoll(int epfd, int oper, int sock, uint32_t events)
{
struct epoll_event ev;
ev.events = events;
ev.data.fd = sock;
int n = epoll_ctl(epfd, oper, sock, &ev);
return n == 0;
}
static int WaitEpoll(int epfd, struct epoll_event revs[], int num, int timeout)
{
//如果底层就绪的sock非常多,revs承装不下,一次拿不完,就下一次再拿
//关于epoll_wait的返回值问题:有多少个fd上的事件就绪,就返回多少,epoll返回的时候,
//会将所有就绪的event按照顺序放入到revs数组中,一共有返回值个
return epoll_wait(epfd, revs, num, timeout);
}
};
epollServer.hpp
#ifndef __EPOLL_SERVER_HPP__
#define __EPOLL_SERVER_HPP__
#include <iostream>
#include <string>
#include <functional>
#include <cassert>
#include "Log.hpp"
#include "Sock.hpp"
#include "Epoll.hpp"
namespace ns_epoll
{
const static int default_port = 8080;
const static int gnum = 64;
//只处理读取
class EpollServer
{
using func_t = std::function<void(std::string)>;
public:
EpollServer(func_t HandlerRequest, const int &port = default_port)
: _port(port), _revs_num(gnum), _HandlerRequest(HandlerRequest)
{
// 0. 申请对应的空间
_revs = new struct epoll_event[_revs_num];
// 1. 创建listensock
_listensock = Sock::Socket();
Sock::Bind(_listensock, _port);
Sock::Listen(_listensock);
// 2. 创建epoll模型
_epfd = Epoll::CreateEpoll();
logMessage(DEBUG, "init success, listensock: %d, epfd: %d", _listensock, _epfd); // 3, 4
// 3. 将listensock,先添加到epoll中,让epoll帮我们管理起来
if (!Epoll::CtlEpoll(_epfd, EPOLL_CTL_ADD, _listensock, EPOLLIN))
exit(6);
logMessage(DEBUG, "add listensock to epoll success."); // 3, 4
}
void Accepter(int listensock)
{
std::string clientip;
uint16_t clientport;
int sock = Sock::Accept(listensock, &clientip, &clientport);
if(sock < 0)
{
logMessage(WARNING, "accept error!");
return;
}
// 不能直接读取,因为并不清楚,底层是否有数据
// 将新的sock,添加给epoll
if (!Epoll::CtlEpoll(_epfd, EPOLL_CTL_ADD, sock, EPOLLIN)) return;
logMessage(DEBUG, "add new sock : %d to epoll success", sock);
}
void Recver(int sock)
{
// 1. 读取数据
char buffer[10240];
ssize_t n = recv(sock, buffer, sizeof(buffer)-1, 0);
if(n > 0)
{
//假设这里就是读到了一个完整的报文
buffer[n] = 0;
_HandlerRequest(buffer); // 2. 处理数据
}
else if(n == 0)
{
// 1. 先在epoll中去掉对sock的关心
bool res = Epoll::CtlEpoll(_epfd, EPOLL_CTL_DEL, sock, 0);
assert(res);
(void)res;
// 2. 在close文件
close(sock);
logMessage(NORMAL, "client %d quit, me too...", sock);
}
else
{
// 1. 先在epoll中去掉对sock的关心
bool res = Epoll::CtlEpoll(_epfd, EPOLL_CTL_DEL, sock, 0);
assert(res);
(void)res;
// 2. 在close文件
close(sock);
logMessage(NORMAL, "client recv %d error, close error sock", sock);
}
}
void HandlerEvents(int n)
{
assert(n > 0);
for(int i = 0; i < n; i++)
{
uint32_t revents = _revs[i].events;
int sock = _revs[i].data.fd;
// 读事件就绪
if(revents & EPOLLIN)
{
if(sock == _listensock) Accepter(_listensock);
else Recver(sock);
}
if(revents & EPOLLOUT)
{
//TODO?
}
}
}
void LoopOnce(int timeout)
{
int n = Epoll::WaitEpoll(_epfd, _revs, _revs_num, timeout);
//if(n == _revs_num) //扩容
switch (n)
{
case 0:
logMessage(DEBUG, "timeout...");
break;
case -1:
logMessage(WARNING, "epoll wait error: %s", strerror(errno));
break;
default:
// 等待成功
logMessage(DEBUG, "get a event");
HandlerEvents(n);
break;
}
}
void Start()
{
int timeout = -1;
while(true)
{
LoopOnce(timeout);
}
}
~EpollServer()
{
if (_listensock >= 0)
close(_listensock);
if (_epfd >= 0)
close(_epfd);
if (_revs)
delete[] _revs;
}
private:
int _listensock;
int _epfd;
uint16_t _port;
struct epoll_event *_revs;
int _revs_num;
func_t _HandlerRequest;
};
}
#endif
信号驱动IO模型
信号驱动的IO在进程开始的时候注册一个信号处理的回调函数,进程继续执行,当信号发生时,即有了IO的时间,这里就有数据到来,利用注册的回调函数将到来的数用recvfrom()接收到。
异步IO模型
异步IO与前面的信号驱动IO相似,其区别在于信号驱动IO当数据到来的时候,使信号通知注册的信号处理函数,而异步IO则在数据复制完成的时候才发送信号通知注册的信号处理函数。