写文章
深入理解Windows异步机制:IOCP的工作原理与应用
目录
收起
一、IOCP简介
二、IOCP工作流程
三、IOCP的使用
四、IOCP案例实战
五、常见问题和解答
Windows异步机制中的IOCP(Input/Output Completion Port)是一种高效的异步编程模型,特别适用于处理大量并发IO操作的场景,如网络通信和文件IO等。
在使用IOCP进行异步编程时,首先需要创建一个IOCP对象作为事件通知和管理的中心。然后将需要进行异步IO操作的句柄(如套接字、文件句柄等)关联到该IOCP对象上。当有新的IO操作完成时,系统会向相关联的IOCP对象投递一个完成包(Completion Packet),应用程序可以通过调用GetQueuedCompletionStatus函数来获取这些完成包。
通过使用回调函数或线程池技术,在应用程序中可以针对每个完成包做出相应的处理。这种方式避免了传统同步IO阻塞的问题,提升了系统资源利用率和响应速度。
一、IOCP简介
IOCP模型属于一种通讯模型,适用于Windows平台下高负载服务器的一个技术。在处理大量用户并发请求时,如果采用一个用户一个线程的方式那将造成CPU在这成千上万的线程间进行切换,后果是不可想象的。而IOCP完成端口模型则完全不会如此处理,它的理论是并行的线程数量必须有一个上限-也就是说同时发出500个客户请求,不应该允许出现500个可运行的线程。目前来说,IOCP完成端口是Windows下性能最好的I/O模型,同时它也是最复杂的内核对象。它避免了大量用户并发时原有模型采用的方式,极大的提高了程序的并行处理能力。
(1)原理图
一共包括三部分:完成端口(存放重叠的I/O请求),客户端请求的处理,等待者线程队列(一定数量的工作者线程,一般采用CPU*2个)
完成端口中所谓的[端口]并不是我们在TCP/IP中所提到的端口,可以说是完全没有关系。它其实就是一个通知队列,由操作系统把已经完成的重叠I/O请求的通知放入其中。当某项I/O操作一旦完成,某个可以对该操作结果进行处理的工作者线程就会收到一则通知。
通常情况下,我们会在创建一定数量的工作者线程来处理这些通知,也就是线程池的方法。线程数量取决于应用程序的特定需要。理想的情况是,线程数量等于处理器的数量,不过这也要求任何线程都不应该执行诸如同步读写、等待事件通知等阻塞型的操作,以免线程阻塞。每个线程都将分到一定的CPU时间,在此期间该线程可以运行,然后另一个线程将分到一个时间片并开始执行。如果某个线程执行了阻塞型的操作,操作系统将剥夺其未使用的剩余时间片并让其它线程开始执行。也就是说,前一个线程没有充分使用其时间片,当发生这样的情况时,应用程序应该准备其它线程来充分利用这些时间片。
(2) IOCP优点
基于IOCP的开发是异步IO的,决定了IOCP所实现的服务器的高吞吐量,通过引入IOCP,会大大减少Thread切换带来的额外开销,最小化的线程上下文切换,减少线程切换带来的巨大开销,让CPU把大量的事件用于线程的运行。当与该完成端口相关联的可运行线程的总数目达到了该并发量,系统就会阻塞。
I/O 完成端口可以充分利用 Windows 内核来进行 I/O 调度,相较于传统的 Winsock 模型,IOCP 在机制上有明显的优势。
相较于传统的Winsock模型,IOCP的优势主要体现在两方面:独特的异步I/O方式和优秀的线程调度机制。
独特的异步I/O方式
IOCP模型在异步通信方式的基础上,设计了一套能够充分利用Windows内核的I/O通信机制,主要过程为:
- ① socket关联iocp
- ② 在socket上投递I/O请求
- ③ 事件完成返回完成通知封包
- ④ 工作线程在iocp上处理事件
IOCP的这种工作模式:程序只需要把事件投递出去,事件交给操作系统完成后,工作线程在完成端口上轮询处理。该模式充分利用了异步模式高速率输入输出的优势,能够有效提高程序的工作效率。
优秀的线程调度机制
完成端口可以抽象为一个公共消息队列,当用户请求到达时,完成端口把这些请求加入其抽象出的公共消息队列。这一过程与多个工作线程轮询消息队列并从中取出消息加以处理是并发操作。这种方式很好地实现了异步通信和负载均衡,因为它使几个线程“公平地”处理多客户端的I/O,并且线程空闲时会被挂起,不会占用CPU周期。
IOCP模型充分利用Windows系统内核,可以实现仅用少量的几个线程来处理和多个client之间的所有通信,消除了无谓的线程上下文切换,最大限度的提高了网络通信的性能。
(3)IOCP应用
1.创建和关联完成端口
//功能:创建完成端口和关联完成端口
HANDLE WINAPI CreateIoCompletionPort(
* __in HANDLE FileHandle, // 已经打开的文件句柄或者空句柄,一般是客户端的句柄
* __in HANDLE ExistingCompletionPort, // 已经存在的IOCP句柄
* __in ULONG_PTR CompletionKey, // 完成键,包含了指定I/O完成包的指定文件
* __in DWORD NumberOfConcurrentThreads // 真正并发同时执行最大线程数,一般推介是CPU核心数*2
* );
//创建完成端口句柄
HANDLE completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
2.与socket进行关联
typedef struct{
SOCKET socket;//客户端socket
SOCKADDR_STORAGE ClientAddr;//客户端地址
}PER_HANDLE_DATA, *LPPER_HANDLE_DATA;
//与socket进行关联
CreateIoCompletionPort((HANDLE)(PerHandleData -> socket),
completionPort, (DWORD)PerHandleData, 0);
3.获取队列完成状态
//功能:获取队列完成状态
/*
返回值:
调用成功,则返回非零数值,相关数据存于lpNumberOfBytes、lpCompletionKey、lpoverlapped变量中。失败则返回零值。
*/
BOOL GetQueuedCompletionStatus(
HANDLE CompletionPort, //完成端口句柄
LPDWORD lpNumberOfBytes, //一次I/O操作所传送的字节数
PULONG_PTR lpCompletionKey, //当文件I/O操作完成后,用于存放与之关联的CK
LPOVERLAPPED *lpOverlapped, //IOCP特定的结构体
DWORD dwMilliseconds); //调用者的等待时间
/*
4.用于IOCP的特点函数
//用于IOCP的特定函数
typedef struct _OVERLAPPEDPLUS{
OVERLAPPED ol; //一个固定的用于处理网络消息事件返回值的结构体变量
SOCKET s, sclient; int OpCode; //用来区分本次消息的操作类型(在完成端口的操作里面, 是以消息通知系统,读数据/写数据,都是要发这样的 消息结构体过去的)
WSABUF wbuf; //读写缓冲区结构体变量
DWORD dwBytes, dwFlags; //一些在读写时用到的标志性变量
}OVERLAPPEDPLUS;
5.投递一个队列完成状态
//功能:投递一个队列完成状态
BOOL PostQueuedCompletionStatus(
HANDLE CompletlonPort, //指定想向其发送一个完成数据包的完成端口对象
DW0RD dwNumberOfBytesTrlansferred, //指定—个值,直接传递给GetQueuedCompletionStatus 函数中对应的参数
DWORD dwCompletlonKey, //指定—个值,直接传递给GetQueuedCompletionStatus函数中对应的参数
LPOVERLAPPED lpoverlapped, ); //指定—个值,直接传递给GetQueuedCompletionStatus 函数中对应的参数
二、IOCP工作流程
iocp的特性:高并发,异步,高效。
我们先看iocp的整个流程:
- 主线程接收客户端链接,绑定到iocp对象,异步接收消息。继续接收客户端链接。
- 工作线程获取iocp对象绑定的socket网络数据。处理数据,异步接收数据或者断开链接。 继续工作线程获取iocp对象绑定的socket网络数据。
由此,高并发就很好理解了。线程不断接收链接,有多少我都扔给iocp对象,有工作线程去做处理。所以并发量没有上限,这个上限是硬件的上限。
而异步是windows系统提供了一个异步接收网络数据函数。高效的原因在于,整个iocp流程没有一处程序需要被迫阻塞的地方。阻塞的逻辑由操作系统完成了。所有工作线程都能满负荷的无锁的去运行。原因我在多线程读取同一个socket数据会分析。当你看了其它的数据收发方式,比较之后,就会有这就是我想要的方案的感受。原因我在如何实现iocp会分析。
多线程读取同一个socket的数据:
这是一个很让人担心的地方,不同线程拿到我还没接收完的socket网络数据,数据怎么保证有序呢?其实这个问题并不存在。我们socket每一次异步接收数据,那么当数据收到之后,就会扔进iocp对象绑定的socket网络数据列表。也就是说,iocp对象绑定的socket网络数据列表同一时刻,同一个socket只会存在一份网络数据在里面。除非你愚蠢的用同一个socket多次异步接收数据。所以我们多线程就不会存在同时两个线程在处理同一个socket的数据。那么我们问题的关键地方在于如何知道我们当前socket上一次接收的数据在哪。其实去看例子不难明白每一次都有一个socket绑定的数据指针对象能一起拿到。那么我们只要多添加一个总的消息buffer就好了。当消息接收完全,清空总消息buffer就万事成矣。
接收消息buffer大小问题:
不同于同步接收网络数据,我可以不断receive,直到自己判断数据接收完了。可惜异步编程的无奈就在于必须要给一个数据缓冲区大小。当然我们希望buffer越大越好,能够一次性接收到所有数据。这肯定是不可取的,因为每一个socket链接过来,我们都要创建一个数据buffer,我们来算一算,如果一个链接我们用1MB内存,那么百万并发我们需要900多g的内存。那么高并发显然就不现实了。 所以对于iocp来说,消息buffer尽量小。虽然额外会增加一个buffer,但是并不是所有链接都会发那么大量的数据。
(1)IOCP的使用
- 创建一个新的完成端口。完成端口被设计成与一个线程池相互合作,线程池的线程并发的用来处理完成的 IO 通知。CreateIoCompletionPort这个 API 用于创建 IOCP, 最后一个参数则是指定线程池中线程个数,一般来说取 CPU * 2 ,这样可以最充分使用多核 CPU ,又降低了线程间的切换。CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, dwNumberOfConcurrentThreads)
- 创建工作线程。
- 关联一个 IO 设备到完成端口。也是调用CreateIoCompletionPort(API 设计有些太随意了吧,难道有什么历史原因?)。
HANDLE h = CreateIoCompletionPort(hDevice, hCompletionPort, dwCompletionKey, 0); - 使用 overlapped IO ,例如 socket 的 WSARecv/WSASend,甚至 AcceptEx 和 ConnectEx。这些调用都只是发起一个 IO 请求然后立即返回。函数调用都需要初始化一个 OVERLAPPED 结构体,后面有提到其作用。
- 工作线程是一个 loop, 阻塞在 GetQueuedCompletionStatus 调用上。GetQueuedCompletionStatus 返回时从 IO 完成队列中取出一个 completion packet。线程池线程阻塞时是由系统负责完成调度的。
(2)IOCP 内部的一些数据结构
- Device List:包含所有与完成端口相关联的设备的一个列表。
- I/O Completion Queue(FIFO):当一个异步 IO 请求完成了,系统会去检查是否这个 IO 设备与任何 IO 完成端口关联了,如果是,系统会在 IO 完成端口队列的末尾添加一个 completion packet(以 FIFO 的顺序入队),GetQueuedCompletionStatus 就是在这个队列上等待。
- IOCP 关联线程等待队列:线程池中的线程调用GetQueuedCompletionStatus时,就会被放进一个等待队列,IO 完成端口内核对象根据此队列知道有哪些线程在等待处理completion packet。线程等待队列是按照 LIFO 的方式入队的,也就是当有一个 completion packet 到来时,系统先唤醒最后调用GetQueuedCompletionStatus进入等待队列的线程。
(3)IOCP和线程池的相互作用
- 任何线程都可以调用
GetQueuedCompletionStatus
来与一个 IO 完成端口关联起来,但是一个线程只能关联一个 IOCP,当线程退出或者指定了其他的 IOCP或者关闭了 IOCP,线程才与这个 IOCP 解开绑定。 - 创建 IOCP 的时候会指定一个并发值,虽然任意个线程可以关联到这个 IOCP,但是并发值限定了可以同时运行的线程数。假设这样的场景,有一个并发值为 1 的 IOCP,但是有多于一个的线程关联到了这个 IOCP,如果完成队列中总是有一个 completion packet 在等待,当正在运行的线程调用
GetQueuedCompletionStatus
时就会立即返回,该线程处理完这个 completion packet 再次调用GetQueuedCompletionStatus
又会立即返回。在处理 completion packet过程中,虽然完成队列中始终有 completion packet 待处理,但是因为并发值为 1 的原因,系统不会去调度其他线程来执行,尽管关联 IOCP 的线程不止一个。同时也避免了线程切换的开销,因为始终都是这一个线程在执行。 - 在上述情况中,看起来线程池中关联的其他线程毫无用处,但是其实是没有考虑到正在运行的线程进入等待状态或者因为某种情况与该 IOCP 解除绑定时的情况。如果正在运行的线程调用
Sleep
,WaitFor*
,或者一个同步 IO 函数,或者任何可以引起当前线程从运行状态变为等待状态的函数时,IOCP 就会立即调度其他关联的线程,维持始终有一个线程在运行。
(4)IOCP 使用过程中遇到的问题
- 因为涉及到多线程会比 epoll + 单线程要编码复杂。
- API 设计比较糟糕,这也加大了编码难度。
- 文档描述不清晰,甚至没有一个官方的示例程序,非官方文档或者程序或多或少有些错误,让人难以放心使用。以 WSARecv 为例子,MSDN上描述若 WSARecv 能立即返回,返回值为 0。这是不是意味着程序要在两处处理 IO 完成的情况,一处是 IO 立即返回时,一处是工作线程
GetQueuedCompletionStatus
等待 IOCP 完成队列处。几乎所有的异步 IO 函数都是如此。但是所幸似乎即使立即返回 0 ,完成队列中也会有一个 completion packet,所以只在工作线程中的完成队列中等待 IO 完成也不会出错。 - 一般的使用 TCP 进行通讯的网络程序,因为 TCP 流无界的特性,都会自定义成这样的应用网络数据包:前面几个字节代表该包的长度,后面就是该包真正的内容。应用程序在解包的时候,对应的要先获取包的长度,再截取对应长度的包数据。这样的过程在多线程的 IOCP 会比较困难,多个线程取到了各个数据包的不同部分,而且因为 completion packet 的出队顺序并不能保证,各个线程获取的数据包之间的顺序已经丢失了。因此,必须想办法解决包的顺序问题,而且解包过程需要同步各个线程。这样无疑使得代码变得更复杂。
- IOCP 作为异步 IO ,可以非常方便的发起 IO ,但是每次发起 IO 时候都必须提交一段用户内存,在 IO 完成之前这段内存必须是被锁住的,既你不能再使用。当然这不是 IOCP 的问题,这是异步 IO特性决定的。
(5)一个收发 TCP 应用协议包的程序示例
- 协议包定义成头两个字节保存包长度 len,包头后面 len 字节是包的具体内容。为了简化编码,又能利用到 IOCP 一些特性,决定只启动一个工作线程处理所有的 IO 完成操作,发包和收包都是非阻塞的异步调用。
- 提交给异步 IO 的 buffer,都是从一段预先分配的内存中取出来的,这样使得IO 操作使用的内存是可控的,并且不会有内存碎片,充分使用内存。
- IOCP 的几个核心 API 都与参数 completionKey, overlapped 有关。在程序中 completionKey 可以对应是对哪个 socket 进行操作,overlapped 则对应成具体哪一个 IO 操作。
- 同一时间只允许一个同类的 IO 操作(读或者写)在提交。
#include <iostream>
#include <string>
#include <cstring>
#include <sys/socket.h>
#include <arpa/inet.h>
const int BUFFER_SIZE = 1024;
void send_message(int sock, const std::string& message) {
// 发送消息
send(sock, message.c_str(), message.length(), 0);
}
std::string receive_message(int sock) {
// 接收消息
char buffer[BUFFER_SIZE];
memset(buffer, 0, BUFFER_SIZE);
int bytesReceived = recv(sock, buffer, BUFFER_SIZE - 1, 0);
if (bytesReceived <= 0) {
return "";
}
return std::string(buffer);
}
// 客户端代码示例
void client() {
const char* serverIP = "127.0.0.1";
int serverPort = 8888;
// 创建套接字
int clientSock = socket(AF_INET, SOCK_STREAM, 0);
// 设置服务器地址和端口信息
sockaddr_in serverAddress;
serverAddress.sin_family = AF_INET;
serverAddress.sin_port = htons(serverPort);
if (inet_pton(AF_INET, serverIP , &(serverAddress.sin_addr)) <= 0) {
std::cerr << "Failed to setup server address." << std::endl;
return;
}
// 连接到服务器
if (connect(clientSock , (struct sockaddr *)&serverAddress , sizeof(serverAddress)) < 0) {
std::cerr << "Failed to connect to the server." << std::endl;
return;
}
// 发送请求数据给服务器
std::string request = "Hello Server!";
send_message(clientSock, request);
// 接收服务器的响应数据
std::string response = receive_message(clientSock);
std::cout << "Received from server: " << response << std::endl;
close(clientSock);
}
// 服务端代码示例
void server() {
const char* bindIP = "127.0.0.1";
int bindPort = 8888;
// 创建套接字
int serverSock = socket(AF_INET, SOCK_STREAM, 0);
// 设置绑定地址和端口信息
sockaddr_in serverAddress;
serverAddress.sin_family = AF_INET;
serverAddress.sin_port = htons(bindPort);
if (inet_pton(AF_INET, bindIP , &(serverAddress.sin_addr)) <= 0) {
std::cerr << "Failed to setup binding address." << std::endl;
return;
}
// 绑定到指定地址和端口上
if (bind(serverSock, (struct sockaddr *)&serverAddress, sizeof(serverAddress)) < 0) {
std::cerr << "Failed to bind the socket." << std::endl;
return;
}
// 开始监听客户端连接请求
listen(serverSock, SOMAXCONN);
std::cout << "Server listening on " << bindIP << ":" << bindPort << std::endl;
while (true) {
// 接受客户端连接
sockaddr_in clientAddress;
socklen_t clientAddrLen = sizeof(clientAddress);
int clientSock = accept(serverSock, (struct sockaddr *)&clientAddress, &clientAddrLen);
if (clientSock < 0) {
std::cerr << "Failed to accept client connection." << std::endl;
continue;
}
// 接收客户端的请求数据
std::string request = receive_message(clientSock);
std::cout << "Received from client: " << request << std::endl;
// 处理请求并返回响应数据
std::string response = "Hello Client!";
// 发送响应给客户端
send_message(clientSock, response);
close(clientSock);
}
close(serverSock);
}
int main() {
// 启动服务器和客户端
server();
client();
return 0;
}
上述示例包括一个简单的服务器和客户端代码。服务器使用socket()函数创建套接字,并绑定到指定的IP地址和端口号,然后通过listen()开始监听连接请求。在接收到客户端连接后,从该连接套接字中接收消息,并发送响应消息给客户端。客户端同样使用socket()函数创建套接字,并连接到服务器指定的IP地址和端口号,发送请求消息给服务器,并等待服务器回复。注意将 serverIP 和 serverPort 设置为实际需要的值。服务端程序比较简单,可以自己实现并验证。
三、IOCP的使用
初次学习使用IOCP的朋友在熟悉各个API时,建议参看MSDN的官方文档。
IOCP的使用主要分为以下几步:
- 创建完成端口(iocp)对象
- 创建一个或多个工作线程,在完成端口上执行并处理投递到完成端口上的I/O请求
- Socket关联iocp对象,在Socket上投递网络事件
- 工作线程调用GetQueuedCompletionStatus函数获取完成通知封包,取得事件信息并进行处理
(1)创建完成端口对象
使用IOCP模型,首先要调用 CreateIoCompletionPort 函数创建一个完成端口对象,Winsock将使用这个对象为任意数量的套接字句柄管理 I/O 请求。函数定义如下:
HANDLE WINAPI CreateIoCompletionPort(
_In_ HANDLE FileHandle,
_In_opt_ HANDLE ExistingCompletionPort,
_In_ ULONG_PTR CompletionKey,
_In_ DWORD NumberOfConcurrentThreads
);
此函数的两个不同功能:
- 创建一个完成端口对象
- 将一个或多个文件句柄(这里是套接字句柄)关联到 I/O 完成端口对象
最初创建完成端口对象时,唯一需要设置的参数是 NumberOfConcurrentThreads,该参数定义了 允许在完成端口上同时执行的线程的数量。理想情况下,我们希望每个处理器仅运行一个线程来为完成端口提供服务,以避免线程上下文切换。NumberOfConcurrentThreads 为0表示系统允许的线程数量和处理器数量一样多。因此,可以简单地使用以下代码创建完成端口对象,取得标识完成端口的句柄。
HANDLE m_hCompletion = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE,0,0,0);
(2)I/O工作线程和完成端口
I/O 工作线程在完成端口上执行并处理投递的I/O请求。关于工作线程的数量,要注意的是,创建完成端口时指定的线程数量和这里要创建的线程数量不是一回事。CreateIoCompletionPort 函数的 NumberOfConcurrentThreads 参数明确告诉系统允许在完成端口上同时运行的线程数量。如果创建的线程数量多于 NumberOfConcurrentThreads,也仅有NumberOfConcurrentThreads 个线程允许运行。
但也存在确实需要创建更多线程的特殊情况,这主要取决于程序的总体设计。如果某个线程调用了一个函数,如 Sleep 或 WaitForSingleObject,进入了暂停状态,多出来的线程中就会有一个开始运行,占据休眠线程的位置。
有了足够的工作线程来处理完成端口上的 I/O 请求后,就该为完成端口关联套接字句柄了,这就用到了 CreateCompletionPort 函数的前3个参数。
- FileHandle:要关联的套接字句柄
- ExistingCompletionPort:要关联的完成端口对象句柄
- CompletionKey:指定一个句柄唯一(per-handle)数据,它将与FileHandle套接字句柄关联在一起
(3)完成端口和重叠I/O
向完成端口关联套接字句柄之后,便可以通过在套接字上投递重叠发送和接收请求处理 I/O。在这些 I/O 操作完成时,I/O 系统会向完成端口对象发送一个完成通知封包。I/O 完成端口以先进先出的方式为这些封包排队。工作线程调用 GetQueuedCompletionStatus 函数可以取得这些队列中的封包。函数定义如下:
BOOL GetQueuedCompletionStatus(
[in] HANDLE CompletionPort,
LPDWORD lpNumberOfBytesTransferred,
[out] PULONG_PTR lpCompletionKey,
[out] LPOVERLAPPED *lpOverlapped,
[in] DWORD dwMilliseconds
);
参数说明
- CompletionPort:完成端口对象句柄
- lpNumberOfBytesTransferred:I/O操作期间传输的字节数
- lpCompletionKey:关联套接字时指定的句柄唯一数据
- lpOverlapped:投递 I/O 请求时使用的重叠对象地址,进一步得到 I/O 唯一(per-I/O)数据
lpCompletionKey 参数包含了我们称为 per-handle 的数据,该数据在套接字第一次关联到完成端口时传入,用于标识 I/O 事件是在哪个套接字句柄上发生的。可以给这个参数传递任何类型的数据。
lpOverlapped 参数指向一个 OVERLAPPED 结构,结构后面便是我们称为per-I/O的数据,这可以是工作线程处理完成封包时想要知道的任何信息。
per-handle数据和per-I/O数据结构类型示例
#define BUFFER_SIZE 1024
//per-handle 数据
typedef struct _PER_HANDLE_DATA
{
SOCKET s; //对应的套接字句柄
SOCKADDR_IN addr; //客户端地址信息
}PER_HANDLE_DATA,*PPER_HANDLE_DATA;
//per-I/O 数据
typedef struct _PER_IO_DATA
{
OVERLAPPED ol; //重叠结构
char buf[BUFFER_SIZE]; //数据缓冲区
int nOperationType; //I/O操作类型
#define OP_READ 1
#define OP_WRITE 2
#define OP_ACCEPT 3
}PER_IO_DATA,*PPER_IO_DATA;
(4)示例程序
主线程首先创建完成端口对象,创建工作线程处理完成端口对象中的事件;然后创建监听套接字,开始监听服务端口;循环处理到来的连接请求,该过程具体如下:
- 调用 accept 函数等待接受未决的连接请求
- 接受新连接后,创建 per-handle 数,并将其关联到完成端口对象
- 在新接受的套接字上投递一个接收请求,该I/O完成后,由工作线程负责处理
void main()
{
int nPort = 4567;
HANDLE hCompletion = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0); //创建完成端口对象
::CreateThread(NULL, 0, ServerThread, (LPVOID)hCompletion, 0, 0); //创建工作线程
//创建监听套接字,绑定到本地地址,开始监听
SOCKET sListen = ::socket(AF_INET, SOCK_STREAM, 0);
SOCKADDR_IN si;
si.sin_family = AF_INET;
si.sin_port = ::ntohs(nPort);
si.sin_addr.S_un.S_addr = INADDR_ANY;
::bind(sListen, (sockaddr*)&si, sizeof(si));
::listen(sListen, 5);
//循环处理到来的连接
while (true) {
//等待接受未决的连接请求
SOCKADDR_IN saRemote;
int nRemoteLen = sizeof(saRemote);
SOCKET sNew = ::accept(sListen, (sockaddr*)&saRemote, &nRemoteLen);
//接受到新连接之后,为它创建一个per-handle数据,并将它们关联到完成端口对象
PPER_HANDLE_DATA pPerHandle = (PPER_HANDLE_DATA)::GlobalAlloc(GPTR, sizeof(PER_HANDLE_DATA));
pPerHandle->s = sNew;
memcpy(&pPerHandle->addr, &saRemote, nRemoteLen);
::CreateIoCompletionPort((HANDLE)pPerHandle->s, hCompletion, (DWORD)pPerHandle, 0);
//投递一个接收请求
PPER_IO_DATA pPerIO = (PPER_IO_DATA)::GlobalAlloc(GPTR, sizeof(PER_IO_DATA));
pPerIO->nOperationType = OP_READ;
WSABUF buf;
buf.buf = pPerIO->buf;
buf.len = BUFFER_SIZE;
DWORD dwRecv;
DWORD dwFlags = 0;
::WSARecv(pPerHandle->s, &buf, 1, &dwRecv, &dwFlags, &pPerIO->ol, NULL);
}
}
I/O 工作线程循环调用 GetQueuedCompletionStatus 函数从 I/O 完成端口移除完成的 I/O 通知封包,解析并进行处理。
DWORD WINAPI ServerThread(LPVOID lpParam)
{ //得到完成端口对象句柄
HANDLE hCompletion = (HANDLE)lpParam;
DWORD dwTrans;
PPER_HANDLE_DATA pPerHandle;
PPER_IO_DATA pPerIO;
while (true) {
//在关联到此完成端口的所有套接字上等待I/O完成
BOOL bOK = ::GetQueuedCompletionStatus(hCompletion, &dwTrans, (PULONG_PTR)&pPerHandle, (LPOVERLAPPED*)&pPerIO, WSA_INFINITE);
if (!bOK) {
//在此套接字上由错误发生
::closesocket(pPerHandle->s);
::GlobalFree(pPerHandle);
::GlobalFree(pPerIO);
continue;
}
if (dwTrans == 0 && (pPerIO->nOperationType == OP_READ || pPerIO->nOperationType == OP_WRITE)) {
::closesocket(pPerHandle->s);
::GlobalFree(pPerHandle);
::GlobalFree(pPerIO);
continue;
}
switch (pPerIO->nOperationType)
{ //通过per-IO数据中的nOperationType域查看有什么I/O请求完成了
case OP_READ: //完成一个接收请求
{
pPerIO->buf[dwTrans] = '\0';
cout << "接收到数据:" << pPerIO->buf << endl;
cout << "共有" << dwTrans << "字符" << endl;
//继续投递接收I/O请求
WSABUF buf;
buf.buf = pPerIO->buf;
buf.len = BUFFER_SIZE;
pPerIO->nOperationType = OP_READ;
DWORD nFlags = 0;
::WSARecv(pPerHandle->s, &buf, 1, &dwTrans, &nFlags, &pPerIO->ol, NULL);
}
break;
case OP_WRITE: //本例中没有投递这些类型的I/O请求
case OP_ACCEPT: break;
}
}
return 0;
}
(5)恰当地关闭IOCP
关闭 I/O 完成端口时,特别是有多个线程在socket上执行 I/O 时,要避免当重叠操作正在进行时释放它的 OVERLAPPED 结构。阻止该情况发生的最好方法是在每个 socket 上调用 closesocket 函数,确保所有未决的重叠 I/O 操作都会完成。
一旦所有socket关闭,就该终止完成端口上处理 I/O 事件的工作线程了。可以通过调用 PostQueuedCompletionStatus 函数发送特定的完成封包来实现。所有工作线程都终止之后,可以调用 CloseHandle 函数关闭完成端口。
四、IOCP案例实战
IOCP以异步处理网络I/O事件、优秀的线程调度等机制,成为Windows环境下性能最优秀的网络通信模型之一。但结合不同的应用场景,IOCP也需要合理的使用方式才能发挥其性能优势。
前段时间需要开发一个Windows环境下的网络通信综合集成系统,作者通过IOCP模型,设计实现了一个用于网络通信的底层模块,这个模块同时支持tcp、udp、广播、udp组播等多种通信方式,每种通信方式对应不同的事件处理机制,能够支持高性能的网络通信,并为上层模块提供服务。
支持多通信方式的模块设计
IOCP模型的工作机制如下图所示。Socket关联完成端口对象(IOCP)之后,由系统调度工作线程,在IOCP下轮询接收消息通知,并根据通知内容对I/O事件进行处理。
但根据作者的应用场景,模块要能够同时支持tcp、udp、广播、udp组播等多种通信方式,且每种方式的事件处理机制不完全相同,单IOCP下的工作线程无法同时进行多种不同机制的事件处理,且不同机制对工作线程的需求也并不相同,因此设计了一种基于多个IOCP实现不同通信方式的方案,具体实现如下图所示:
如上图所示,针对tcp、udp、广播使用不同的处理机制,单个IOCP下的工作线程无法同时满足多种通信方式的不同事件处理机制,因此采用3个IOCP来对不同通信协议的事件进行区分处理。
- 1.TCP Socket关联IOCP 1,用一个工作线程进行TCP事件的处理。这里只使用1个工作线程的原因是:防止传输过程中,某个数据包只发送成功部分数据,可能导致的数据乱序问题。例如:如果有两个工作线程A和B,都在进行tcp send事件。如果此时A线程发送一个长度为1000字节的数据包a,但只发送成功了前800字节的内容(网络质量较差时可能会出现该问题)。此时B也在进行另一长度也为1000字节的数据包b的发送。则此时可能会出现:数据包b的1000字节内容,会比数据包a的剩余200字节内容先到达接收端缓冲区,造成数据乱序。
- 2.UDP Socket关联IOCP 2,用多个工作线程进行UDP事件的处理,由于UDP的包式传输,不存在上述TCP协议的问题,因此可用多个工作线程提升udp事件处理的性能上限。
- 3.广播Socket关联IOCP 3,用一个工作线程处理广播事件,这里只使用一个工作线程是出于广播数据较少,节约系统资源的考量。
- 4.UDP组播方式的Socket和普通UDP一样,关联IOCP 2,因为两者的事件处理机制相同。
五、常见问题和解答
(1)服务器的吞吐量问题
我们都知道,基于IOCP的开发是异步IO的,也正是这一技术的本质,决定了IOCP所实现的服务器的高吞吐量。
我们举一个及其简化的例子,来说明这一问题。在网络服务器的开发过程中,影响其性能吞吐量的,有很多因素,在这里,我们只是把关注点放在两个方面,即:网络IO速度与Disk IO速度。我们假设:在一个千兆的网络环境下,我们的网络传输速度的极限是大概125M/s,而Disk IO的速度是10M/s。在这样的前提下,慢速的Disk 设备会成为我们整个应用的瓶颈。我们假设线程A负责从网络上读取数据,然后将这些数据写入Disk。如果对Disk的写入是同步的,那么线程A在等待写完Disk的过程是不能再从网络上接受数据的,在写入Disk的时间内,我们可以认为这时候Server的吞吐量为0(没有接受新的客户端请求)。对于这样的同步读写Disk,一些的解决方案是通过增加线程数来增加服务器处理的吞吐量,即:当线程A从网络上接受数据后,驱动另外单独的线程来完成读写Disk任务。这样的方案缺点是:需要线程间的合作,需要线程间的切换(这是另一个我们要讨论的问题)。而IOCP的异步IO本质,就是通过操作系统内核的支持,允许线程A以非阻塞的方式向IO子系统投递IO请求,而后马上从网络上读取下一个客户端请求。这样,结果是:在不增加线程数的情况下,IOCP大大增加了服务器的吞吐量。说到这里,听起来感觉很像是DMA。的确,许多软件的实现技术,在本质上,与硬件的实现技术是相通的。另外一个典型的例子是硬件的流水线技术,同样,在软件领域,也有很著名的应用。
(2)线程间的切换问题
服务器的实现,通过引入IOCP,会大大减少Thread切换带来的额外开销。我们都知道,对于服务器性能的一个重要的评估指标就是:System/Context Switches,即单位时间内线程的切换次数。如果在每秒内,线程的切换次数在千的数量级上,这就意味着你的服务器性能值得商榷。Context Switches/s应该越小越好。说到这里,我们来重新审视一下IOCP。
完成端口的线程并发量可以在创建该完成端口时指定(即NumberOfConcurrentThreads参数)。该并发量限制了与该完成端口相关联的可运行线程的数目(就是前面我在IOCP简介中提到的执行者线程组的最大数目)。当与该完成端口相关联的可运行线程的总数目达到了该并发量,系统就会阻塞任何与该完成端口相关联的后续线程的执行,直到与该完成端口相关联的可运行线程数目下降到小于该并发量为止。最有效的假想是发生在有完成包在队列中等待,而没有等待被满足,因为此时完成端口达到了其并发量的极限。此时,一个正在运行中的线程调用GetQueuedCompletionStatus时,它就会立刻从队列中取走该完成包。这样就不存在着环境的切换,因为该处于运行中的线程就会连续不断地从队列中取走完成包,而其他的线程就不能运行了。
完成端口的线程并发量的建议值就是你系统CPU的数目。在这里,要区分清楚的是,完成端口的线程并发量与你为完成端口创建的工作者线程数是没有任何关系的,工作者线程数的数目,完全取决于你的整个应用的设计(当然这个不宜过大,否则失去了IOCP的本意:))。
(3)IOCP开发过程中的消息乱序问题
使用IOCP开发的问题在于它的复杂。我们都知道,在使用TCP时,TCP协议本身保证了消息传递的次序性,这大大降低了上层应用的复杂性。但是当使用三个线程同时从IOCP中读取Msg1, Msg2,与Msg3。由于TCP本身消息传递的有序性,所以,在IOCP队列内,Msg1-Msg2-Msg3保证了有序性。三个线程分别从IOCP中取出Msg1,Msg2与Msg3,然后三个线程都会将各自取到的消息投递到逻辑层处理。在逻辑处理层的实现,我们不应该假定Msg1-Msg2-Msg3顺序,原因其实很简单,在Time 1~Time 2的时间段内,三个线程被操作系统调度的先后次序是不确定的,所以在到达逻辑处理层,Msg1,Msg2与Msg3的次序也就是不确定的。所以,逻辑处理层的实现,必须考虑消息乱序的情况,必须考虑多线程环境下的程序实现。
在这里,我把消息乱序的问题单列了出来。其实在IOCP的开发过程中,相比于同步的方式,应该还有其它更多的难题需要解决,这也是与Select方式相比,IOCP的缺点,实现复杂度高。
往期回顾:
- 2024 技术展望 |C/C++发展方向(强烈推荐!!)
- 2024 技术展望 | Linux内核源码分析(强烈推荐收藏!)
- 2024 技术展望 | 从菜鸟到大师!用Qt编写出惊艳世界的应用
- 2024 技术展望 | 探索存储全栈开发:构建高效可靠的数据存储系统
- 2024 技术展望 | 突破性能瓶颈:释放DPDK带来的无尽潜力
- 2024 技术展望 | 嵌入式音视频技术解析:从原理到项目应用
- 2024 技术展望 | C++游戏后端开发,基于魔兽开源后端框架
22222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222
Epoll 是Linux系统下的模型;IOCP 是Windows下模型;
Epoll 是当事件资源满足时发出可处理通知消息;
IOCP 则是当事件完成时发出完成通知消息;
从应用程序的角度来看, Epoll 是同步非阻塞的;IOCP是异步操作;
举例说明,更加清晰透彻:
有一个打印店,有一台打印机,好几个人在排队打印。
普通打印店,正常情况是:
1、你准备好你的文档,来到打印店;
2、排队,等别人打印完;
3、轮到你了,打印你的文档;
4、你取走文档,做后面的处理。
这种方式,你会浪费很多等待时间,非常低效。
于是, Linux和windows都提出了自己最优的模型。
Linux的epoll模型,则可以描述如下:
1、你准备好你的文档,来到打印店;
2、告诉店小二说,我先排队在这位置,轮到我了通知一声(假定你来回路上不耗时);
3、你先去忙你的事情去了;
4、轮到你了,店小二通知你(假定你来回路上不耗时);
5、你获得打印机使用权了,开始打印;
6、打印完了拿走。
你会发现,你节省了排队的时间,等到你能获得打印机资源的时候,告诉你来处理。但是这里,就浪费了一点时间,就是你自己打印。这就是epoll的同步非阻塞。
windows的IOCP模型,则可以描述如下:
1、你准备好你的文档,来到打印店;
2、告诉店小二说,我先排队,轮到我了帮打印下,好了通知我(也假定你来回路上不耗时);
3、你先去忙你的事情去了;
4、轮到你的文档了,店小二直接帮你打印好了,通知你;
5、你来了,直接取走文档。
你会发现,你不但节省了排队时间,你连打印时间都节省了, 完全异步操作。
很显然,IOCP简直是太完美了,可以称得上是最高性能的服务器网络模型了。
那么问题来了,是不是epoll就比iocp效率低了?
不一定。
同样是以打印为例:
假定现在有两个打印店,分别命名为:
epoll打印店(L店), IOCP打印店(W店)
你把相同的材料2份,分别放在两个店,哪一个会先完成呢?
如果L店的工作人员,工作任务少,效率非常高,很快就轮到你打印了;
而W店的工作人员,工作任务巨多,慢慢悠悠,边工作边吃饭边聊天,很久才轮到你的打印。
请问:那个会先打印完? 所以,谁更快,还与打印店的工作方式有很大关系。
windows启动后,即便什么也不干,就已经有近百个进程+近百个服务在后台运行+一千多个线程在工作了,外加大量图形化界面占用许多系统资源。而Linux启动后仅有几个进程几个服务在后台,轻装上阵,只要你不启动图形化应用,就几乎没有什么图形化应用占用系统资源。
回到本话题,决定效率快慢的,模型是一方面;操作系统的底层协议处理架构,也是一方面。
两者同样重要。
当然你也可以说,也很有可能是,L店的打印机是时速30张/分钟,W店打印机时速120张/分钟,非常正确,但是这就属于硬件配置的范畴了,这就等于是i9处理器与二十年前的赛扬II处理器在比较了。