文章目录
- 高级IO
- 阻塞IO模型
- 非阻塞IO模型
- 多路转接IO
- select简介
- socket 就绪条件
- select服务器
- select的优缺点
- 多路转接的使用场景
高级IO
非阻塞IO,记录锁,系统V流机制,I/O多路转接(I/O)多路复用,readv
和 writev
函数以及存储映射IO(mmap
),这些统称为高级IO
阻塞IO模型
#include <iostream>
#include <unistd.h>
#include <fcntl.h>
int main() {
char buffer[1024];
while (true) {
ssize_t size = read(0, buffer, sizeof(buffer) - 1);
if (size < 0) {
std::cerr << "read error" << std::endl;
break;
}
buffer[size - 1] = 0;
std::cout << "Echo # " << buffer << std::endl;
}
return 0;
}
非阻塞IO模型
打开文件时默认都是以阻塞的方式打开的,如果想要非阻塞的方式打开某个文件,需要在使用open()
函数打开文件时携带O_NONBLOCK
或O_NDELAY
选项,此时就能够以非阻塞方式打开文件,这是在打开文件时设置非阻塞的方式,如果要修改一个已经打开文件的属性,此时就要用到fcntl
函数
int open(const char *pathname, int flags);
int open(const char *pathname, int flags, mode_t mode);
fcntl
函数
int fcntl(int fd, int cmd, ... /* arg */ );
fd
: 已经打开的文件描述符cmd
: 命令,需要进行的操作- … : 可变参数,传入的
cmd
不同,后面的追加参数也就不同 - 调用成功,其返回值取决于具体操作。失败返回-1,同时设置错误码
fcntl
函数常用的五种功能和对应的cmd
取值如下
-
F_DUPFD
复制一个现有描述符 -
F_GETFD 或 F_SETFD
获得/设置文件描述符标记 -
F_GETFL 或 F_SETFL
获得/设置文件状态标记 -
F_GETOWN 或 F_SETOWN
获得/设置异步I/O所有权 -
F_GETLK , F_SETLK, F_SETLKW
获得/设置记录锁
实现
SetNonBlock
函数
定义一个函数,将该函数指定的文件描述符设置成非阻塞状态
- 首先使用
fcntl
函数获取该文件描述符的状态标记(int 类型的位图),cmd = F_GETFL
- 获取到的文件状态标记上添加非阻塞标记
O_NONBLOCK
,再次调用fcntl
函数对文件状态标记进行设置,cmd = F_SETFL
#pragma once
#include <iostream>
#include <unistd.h>
#include <fcntl.h>
bool SetNonBlock(int fd) {
int fl = fcntl(fd, F_GETFL);
if (fl < 0) {
std::cerr << "fcntl error" << std::endl;
return false;
}
fcntl(fd, F_SETFL, fl | O_NONBLOCK);
return true;
}
以非阻塞轮询方式读取标准输入
#include "tool.hpp"
#include <memory.h>
int main() {
SetNonBlock(0); // 将标准输入设置成非阻塞读取
#define BUFFER_SIZE 128
char buffer[BUFFER_SIZE];
memset(buffer, 0, sizeof(buffer));
while (true) {
ssize_t size = read(0, buffer, sizeof(buffer) - 1);
if (size < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) { // 底层没有数据就绪
std::cout << strerror(errno) << std::endl;
sleep(3);
continue;
} else if (errno == EINTR) { // 在读取数据前被信号中断
std::cout << strerror(errno) << std::endl;
sleep(3);
continue;
} else {
std::cerr << "read error" << std::endl;
break;
}
}
buffer[size - 1] = 0;
std::cout << "echo # " << buffer << std::endl;
}
return 0;
}
需要注意的是,当read
函数以非阻塞方式读取标准输入时,如果底层数据不就序,那么read
函数就会立即返回,当底层数据不就序时,read
函数时就会立即返回,此时错误码被设置成EAGAIN
或 EWOULDBLOCK
此外,调用read()
函数读取数据之前可能会收到其它信号中断,此时read
函数也会以出错方式返回,错误码被设置成EINTR
,此时应该重新执行read
函数对数据进行读取
因此在使用非阻塞方式读取数据时,需要对read
函数进行进一步分类,如果返回值为-1
,可能存在没有数据可读,信号中断,真的出错了三种情况,应该对错误码进行进一步判断
#include "tool.hpp"
#include <memory.h>
int main() {
SetNonBlock(0); // 将标准输入设置成非阻塞读取
#define BUFFER_SIZE 128
char buffer[BUFFER_SIZE];
memset(buffer, 0, sizeof(buffer));
while (true) {
ssize_t size = read(0, buffer, sizeof(buffer) - 1);
if (size < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) { // 底层没有数据就绪
std::cout << strerror(errno) << std::endl;
sleep(3);
continue;
} else if (errno == EINTR) { // 在读取数据前被信号中断
std::cout << strerror(errno) << std::endl;
sleep(3);
continue;
} else { // 真的出错了
std::cerr << "read error" << std::endl;
break;
}
}
buffer[size - 1] = 0;
std::cout << "echo # " << buffer << std::endl;
}
return 0;
}
Resource temporarily unavailable // 当没有数据时,会打印strerror(error) 错误信息
Resource temporarily unavailable
123 // 当读取到数据时,输出数据
echo # 123
Resource temporarily unavailable
124
echo # 124
Resource temporarily unavailable
多路转接IO
select简介
select 是系统提供的一个多路转接接口
- select 系统调用可以让我们呢的程序同时监视多个文件描述符上的事件是否就绪
- select 的核心工作就是等,当监视的多个文件描述符上有至少一个事件就绪时,select 就会返回并将对应的文件描述符上的就绪事件告诉调用者
select 函数
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
-
nfds
: 需要监视的文件描述符中,最大的文件描述符的值 + 1 -
readfds
: 输入输出型参数,调用时用户告知内核需要监视哪些文件描述符的读事件是否就绪,返回时内核告知用户哪些文件描述符的读事件已经就绪 -
writefds
: 输入输出型参数, 调用时用户告知内核需要监视哪些文件描述符的写事件是否就绪,返回时内核告知用户哪些文件描述符上写事件已经就绪 -
exceptfds
:输入输出型参数, 调用时用户告知内核需要监视哪些文件描述符上的异常事件是否就绪,返回时内核告知用户哪些文件描述符的异常事件已经就绪 -
timeout
: 输入输出型参数,调用时由用户设置select
的等待事件,返回时表示timeout
的剩余事件。 -
如果函数调用成功,则返回有事件就绪的文件描述符个数,如果timeout事件耗尽则返回0.调用失败返回-1,同时设置错误码
timeout 有三种取值
NULL/nullptr
: select调用后进行阻塞等待,直到某个被监视的文件描述符上有事件就绪- 0 : select 调用后进行非阻塞等待,无论监视的文件描述符事件是否就绪,select 检测完后立即返回
- 特定的事件值:select 调用后再指定时间内进行阻塞等待,如果被监视的文件描述符上一直没有事件就绪,则会超时返回
select 调用失败后,可能出现的错误码
EBADF
: 文件描述符无效或者文件已经关闭EINTR
: 此调用被信号中断EINVAL
: 参数nfds
为负值ENOMEM
: 核心内存不足
fd_set
结构
fd_set
结构与sigset_t
等结构类似,fd_set
本质就是一个位图,用位图中的比特位来表示需要监视的文件描述符。对这个位图的操作系统提供了一组专门的结构,用于对fd_set
类型位图进行各种操作
void FD_CLR(int fd, fd_set *set); // 清除 fd
int FD_ISSET(int fd, fd_set *set); // 判断 fd
void FD_SET(int fd, fd_set *set); // 设置 fd
void FD_ZERO(fd_set *set); // 清空 位图
timeout
结构
struct timeval {
long tv_sec; /* seconds */
long tv_usec; /* microseconds */
};
tv_sec
表示的是秒,tv_usec
表示的是微秒
socket 就绪条件
读就绪
- socket 再内核中的接收缓冲区的字节数,大于等于低水平标记位
SO_RCVLOWAT
,此时可以无阻塞的读取该文件描述符,并且返回值大于0 - socket 在使用TCP通信时,对端关闭连接,socket 读则返回0
- listen socket 上有连接请求
- socket 上有未处理的错误
写就绪
- socket 内核中,发送缓冲区的可用字节数,大于等于低水平位标记
SO_SNDLOWAT
,此时可以无阻塞写,并且返回值大于0 - socket 的写操作被关闭(close 或 shutdown), 对一个写操作关闭的socket 进行写操作会触发SIGPIPE信号
- socket 使用非阻塞
connect
连接成功或者失败后 - socket 上有未读取的错误
异常就绪
- socket 上收到外带数据
外带数据和TCP的进击模式相关,TCP报头中的URG标志位和16位紧急指针配合使用,能够发送/接收我带外数据
select服务器
如果我们想要实现一个简单的select服务器,首先我们必须能够拿到客户端发来的数据并进行打印,那么select服务其的工作流程应该
- 初始化服务器,完成套接字创建,绑定,监听等
- 定义
fd_array
数组保存需要关注的套接字,开始只需要将监听套接字放入即可,并设置fd_set,将fd_array中的文件描述符设置进位图中 - 循环调用select函数,检测读事件是否就绪,就绪进行对应操作
Socket类
首先可以先编写一个Socket类,对网络编程套接字的接口进行一定程度封装,便于使用
#pragma once
#include <iostream>
#include <unistd.h>
#include <memory.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/types.h>
class Socket{
public:
static int SocketCreate(); // 创建套接字
static void SocketBind(int sock, int port); // 绑定套接字
static void SocketListen(int sock, int backlog); // 监听套接字
};
int Socket::SocketCreate(){
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
std::cerr << "socket error" << std::endl;
exit(2);
}
// 设置端口复用
int opt = 1;
setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
std::cout << "SocketCreate success" << std::endl;
return sockfd;
}
void Socket::SocketBind(int sock, int port) {
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = INADDR_ANY;
socklen_t len = sizeof(local);
if (bind(sock, (struct sockaddr*)&local, len) < 0) {
std::cerr << "bind error" << std::endl;
exit(3);
}
std::cout << "SocketBind success" << std::endl;
}
void Socket::SocketListen(int sock, int backlog) {
if (listen(sock, backlog) < 0) {
std::cerr << "listen error" << std::endl;
exit(4);
}
std::cout << "SocketListen success" << std::endl;
}
SelectServer类
服务器的IP地址直接使用INADDR_ANY
即可,所以我们的服务器只需要将监听套接字和开放端口号设置进成员变量即可
#pragma once
#include "socket.hpp"
#include <sys/select.h>
#include <unistd.h>
#define BALCK_LOG 5
#define SERVER_PORT 8888
class SelectServer{
public:
static SelectServer* GetInstance(int port = SERVER_PORT);
~SelectServer(){ if (listen_sock > 0) close(listen_sock); }
void InitSelectServer();
void Run();
private:
SelectServer(int _port) : port(_port){};
private:
void ClearFdArray(int* fd_array, int num, int default_val);
private:
void HandlerEvent(const fd_set& readfds, int fd_array[], int num);
bool SetFdArray(int sock, int fd_array[], int num);
private:
int listen_sock;
int port;
static SelectServer* instance;
};
SelectServer* SelectServer::instance = nullptr;
SelectServer* SelectServer::GetInstance(int port) {
if (instance == nullptr) {
instance = new SelectServer(port);
}
return instance;
}
void SelectServer::InitSelectServer() {
listen_sock = Socket::SocketCreate();
Socket::SocketBind(listen_sock, port);
Socket::SocketListen(listen_sock, BALCK_LOG);
}
void SelectServer::Run() {
#define FD_ARRAY_NUM 1024
#define DFL_FD -1
fd_set readfds;
int fd_array[FD_ARRAY_NUM];
ClearFdArray(fd_array, FD_ARRAY_NUM, DFL_FD);
fd_array[0] = listen_sock;
for (;;){
FD_ZERO(&readfds);
// 扫描获取最大文件描述符
int maxfd = DFL_FD;
for (int i = 0; i < FD_ARRAY_NUM; i++) {
if (fd_array[i] == DFL_FD) continue;
// 将需要监视的套接字设置进readfds
FD_SET(fd_array[i], &readfds);
maxfd = maxfd > fd_array[i] ? maxfd : fd_array[i];
}
struct timeval timeout = {5, 0};
// struct timeval timeout = {0, 0};
switch (select(maxfd + 1, &readfds, nullptr, nullptr, &timeout)) {
case 0: std::cout << "timeout ..." << std::endl; break;
case -1: std::cerr << "select error" << std::endl; break;
default : std::cout << "有事件发生..." << std::endl;
HandlerEvent(readfds, fd_array, FD_ARRAY_NUM);
break;
}
}
}
void SelectServer::ClearFdArray(int* fd_array, int num, int default_val) {
for (int i = 0; i < num; i++)
*(fd_array + i) = default_val;
}
void SelectServer::HandlerEvent(const fd_set& readfds, int fd_array[], int num){
for (int i = 0; i < num; i++) {
if (fd_array[i] == DFL_FD) continue;
if (fd_array[i] == listen_sock && FD_ISSET(fd_array[i], &readfds)){ // 连接读
struct sockaddr_in peer;
memset(&peer, 0, sizeof(peer));
socklen_t len = sizeof(peer);
int sock = accept(listen_sock, (struct sockaddr*)&peer, &len);
if (sock < 0) {
std::cerr << "accept error" << std::endl;
continue;
}
std::string peer_ip = inet_ntoa(peer.sin_addr);
int peer_port = ntohs(peer.sin_port);
std::cout << "get a new link[" << peer_ip << ":" << peer_port << "]" << std::endl;
if (!SetFdArray(sock, fd_array, FD_ARRAY_NUM)) { // 将获取到的套接字添加到fd_array中
close(sock);
std::cout << "select server is full, close fd : " << sock << std::endl;
}
}
else if (FD_ISSET(fd_array[i], &readfds)) { // 数据读
#define BUF_NUM 1024
char buffer[BUF_NUM];
ssize_t size = read(fd_array[i], buffer, sizeof(buffer) - 1);
if (size > 0) {
buffer[size - 1] = 0;
std::cout << "echo #" << buffer << std::endl;
} else if (size == 0) {
std::cout << "client quit ... close fd : " << fd_array[i] << std::endl;
close(fd_array[i]);
fd_array[i] = DFL_FD;
} else {
std::cerr << "read error" << std::endl;
close(fd_array[i]);
fd_array[i] = DFL_FD; // 清除网络文件
}
}
}
}
select 服务器测试
#include "select_server.hpp"
void Usage(char* proc) {
std::cout << "Usage: " << proc << " port " << std::endl;
}
int main(int argc, char* argv[]) {
if (argc != 2) {
Usage(argv[0]);
exit(1);
}
int port = atoi(argv[1]);
SelectServer* ss = SelectServer::GetInstance(port);
ss->InitSelectServer();
ss->Run();
return 0;
}
使用telnet工具连接服务器,此时就可以通过telnet向服务器发送的数据就可以被服务器读到并且打印了。可以看到虽然select服务器是一个单进程服务器,但是它却可以同时为多个客户端提供服务,根本原因是因为select
函数调用后会告知select
服务器是哪个客户端对应的连接事件就绪了,,此时select服务器就可以读取客户端数据,读取完后调用特定函数处理
当前select 服务器存在的问题
- 服务器并没有对客户端进行响应,如果使用select服务器的话,不能直接调用write函数发送数据,因为
write
函数本质上也需要分为等和拷贝两部分。我们也必须将等这个过程交给select
函数,因此每次调用select
函数之前需要重新设置readfds 和 writefds
- 没有定制协议
- 没有对应的输入和输出缓冲区,代码中的数据直接存储到了字符串数组buffer中,者是不眼睛的。因为一次数据读取可能并没有读取到一个完整的报文,半个报文是不能直接交给上层处理的。应当将数据存储到一个输入缓冲区中,当读取到一个完整的报文后再让服务器进行处理。当然如果服务器需要对客户端的请求进行响应,也不能直接调用write发送给客户端,必须存储到一个输出缓冲区,因为响应的数据可能很庞大,无法一次性发送完成需要分批发送
select的优缺点
优点
- 可以同时等待多个文件描述符,并且实际只负责等待,将等的事件重叠,踢狗IO效率,实际IO操作由accept, read, write 等接口来完成,并且保证这些接口进行IO操作时不会被阻塞
当然这是所有多路转接接口的优点
缺点
- 每次调用select 都必须要手动设置fd集合,每次都要遍历设置非常麻烦
- 每次调用select, 都需要将fd集合从用户态拷贝到内核态,这个开销在fd_set大的时候开销会很大
- 每次调用select都需要在内核遍历所有传进来的fd,开销在fd很多的时候也很大
- 由于使用位图存储fd,位图的大小一定是优先的,所以select 可监控的文件描述符也是有限的
select 可监控文件描述符的个数
select 的可监控文件描述符个数可以通过sizeof(fd_set) * 8)
来计算,实际上fd_set
的大小是128字节也就是能够监视1024个文件描述符。但不同环境下fd_set的大小可能是不同的,并且其大小可以重新调整(重新编译内核)
一个进程可以打开的文件描述符个数
进程控制块PCB(task_struct) 当中会有一个files指针,该指针指向一个struct files_struct结构,进程的文件描述符fd_array就存储在该结构当中,其中文件描述符表中的fd_array定义大小为NR_OPEN_DEFALUT
,其值就是32
但实际上一个进程可以打开的文件描述符个数是可以扩展的,使用ulimit -a
就可以查看进程打开文件描述符的上限
[clx@VM-20-6-centos select_io]$ ulimit -a
core file size (blocks, -c) 0
data seg size (kbytes, -d) unlimited
scheduling priority (-e) 0
file size (blocks, -f) unlimited
pending signals (-i) 14686
max locked memory (kbytes, -l) unlimited
max memory size (kbytes, -m) unlimited
open files (-n) 100001 # 文件描述符上限
因此select只能监听1024个文件描述符太少了,说明其最多只能为1023个客户端进行服务
多路转接的使用场景
- 多路转接接口一般适用于多链接的情况,并且多链接中只有少部分的连接比较活跃。因为大部分连接都不活跃,在一段时间只有少量连接在进行IO,大部分都在等待事件就绪,使用多路转接接口就可以将这些等待事件进行重叠,提高IO效率
- 可以试想一下使用线程池,我们用五个线程提供服务,但是连接很多并且大部分时间都在等待用户输入信息,这就会导致阻塞队列中的连接越来越多,而服务器实际一直在等待客户输入,提供服务的时间其实很少
- 所以对于多链接中大部分连接都很活跃的场景,多路转接其实不太适合。因为每个连接都很活跃,那么每个连接上的事件基本都是就绪的。因此根本不需要使用多路转接接口来帮我们进行等待,毕竟使用多路转接接口也是需要花费系统的事件和空间资源的
多链接但少量连接活跃的场景(QQ/微信等聊天工具),大量连接活跃的场景(数据库备份)
参考文章:2021dragon 的Linux高级IO
原文链接:https://blog.csdn.net/chenlong_cxy/article/details/126050039