之前的系统部分的基础IO:就是冯诺依曼结果中的访问磁盘,用内存作为输入输出缓冲区提高效率
现在我们要说的高级IO(input/output):访问的外设(网络中就是网卡):我们的发送和接收接收其实大部分时间都在等,发送在等发送缓冲区中输入了数据,接收在等接收缓冲区有数据,然后在发生拷贝
IO: input:用户从内核缓冲区(内核缓冲区从外设拿)拿数据,output:外设(显示器)从(用户把数据拿给内核缓冲区)内核拿数据
所以IO=等+拷贝,如何提高IO效率—>减少等的比重
初识五种IO模型:
阻塞IO:
在内核将数据准备好之前(网卡把数据拷贝给系统套接字缓冲区之前), 系统调用会一直等待. 所有的套接字, 默认都是阻塞方式.
非阻塞轮询IO
如果内核还未将数据准备好,不会一直阻塞的等待准备好,而是会返回一个EWOULDBLOCK错误码.告诉进程还为准备好,进程去干一下其他事情,然后又轮询回来问一次,直到数据准备好。把数据拷贝给用户,然后就返回成功,
实际写代码中如何设置成非阻塞呢:
代用系统调用fcntl+F_GETFL,把进程中的某个文件描述符(文件)的属性取出来
然后再用fcntl+F_SETFL把这个文件描述符的IO属性设置为非阻塞
#include <iostream>
#include <unistd.h>
#include <fcntl.h>
#include <cstdlib>
#include <cerrno>
// 对指定的fd设置非阻塞
void SetNonBlock(int fd)
{
int fl = fcntl(fd, F_GETFL);
if(fl < 0)
{
std::cerr << "fcntl error" << std::endl;
exit(0);
}
fcntl(fd, F_SETFL, fl | O_NONBLOCK);
}
int main()
{
SetNonBlock(0);
while(true)
{
char buffer[1024];
ssize_t s = read(0, buffer, sizeof(buffer)-1); // sizeof(buffer)-1
if(s > 0)
{
buffer[s] = 0;
std::cout << "echo# " << buffer << std::endl;
}
else if(s == 0)
{
std::cout << "end stdin" << std::endl;
break;
}
else
{
// 非阻塞等待, 如果数据没有准备好,返回值会按照出错返回, s == -1
// 数据没有准备好 vs 真的出错了 : 处理方式一定不是一样的。 s无法区分!
// 数据没有准备好,算读取错误吗?不算。read,recv以出错的形式告知上层,数据还没有准备好
if(errno == EWOULDBLOCK)
{
std::cout << "OS的底层数据还没有就绪, errno: " << errno << std::endl;
// 做其他事情了
}
else if(errno == EINTR)
{
std::cout << "IO interrupted by signal, try again" << std::endl;
}
else
{
std::cout << "read error!" << std::endl;
break;
}
}
sleep(1);
}
}
信号驱动IO(简单了解一下)
内核将数据准备好的时候, 使用SIGIO信号通知应用程序进行IO操作.
IO多路转接(复用):
虽然从流程图上看起来和阻塞IO类似. 实际上最核心在于IO多路转接能够同时等待多个文件描述符的就绪状态
实现多路转接的几种系统调用:
select技术:
用一下个select写一个服务器来具体讲述其中的细节:
因为把要检测的文件描述符设置进去和有就绪返回回来的参数是同一个(输入输出型参数),所以每检测一次,位图都会变化,所以需要借助第三方数组来记录之前已经设置进去的文件描述符,然后在要知道哪些文件描述符是就绪状态时,要遍历数组,首先数组中有数据,其次和输出的位图参数比较是否就绪才可以确定。把新的文件描述符添加进数组后,如果再次去调用的话,位图输入输出型参数又要遍历数组从新设置
#pragma once
#include <iostream>
#include <string>
#include <sys/select.h>
#include <memory>
#include "Log.hpp"
#include "Socket.hpp"
using namespace Net_Work;
const static int gdefaultport = 8888;
const static int gbacklog = 8;
const static int num = sizeof(fd_set) * 8;
class SelectServer
{
private:
void HandlerEvent(fd_set &rfds)
{
for (int i = 0; i < num; i++)
{
if (_rfds_array[i] == nullptr)
continue;
// 合法的sockfd
// 读事件分两类,一类是新连接到来。 一类是新数据到来
int fd = _rfds_array[i]->GetSockFd();
if (FD_ISSET(fd, &rfds))
{
// 读事件就绪
if (fd == _listensock->GetSockFd())
{
lg.LogMessage(Info, "get a new link\n");
// 获取连接
std::string clientip;
uint16_t clientport;
// 不会阻塞!!,因为select已经检测到了listensock已经就绪了
Socket *sock = _listensock->AcceptConnection(&clientip, &clientport);
if (!sock)
{
lg.LogMessage(Error, "accept error\n");
continue;
}
lg.LogMessage(Info, "get a client, client info is# %s:%d, fd: %d\n", clientip.c_str(), clientport, sock->GetSockFd());
// 这里已经获取连接成功了,接下来怎么办???
// read?write?绝对不能!!!read 底层数据是否就绪时不确定的!谁清楚fd上面是否有读事件呢?select!
// 新链接fd到来的时候,要把新的fd, 想办法交给select托管 -- 只需要添加到数组_rfds_array中即可
int pos = 0;
for (; pos < num; pos++)
{
if (_rfds_array[pos] == nullptr)
{
_rfds_array[pos] = sock;
lg.LogMessage(Info, "get a new link, fd is : %d\n", sock->GetSockFd());
break;
}
}
if (pos == num)
{
sock->CloseSocket();
delete sock;
lg.LogMessage(Warning, "server is full...!\n");
}
}
else
{
// 普通的读事件就绪
// 读数据是有问题的
// 这一次读取不会被卡住吗?
std::string buffer;
bool res = _rfds_array[i]->Recv(&buffer, 1024);
if (res)
{
lg.LogMessage(Info, "client say# %s\n", buffer.c_str());
buffer += ": 你好呀,少年";
_rfds_array[i]->Send(buffer);
buffer.clear();
}
else
{
lg.LogMessage(Warning, "client quit, maybe close or error, close fd : %d\n", _rfds_array[i]->GetSockFd());
_rfds_array[i]->CloseSocket();
delete _rfds_array[i];
_rfds_array[i] = nullptr;
}
}
}
}
}
public:
SelectServer(int port = gdefaultport) : _port(port), _listensock(new TcpSocket()), _isrunning(false)
{
}
void InitServer()
{
_listensock->BuildListenSocketMethod(_port, gbacklog);
for (int i = 0; i < num; i++)
{
_rfds_array[i] = nullptr;
}
_rfds_array[0] = _listensock.get();
}
void Loop()
{
_isrunning = true;
while (_isrunning)
{
// 我们能不能直接accept新连接呢?不能!所有的fd,都要交给select. listensock上面新连接,相当于读事件,有新连接,就等价于有新数据到来
// 首先不能直接accept,而是将listensock交给select。因为只有select有资格知道有没有IO事件就绪
// 故意放在循环内部
// 遍历数组,1. 找最大的fd 2. 合法的fd添加到rfds集合中
fd_set rfds;
FD_ZERO(&rfds);
int max_fd = _listensock->GetSockFd();
for (int i = 0; i < num; i++)
{
if (_rfds_array[i] == nullptr)
{
continue;
}
else
{
int fd = _rfds_array[i]->GetSockFd();
FD_SET(fd, &rfds); // 添加所有合法fd到rfds集合中
if (max_fd < fd) // 更新最大fd
{
max_fd = fd;
}
}
}
// 定义时间
struct timeval timeout = {0, 0};
// rfds本质是一个输入输出型参数,rfds是在select调用返回的时候,不断被修改,所以,每次都要重置
PrintDebug();
int n = select(max_fd + 1, &rfds, nullptr, nullptr, /*&timeout*/ nullptr);
switch (n)
{
case 0:
lg.LogMessage(Info, "select timeout..., last time: %u.%u\n", timeout.tv_sec, timeout.tv_usec);
break;
case -1:
lg.LogMessage(Error, "select error!!!\n");
break;
default:
// 正常的就绪的fd
lg.LogMessage(Info, "select success, begin event handler, last time: %u.%u\n", timeout.tv_sec, timeout.tv_usec);
HandlerEvent(rfds); // _rfds_array: 3,4,5,6,7,8,9,10 -> rfds: 4,5,6
break;
}
}
_isrunning = false;
}
void Stop()
{
_isrunning = false;
}
void PrintDebug()
{
std::cout << "current select rfds list is : ";
for (int i = 0; i < num; i++)
{
if (_rfds_array[i] == nullptr)
continue;
else
std::cout << _rfds_array[i]->GetSockFd() << " ";
}
std::cout << std::endl;
}
~SelectServer()
{
}
private:
std::unique_ptr<Socket> _listensock;
int _port;
int _isrunning;
// select 服务器要被正确设计,需要程序员定义数据结构,来把所有的fd管理起来,往往是数组!
Socket *_rfds_array[num];
};
select的优缺点:
优点:select只负责等待,可以等待多个fd,提高了IO的效率
缺点:输入输出型参数:每次都要对参数从新设置,内核每次都要对参数位图进行遍历,拷贝给用户
需要第三方数组来不断的遍历来设置位图和读取位图
位图是有类型fd_set的,所以最多能检测的文件是有上限的
从内核到用户从用户到内核调用的收拾同一个参数
poll技术:
只是改进:通过数组首元素地址管理fd,解决了能检测的fd的上限问题,因为参数是数组指针,所以不用每次都重新设置一遍,而是把新的文件描述符加到数组中就行,对要检测什么状态是设置在数组的元素值中的结构中的,而且输入输出参数是分离的,所以也不用每次的设置
但是还是有的和select一样的一些问题没有被解决:
需要对返回的数组遍历才能找到就绪的文件描述符,随着检测fd的数量增加效率会降低,每次虽然不用遍历重置fds,但是fds是改变了的,传参时还是要把从用户拷贝给内核
#pragma once
#include <iostream>
#include <string>
#include <poll.h>
#include <memory>
#include "Log.hpp"
#include "Socket.hpp"
using namespace Net_Work;
const static int gdefaultport = 8888;
const static int gbacklog = 8;
const int gnum = 1024;
class PollServer
{
private:
void HandlerEvent()
{
for (int i = 0; i < _num; i++)
{
if (_rfds[i].fd == -1)
continue;
// 合法的sockfd
// 读事件分两类,一类是新连接到来。 一类是新数据到来
int fd = _rfds[i].fd;
short revents = _rfds[i].revents;
if (revents & POLLIN)
{
// 新连接到来了
if (fd == _listensock->GetSockFd())
{
lg.LogMessage(Info, "get a new link\n");
// 获取连接
std::string clientip;
uint16_t clientport;
// 不会阻塞!!,因为select已经检测到了listensock已经就绪了
int sock = _listensock->AcceptConnection(&clientip, &clientport);
if (sock == -1)
{
lg.LogMessage(Error, "accept error\n");
continue;
}
lg.LogMessage(Info, "get a client, client info is# %s:%d, fd: %d\n", clientip.c_str(), clientport, sock);
// 这里已经获取连接成功了,接下来怎么办???
// read?write?绝对不能!!!read 底层数据是否就绪时不确定的!谁清楚fd上面是否有读事件呢?poll!
// 新链接fd到来的时候,要把新的fd, 想办法交给poll托管 -- 只需要添加到数组_rfds中即可
int pos = 0;
for (; pos < _num; pos++)
{
if (_rfds[pos].fd == -1)
{
_rfds[pos].fd = sock;
_rfds[pos].events = POLLIN;
lg.LogMessage(Info, "get a new link, fd is : %d\n", sock);
break;
}
}
if (pos == _num)
{
// 1. 扩容
// 2. 关闭
close(sock);
lg.LogMessage(Warning, "server is full...!\n");
}
}
else
{
// 普通的读事件就绪
// 读数据是有问题的
// 这一次读取不会被卡住吗?
char buffer[1024];
ssize_t n = recv(fd, buffer, sizeof(buffer-1), 0); // 这里读取会阻塞吗?不会!
if (n > 0)
{
buffer[n] = 0;
lg.LogMessage(Info, "client say# %s\n", buffer);
std::string message = "你好呀,少年, ";
message += buffer;
send(fd, message.c_str(), message.size(), 0);
}
else
{
lg.LogMessage(Warning, "client quit, maybe close or error, close fd : %d\n", fd);
close(fd);
// 取消poll的关心
_rfds[i].fd = -1;
_rfds[i].events = 0;
_rfds[i].revents = 0;
}
}
}
}
}
public:
PollServer(int port = gdefaultport) : _port(port), _listensock(new TcpSocket()), _isrunning(false), _num(gnum)
{
}
void InitServer()
{
_listensock->BuildListenSocketMethod(_port, gbacklog);
_rfds = new struct pollfd[_num];
for (int i = 0; i < _num; i++)
{
_rfds[i].fd = -1;
_rfds[i].events = 0;
_rfds[i].revents = 0;
}
// 最开始的时候,只有一个文件描述符, Listensock
_rfds[0].fd = _listensock->GetSockFd();
_rfds[0].events |= POLLIN;
}
void Loop()
{
_isrunning = true;
while (_isrunning)
{
// 定义时间
int timeout = -1;
PrintDebug();
int n = poll(_rfds, _num, timeout);
switch (n)
{
case 0:
lg.LogMessage(Info, "poll timeout...\n");
break;
case -1:
lg.LogMessage(Error, "poll error!!!\n");
break;
default:
// 正常的就绪的fd
lg.LogMessage(Info, "select success, begin event handler\n");
HandlerEvent(); // _rfds_array: 3,4,5,6,7,8,9,10 -> rfds: 4,5,6
break;
}
}
_isrunning = false;
}
void Stop()
{
_isrunning = false;
}
void PrintDebug()
{
std::cout << "current poll fd list is : ";
for (int i = 0; i < _num; i++)
{
if (_rfds[i].fd == -1)
continue;
else
std::cout << _rfds[i].fd << " ";
}
std::cout << std::endl;
}
~PollServer()
{
delete[] _rfds;
}
private:
std::unique_ptr<Socket> _listensock;
int _port;
int _isrunning;
struct pollfd *_rfds;
int _num;
};
对poll的问题:需要对返回的数组遍历才能找到就绪的文件描述符,随着检测fd的数量增加效率会降低,
进行改良的技术:epoll
epoll:效率最高的多路转接技术
在我们调用epoll_wait从就绪队列中拿到关心的fd和就绪事件的时候,就要对该fd如果是读事件的话,就要去读取,但是一次读取并不能保证能够读取完(可能你设置的接收的字符串只有1024),所以epoll的通知模式有两种
LT模式(epoll默认模式):数据没有被处理完或者没处理就会一直通知,(你这一次没有取完,这个继续队列中就还会保留着这个fd和事件的节点,只到下次下下次…把数据取完了才会释放,也就不会再通知了)。
ET模式:只会通知一次,不再通知,直到这个fd又有这个事件发生才会通知。这样为了不发生数据丢失,所以应用层的程序员就要循环的去把这次通知的fd中的数据都读完。既然是循环,不可避免的最后一次循环一定是没有数据就阻塞的,但是我们epoll是多路转接技术,是不允许IO的时候阻塞的,所以epoll技术的ET模式的IO要用非阻塞IO
LT和ET那个更高效
从两者的特性来看,ET更高效因为ET的每次通知都是有效的
从数据发送速度来看:ET更高效,因为ET的通知特性,所以上层就必须在收到通知的时候把这个fd中收到的数据都读完,这样他的接收缓冲区不就更大了吗,那么他在TCP的报文发送给发送端的窗口大小就更大了,发送端直到他的滑动窗口更大了,发送数据的效率就更高了
reactor:
就是把这个进程关心的事件+fd,和epoll模块封装,然后当epoll中有事件就绪的时候,就进行事件的派发——把事件+fd派发出去
写事件的关心是按需关心的,刚开始是一直写,当写不进去(发送缓冲区满了),才把他的写事件关心起来,如果下次写,可以写完,那么他就又要删除写事件的关心
异步IO: 由内核在数据拷贝完成时, 通知应用程序(而信号驱动是告诉应用程序何时可以开始拷贝数据).