摘要
在网络编程领域,同步(Synchronous)、异步(Asynchronous)、阻塞(Blocking)与非阻塞(Non-blocking)IO模型是核心概念。尽管这些概念在多篇文章中被广泛讨论,它们的抽象性使得彻底理解并非易事。本文旨在通过具体的实验案例,将这些抽象概念具体化,以助于读者构建清晰的理解框架。
概念
IO 复用到底复用了什么
IO 的类型有网络 IO、磁盘 IO。我们可以把标准输入、套接字等都看做 I/O 的一路,多路复用的意思,就是在任何一路 I/O 有“事件”发生的情况下,通知应用程序去处理相应的 I/O 事件,这样我们的程序就变成了“多面手”,在同一时刻仿佛可以处理多个 I/O 事件。
IO 事件类型
- 标准输入文件描述符准备好可以读。
- 监听套接字准备好,新的连接已经建立成功。
- 已连接套接字准备好可以写。
- 如果一个 I/O 事件等待超过了 10 秒,发生了超时事件。
IO 模型
阻塞 IO
阻塞 IO 模型,当我们调用 recvfrom 读取数据时,只用等数据完全准备好,然后应用程序把数据从内核态拷贝到应用空间,程序才会返回,否则从调用方视角来看程序将会一直阻塞在 recvfrom 上。
非阻塞 IO
非阻塞IO场景发起 recvfrom 后,在内核数据没准备好的情况下会返回 EWOULDBLOCK,EAGAIN 错误,所以调用方需要不断的轮训获取数据结果。非阻塞 I/O 可以使用在 read、write、accept、connect 等多种不同的场景,在非阻塞 I/O 下,使用轮询的方式引起 CPU 占用率高,所以一般将非阻塞 I/O 和 I/O 多路复用技术 select、poll 等搭配使用,在非阻塞 I/O 事件发生时,再调用对应事件的处理函数。这种方式,极大地提高了程序的健壮性和稳定性,是 Linux 下高性能网络编程的首选。
非阻塞 IO Write 流程
/* 向文件描述符 fd 写入 n 字节数 */
ssize_t writen(int fd, const void * data, size_t n)
{
size_t nleft;
ssize_t nwritten;
const char *ptr;
ptr = data;
nleft = n;
// 如果还有数据没被拷贝完成,就一直循环
while (nleft > 0) {
if ( (nwritten = write(fd, ptr, nleft)) <= 0) {
/* 这里 EINTR 是非阻塞 non-blocking 情况下,通知我们再次调用 write() */
if (nwritten < 0 && errno == EINTR)
nwritten = 0;
else
return -1; /* 出错退出 */
}
/* 指针增大,剩下字节数变小 */
nleft -= nwritten;
ptr += nwritten;
}
return n;
}
- nleft 标记剩余写入数据
- while 循环一直写入直到 nleft == 0
- write 失败后,如果是非阻塞将会返回 EINTR 错误,说明数据还未准备好,这是我们将 nwritten 置为0
- write 成功则说明内核 socket 缓冲有空间了,不断写入值直到 nleft == 0
IO 复用
IO 复用不同于非阻塞IO的地方在于,IO 复用是在内核态实现了轮训,相比应用层实现少了很多系统调用(系统调用成本很高)
Read 和 Write 非阻塞模式对比
图源-网络编程实战
案例
使用Select与非阻塞IO实现高效网络通信
#define MAX_LINE 1024
#define FD_INIT_SIZE 128
char rot13_char(char c) {
if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
return c + 13;
else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
return c - 13;
else
return c;
}
// 数据缓冲区
struct Buffer {
int connect_fd; // 连接字
char buffer[MAX_LINE]; // 实际缓冲
size_t writeIndex; // 缓冲写入位置
size_t readIndex; // 缓冲读取位置
int readable; // 是否可以读
};
struct Buffer *alloc_Buffer() {
struct Buffer *buffer = malloc(sizeof(struct Buffer));
if (!buffer)
return NULL;
buffer->connect_fd = 0;
buffer->writeIndex = buffer->readIndex = buffer->readable = 0;
return buffer;
}
void free_Buffer(struct Buffer *buffer) {
free(buffer);
}
int onSocketRead(int fd, struct Buffer *buffer) {
char buf[1024];
int i;
ssize_t result;
// 循环读取数据直到读完
while (1) {
result = recv(fd, buf, sizeof(buf), 0);
if (result <= 0)
break;
for (i = 0; i < result; ++i) {
if (buffer->writeIndex < sizeof(buffer->buffer))
buffer->buffer[buffer->writeIndex++] = rot13_char(buf[i]);
if (buf[i] == '\n') {
buffer->readable = 1; // 缓冲区可以读
}
}
}
if (result == 0) {
return 1;
} else if (result < 0) {
if (errno == EAGAIN)
return 0;
return -1;
}
return 0;
}
int onSocketWrite(int fd, struct Buffer *buffer) {
while (buffer->readIndex < buffer->writeIndex) {
ssize_t result = send(fd, buffer->buffer + buffer->readIndex, buffer->writeIndex - buffer->readIndex, 0);
if (result < 0) {
if (errno == EAGAIN)
return 0;
return -1;
}
buffer->readIndex += result;
}
if (buffer->readIndex == buffer->writeIndex)
buffer->readIndex = buffer->writeIndex = 0;
buffer->readable = 0;
return 0;
}
int main(int argc, char **argv) {
int listen_fd;
int i, maxfd;
struct Buffer *buffer[FD_INIT_SIZE];
for (i = 0; i < FD_INIT_SIZE; ++i) {
buffer[i] = alloc_Buffer();
}
// 设置 非 阻塞监听
listen_fd = tcp_nonblocking_server_listen(SERV_PORT);
fd_set readset, writeset, exset;
FD_ZERO(&readset);
FD_ZERO(&writeset);
FD_ZERO(&exset);
while (1) {
maxfd = listen_fd;
FD_ZERO(&readset);
FD_ZERO(&writeset);
FD_ZERO(&exset);
// listener 加入 readset
FD_SET(listen_fd, &readset);
for (i = 0; i < FD_INIT_SIZE; ++i) {
if (buffer[i]->connect_fd > 0) {
if (buffer[i]->connect_fd > maxfd)
maxfd = buffer[i]->connect_fd;
FD_SET(buffer[i]->connect_fd, &readset);
if (buffer[i]->readable) {
FD_SET(buffer[i]->connect_fd, &writeset);
}
}
}
if (select(maxfd + 1, &readset, &writeset, &exset, NULL) < 0) {
error(1, errno, "select error");
}
if (FD_ISSET(listen_fd, &readset)) {
printf("listening socket readable\n");
// sleep 模拟处理延时
sleep(5);
struct sockaddr_storage ss;
socklen_t slen = sizeof(ss);
// 如果是阻塞 IO 由于超时原因客户端断开连接,此时服务端的连接也失效,加入一直没有请求进来
// 将会一直阻塞在 accept 这里。如果是异步IO accept 将会立刻返回,但我们要处理好 accept 的
// 异常情况
int fd = accept(listen_fd, (struct sockaddr *) &ss, &slen);
if (fd < 0) {
error(1, errno, "accept failed");
} else if (fd > FD_INIT_SIZE) {
error(1, 0, "too many connections");
close(fd);
} else {
// 把连接套接字设置为非阻塞
make_nonblocking(fd);
if (buffer[fd]->connect_fd == 0) {
buffer[fd]->connect_fd = fd;
} else {
error(1, 0, "too many connections");
}
}
}
for (i = 0; i < maxfd + 1; ++i) {
int r = 0;
if (i == listen_fd)
continue;
if (FD_ISSET(i, &readset)) {
r = onSocketRead(i, buffer[i]);
}
if (r == 0 && FD_ISSET(i, &writeset)) {
r = onSocketWrite(i, buffer[i]);
}
if (r) {
buffer[i]->connect_fd = 0;
close(i);
}
}
}
}
- 调用 fcntl 将监听套接字设置为非阻塞。
- 行调用 select 进行 I/O 事件分发处理
- 把accept的连接套接字设置为非阻塞的
- 处理连接套接字上的 I/O 读写事件,抽象了一个 Buffer 对象,Buffer 对象使用了 readIndex 和 writeIndex 分别表示当前缓冲的读写位置。
结尾
文章总结了同步、异步、阻塞与非阻塞IO模型的关键概念,并通过对Select与非阻塞IO的案例分析,展示了这些概念在实际编程中的应用。希望读者通过本文能够获得对网络编程中IO模型的深入理解,并指导实践中的应用
Reference
- http://www.pandademo.com/2016/11/linux-kernel-select-source-dissect/
- https://www.jianshu.com/p/95b50b026895
- https://www.zhihu.com/question/19732473
- https://tubetrue01.github.io/articles/2021/08/16/c_unix/Socket(%E4%BA%8C)recv%E4%B8%8Esend%E5%87%BD%E6%95%B0/
- https://time.geekbang.org/column/intro/100032701
- https://github.com/froghui/yolanda