文章目录
- 1. 多路复用IO
- 2. 异步IO
- 3. 信号驱动IO
1. 多路复用IO
I/O多路复用这个术语可能对一些人来说比较陌生,但提到select/epoll,就容易理解了。在某些场景下,这种I/O方式也被称为事件驱动I/O(event-driven I/O)。我们都知道,select/epoll的优势在于单个进程可以同时处理多个网络连接的I/O。其基本原理是select/epoll函数会不断轮询所负责的所有套接字,当某个套接字有数据到达时,就通知用户进程。流程如下所示:
当用户进程调用select时,整个进程会被阻塞,同时内核会监视所有select负责的套接字,当任何一个套接字的数据准备好了,select便会返回。此时用户进程再调用read操作,将数据从内核拷贝到用户进程。
这个过程与阻塞I/O其实没有太大的不同,实际上还稍微差一点。因为这里需要使用两个系统调用(select和read),而阻塞I/O只调用了一个系统调用(read)。但是使用select后,最大的优势在于用户可以在一个线程内同时处理多个套接字的I/O请求。用户可以注册多个套接字,然后不断地调用select读取被激活的套接字,从而实现在同一个线程内同时处理多个I/O请求。在同步阻塞模型中,只能通过多线程的方式才能实现这个目标。(顺便提一下:所以,如果处理的连接数不是很高的话,使用select/epoll的Web服务器并不一定比使用多线程+阻塞I/O的Web服务器性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是能处理更多的连接。)
在多路复用模型中,对于每一个套接字,通常都设置为非阻塞(non-blocking),但是,如上所示,整个用户进程实际上是一直被阻塞的。只是进程是被select函数阻塞,而不是被套接字I/O阻塞。因此,select()与非阻塞I/O相似。
大部分Unix/Linux操作系统都支持select函数,该函数用于探测多个文件句柄的状态变化。下面给出select接口的原型:
FD_ZERO(int fd, fd_set* fds);
FD_SET(int fd, fd_set* fds);
FD_ISSET(int fd, fd_set* fds);
FD_CLR(int fd, fd_set* fds);
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
在这里,fd_set类型可以简单地理解为按位标记句柄的队列,例如要在某个fd_set中标记一个值为16的句柄,则该fd_set的第16个bit位被标记为1。具体的置位、验证可以使用FD_SET、FD_ISSET等宏实现。在select()函数中,readfds、writefds和exceptfds同时作为输入参数和输出参数。如果输入的readfds标记了16号句柄,则select()将检测16号句柄是否可读。在select()返回后,可以通过检查readfds是否标记16号句柄,来判断该“可读”事件是否发生。另外,用户可以设置timeout时间。
下面将重新模拟上例中从多个客户端接收数据的模型。
上述模型只是描述了使用select()接口同时从多个客户端接收数据的过程;由于select()接口可以同时对多个句柄进行读状态、写状态和错误状态的探测,所以可以很容易构建为多个客户端提供独立问答服务的服务器系统。
这里需要指出的是,客户端的一个connect()操作,将在服务器端激发一个“可读事件”,所以select()也能探测来自客户端的connect()行为。
上述模型中,最关键的地方是如何动态维护select()的三个参数readfds、writefds和exceptfds。作为输入参数,readfds应该标记所有需要探测的“可读事件”的句柄,其中永远包括那个探测connect()的那个“母”句柄;同时,writefds和exceptfds应该标记所有需要探测的“可写事件”和“错误事件”的句柄(使用FD_SET()标记)。
作为输出参数,readfds、writefds和exceptfds中保存了select()捕捉到的所有事件的句柄值。程序员需要检查所有的标记位(使用FD_ISSET()检查),以确定到底哪些句柄发生了事件。
上述模型主要模拟的是“一问一答”的服务流程,所以如果select()发现某句柄捕捉到了“可读事件”,服务器程序应及时进行recv()操作,并根据接收到的数据准备好待发送数据,并将对应的句柄值加入writefds,准备下一次的“可写事件”的select()探测。同样,如果select()发现某句柄捕捉到“可写事件”,则程序应及时进行send()操作,并准备好下一次的“可读事件”探测准备。
这种模型的特征在于每一个执行周期都会探测一次或一组事件,一个特定的事件会触发某个特定的响应。我们可以将这种模型归类为“事件驱动模型”。
相比其他模型,使用select()的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多 CPU,同时能够为多客户端提供服务。如果试图建立一个简单的事件驱动的服务器程序,这个模型有一定的参考价值。
但这个模型依旧有着很多问题。首先 select()接口并不是实现“事件驱动”的最好选择。因为当需要探测的句柄值较大时,select()接口本身需要消耗大量时间去轮询各个句柄。很多操作系统提供了更为高效的接口,如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll等。如果需要实现更高效的服务器程序,类似 epoll 这样的接口更被推荐。遗憾的是不同的操作系统特供的 epoll 接口有很大差异,所以使用类似于 epoll 的接口实现具有较好跨平台能力的服务器会比较困难。
其次,该模型将事件探测和事件响应夹杂在一起,一旦事件响应的执行体庞大,则对整个模型是灾难性的。如下例,庞大的执行体 1 将直接导致响应事件 2 的执行体迟迟得不到执行,并在很大程度上降低了事件探测的及时性。
幸运的是,有很多高效的事件驱动库可以屏蔽上述的困难,常见的事件驱动库有 libevent 库,还有作为 libevent 替代者的 libev 库。这些库会根据操作系统的特点选择最合适的事件探测接口,并且加入了信号(signal) 等技术以支持异步响应,这使得这些库成为构建事件驱动模型的不二选择。下章将介绍如何使用 libev 库替换 select 或 epoll
接口,实现高效稳定的服务器模型。
实际上,Linux 内核从 2.6 开始,也引入了支持异步响应的 IO 操作,如 aio_read, aio_write,这就是异步 IO。异步 IO 的优势在于应用程序无需等待 IO 操作的完成,而是在 IO 操作完成后得到通知。这使得应用程序可以在等待某个 IO 操作完成的同时,处理其他任务,从而提高了应用程序的执行效率。
总之,通过使用 select、epoll、kqueue 等 IO 多路复用技术或者异步 IO,可以实现事件驱动的服务器程序,以提高服务器程序的性能和扩展性。在实际应用中,可以根据具体需求和平台特性选择合适的事件驱动库或异步 IO 方法,以满足不同场景下的需求。
在构建高性能、可扩展的服务器程序时,还需要考虑其他一些关键因素。以下是一些建议和技巧:
- 负载均衡:在面对大量客户端连接时,合理地分配负载至关重要。可以使用轮询、最少连接、一致性哈希等负载均衡算法,将客户端请求分配到不同的服务器节点上,从而实现负载均衡。
- 缓存:缓存是提高服务器性能的关键手段。将经常访问的数据或计算结果缓存起来,可以减少服务器的计算负担和响应时间。可以使用本地缓存、分布式缓存等技术,如 Memcached、Redis 等。
- 连接池:为了减少建立和断开连接所产生的开销,可以使用连接池技术。连接池预先建立一定数量的连接,并在需要时将连接分配给请求。这种方式可以减少连接建立的时间开销,提高服务器性能。
- 线程和进程管理:合理地管理服务器程序中的线程和进程,可以提高服务器的处理能力。可以使用线程池、进程池等技术,避免频繁地创建和销毁线程或进程。
- 优化数据结构和算法:优化数据结构和算法是提高服务器程序性能的基础。使用合适的数据结构和算法,可以大幅度提高程序的执行效率。
- 分布式和微服务架构:随着业务的发展和系统复杂度的提高,可以考虑采用分布式架构和微服务架构。这些架构将系统拆分为多个独立的模块,每个模块负责特定的功能,从而提高整个系统的扩展性和可维护性。
- 性能监控和调优:定期对服务器程序进行性能监控,分析性能瓶颈,并进行相应的调优。可以使用性能监控工具、日志分析等方法,找出系统中的性能问题,并进行优化。
通过上述方法和技巧,可以在不同场景下构建高性能、可扩展的事件驱动服务器程序。在实际应用中,根据具体需求和场景选择合适的技术和策略,以满足服务器程序的性能和扩展性需求。
使用epoll创建一个简单的TCP回显服务器的例子:
它可以同时处理多个客户端连接,服务器使用epoll_create1
创建一个epoll实例,然后使用epoll_ctl
将监听套接字和客户端套接字添加到epoll实例中。使用epoll_wait
等待文件描述符状态变化,然后处理这些事件。
epoll.cpp
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <string.h>
const int MAX_EVENTS = 10;
int make_socket_non_blocking(int sfd) {
int flags = fcntl(sfd, F_GETFL, 0);
if (flags == -1) {
std::cerr << "Error: fcntl failed" << std::endl;
return -1;
}
flags |= O_NONBLOCK;
if (fcntl(sfd, F_SETFL, flags) == -1) {
std::cerr << "Error: fcntl failed" << std::endl;
return -1;
}
return 0;
}
int main() {
int sfd = socket(AF_INET, SOCK_STREAM, 0);
if (sfd == -1) {
std::cerr << "Error: socket creation failed" << std::endl;
return 1;
}
if (make_socket_non_blocking(sfd) == -1) {
close(sfd);
return 1;
}
sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_port = htons(8080);
if (bind(sfd, (sockaddr*)&addr, sizeof(addr)) == -1) {
std::cerr << "Error: bind failed" << std::endl;
close(sfd);
return 1;
}
if (listen(sfd, SOMAXCONN) == -1) {
std::cerr << "Error: listen failed" << std::endl;
close(sfd);
return 1;
}
int efd = epoll_create1(0);
if (efd == -1) {
std::cerr << "Error: epoll_create1 failed" << std::endl;
close(sfd);
return 1;
}
epoll_event event;
event.data.fd = sfd;
event.events = EPOLLIN | EPOLLET;
if (epoll_ctl(efd, EPOLL_CTL_ADD, sfd, &event) == -1) {
std::cerr << "Error: epoll_ctl failed" << std::endl;
close(sfd);
close(efd);
return 1;
}
epoll_event events[MAX_EVENTS];
while (true) {
int n = epoll_wait(efd, events, MAX_EVENTS, -1);
for (int i = 0; i < n; ++i) {
if (events[i].data.fd == sfd) {
// 新连接
while (true) {
sockaddr in_addr;
socklen_t in_len = sizeof(in_addr);
int infd = accept(sfd, &in_addr, &in_len);
if (infd == -1) {
break;
}
if (make_socket_non_blocking(infd) == -1) {
close(infd);
continue;
}
epoll_event event;
event.data.fd = infd;
event.events = EPOLLIN | EPOLLET;
if (epoll_ctl(efd, EPOLL_CTL_ADD, infd, &event) == -1) {
std::cerr << "Error: epoll_ctl failed" << std::endl;
close(infd);
continue;
}
}
} else {
// 处理已连接的客户端
char buf[1024];
ssize_t count = 0;
while (true) {
count = read(events[i].data.fd, buf, sizeof(buf));
if (count <= 0) {
break;
}
// 回显数据
ssize_t written = 0;
while (written < count) {
ssize_t n = write(events[i].data.fd, buf + written, count - written);
if (n == -1) {
break;
}
written += n;
}
}
if (count == 0 || (count == -1 && errno != EAGAIN)) {
// 断开连接
close(events[i].data.fd);
epoll_ctl(efd, EPOLL_CTL_DEL, events[i].data.fd, nullptr);
}
}
}
}
close(sfd);
close(efd);
return 0;
}
测试用的client.cc:
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <string.h>
int main() {
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd == -1) {
std::cerr << "Error: socket creation failed" << std::endl;
return 1;
}
sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(8080);
if (inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr) <= 0) {
std::cerr << "Error: inet_pton failed" << std::endl;
close(sockfd);
return 1;
}
if (connect(sockfd, (sockaddr*)&addr, sizeof(addr)) == -1) {
std::cerr << "Error: connect failed" << std::endl;
close(sockfd);
return 1;
}
std::string message;
char buffer[1024];
while (true) {
std::cout << "Enter message to send: ";
std::getline(std::cin, message);
if (message == "exit") {
break;
}
ssize_t sent = send(sockfd, message.c_str(), message.size(), 0);
if (sent == -1) {
std::cerr << "Error: send failed" << std::endl;
break;
}
ssize_t received = recv(sockfd, buffer, sizeof(buffer), 0);
if (received == -1) {
std::cerr << "Error: recv failed" << std::endl;
break;
}
std::cout << "Server response: ";
std::cout.write(buffer, received);
std::cout << std::endl;
}
close(sockfd);
return 0;
}
2. 异步IO
在Linux下,异步IO(asynchronous IO)主要用于磁盘IO读写操作,而不是网络IO。从2.6版本的内核开始引入。首先来看一下异步IO的流程:
用户进程发起读(read)操作后,可以立即开始执行其他任务。从内核(kernel)的角度看,当收到一个异步读请求(asynchronous read)后,它会立即返回,因此不会对用户进程产生任何阻塞(block)。接着,内核会等待数据准备完成,然后将数据拷贝到用户内存。当这些操作全部完成后,内核会向用户进程发送一个信号(signal),通知其读操作已经完成。
关于使用异步IO实现的服务器,这里就不举例了。后续有时间可以另开文章讲述。异步IO是真正非阻塞的,它不会对请求进程产生任何阻塞,因此对高并发网络服务器实现至关重要。
到目前为止,已经介绍了四种IO模型。现在回过头来回答最初的几个问题:阻塞(blocking)和非阻塞(non-blocking)的区别是什么?同步IO(synchronous IO)和异步IO(asynchronous IO)的区别是什么?
首先回答最简单的问题:阻塞与非阻塞。前面的介绍中已经很明确地说明了这两者的区别。调用阻塞IO会一直阻塞对应的进程,直到操作完成;而非阻塞IO在内核还在准备数据的情况下会立即返回。
两者的区别在于同步IO在进行“IO操作”时会阻塞进程。根据这个定义,前面所述的阻塞IO、非阻塞IO和IO多路复用(IO multiplexing)都属于同步IO。有人可能会说,非阻塞IO并没有被阻塞。这里有一个非常“狡猾”的地方,定义中所指的“IO操作”是指真实的IO操作,即示例中的读(read)系统调用。非阻塞IO在执行读系统调用时,如果内核的数据没有准备好,这时候不会阻塞进程。但是当内核中的数据准备好时,读操作会将数据从内核拷贝到用户内存中,这个时候进程是被阻塞的。而异步IO则不同,当进程发起IO操作后,就直接返回而不再理会,直到内核发送一个信号,告诉进程IO已经完成。在这整个过程中,进程完全没有被阻塞。
使用C++中的asio
库创建异步TCP回显服务器的例子:
-
安装
asio
库:-
通过包管理器安装:
对于某些Linux发行版,asio
库可能已经包含在软件仓库中。例如,在基于Debian的系统(如Ubuntu)上,可以使用apt
包管理器安装libasio-dev
:sudo apt-get update sudo apt-get install libasio-dev
在基于Fedora的系统上,可以使用
dnf
包管理器安装asio-devel
:sudo dnf install asio-devel
通过包管理器安装的
asio
库将自动添加到系统的头文件搜索路径中,因此无需手动指定。 -
手动安装:
如果你的Linux发行版没有预先打包的asio
库,或者你希望安装特定版本的asio
库,可以手动下载和安装。首先,访问
asio
库的官方网站:https://think-async.com/Asio/ ,在"Download"部分点击"Standalone",下载独立版的asio库。这个版本不依赖于Boost库,但需要C++11支持的编译器。下载完成后,解压缩这个文件。
接下来,将整个
asio
文件夹复制到你的项目目录下,或者将asio/include
目录添加到你的编译器/构建系统的头文件搜索路径中。以g++编译器为例,将
asio/include
添加到头文件搜索路径中,使用-I
选项指定头文件路径:g++ -std=c++11 -o server server.cc -I/home/ricky/asio/asio-1.26.0/include -lpthread
补充:
-I
选项指定头文件路径,-L
选项指定库文件路径,-l
选项指定要链接的库文件
-
-
异步TCP回显服务器代码:
#include <iostream> #include <asio.hpp> #include <memory> #include <thread> #include <chrono> using asio::ip::tcp; class EchoSession : public std::enable_shared_from_this<EchoSession> { public: EchoSession(tcp::socket socket) : socket_(std::move(socket)) { } void start() { read(); } private: void read() { auto self(shared_from_this()); socket_.async_read_some( asio::buffer(data_, max_length), [this, self](const std::error_code& error, std::size_t bytes_transferred) { if (!error) { write(data_, bytes_transferred); } }); } void write(const char* data, std::size_t bytes_transferred) { auto self(shared_from_this()); asio::async_write( socket_, asio::buffer(data, bytes_transferred), [this, self](const std::error_code& error, std::size_t /*bytes_transferred*/) { if (!error) { read(); } }); } tcp::socket socket_; enum { max_length = 1024 }; char data_[max_length]; }; class EchoServer { public: EchoServer(asio::io_context& io_context, short port) : acceptor_(io_context, tcp::endpoint(tcp::v4(), port)) { accept(); } private: void accept() { acceptor_.async_accept( [this](const std::error_code& error, tcp::socket socket) { if (!error) { std::make_shared<EchoSession>(std::move(socket))->start(); } accept(); }); } tcp::acceptor acceptor_; }; int main(int argc, char* argv[]) { try { if (argc != 2) { std::cerr << "Usage: async_tcp_echo_server <port>\n"; return 1; } asio::io_context io_context; EchoServer server(io_context, std::atoi(argv[1])); io_context.run(); } catch (std::exception& e) { std::cerr << "Exception: " << e.what() << "\n"; } return 0; }
-
编译代码:
g++ -std=c++11 -o server server.cc -I/home/ricky/asio/asio-1.26.0/include -lpthread
-
运行服务器:
运行编译后的可执行文件,并指定一个端口号。例如,使用端口号8080
:./server 8080
-
测试服务器
telnet localhost 8080
3. 信号驱动IO
首先,我们需要允许套接字(socket)进行信号驱动I/O(Signal-Driven I/O),并安装一个信号处理函数。这样,进程可以继续运行,不会被阻塞。当数据准备好时,进程会收到一个SIGIO信号。在信号处理函数中,我们可以调用I/O操作函数处理数据。当数据报准备好读取时,内核会为该进程产生一个SIGIO信号。我们可以在信号处理函数中调用read函数读取数据报,并通知主循环数据已准备好待处理;也可以立即通知主循环,让它来读取数据报。无论如何处理SIGIO信号,这种模型的优势在于等待数据报到达(第一阶段)期间,进程可以继续执行,不被阻塞。这样避免了select的阻塞与轮询,当有活跃套接字时,由注册的处理函数(handler)处理。
通过上面的介绍,我们可以发现非阻塞IO(non-blocking IO)和异步IO(asynchronous IO)之间的区别是很明显的。在非阻塞IO中,尽管进程大部分时间不会被阻塞,但它仍然需要主动检查,并且在数据准备完成后,也需要进程主动再次调用recvfrom函数将数据拷贝到用户内存。而异步IO则完全不同。它就像是用户进程将整个IO操作交给了他人(内核)来完成,然后在操作完成后,内核通过信号通知用户进程。在此期间,用户进程无需检查IO操作的状态,也不需要主动拷贝数据。
C++中使用信号驱动I/O来处理TCP连接的例子:
信号驱动I/O依赖于操作系统的信号机制,在C++中可以使用<signal.h>库来实现信号驱动I/O。
#include <iostream>
#include <signal.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <cstring>
void sigio_handler(int sig);
int sockfd;
int main() {
struct sockaddr_in server_addr;
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
std::cerr << "Error: Unable to create socket" << std::endl;
return 1;
}
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(8080);
if (bind(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
std::cerr << "Error: Unable to bind socket" << std::endl;
return 1;
}
if (listen(sockfd, 5) < 0) {
std::cerr << "Error: Unable to listen on socket" << std::endl;
return 1;
}
signal(SIGIO, sigio_handler);
fcntl(sockfd, F_SETOWN, getpid());
int flags = fcntl(sockfd, F_GETFL);
fcntl(sockfd, F_SETFL, flags | O_ASYNC);
while (1) {
pause();
}
return 0;
}
void sigio_handler(int sig) {
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
int client_socket = accept(sockfd, (struct sockaddr *)&client_addr, &client_addr_len);
if (client_socket < 0) {
std::cerr << "Error: Unable to accept connection" << std::endl;
return;
}
char buffer[256];
memset(buffer, 0, 256);
int n = read(client_socket, buffer, 255);
if (n < 0) {
std::cerr << "Error: Unable to read from client" << std::endl;
return;
}
std::cout << "Received message: " << buffer << std::endl;
close(client_socket);
}
对这个TCP服务器进行测试,这次我们使用telnet或nc命令从命令行连接到服务器:
打开一个新的命令行终端,然后运行以下命令:
telnet localhost 8080
或者
nc localhost 8080
命令行将尝试连接到服务器。一旦连接成功,可以输入一些文本并按回车键发送给服务器。服务器将在控制台输出接收到的消息。
server: