1.概念以及重叠IO存在的问题
2.完成端口代码详解
整体流程
使用到的新函数
1.CreateIoCompletionPort函数
该函数创建输入/输出 (I/O) 完成端口并将其与指定的文件句柄相关联,或创建尚未与文件句柄关联的 I/O 完成端口,以便稍后关联,即创建一个新的完成端口。将打开的文件句柄的实例与 I/O 完成端口相关联,使进程能够接收涉及该文件句柄的异步 I/O 操作完成通知,即把端口和socket绑定,函数原型
HANDLE WINAPI CreateIoCompletionPort(
_In_ HANDLE FileHandle,
_In_opt_ HANDLE ExistingCompletionPort,
_In_ ULONG_PTR CompletionKey,
_In_ DWORD NumberOfConcurrentThreads
);
在创建一个新的完成端口时:
参数1:填 INVALID_HANDLE_VALUE
参数2:填NULL
参数3:填0,
参数4:填0
返回值:成功返回一个新的完成端口句柄,失败返回0
在把完成端口绑定socket时
参数1:填服务器socket
参数2:填完成端口
参数3:填绑定的socket或者对应的下标
参数4:填0
返回值:成功返回一个与参数2一样的端口句柄,失败会返回错误码
##代码样例
//创建完成端口
HANDLE hPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (hPort == 0)
{
int a = GetLastError();
printf("创建出错:%d\n", a);
closesocket(socketServer);
WSACleanup();
return 0;
}
//绑定
HANDLE hPort1 = CreateIoCompletionPort(socketServer, hPort, 0, 0);//绑定成功 hPort1和hPort是一样的
if (hPort1 != hPort)
{
int a = GetLastError();
printf("绑定出错:%d\n", a);
//关闭端口
CloseHandle(hPort);
closesocket(socketServer);
WSACleanup();
return 0;
}
2.CreateThread函数
该函数创建在调用进程的虚拟地址空间内执行的线程,即创建一根线程,函数原型
HANDLE CreateThread(
[in, optional] LPSECURITY_ATTRIBUTES lpThreadAttributes,
[in] SIZE_T dwStackSize,
[in] LPTHREAD_START_ROUTINE lpStartAddress,
[in, optional] __drv_aliasesMem LPVOID lpParameter,
[in] DWORD dwCreationFlags,
[out, optional] LPDWORD lpThreadId
);
参数1:控制线程句柄能否被继承,NULL表示不继承
参数2:线程栈大小,以字节为单位
参数3:线程函数地址,DWORD WINAPI ThreadProc(LPVOID lpParameter)
参数4:外部给线程传递数据
参数5:0表示线程立即执行,CREATE_SUPENDED表示线程创建完挂起状态,调用ResumeThread启动函数
参数6:线程ID,可以填NULL
返回值:成功返回线程句柄,失败返回NULL
3.GetQueuedCompletionStatue函数
该函数尝试从指定的 I/O 完成端口取消 I/O 完成数据包的排队。 如果没有排队的完成数据包,该函数将等待与完成端口关联的挂起 I/O 操作完成,函数原型
BOOL GetQueuedCompletionStatus(
[in] HANDLE CompletionPort,
LPDWORD lpNumberOfBytesTransferred,
[out] PULONG_PTR lpCompletionKey,
[out] LPOVERLAPPED *lpOverlapped,
[in] DWORD dwMilliseconds
);
参数1:完成端口,即使用CreateIoCompletionPort函数创建的一个变量,需要从主函数传入
参数2:接收或发送的字节数, 0表示客户端下线
参数3:完成端口函数的参数3传入
参数4:接收到的重叠结构
参数5:等待时间,INFINITE一直等
返回值:成功返回TRUE,失败返回FALSE
##完成端口整体代码
#define _CRT_SECURE_NO_WARNINGS
#define _WINSOCK_DEPRECATED_NO_WARNINGS
#include<stdio.h>
#include<Winsock2.h>
#include<mswsock.h>
#pragma comment(lib, "ws2_32.lib")
#pragma comment(lib, "mswsock.lib")
int PostAccept();
int PostRecv(int index);
int PostSend(int index);
DWORD WINAPI ThreadProc(LPVOID lpParameter);
#define MAX_COUNT 1024
#define MAX_RECV_COUNT 1024
SOCKET g_allsock[MAX_COUNT];
OVERLAPPED g_allIOp[MAX_COUNT];
int g_count;
char g_recvbuf[MAX_RECV_COUNT];
HANDLE hPort;//定义句柄
HANDLE* hThread;
int nProcessorsCount;
void Clear()
{
for (int i = 0; i < g_count; i++)
{
if (g_allsock[i] == 0)
{
continue;
}
closesocket(g_allsock[i]);
WSACloseEvent(g_allIOp[i].hEvent);
}
}
BOOL WINAPI fun(DWORD dwCtrlType)
{
switch (dwCtrlType)
{
case CTRL_CLOSE_EVENT:
//释放所有soket和事件
for (int i = 0; i < nProcessorsCount; i++)
{
CloseHandle(hThread[i]);
}
free(hThread);
Clear();
break;
}
return TRUE;
}
int main()
{
SetConsoleCtrlHandler(fun, TRUE);
//第一步 打开网络库并校验版本
WORD wdVersion = MAKEWORD(2, 2);
WSADATA wdSockMsg;
if (0 != WSAStartup(wdVersion, &wdSockMsg))
{
printf("打开网络库失败\n");
return 0;
}
if (2 != HIBYTE(wdSockMsg.wVersion) || 2 != LOBYTE(wdSockMsg.wVersion))
{
printf("版本不对\n");
WSACleanup();
return 0;
}
//第二步 创建socket
SOCKET socketServer = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
if (socketServer == INVALID_SOCKET)
{
printf("创建socket失败\n");
WSACleanup();
return 0;
}
//第三步 绑定IP地址和端口号
struct sockaddr_in si;
si.sin_family = AF_INET;
si.sin_port = htons(12332);
si.sin_addr.S_un.S_addr = inet_addr("127.0.0.1");
if (SOCKET_ERROR == bind(socketServer, (const struct sockaddr*)&si, sizeof(si)))
{
printf("绑定失败\n");
closesocket(socketServer);
WSACleanup();
return 0;
}
g_allsock[g_count] = socketServer;
g_allIOp[g_count].hEvent = WSACreateEvent();
g_count++;
//创建完成端口
hPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (hPort == 0)
{
int a = GetLastError();
printf("创建出错:%d\n", a);
closesocket(socketServer);
WSACleanup();
return 0;
}
//绑定
HANDLE hPort1 = CreateIoCompletionPort((HANDLE)socketServer, hPort, 0, 0);//绑定成功 hPort1和hPort是一样的
if (hPort1 != hPort)
{
int a = GetLastError();
printf("绑定出错:%d\n", a);
//关闭端口
CloseHandle(hPort);
closesocket(socketServer);
WSACleanup();
return 0;
}
//第四步 开始监听
if (SOCKET_ERROR == listen(socketServer, SOCK_STREAM))
{
printf("监听失败\n");
CloseHandle(hPort);
closesocket(socketServer);
WSACleanup();
return 0;
}
if (PostAccept() != 0)
{
Clear();
//清理网络库
WSACleanup();
return 0;
}
//创建线程
SYSTEM_INFO systemProcessorsCount;//该结构体对象存放系统信息
GetSystemInfo(&systemProcessorsCount);//该函数可以获取到系统的信息
nProcessorsCount = systemProcessorsCount.dwNumberOfProcessors;//获取到系统的核数
hThread = (HANDLE*)malloc(sizeof(HANDLE) * nProcessorsCount);
if (hThread == NULL)
{
return 0;
}
for (int i = 0; i < nProcessorsCount; i++)
{
hThread[i] = CreateThread(NULL, 0, ThreadProc, hPort, 0, NULL);
if (hThread[i] == NULL)
{
int a = GetLastError();
printf("绑定出错:%d\n", a);
//关闭端口
CloseHandle(hPort);
closesocket(socketServer);
WSACleanup();
return 0;
}
}
//阻塞主线程
while (1)
{
Sleep(1000);
}
//释放线程句柄
for (int i = 0; i < nProcessorsCount; i++)
{
CloseHandle(hThread[i]);
}
free(hThread);
CloseHandle(hPort);
closesocket(socketServer);
WSACleanup();
return 0;
}
DWORD WINAPI ThreadProc(LPVOID lpParameter)
{
HANDLE port = (HANDLE)lpParameter;//参数1
DWORD NumberBytes;//参数2 字节数
ULONG_PTR index;//参数3 下标
LPOVERLAPPED Overlapped;//参数4 重叠结构
while (1)
{
BOOL bFlag = GetQueuedCompletionStatus(port, &NumberBytes, &index, &Overlapped, INFINITE);
if (bFlag == FALSE)
{
int a = GetLastError();//获取错误码
if (a == 64)
{
printf("force close\n");
}
printf("%d\n", a);
continue;
}
//处理accept
if (0 == index)
{
//绑定到完成端口
HANDLE hPort1 = CreateIoCompletionPort((HANDLE)g_allsock[g_count], hPort, g_count, 0);
if (hPort1 != hPort)
{
int a = GetLastError();
printf("%d\n", a);
closesocket(g_allsock[g_count]);
continue;
}
printf("accept succee\n");
//新客户端投递recv
PostRecv(g_count);
g_count++;
PostAccept();
}
else
{
if (0 == NumberBytes)//字节数为0
{
//客户端下线
//关闭
printf("close\n");
closesocket(g_allsock[index]);
WSACloseEvent(g_allIOp[index].hEvent);
g_allsock[index] = 0;
g_allIOp[index].hEvent = NULL;
}
else
{
if (0 != g_recvbuf[0])//数组中第一个元素不为0,表示接收到消息
{
//recv
printf("%s\n", g_recvbuf);
memset(g_recvbuf, 0, MAX_RECV_COUNT);
PostRecv((int)index);
}
else
{
//send
printf("send succee\n");
}
}
}
}
return 0;
}
int PostAccept()
{
g_allsock[g_count] = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
g_allIOp[g_count].hEvent = WSACreateEvent();
char str[1024] = { 0 };
DWORD dwRecvcount;
BOOL aRes = AcceptEx(g_allsock[0], g_allsock[g_count], str, 0, sizeof(struct sockaddr) + 16,
sizeof(struct sockaddr) + 16, &dwRecvcount, &g_allIOp[0]);
int a = WSAGetLastError();
if (ERROR_IO_PENDING != a)
{
//函数出错
return 1;
}
return 0;
}
int PostRecv(int index)
{
WSABUF wsabuf;
wsabuf.buf = g_recvbuf;
wsabuf.len = MAX_RECV_COUNT;
DWORD dwRecvCount;
DWORD dwFlag = 0;
int wRes = WSARecv(g_allsock[index], &wsabuf, 1, &dwRecvCount, &dwFlag, &g_allIOp[index], NULL);
int a = WSAGetLastError();
if (a != ERROR_IO_PENDING)
{
//延迟处理
//函数执行出错
printf("recv出错\n");
return 1;
}
return 0;
}
int PostSend(int index)
{
WSABUF wsabuf;//参数2
wsabuf.buf = "你好";
wsabuf.len = MAX_RECV_COUNT;
DWORD dwSendCount;//参数4
DWORD dwFlag = 0;//参数5
int wRes = WSASend(g_allsock[index], &wsabuf, 1, &dwSendCount, dwFlag, &g_allIOp[index], NULL);
int a = WSAGetLastError();
if (a != ERROR_IO_PENDING)
{
//延迟处理
//函数执行出错
printf("send出错\n");
return 1;
}
return 0;
}