高级I/O操作与非阻塞I/O
在操作系统中,I/O(输入/输出)操作是所有实现的基础。本文将探讨阻塞I/O与非阻塞I/O的区别,以及如何使用有限状态机来实现非阻塞I/O,并介绍数据中继的概念。
阻塞I/O与非阻塞I/O
阻塞I/O
阻塞I/O是操作系统中默认的I/O操作方式。在阻塞I/O中,如果系统调用(如read()或write())无法立即执行,进程将被阻塞,直到可以进行I/O操作为止。这意味着,如果一个进程正在等待I/O操作,它将无法进行任何其他操作。
非阻塞I/O
非阻塞I/O允许进程在I/O操作无法立即执行时继续进行其他操作。在非阻塞模式下,如果读操作时设备数据不充足,或写数据时缓冲区空间不足,系统会返回一个EAGAIN错误,告诉进程当前无法进行I/O操作,进程可以稍后再试。
有限状态机编程
有限状态机(Finite State Machine, FSM)是一种用来处理复杂流程的编程模型。它适用于流程结构化的场景,也可以用于处理复杂且非结构化的流程。
有限状态机解决的问题是复杂流程。
简单流程:自然流程是结构化的,按照人类顺序思维解决的问题。
复杂流程:自然流程不是结构化的,比如先前的MultiButton。
实现非阻塞I/O
要在Linux操作系统下实现非阻塞I/O,可以使用O_NONBLOCK标志来设置文件描述符。以下是一个简单的示例,展示如何打开一个文件并以非阻塞方式读取数据。
int fd = open("file.txt", O_RDONLY | O_NONBLOCK);
if (fd < 0) {
perror("open()");
exit(1);
}
数据中继原理解析
数据中继是指在两个设备之间进行数据交换的过程。在数据中继中,一个设备作为源设备,另一个设备作为目标设备。数据中继的实现通常涉及两个状态机,一个用于读取数据(源设备)另一个用于写入数据(目标设备)。
假设打开两个设备,要在两个设备之间进行数据交换(数据中继)
两个设备也有其它数据来源
要实现的功能:
读左然后写右和读右然后写左
要是用阻塞的话左边一直没数据来会卡在读左等待
分成两个任务一个读左然后写右,一个读右然后写左
具体实例
在linux操作系统下实现终端设备界面相互切换。实现读取fd1的数据写入的fd2中,读取fd2的数据写入到fd1当中。
状态机简单示意图如下所示:
非阻塞IO
简单流程:自然流程是结构化的
复杂流程:自然流程不是结构化的
完成数据中继,就像copy文件的程序
非阻塞操作:
在Linux中,一切皆文件,文件读写操作默认是阻塞的。但是可以通过设置O_NONBLOCK标志将读写操作设置为非阻塞方式。如果读操作时设备数据不足或者写操作时缓冲区空间不足,系统会返回-EAGAIN错误,但不会阻塞线程。
例子
int fd = open("file.txt", O_RDONLY | O_NONBLOCK);
if (fd < 0) {
perror("open()");
exit(1);
}
fcntl
fcntl函数的作用是获取和设置文件的访问模式和状态标志。
int fcntl(int fd, int cmd, ... /* arg */);
参数说明:
- fd:文件描述符。
- cmd:控制命令,如F_GETFL和F_SETFL。
- F_GETFL (void):返回文件访问模式和文件状态标志。
- F_SETFL (int):设置文件状态标志为指定值,忽略文件访问模式和文件创建标志。
在Linux中,F_SETFL命令可以改变O_APPEND、O_ASYNC、O_DIRECT、O_NOATIME和O_NONBLOCK标志,但不能改变O_DSYNC和O_SYNC标志。
relay函数编写
- 获取文件原有状态。
- 在原有状态基础上添加非阻塞状态。
relay(int fd1, int fd2) {
int fd1save = fcntl(fd1, F_GETFL); // 获取文件状态
fcntl(fd1, F_SETFL, fd1save | O_NONBLOCK); // 设置文件为非阻塞状态
int fd2save = fcntl(fd2, F_GETFL); // 获取文件状态
fcntl(fd2, F_SETFL, fd2save | O_NONBLOCK); // 设置文件为非阻塞状态
}
定义两个状态机,一个负责从源文件读取数据到目标文件,另一个负责从目标文件读取数据到源文件。
// 状态机状态枚举
enum {
STATE_R = 1, // 读态
STATE_W, // 写态
STATE_EX, // 异常终止态
STATE_T // 退出态
};
// 状态机结构体
struct fsm_st {
int state; // 当前状态
int sfd; // 源文件描述符
int dfd; // 目标文件描述符
int len; // 读取长度
int pos; // 位置
char *errstr; // 报错信息
char buf[BUFSIZE]; // 缓冲区
};
// 初始化状态机
static void relay(int fd1, int fd2) {
struct fsm_st fsm12, fsm21;
int fd1save = fcntl(fd1, F_GETFL);
fcntl(fd1, F_SETFL, fd1save | O_NONBLOCK);
int fd2save = fcntl(fd2, F_GETFL);
fcntl(fd2, F_SETFL, fd2save | O_NONBLOCK);
fsm12.state = STATE_R;
fsm12.sfd = fd1;
fsm12.dfd = fd2;
fsm21.state = STATE_R;
fsm21.sfd = fd2;
fsm21.dfd = fd1;
// ...
}
// 当不是退出态时,推动状态机
while (fsm12.state != STATE_T && fsm21.state != STATE_T) {
fsm_driver(&fsm12);
fsm_driver(&fsm21);
}
// 恢复起始默认状态
fcntl(fd1, F_SETFL, fd1save);
fcntl(fd2, F_SETFL, fd2save);
}
fsm_driver推动状态机
-----------------
```c
static void fsm_driver(struct fsm_st *fsm) {
int ret;
switch (fsm->state) {
case STATE_R:
fsm->len = read(fsm->sfd, fsm->buf, BUFSIZE);
if (fsm->len == 0) {
fsm->state = STATE_T;
} else if (fsm->len < 0) {
fsm->errstr = "read()";
fsm->state = STATE_EX;
} else {
fsm->pos = 0;
fsm->state = STATE_W;
}
break;
case STATE_W:
ret = write(fsm->dfd, fsm->buf + fsm->pos, fsm->len);
if (ret < 0) {
fsm->errstr = "write()";
fsm->state = STATE_EX;
} else {
fsm->pos += ret;
fsm->len -= ret;
if (fsm->len == 0) {
fsm->state = STATE_R;
} else {
fsm->state = STATE_W;
}
}
break;
case STATE_EX:
perror(fsm->errstr);
fsm->state = STATE_T;
break;
case STATE_T:
// 执行一些清理工作
break;
default:
abort();
}
}
完整代码
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#define TTY1 "/dev/tty11"
#define TTY2 "/dev/tty12"
#define BUFSIZE 1024
/*状态机状态枚举类型*/
enum
{
STATE_R=1, //读态
STATE_W, //写态
STATE_Ex, //异常终止态
STATE_T //退出态
};
/*状态机结构体*/
struct fsm_st
{
int state; //当前状态机的状态
int sfd;//源文件描述符
int dfd;//目标文件描述符
int len;//读取长度
int pos;//位置
char * errstr; //报错信息
char buf[BUFSIZE]; //buf缓冲区
};
/**************推动状态机****************/
static void fsm_driver(struct fsm_st*fsm)
{
int ret;
switch (fsm->state)
{
/*状态机读取*/
case STATE_R:
/*读取到的源fd存储到buf中*/
fsm->len = read(fsm->sfd,fsm->buf,BUFSIZE);
/*如果读取0字节,退出状态机*/
if (fsm->len == 0)
fsm->state = STATE_T;
/*如果读取<0字节,进行状态判断*/
else if (fsm->len < 0)
{
/*如果读取<0字节,二次判断*/
if (errno == EAGAIN)
fsm->state =STATE_R;
else
{
/*宕机退出*/
fsm->errstr = "read()";
fsm->state =STATE_Ex;
}
}
else
/*都没有报错,说明读取正常,则开始状态机写入*/
{
/*******初始化写入的位置为0***************/
fsm->pos = 0;
fsm->state =STATE_W;
}
break;
/*状态机写入*/
case STATE_W:
/*写入读取到的数据len*/
ret = write(fsm->dfd,fsm->buf+fsm->pos,fsm->len);
/*写入读取到的数据<0*/
if(ret < 0)
{
/*假的错误*/
if (errno == EAGAIN)
/*再次进入写入*/
fsm->state = STATE_W;
else
/*真正读取错误*/
{
/*读取错误*/
fsm->errstr = "read()";
/*宕机退出*/
fsm->state =STATE_Ex;
}
}
else
/***************坚持写够len个字节数***************/
{
/*******从pos的位置继续向下写入字节***************/
fsm->pos += ret;
fsm->len -= ret;
/*如果写入完成*/
if(fsm->len == 0)
/*返回读态*/
fsm->state = STATE_R;
/*否则返回写态,继续写够len个字节*/
else
fsm->state = STATE_W;
}
break;
/*宕机退出*/
case STATE_Ex:
perror(fsm->errstr);
fsm->state = STATE_T;
break;
/*完整退出*/
case STATE_T:
/*do sth*/
break;
default:
/*如果都不是以上任意一个状态,发送异常*/
abort();
break;
}
}
static void relay(int fd1,int fd2)
{
struct fsm_st fsm12,fsm21; //定义结构体读左写右,读右写左
/*首先保证文件是以非阻塞实现的*/
int fd1_save = fcntl(fd1,F_GETFL); //获取文件状态
fcntl(fd1,F_SETFL,fd1_save|O_NONBLOCK);//追加文件描述符的状态为非阻塞
int fd2_save = fcntl(fd2,F_GETFL); //获取文件状态
fcntl(fd2,F_SETFL,fd1_save|O_NONBLOCK); //追加文件描述符的状态为非阻塞
/******************************/
/*初始化状态机*/
fsm12.state = STATE_R;
fsm12.sfd = fd1;
fsm12.dfd = fd2;
fsm21.state = STATE_R;
fsm21.sfd = fd2;
fsm21.dfd = fd1;
/**************/
/*当不是退出态,推动状态机*/
while (fsm12.state != STATE_T ||fsm21.state != STATE_T )
{
fsm_driver(&fsm12);
fsm_driver(&fsm21);
}
/************************/
/*恢复起始默认状态*/
fcntl(fd1,F_SETFL,fd1_save);
fcntl(fd2,F_SETFL,fd2_save);
/******************/
}
int main()
{
int fd1,fd2;
/*模拟用户打开设备*/
fd1 = open(TTY1,O_RDWR);
if(fd1 < 0)
{
perror("open()");
exit(1);
}
write(fd1,"TTY1\n",5);
/*模拟用户打开设备,以非阻塞方式打开设备*/
fd2 = open(TTY2,O_RDWR|O_NONBLOCK);
if(fd2 < 0)
{
perror("open()");
exit(1);
}
write(fd2,"TTY2\n",5);
/*中继引擎函数*/
relay(fd1,fd2);
close(fd2);
close(fd1);
exit(0);
}
测试:
要用root用户执行
ctl+atl+F11和ctl+atl+F12来回切换观察
ctl+atl+F1回到图像界面
IO多路转接
解决IO密集型任务中忙等的问题,监视多个文件描述符的行为,当当前文件描述符发生了我们感兴趣的行为时,才去做后续操作。常见的IO多路转接函数有select()
、poll()
、epoll()
等。
select()
可以实现安全的休眠(替代sleep)前面都给NULL,只设置最后的timeout
-
select() 兼容性好 但设计有缺陷 以事件为单位组织文件描述符
-
nfds的类型问题
-
参数没有const修饰 也就是函数会修改 fdset 任务和结果放在一起
-
监视的事件太过单一 读 写 异常(异常的种类非常多)
-
原函数:
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
- 参数:
- fd_set 文件描述符集合
- void FD_CLR(int fd, fd_set *set); 在指定集合中删除指定文件描述符
- int FD_ISSET(int fd, fd_set *set); 判断一个文件描述符是否在集合
- void FD_SET(int fd, fd_set *set); 添加文件描述符到集合
- void FD_ZERO(fd_set *set); 集合置0
- nfds 要监视文件描述符里最大的再加1
- readfds 所关心的可以发生读的状态的集合(当里面有文件描述符发生读就返回)
- writerfds 所关心的可以发生写的状态的集合(当里面有文件描述符发生写就返回)
- exceptfds 所关心异常的情况
- timerout 超时设置(不设置会发生忙等)
- struct timeval {
time_t tv_sec; /* seconds /秒
suseconds_t tv_usec; / microseconds */微秒
};
- struct timeval {
- 返回值 返回发生行为的文件描述符的个数,发生行为的文件描述符会覆盖回原来的集合
- 会有假错,因为是阻塞的会被信号打断
忙等与非阻塞IO
- 忙等:会消耗CPU资源,当没有数据可读时会一直消耗CPU。
- 非阻塞IO:不会消耗CPU,当没有数据可读时会立即返回。
IO多路转接示例
#include <sys/select.h>
static int max(int a, int b) {
if (a > b) return a;
return b;
}
static void relay(int fd1, int fd2) {
int old_fd1, old_fd2;
fd_set rset, wset;
old_fd1 = fcntl(fd1, F_GETFL);
fcntl(fd1, F_SETFL, old_fd1 | O_NONBLOCK);
old_fd2 = fcntl(fd2, F_GETFL);
fcntl(fd2, F_SETFL, old_fd2 | O_NONBLOCK);
struct fsm_st fsm12, fsm21; // 读左写右 读右写左
fsm12.state = STATE_R;
fsm12.sfd = fd1;
fsm12.dfd = fd2;
fsm21.state = STATE_R;
fsm21.sfd = fd2;
fsm21.dfd = fd1;
while (fsm12.state != STATE_T || fsm21.state != STATE_T) {
// 布置监视任务
FD_ZERO(&rset);
FD_ZERO(&wset);
if (fsm12.state == STATE_R)
FD_SET(fsm12.sfd, &rset);
if (fsm12.state == STATE_W)
FD_SET(fsm12.sfd, &wset);
if (fsm21.state == STATE_R)
FD_SET(fsm21.sfd, &rset);
if (fsm21.state == STATE_W)
FD_SET(fsm21.sfd, &wset);
// 监视
struct timeval ts;
ts.tv_sec = 0;
ts.tv_usec = 2;
int maxfd = max(fd1, fd2);
if (fsm12.state < STATE_AUTO || fsm21.state < STATE_AUTO) {
if (select(maxfd + 1, &rset, &wset, NULL, &ts) < 0) {
if (errno == EINTR)
continue;
perror("select()");
exit(1);
}
}
// 查看监视结果
if (FD_ISSET(fd1, &rset) || FD_ISSET(fd2, &wset) || fsm12.state > STATE_AUTO) {
fsm_driver(&fsm12);
}
if (FD_ISSET(fd2, &rset) || FD_ISSET(fd1, &wset) || fsm21.state > STATE_AUTO) {
fsm_driver(&fsm21);
}
}
// 恢复原来的文件描述符状态
fcntl(fd1, F_SETFL, old_fd1);
fcntl(fd2, F_SETFL, old_fd2);
}
enum {
STATE_R = 1,
STATE_W,
STATE_AUTO,
STATE_Ex,
STATE_T
};
在这个例子中,我们使用了select()
函数来监视两个文件描述符fd1
和fd2
。当其中一个文件描述符准备好读或写时,相应的状态机fsm12
或fsm21
就会被推进。这里增加了一个STATE_AUTO
状态,用于在EX
或T
状态之外的其他状态时,触发读写操作。这样可以避免在异常或退出状态时进行不必要的读写操作。
poll()
poll()
函数用于等待文件描述符上的事件。它以文件描述符
为单位组织事件,相比select()
更加可移植。
原函数
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
作用
fds
: 指向struct pollfd
数组的指针,用于指定要监视的文件描述符及其对应的事件。nfds
: 要监视的文件描述符数量。timeout
: 超时时间,单位为毫秒。-1表示阻塞直到有事件发生,0表示非阻塞立即返回,大于0表示等待指定时间。
参数
struct pollfd
: 用于指定文件描述符和事件。fd
: 文件描述符。events
: 所关心的事件,如POLLIN
(可读)、POLLOUT
(可写)等。revents
: 发生的事件。
返回值
- 返回就绪文件描述符的个数。
例子
#include <poll.h>
static void relay(int fd1, int fd2) {
int old_fd1, old_fd2;
struct fsm_st fsm12, fsm21; // 读左写右 读右写左
struct pollfd pfd[2];
old_fd1 = fcntl(fd1, F_GETFL);
fcntl(fd1, F_SETFL, old_fd1 | O_NONBLOCK);
old_fd2 = fcntl(fd2, F_GETFL);
fcntl(fd2, F_SETFL, old_fd2 | O_NONBLOCK);
fsm12.state = STATE_R;
fsm12.sfd = fd1;
fsm12.dfd = fd2;
fsm21.state = STATE_R;
fsm21.sfd = fd2;
fsm21.dfd = fd1;
pfd[0].fd = fd1;
pfd[1].fd = fd2;
while (fsm12.state != STATE_T || fsm21.state != STATE_T) {
// 布置监视任务
pfd[0].events = 0; // 事件清零
if (fsm12.state == STATE_R) // 1可读
pfd[0].events |= POLLIN;
if (fsm21.state == STATE_W) // 1可写
pfd[0].events |= POLLOUT;
pfd[1].events = 0; // 事件清零
if (fsm21.state == STATE_R) // 2可读
pfd[1].events |= POLLIN;
if (fsm12.state == STATE_W) // 2可写
pfd[1].events |= POLLOUT;
// 监视
if (fsm12.state < STATE_AUTO || fsm21.state < STATE_AUTO) {
while (poll(pfd, 2, -1) < 0) {
if (errno == EINTR)
continue;
perror("poll()");
exit(1);
}
}
// 查看监视结果
if (pfd[0].revents & POLLIN || pfd[1].revents & POLLOUT || fsm12.state > STATE_AUTO) {
fsm_driver(&fsm12);
}
if (pfd[1].revents & POLLIN || pfd[0].revents & POLLOUT || fsm21.state > STATE_AUTO) {
fsm_driver(&fsm21);
}
}
// 恢复原来的文件描述符状态
fcntl(fd1, F_SETFL, old_fd1);
fcntl(fd2, F_SETFL, old_fd2);
}
在这个例子中,我们使用了poll()
函数来监视两个文件描述符fd1
和fd2
。poll()
通过struct pollfd
结构体来指定要监视的文件描述符和对应的事件。当文件描述符上发生的事件匹配我们设置的事件时,poll()
会返回就绪文件描述符的个数。
epoll
epoll 是 Linux 特有的 I/O 多路复用机制,它是对 poll 机制的增强和优化,因此不具有跨平台性。
epoll_create()
原函数:
#include <sys/epoll.h>
int epoll_create(int size);
作用:创建一个 epoll 实例。
参数:size
参数可以随意给一个大于 0 的数,用于指定 epoll 实例的最大监听数。
返回值:返回新创建的 epoll 实例的文件描述符。
epoll_ctl()
原函数:
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
作用:控制 epoll 实例,对指定 epoll 实例 epfd
中的文件描述符 fd
执行操作 op
(添加、修改、删除)。
参数:
epfd
:epoll 实例的文件描述符。op
:操作类型,如EPOLL_CTL_ADD
、EPOLL_CTL_MOD
、EPOLL_CTL_DEL
。fd
:要操作的文件描述符。event
:指定的事件,包括events
和data
两个字段。
epoll_event 结构
events
:所需监听的事件类型,如EPOLLIN
(可读)、EPOLLOUT
(可写)等。data
:用户数据,可以是文件描述符,也可以是与文件描述符相关联的其他数据。
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
epoll_wait()
原函数:
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
作用:等待文件描述符上的事件。
参数:
epfd
:epoll 实例的文件描述符。events
:用于存放等待的事件。maxevents
:最多可以返回的事件数。timeout
:超时时间,-1 表示阻塞直到有事件发生,0 表示非阻塞立即返回,正数表示等待指定时间。
返回值:返回就绪的事件数。
示例
#include <sys/epoll.h>
static void relay(int fd1, int fd2) {
int old_fd1, old_fd2;
struct fsm_st fsm12, fsm21; // 读左写右 读右写左
struct epoll_event ev;
old_fd1 = fcntl(fd1, F_GETFL);
fcntl(fd1, F_SETFL, old_fd1 | O_NONBLOCK);
old_fd2 = fcntl(fd2, F_GETFL);
fcntl(fd2, F_SETFL, old_fd2 | O_NONBLOCK);
fsm12.state = STATE_R;
fsm12.sfd = fd1;
fsm12.dfd = fd2;
fsm21.state = STATE_R;
fsm21.sfd = fd2;
fsm21.dfd = fd1;
int epfd = epoll_create(2);
if (epfd < 0) {
perror("epoll_create()");
exit(1);
}
ev.data.fd = fd1;
ev.events = 0;
epoll_ctl(epfd, EPOLL_CTL_ADD, fd1, &ev);
ev.data.fd = fd2;
ev.events = 0;
epoll_ctl(epfd, EPOLL_CTL_ADD, fd2, &ev);
while (fsm12.state != STATE_T || fsm21.state != STATE_T) {
ev.data.fd = fd1;
ev.events = 0;
// 布置监视任务
if (fsm12.state == STATE_R) // 1可读
ev.events |= EPOLLIN;
if (fsm21.state == STATE_W) // 1可写
ev.events |= EPOLLOUT;
epoll_ctl(epfd, EPOLL_CTL_MOD, fd1, &ev);
ev.data.fd = fd2;
ev.events = 0;
if (fsm21.state == STATE_R) // 2可读
ev.events |= EPOLLIN;
if (fsm12.state == STATE_W) // 2可写
ev.events |= EPOLLOUT;
epoll_ctl(epfd, EPOLL_CTL_MOD, fd2, &ev);
// 监视
if (fsm12.state < STATE_AUTO || fsm21.state < STATE_AUTO) {
while (epoll_wait(epfd, &ev, 1, -1) < 0) {
if (errno == EINTR)
continue;
perror("poll()");
exit(1);
}
}
// 查看监视结果
if (ev.data.fd == fd1 && ev.events & EPOLLIN ||
ev.data.fd == fd2 && ev.events & EPOLLOUT ||
fsm12.state > STATE_AUTO) {
fsm_driver(&fsm12);
}
if (ev.data.fd == fd2 && ev.events & EPOLLIN ||
ev.data.fd == fd1 && ev.events & EPOLLOUT ||
fsm21.state > STATE_AUTO) {
fsm_driver(&fsm21);
}
}
// 恢复原来的文件描述符状态
fcntl(fd1, F_SETFL, old_fd1);
fcntl(fd2, F_SETFL, old_fd2);
close(epfd);
}
在这个示例中,我们使用 epoll 来监视两个文件描述符 fd1 和 fd2。我们首先使用 epoll_create 创建一个 epoll 实例,然后使用 epoll_ctl 添加这两个文件描述符。
在主循环中,我们根据状态机的当前状态来更新 epoll 实例中文件描述符的事件监听。然后,我们使用 epoll_wait 来等待文件描述符上的事件。当有事件发生时,我们根据事件更新状态机,并处理相应的读写操作。
最后,当状态机达到退出状态时,我们关闭 epoll 实例,并恢复文件描述符到非阻塞状态。
这个例子展示了如何使用 epoll 来实现非阻塞的 I/O 操作,适用于处理多个文件描述符的场景。