目录
- 同步和异步
- 同步阻塞IO模型
- 基本概念
- 应用场景
- 优缺点
- 同步非阻塞IO模型
- 基本概念
- 应用场景
- 优缺点
- IO多路复用模型
- 信号驱动IO模型
- 回顾复习
- 1.信号
- 2.产生信号的条件
- 3.可重入函数
- 4.为什么中断处理函数不能直接调用不可重入函数
- 5.如何写出可重入的函数
- 基本概念
- 应用场景
- 优缺点
- 异步IO模型
- 基本概念
- 关于异步IO的API
- 应用场景
- 优缺点
- 总结
同步和异步
-
同步:同步是指一个进程在执行某个请求的时候,如果该请求需要一段时间才能返回信息,那么这个进程会一直等待下去,直到收到返回信息才继续执行下去。
-
异步:异步是指进程不需要一直等待下去,而是继续执行下面的操作,不管其他进程的状态,当有信息返回的时候会通知进程进行处理,这样就可以提高执行的效率了,即异步是我们发出的一个请求,该请求会在后台
同步阻塞IO模型
基本概念
最传统的一种IO模型,即在读写数据过程中会发生阻塞现象。
当用户线程发出IO请求之后,内核会去查看数据是否就绪(数据是否在内核缓冲区),如果没有就绪就会等待数据就绪(等待数据从磁盘\网卡加载到内核缓冲区),而用户线程就会处于阻塞状态,用户线程交出CPU。当数据就绪之后,内核会将数据从内核缓冲区拷贝到用户缓冲区,并返回结果给用户线程(比如拷贝的字节数),用户线程才解除block状态。
应用场景
下面是用C语言写的的服务器,这里面就有同步阻塞IO的使用场景
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#define BUFFER_SIZE 1024
int main(int argc, char *argv[])
{
int str_length; // 服务器读取到的字符串长度
socklen_t addr_size; // IPv4套接字地址结构的大小
char buffer[BUFFER_SIZE]; // 缓冲区大小
short port = 8080; // 服务器的监听端口
char listen_addr_str[] = "127.0.0.1"; // 服务器的IP地址
int server_socket, client_socket; // 定义服务端的socket和客户端的socket
struct sockaddr_in server_addr, client_addr; // 定义服务端和客户端的IPv4的套接字地址结构(定长,16字节)
size_t listen_addr = inet_addr(listen_addr_str); // 将点分十进制的IPv4地址转换成网络字节序列的长整型
server_socket = socket(PF_INET, SOCK_STREAM, 0); // 创建套接字,SOCK_STREAM表示使用TCP协议
// 对IPv4的套接字地址结构做初始化
bzero(&server_addr, sizeof(server_addr)); // 将server_addr结构体的前sizeof(serveraddr)个字节清零。与memset()差不多。
server_addr.sin_family = AF_INET; // sin_family用来定义是哪种地址族,AF_INET表示使用IPv4进行通信
server_addr.sin_port = htons(port); // 指定端口号,htons()将短整型数据转换成网络字节顺序
server_addr.sin_addr.s_addr = listen_addr; // 指定服务器的IP地址
// 将服务器的套接字和套接字地址结构进行绑定
if (bind(server_socket, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1)
{
printf("绑定失败\n");
exit(1);
}
// 开始监听
if (listen(server_socket, 5) == -1) // 5表示队列的容量,这个队列用于记录正在连接但是还没有连接完成的客户端
{
printf("监听失败\n");
exit(1);
}
// 开始连接
addr_size = sizeof(client_addr);
// accept将客户端的信息绑定到一个socket上,也就是服务器要给客户端创建一个socket,连接成功就返回客户端的socket
client_socket = accept(server_socket, (struct sockaddr *)&client_addr, &addr_size);
while (1)
{
str_length = read(client_socket, buffer, BUFFER_SIZE);
if (str_length == 0) // 读取数据完毕,关闭套接字
{
close(client_socket);
printf("%d号客户端的连接已关闭\n", client_socket);
break;
}
else
{
printf("%d号客户端发送的数据:\n%s", client_socket, buffer);
write(client_socket, buffer, str_length); // 发送数据
}
}
return 0;
}
运行结果:
TCP连接一旦建立,客户端就会发送给服务端一个HTTP请求报文。在发送请求报文之前,服务端的read()函数(第53行)一直处于阻塞状态,并且read这个系统调用一直处于操作系统的内核态。客户端发送请求报文之后,请求报文的字节序列被服务器的网卡接收,此时报文数据还没有加载到操作系统的内核缓冲区,等到网卡接收到的数据被加载到内核缓冲区之后(这个操作由DMA完成,几乎不需要占用CPU),read()系统调用将内核缓冲区的数据拷贝到用户进程缓冲区(需要占用CPU)。最后再从内核态切换到用户态,应用程序读取用户进程缓冲区的数据并打印到终端。这就是一个典型的同步阻塞IO模型。
优缺点
优点:
- 简单易懂,代码实现简单。
- 在阻塞等待数据期间,用户进程挂起,用户进程基本不会占用CPU资源。
缺点:
- 会阻塞程序的执行,如果 I/O 操作时间较长,程序就会停顿很长时间,程序的性能差。
- 在高并发的应用场景下,需要大量的进程来维护大量的网络连接,内存、上下文切换开销会非常巨大。
同步非阻塞IO模型
基本概念
socket连接默认是阻塞模式,在Linux系统下,可以通过设置将socket变成为非阻塞的模式。使用非阻塞模式的IO读写,叫作同步非阻塞IO,简称为NIO模式。在NIO模型中,应用程序一旦开始IO系统调用,会出现以下两种情况:
- 在内核缓冲区中没有数据的情况下,系统调用会立即返回,返回一个调用失败的信息。
- 在内核缓冲区中有数据的情况下,是阻塞的,直到数据从内核缓冲复制到用户进程缓冲。复制完成后,系统调用返回成功,应用进程开始处理用户空间的缓存数据。
在非阻塞IO模型中,用户线程需要不断地询问内核数据是否就绪,也就说非阻塞IO不会交出CPU,而会一直占用CPU。
应用场景
由于使用Edge、Chrome等浏览器会在TCP连接建立后立即向服务器发送HTTP报文,这会让报文立即到达内核缓冲区,此时read系统调用也不会阻塞。所以为了更清晰的理解同步非阻塞IO模型,就需要自己写一个客户端,这个客户端需要在TCP连接建立完成3秒钟之后再发送HTTP报文。下面是客户端和服务器的代码。
客户端代码:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#define PORT 8080
int main(int argc, char const *argv[])
{
int client_fd, valread;
struct sockaddr_in server_addr;
char buffer[1024] = {0};
const char *http_request = "GET / HTTP/1.1\r\nHost: 127.0.0.1:8080\r\nConnection: close\r\n\r\n";
client_fd = socket(AF_INET, SOCK_STREAM, 0); // 创建socket
server_addr.sin_family = AF_INET; // sin_family用来定义是哪种地址族,AF_INET表示使用IPv4进行通信
server_addr.sin_port = htons(PORT); // 指定端口号,htons()将短整型数据转换成网络字节顺序
inet_pton(AF_INET, "127.0.0.1", &server_addr.sin_addr); // 将IPv4地址从点分十进制转换为二进制格式
if (connect(client_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) // 连接服务器
{
perror("connect failed");
exit(EXIT_FAILURE);
}
sleep(3); // 现在TCP连接已经建立完成,3秒后发送HTTP请求
send(client_fd, http_request, strlen(http_request), 0); // 发送HTTP请求
return 0;
}
服务器代码:
#include <stdio.h>
#include <fcntl.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#define BUFFER_SIZE 1024
int errno;
int set_non_block(int socket)
{
int flags = fcntl(socket, F_GETFL, 0);
flags |= O_NONBLOCK;
return fcntl(socket, F_SETFL, flags);
}
int main(int argc, char *argv[])
{
int str_length; // 服务器读取到的字符串长度
socklen_t addr_size; // IPv4套接字地址结构的大小
char buffer[BUFFER_SIZE]; // 缓冲区大小
short port = 8080; // 服务器的监听端口
char listen_addr_str[] = "127.0.0.1"; // 服务器的IP地址
int server_socket, client_socket; // 定义服务端的socket和客户端的socket
struct sockaddr_in server_addr, client_addr; // 定义服务端和客户端的IPv4的套接字地址结构(定长,16字节)
size_t listen_addr = inet_addr(listen_addr_str); // 将点分十进制的IPv4地址转换成网络字节序列的长整型
server_socket = socket(PF_INET, SOCK_STREAM, 0); // 创建套接字,SOCK_STREAM表示使用TCP协议
// 对IPv4的套接字地址结构做初始化
bzero(&server_addr, sizeof(server_addr)); // 将server_addr结构体的前sizeof(serveraddr)个字节清零。与memset()差不多。
server_addr.sin_family = AF_INET; // sin_family用来定义是哪种地址族,AF_INET表示使用IPv4进行通信
server_addr.sin_port = htons(port); // 指定端口号,htons()将短整型数据转换成网络字节顺序
server_addr.sin_addr.s_addr = listen_addr; // 指定服务器的IP地址
// 将服务器的套接字和套接字地址结构进行绑定
if (bind(server_socket, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1)
{
printf("绑定失败\n");
exit(1);
}
// 开始监听
if (listen(server_socket, 5) == -1) // 5表示队列的容量,这个队列用于记录正在连接但是还没有连接完成的客户端
{
printf("监听失败\n");
exit(1);
}
// 开始连接
addr_size = sizeof(client_addr);
// accept将客户端的信息绑定到一个socket上,也就是服务器要给客户端创建一个socket,连接成功就返回客户端的socket
client_socket = accept(server_socket, (struct sockaddr *)&client_addr, &addr_size);
if (client_socket > 0) // 非阻塞下,无法读取返回-1
{
if (set_non_block(client_socket) == -1) // 将客户端套接字设置非阻塞
{
printf("设置客户端非阻塞失败\n");
exit(1);
}
}
while (1)
{
// read()返回值:读到的字节数,若已到文件尾,返回0;若出错,返回−1
str_length = read(client_socket, buffer, BUFFER_SIZE);
if (str_length == -1) // 非阻塞下,无法读取返回-1,并开始轮询
{
printf("当前数据不在内核缓冲区,read系统调用返回\n");
printf("read系统调用返回-1,错误信息:%s\n",strerror(errno));
sleep(1);
continue;
}
else if (str_length > 0)
{
write(client_socket, buffer, str_length);
printf("%d号客户端发送的数据:\n%s", client_socket, buffer);
}
else // 读取数据完毕关闭套接字
{
close(client_socket);
printf("%d号客户端的连接已关闭\n", client_socket);
break;
}
}
exit(0);
}
运行结果:
注:
结果中的“当前数据”指的是客户端发给服务器的HTTP请求报文。
分析如下:
刚开始,HTTP请求报文还没有加载到操作系统的内核缓冲区(或者网卡可能只接受到一部分报文),所以将客户端套接字设置为非阻塞时,read系统调用并会不阻塞,而是立即返回-1(表示内核还没有准备好相关数据)。并且通过轮询
的方式一直向内核请求read系统调用(代码第68,74,78行)。直到当DMA把HTTP请求报文从网卡加载到内核缓冲区之后,read才能从内核缓冲区中读取到数据,并把数据拷贝到用户缓冲区(buffer),最后通过IO函数把用户缓冲区(buffer)的内容打印在终端屏幕上。运行结果的前六行表明用户进程使用了三次read系统调用,这是因为服务器轮询的周期是1秒,而客户端再建立TCP连接成功后3秒才发送HTTP报文,所以在这三秒之内非阻塞的read系统调用都会返回错误。3秒之后HTTP报文加载到内核缓冲区,read成功返回。最后就是客户端连接关闭,read认为读取到了套接字文件的末尾,就返回了0。
优缺点
优点:
- 每次发起的IO系统调用,在内核等待数据过程中可以立即返回。用户线程不会阻塞,实时性较好。
缺点:
- 不断地轮询内核,这将占用大量的CPU时间,效率低下。
IO多路复用模型
IO多路复用应用非常广泛,由于篇幅限制,我把IO多路复用模型的博客文章另起一篇编写。详见:IO多路复用详解-黎明的博客-CSDN
信号驱动IO模型
回顾复习
再理解信号驱动IO模型之前有必要复习一下“信号”和“可重入函数”这两个概念。
1.信号
信号是一种软中断机制,很多比较重要的内核程序都需要有信号处理程序。首先,每个信号都有一个名字,这些名字都以字符SIG开头。在头文件<signal.h>中,每个信号都被定义为正整数常量(信号编号)。并且要记住不存在编号为 0 的信号,因为kill 函数对信号编号 0 有特殊的应用,而POSIX.1将这种信号称为空信号。
2.产生信号的条件
- 当用户按某些终端键时,比如:Ctrl+C会产生中断信号(SIGINT)。
- 硬件异常产生信号:除数为0、无效的内存引用等。这些条件通常由硬件检测到,并通知内核。
- 进程调用kill()函数可将任意信号发送给另一个进程或进程组。关于kill函数的语法详见:Linux 下的KILL函数的用法-博客园
- 用户可用kill命令将信号发送给其他进程。此命令只是kill函数的接口。常用此命令终止后台进程。
- 当检测到某种软件条件发生时,也可能产生信号。如:网络连接上传来带外数据时产生SIGURG信号、进程所设置的定时器已经超时产生SIGALRM信号。
3.可重入函数
可重入函数主要用于多进程环境中,一个可重入的函数简单来说就是可以被中断的函数,也就是说,可以在这个函数执行的任何时刻中断它,当在操作系统的调度下去执行另外一段代码,而返回控制时不会出现错误,这就是可重入函数;而不可重入的函数由于使用了一些系统资源,比如全局变量区,中断向量表等,所以它如果被中断的话,可能会出现问题,这类函数是不能运行在多进程环境下的。
满足下列条件的函数多数是不可重入的:
- 函数体内使用了静态(static)的数据结构;
- 函数体内调用了 malloc() 或者 free() 函数;
- 函数体内调用了标准 I/O 函数(<stdio.h>里面的函数);
4.为什么中断处理函数不能直接调用不可重入函数
在多任务环境下,中断可能在任务执行的任何时间发生;如果一个函数的执行期间被中断后,到重新恢复到断点进行执行的过程中,函数所依赖的环境没有发生改变,那么这个函数就是可重入的,否则就不可重入。
在中断前后不是都要进行状态保存和恢复上下文吗?怎么会出现函数所依赖的环境发生改变了呢? 我们知道中断时确实保存一些上下文,但是仅限于返回地址,cpu 寄存器等之类的少量上下文,而函数内部使用的诸如全局变量、静态变量和buffer 等并不在保护之列,所以如果这些值在函数被中断期间发生了改变,那么当函数回到断点继续执行时,其结果就不可预料了。
在中断处理函数中调用有互斥锁保护的全局变量,如果恰好该变量正在被另一个线程调用,会形成死锁,使中断处理函数不能及时返回,最后导致中断丢失等严重问题。并且在多线程环境中使用,在没有加锁的情况下,对同一段内存块进行并发读写,就会造成 segmentfault/coredump 之类的问题。总而言之,中断处理函数做的事情越简单越好。
5.如何写出可重入的函数
- 尽量使用函数参数和局部变量来存储数据,避免使用全局变量和静态变量。
- 如果必须访问全局变量,记住利用互斥信号量来保护全局变量。或者调用该函数前关中断,调用后再开中断;
- 避免使用不可重入的函数和系统调用,尽量使用可重入的函数和系统调用。
- 在使用库函数时,要注意查看函数是否是可重入的,如果不是,需要采取相应的措施来保证函数的可重入性。
- 在和硬件发生交互的时候,切记关闭硬件中断,完成交互记得打开中断;
基本概念
信号驱动的 I/O 模型是一种异步 I/O 模型,它允许应用程序在等待 I/O 操作完成时继续执行其他任务。在这种模型中,当应用程序发起一个 I/O 操作时,它并不会一直等待操作完成。相反,它会注册一个信号处理程序,该处理程序在 I/O 操作完成时被调用。这意味着,当 I/O 操作完成时,内核会发送一个信号告知应用程序,应用程序可以在信号处理程序中获取 I/O 操作的结果。信号驱动的 I/O 模型适用于大量的 I/O 操作,因为它可以同时处理多个 I/O 操作,而不必等待一个操作完成后才开始下一个操作。
在网络IO的场景下,为了使用该I/O模型,需要开启套接字的信号驱动I/O功能,并通过sigaction系统调用安装一个信号处理函数。sigaction函数立即返回,我们的进程继续工作,即进程没有被阻塞。当数据报准备好时,内核会为该进程产生一个SIGIO信号,这样我们可以在信号处理函数中调用recvfrom读取数据报,也可以在主循环中读取数据报。无论如何处理SIGIO信号,这种模型的优势在于等待数据报到达期间不被阻塞。
应用场景
下面是一个使用信号驱动IO模型的web服务器和一个用于测试的客户端。
服务器代码:
#include <stdio.h>
#include <errno.h>
#include <fcntl.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#define BUF_SIZE 1024
volatile int server_socket, connfd;
// 处理IO信号的回调函数
void handle_io_signal(int sig)
{
char buf[BUF_SIZE];
int len = read(connfd, buf, BUF_SIZE);
if (len > 0)
{
write(STDOUT_FILENO, "数据到了!被SIGIO信号唤醒。接收到数据:\n", 53);
write(STDOUT_FILENO, buf, strlen(buf));
}
return ;
}
int main(void)
{
// 注册IO信号处理程序
signal(SIGIO, handle_io_signal);
short port = 8080; // 服务器的监听端口
char listen_addr_str[] = "127.0.0.1"; // 服务器的IP地址
char buf[BUF_SIZE];
socklen_t addr_size; // IPv4套接字地址结构的大小
struct sockaddr_in server_addr, client_addr; // 定义服务端和客户端的IPv4的套接字地址结构(定长,16字节)
size_t listen_addr = inet_addr(listen_addr_str); // 将点分十进制的IPv4地址转换成网络字节序列的长整型
// 对IPv4的套接字地址结构做初始化
bzero(&server_addr, sizeof(server_addr)); // 将server_addr结构体的前sizeof(serveraddr)个字节清零,与memset()差不多
server_addr.sin_family = AF_INET; // sin_family用来定义是哪种地址族,AF_INET表示使用IPv4进行通信
server_addr.sin_port = htons(port); // 指定端口号,htons()将短整型数据转换成网络字节顺序
server_addr.sin_addr.s_addr = listen_addr; // 指定服务器的IP地址
server_socket = socket(AF_INET, SOCK_STREAM, 0); // 创建套接字
bind(server_socket, (struct sockaddr *)&server_addr, sizeof(server_addr));
listen(server_socket, SOMAXCONN);
addr_size = sizeof(client_addr);
// accept将客户端的信息绑定到一个socket上,也就是服务器要给客户端创建一个socket,连接成功就返回客户端的socket
connfd = accept(server_socket, (struct sockaddr *)&client_addr, &addr_size);
// 将当前进程设置为套接字的拥有者。这样当套接字有事件发生时,内核会向拥有那个套接字的进程发送一个信号(SIGIO)。
fcntl(connfd, __F_SETOWN, getpid());
int flag = fcntl(connfd, F_GETFL);
// 设置套接字的文件状态标志为异步通知和非阻塞
fcntl(connfd, F_SETFL, flag | O_ASYNC | O_NONBLOCK);
while (1)
{
int len = read(connfd, buf, BUF_SIZE);
if (len > 0)
{
write(STDOUT_FILENO, "接收到数据:\n", 17);
write(STDOUT_FILENO, buf, strlen(buf));
}
else if (len == -1)
{
printf("数据还未到达内核缓冲区\n");
pause(); // 进入等待信号状态
}
else
{
printf("客户端连接关闭\n");
close(connfd); // 关闭socket
exit(0);
}
}
exit(0);
}
客户端代码:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#define PORT 8080
int main(int argc, char const *argv[])
{
int client_fd, valread;
struct sockaddr_in server_addr;
char buffer[1024] = {0};
const char *http_request = "GET / HTTP/1.1\r\nHost: 127.0.0.1:8080\r\nConnection: close\r\n\r\nHello World!\r\n";
client_fd = socket(AF_INET, SOCK_STREAM, 0); // 创建socket
server_addr.sin_family = AF_INET; // sin_family用来定义是哪种地址族,AF_INET表示使用IPv4进行通信
server_addr.sin_port = htons(PORT); // 指定端口号,htons()将短整型数据转换成网络字节顺序
inet_pton(AF_INET, "127.0.0.1", &server_addr.sin_addr); // 将IPv4地址从点分十进制转换为二进制格式
// 连接服务器
if (connect(client_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0)
{
perror("connect failed");
exit(EXIT_FAILURE);
}
sleep(3); // 现在TCP连接已经建立完成,3秒后发送HTTP请求
send(client_fd, http_request, strlen(http_request), 0); // 发送HTTP请求
return 0;
}
运行结果如下:
以上客户端和服务器的通信过程如下:
首先,服务器注册IO驱动函数并监听8080端口。然后当客户端与服务器建立TCP连接之后,客户端并没有立即发送HTTP报文,而是休眠3秒之后再向服务器发送HTTP报文,这样可以使服务器在连接建立成功后不能立即读取到数据。由于设置了connfd为非阻塞和异步,所以当服务器读取不到数据后就执行pause()函数进入休眠并等待信号。当然在这里进程也可以进行一些任务处理(执行printf(“数据还未到达内核缓冲区\n”);这条输出语句本身就是在进行某种任务处理),总之在内核发送信号之前进程是不会阻塞的。不过需要注意的是如果进程的任务处理是进行一些大量的IO操作,那么很可能会发生SIGIO信号被丢弃的情况,导致信号处理函数不能得到执行。这是因为在进行大量的IO操作时内核空间与用户空间会进行平凡的信号交互,(由于信号是不排队的,也就是内核发出的信号不及时处理,该信号就会被丢弃)虽然信号队列可以实现让少量的信号进行排队,但是大量的IO操作会造成信号队列溢出的情况,导致后面到达的SIGIO信号被丢弃。
优缺点
优点:
- 信号驱动的 IO 模型相对来说比较简单和易于理解,并且信号到来后就直接强行中断进行处理,更加实时。
- 信号驱动的 IO 模型可以同时支持多个文件描述符的 IO 操作,并且可以实现异步 IO 的能力,因为进程在等待信号的时候可以处理其他任务,从而提高了系统的并发性和性能。
缺点:
- 可能存在信号竞争和信号处理函数的可重入性问题:信号驱动的 IO 模型中,当内核检测到文件描述符上有数据可读或可写时,它会向进程发送一个相应的信号。进程在处理信号时需要考虑到信号的竞争和可能导致的信号处理函数的可重入性问题。
- 当有大量IO操作时,信号较多,SIGIO处理函数不能及时处理可能导致信号队列溢出,而且内核空间与用户空间的频繁信号交互性能也较低。
异步IO模型
基本概念
异步I/O模型的工作机制是,启动某个操作,并让内核在整个操作(包括等待数据和将数据从内核复制到用户空间)完成后通知应用进程。
信号驱动的IO模型和异步IO模型的区别:
信号驱动的 IO 模型中,每个套接字都绑定了一个信号处理函数,当该套接字上有数据可读或可写时,内核会向该进程发送 SIGIO 信号,触发信号处理函数进行读取或写入数据操作。在高并发场景下,由于每个套接字都需要绑定一个信号处理函数,这会增加系统的资源消耗和复杂度,并且由于信号处理函数会在一个独立的线程或进程中执行,因此会导致线程/进程的频繁创建和销毁加重进程调度的负担。
而异步 IO 模型中,应用程序通过异步调用发起 IO 操作,并在操作完成时通过回调函数得到结果。无需额外创建信号处理函数或线程/进程,因此可以大大降低系统的资源消耗和复杂度,并且在高并发场景下可以更加有效地利用系统资源保证程序的性能和可伸缩性。
从IO执行的过程中看,信号驱动的 IO 模型只有前半部分实现了异步,而异步IO模型则在整个IO操作过程中都是异步的。所以,相对于信号驱动的 IO 模型,异步 IO 模型在高并发场景下优势较大,可以更好地适应高并发、高负载的应用程序需求。
关于异步IO的API
API函数 | 解释 |
---|---|
aio_read | 异步读请求 |
aio_error | 检查异步请求的状态 |
aio_return | 获取已完成的异步请求的返回状态 |
aio_write | 异步写请求 |
上面的每个API函数都是通过aiocb结构体来初始化或者查询状态的。这个结构体有里面很多成员,下面只列出我们需要经常用到的成员:
struct aiocb {
int aio_fildes; // 文件描述符
int aio_lio_opcode; // 仅对lio_listio(r/w/nop)有效
volatile void *aio_buf; // 数据缓冲区
size_t aio_nbytes; // 数据缓冲区的字节数
struct sigevent aio_sigevent; // sigevent结构体用于告诉AIO当I/O请求完成后需要怎么做
...
};
-
aio_read
aio_read函数用于对一个有效的文件描述符发送异步读请求。这个文件描述符可以是一个文件、套接字或者管道。函数的定义如下:
int aio_read( struct aiocb *aiocbp );
当读请求被插入到队列之后aio_read函数会立即返回,成功时返回值为0,失败时返回值为-1,并且会设置errno全局变量。要执行读请求应用程序必须初始化aiocb结构体。在使用aio_read函数之前有三件事情要做:第一是将结构体aicbo的前sizeof(aiocb)个字节清零;第二是给结构体aicbo的数据缓冲区分配一块内存空间;第三是初始化一些结构体字段。具体如下:
// ······ int fd, ret_fd; struct aiocb my_aiocb; fd = open("file.txt", O_RDONLY); // 得到一个打开的文件描述符 bzero((char *)&my_aiocb, sizeof(struct aiocb)); // 将my_aiocb的前sizeof(struct aiocb)个字节清零 my_aiocb.aio_buf = malloc(BUFSIZE + 1); // 给结构体aicbo的数据缓冲区分配一块内存空间 // 初始化一些结构体字段 my_aiocb.aio_fildes = fd; my_aiocb.aio_nbytes = BUFSIZE; my_aiocb.aio_offset = 0; ret_fd = aio_read(&my_aiocb); // ······
-
aio_error
aio_error函数用于检查请求的状态。它的定义如下:
int aio_error( struct aiocb *aiocbp );
此函数可以返回一下信息:
- EINPROGRESS,表示此请求还没有完成
- ECANCELLED,表示此请求被应用程序取消
- -1,表示请求出现错误,你可以通过errno的值来检查错误的说明。
-
aio_return
异步I/O和标准阻塞I/O的另外一个不同之处在于你不能直接访问函数的返回状态,因为你没有被阻塞在read系统调用上。标准的read系统调用会将返回状态赋值在函数的返回值上。在异步I/O中你只能使用aio_return函数,此函数的定义如下:
ssize_t aio_return( struct aiocb *aiocbp );
这个函数只能在aio_error返回请求完成(成功或者出错)之后被调用。它的返回值和同步模型中的read和write系统调用的返回值相同(成功就返回传输的字节数,错误返回-1)。
-
aio_write
aio_write用于异步I/O中的写请求,此函数的定义如下:
int aio_write( struct aiocb *aiocbp );
aio_write函数会立即返回,表示这个请求已经被加入到写队列中(成功时返回0,失败返回-1,并设置errno全局变量)。它和异步读函数类似但是有一个区别需要特别注意:异步读函数中设置文件偏移是非常重要的,但是在异步写操作中文件偏移只有在O_APPEND选项
没有设置
时才会起作用,在这种情况下数据会被写入到文件偏移所指定的地方。如果O_APPEND选项被设置,则文件偏移会被忽略,数据总是会写入到文件的末端。
应用场景
前面总结了同步阻塞IO、同步非阻塞IO、IO多路复用、信号驱动IO和异步IO这5中经典的IO模型,现在打算写一个服务器,要求是:基于IO多路复用实现并发,使用异步IO进行数据读取,并且数据读取完成后使用信号通知的方式让用户进程把到达用户缓冲区的数据输出到终端。下面是客户端和服务器的代码。
服务器代码:
#include <aio.h>
#include <stdio.h>
#include <errno.h>
#include <fcntl.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#define MAX_EVENTS 1024 // epoll 监听队列长度
#define MAX_BUFSIZE 4096 // 缓冲区大小
// 创建并设置非阻塞 socket
int create_socket(int port)
{
int sockfd, optval = 1;
struct sockaddr_in server_addr;
char listen_addr_str[] = "127.0.0.1"; // 服务器的IP地址
sockfd = socket(AF_INET, SOCK_STREAM, 0); // 创建 socket
fcntl(sockfd, F_SETFL, O_NONBLOCK); // 设置 non-blocking 模式
setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
size_t listen_addr = inet_addr(listen_addr_str); // 将点分十进制的IPv4地址转换成网络字节序列的长整型
// 对IPv4的套接字地址结构做初始化
bzero(&server_addr, sizeof(server_addr)); // 将server_addr结构体的前sizeof(serveraddr)个字节清零,与memset()差不多
server_addr.sin_family = AF_INET; // sin_family用来定义是哪种地址族,AF_INET表示使用IPv4进行通信
server_addr.sin_port = htons(port); // 指定端口号,htons()将短整型数据转换成网络字节顺序
server_addr.sin_addr.s_addr = listen_addr; // 指定服务器的IP地址
bind(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)); // 绑定端口
listen(sockfd, 1024); // 监听端口
return sockfd;
}
// 处理客户端连接请求
void handle_connection(int epollfd, int listenfd)
{
struct sockaddr_in client_addr;
socklen_t addr_len = sizeof(client_addr);
int connfd = accept(listenfd, (struct sockaddr *)&client_addr, &addr_len);
printf("新连接来自 %s:%d,sockfd: %d\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), connfd);
// 添加客户端socket到epoll对象中
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = connfd;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, connfd, &ev) == -1)
{
perror("epoll_ctl error");
close(connfd);
}
}
// 异步读取请求数据并回复客户端
void do_read(int sockfd, char *buf, struct aiocb *my_aiocb)
{
ssize_t nread;
my_aiocb->aio_fildes = sockfd;
my_aiocb->aio_nbytes = MAX_BUFSIZE;
my_aiocb->aio_buf = buf;
my_aiocb->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
my_aiocb->aio_sigevent.sigev_signo = SIGUSR1;
if (aio_read(my_aiocb) < 0)
{
perror("aio_read error");
close(sockfd);
return;
}
// while (aio_error(my_aiocb) == EINPROGRESS) // TRUE表示IO请求还没有完成
// {
// // 什么都不做,等待异步IO完成
// }
// if ((nread = aio_return(my_aiocb)) < 0) // 异步IO操作完成,得到返回值
// {
// perror("aio_return error");
// close(sockfd);
// return;
// }
while (1) // 等待信号
{
pause();
}
}
void handle_aio_completion(int signo, siginfo_t *info, void *context)
{
struct aiocb *my_aiocb = info->si_value.sival_ptr;
int sockfd = my_aiocb->aio_fildes;
volatile char *buf = my_aiocb->aio_buf;
printf("接收到数据:\n%s\n", buf);
}
int main(int argc, char *argv[])
{
char buf[MAX_BUFSIZE]; // my_aiocb的数据缓冲区
int listenfd, epollfd, nfds, n;
listenfd = create_socket(8080); // 创建 listenfd
epollfd = epoll_create(1024); // 创建 epollfd
struct epoll_event ev, events[MAX_EVENTS]; // ev是添加到epoll对象中的事件,events[]用于存储epoll_wait函数返回的就绪事件
ev.events = EPOLLIN | EPOLLET; // 设置为ET模式(边缘触发)
ev.data.fd = listenfd; // 添加 listenfd 到 epoll 监听队列
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &ev) == -1)
{
perror("epoll_ctl error");
exit(1);
}
// 注册信号处理函数
struct sigaction sig_act;
sigemptyset(&sig_act.sa_mask); // 将所有信号屏蔽位都清零。
sig_act.sa_flags = SA_SIGINFO; // 在处理信号时,使用siginfo_t结构体来传递信号的相关信息
sig_act.sa_sigaction = handle_aio_completion; // 当接收到该信号时,将调用handle_aio_completion函数来处理信号
// 第一个参数SIGUSR1表示要设置的信号类型
// 第二个参数&sig_act将结构体sigaction中的信息应用到SIGUSR1信号上
// 第三个参数NULL表示不需要保存旧的信号处理行为
if (sigaction(SIGUSR1, &sig_act, NULL) < 0)
{
perror("sigaction error");
exit(1);
}
// 初始化 my_aiocb
struct aiocb my_aiocb;
bzero(&my_aiocb, sizeof(struct aiocb));
bzero(buf, MAX_BUFSIZE);
my_aiocb.aio_buf = buf;
my_aiocb.aio_offset = 0;
my_aiocb.aio_sigevent.sigev_notify = SIGEV_SIGNAL; // 使用信号通知的方式
my_aiocb.aio_sigevent.sigev_signo = SIGUSR1; // 异步操作完成时,内核向进程发送SIGUSR1信号来通知它
my_aiocb.aio_sigevent.sigev_value.sival_ptr = &my_aiocb; // 将my_aiocb的指针存储在si_value.sival_ptr中
// 循环监听事件
while (1)
{
nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
if (nfds == -1)
{
perror("epoll_wait error");
continue;
}
// 处理所有事件
for (n = 0; n < nfds; ++n)
{
// 如果是服务器socket有新连接请求
if (events[n].data.fd == listenfd)
{
handle_connection(epollfd, listenfd);
}
// 如果是客户端的读事件
else if (events[n].events & EPOLLIN)
{
do_read(events[n].data.fd, buf, &my_aiocb); // 异步读取客户端发来的数据
}
else
{
break;
}
}
}
return 0;
}
客户端代码:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#define PORT 8080
int main(int argc, char const *argv[])
{
int client_fd, valread;
struct sockaddr_in server_addr;
char buffer[1024] = {0};
const char *http_request = "GET / HTTP/1.1\r\nHost: 127.0.0.1:8080\r\nConnection: close\r\n\r\nHello World!";
client_fd = socket(AF_INET, SOCK_STREAM, 0); // 创建socket
server_addr.sin_family = AF_INET; // sin_family用来定义是哪种地址族,AF_INET表示使用IPv4进行通信
server_addr.sin_port = htons(PORT); // 指定端口号,htons()将短整型数据转换成网络字节顺序
inet_pton(AF_INET, "127.0.0.1", &server_addr.sin_addr); // 将IPv4地址从点分十进制转换为二进制格式
// 连接服务器
if (connect(client_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0)
{
perror("connect failed");
exit(EXIT_FAILURE);
}
sleep(1); // 现在TCP连接已经建立完成,1秒后发送HTTP请求
send(client_fd, http_request, strlen(http_request), 0); // 发送HTTP请求
return 0;
}
这段服务器代码综合了IO多路复用、异步IO和信号驱动IO的思想,具体过程不在多说。当然这段代码也有很多不太合理的地方,比如仅仅是考虑了如何捕获信号,对于并发场景下的信号屏蔽、数据竞争以及信号处理函数的可重入性等问题还没有考虑,还有很多需要改善的地方。
优缺点
优点:
-
高效性:异步IO可以提高应用程序的吞吐量和响应速度,因为它允许应用程序在等待IO操作完成时继续执行其他任务,而不必阻塞等待IO操作完成。
-
可扩展性:异步IO可以提高应用程序的可扩展性,因为它允许应用程序处理更多的并发IO操作。
-
可靠性:异步IO可以提高应用程序的可靠性,因为它可以减少IO操作的延迟和阻塞,从而减少了IO操作失败的可能性。
-
跨平台性:异步IO可以在多个操作系统和硬件平台上运行,因为它是一种标准的IO模型,具有良好的可移植性。
缺点:
-
复杂性:异步IO需要更复杂的代码实现,因为它需要使用回调函数或事件驱动的方式来处理IO操作完成的通知。
-
学习成本:异步IO需要更高的学习成本,因为它需要理解回调函数、事件驱动和异步IO的工作原理。
-
可读性:异步IO的代码可读性可能不如同步IO的代码,因为它需要使用回调函数或事件驱动的方式来处理IO操作完成的通知。
-
调试难度:异步IO的调试难度可能比同步IO更高,因为它需要跟踪回调函数或事件驱动的执行流程。
总结
对五种经典的IO模型的总结:
- 阻塞IO(Blocking IO):当应用程序调用阻塞IO时,它会一直阻塞等待,直到操作完成。在此期间,应用程序无法做任何其他事情。这种模型简单易懂,但可能会浪费资源。
- 非阻塞IO(Non-blocking IO):当应用程序调用非阻塞IO时,它不会被阻塞等待操作完成。相反,它会立即返回,并告诉应用程序操作是否已完成。这种模型需要使用循环检查操作是否已完成,因此可能会导致CPU消耗过高。
- IO复用(IO Multiplexing):IO复用使用select()或poll()系统调用,这些调用可以同时监视多个文件描述符,以确定哪个文件描述符准备好了读或写。这种模型可以同时处理多个连接,并且不会受到阻塞或循环检查的问题。
- 信号驱动IO(Signal-driven IO):信号驱动IO中,应用程序调用ioctl()或fcntl()函数来启用信号驱动IO操作完成的通知。当IO操作完成时,内核会向应用程序发送一个信号,应用程序在信号处理函数中重新开始执行。这种模型避免了阻塞和循环检查的问题,但需要使用信号驱动IO的特定机制。
- 异步IO(Asynchronous IO):异步IO是一种通过回调函数或事件驱动响应IO操作完成的通知,以允许应用程序在IO操作完成之前继续执行其他任务的方法。这种模型可以提高应用程序的吞吐量和响应速度,但需要更复杂的代码实现和更高的学习成本。
前四种I/O模型的主要区别在于第一个阶段,它们的第二个阶段是一样的:在数据从内核复制到应用进程的缓冲区期间,进程会被阻塞于recvfrom系统调用。
而异步I/O模型则是整个操作完成内核才通知应用进程。