TcpServer 服务器优化之后,加了多线程,对心跳包进行优化
TcpServer.h
#ifndef TCPSERVER_H
#define TCPSERVER_H
#include <iostream>
#include <winsock2.h>
#include <ws2tcpip.h>
#include <vector>
#include <map>
#include <string>
#include <ctime>
// 引入静态链接库
#pragma comment(lib, "ws2_32.lib")
#define HEARTBEATTIME 1000
class TcpServer {
public:
TcpServer();
~TcpServer();
// 启动服务器,监听指定端口
bool start(int port);
// 停止服务器
void stop();
// 发送数据给指定客户端
int sendData(SOCKET clientSocket, const char* data, int dataLength);
// 处理服务器业务逻辑,通常在循环中调用
void handle();
//链接
static DWORD WINAPI ThreadAccept(LPVOID lpParam);
//接收数据
static DWORD WINAPI ThreadRecvData(LPVOID lpParam);
//心跳包
static DWORD WINAPI ThreadHeartBeat(LPVOID lpParam);
public:
std::vector<SOCKET> socketsToRemove;
BOOL m_bExit;//程序是否关闭
BOOL m_bHeartBeat;//是否启用心跳包
int heartbeatInterval; // 心跳包间隔时间(秒)
private:
SOCKET listenSocket;
std::vector<SOCKET> clientSockets;
std::map<SOCKET, std::time_t> clientLastHeartbeatTime;
// 设置套接字为非阻塞模式
bool setSocketNonBlocking(SOCKET socket);
// 接受新的客户端连接
void acceptNewClients();
// 接收客户端数据
void receiveClientData();
// 发送心跳包给客户端,并检测客户端响应
void sendHeartbeatsAndCheck();
// 移除已断开连接的客户端
void removeDisconnectedClients(std::vector<SOCKET> &socketsToRemove);
};
#endif
TcpServer.cpp
#include "TcpServer.h"
// 构造函数,初始化相关成员变量
TcpServer::TcpServer() : listenSocket(INVALID_SOCKET),
heartbeatInterval(5), m_bExit(false), m_bHeartBeat(false)
{
WSADATA wsaData;
int result = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (result != 0)
{
std::cerr << "WSAStartup failed: " << result << std::endl;
}
}
// 析构函数,关闭套接字并清理WinSock环境
TcpServer::~TcpServer()
{
stop();
WSACleanup();
}
// 启动服务器,监听指定端口
bool TcpServer::start(int port)
{
listenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (listenSocket == INVALID_SOCKET)
{
std::cerr << "Socket creation failed: " << WSAGetLastError() << std::endl;
return false;
}
if (!setSocketNonBlocking(listenSocket))
{
std::cerr << "Failed to set listen socket non-blocking" << std::endl;
closesocket(listenSocket);
return false;
}
sockaddr_in serverAddr;
serverAddr.sin_family = AF_INET;
serverAddr.sin_addr.s_addr = INADDR_ANY;
serverAddr.sin_port = htons(port);
int result = bind(listenSocket, (sockaddr*)&serverAddr, sizeof(serverAddr));
if (result == SOCKET_ERROR)
{
std::cerr << "Bind failed: " << WSAGetLastError() << std::endl;
closesocket(listenSocket);
return false;
}
result = listen(listenSocket, SOMAXCONN);
if (result == SOCKET_ERROR)
{
std::cerr << "Listen failed: " << WSAGetLastError() << std::endl;
closesocket(listenSocket);
return false;
}
return true;
}
// 停止服务器
void TcpServer::stop()
{
if (listenSocket != INVALID_SOCKET)
{
closesocket(listenSocket);
listenSocket = INVALID_SOCKET;
}
for (SOCKET clientSocket : clientSockets)
{
closesocket(clientSocket);
}
clientSockets.clear();
clientLastHeartbeatTime.clear();
}
// 设置套接字为非阻塞模式
bool TcpServer::setSocketNonBlocking(SOCKET socket)
{
u_long iMode = 1;
int result = ioctlsocket(socket, FIONBIO, &iMode);
if (result == SOCKET_ERROR)
{
std::cerr << "ioctlsocket failed: " << WSAGetLastError() << std::endl;
return false;
}
return true;
}
// 发送数据给指定客户端
int TcpServer::sendData(SOCKET clientSocket, const char* data, int dataLength)
{
if (clientSocket == INVALID_SOCKET)
{
std::cerr << "Invalid client socket, cannot send data" << std::endl;
return SOCKET_ERROR;
}
int totalBytesSent = 0;
while (totalBytesSent < dataLength)
{
int bytesSent = ::send(clientSocket, data + totalBytesSent, dataLength - totalBytesSent, 0);
if (bytesSent == SOCKET_ERROR)
{
if (WSAGetLastError() == WSAEWOULDBLOCK)
{
// 暂时无法发送,等待下次尝试
continue;
}
return SOCKET_ERROR;
}
totalBytesSent += bytesSent;
}
return totalBytesSent;
}
// 接受新的客户端连接
void TcpServer::acceptNewClients()
{
SOCKET newClientSocket = accept(listenSocket, NULL, NULL);
if (newClientSocket == INVALID_SOCKET)
{
if (WSAGetLastError() != WSAEWOULDBLOCK)
{
std::cerr << "Accept failed: " << WSAGetLastError() << std::endl;
}
return;
}
else
{
std::cout << "Accept success: " << newClientSocket << std::endl;
}
if (!setSocketNonBlocking(newClientSocket))
{
std::cerr << "Failed to set client socket non-blocking" << std::endl;
closesocket(newClientSocket);
return;
}
clientSockets.push_back(newClientSocket);
clientLastHeartbeatTime[newClientSocket] = std::time(nullptr);
}
// 接收客户端数据
void TcpServer::receiveClientData()
{
for (size_t i = 0; i < clientSockets.size(); ++i)
{
SOCKET clientSocket = clientSockets[i];
char buffer[1024];
int bytesReceived = ::recv(clientSocket, buffer, sizeof(buffer), 0);
if (bytesReceived == SOCKET_ERROR)
{
if (WSAGetLastError() == WSAEWOULDBLOCK)
{
// 暂时无数据可读,继续检查下一个客户端
continue;
}
}
else
{
buffer[bytesReceived] = '\0';
std::string receivedData(buffer);
// 在这里可以根据接收到的数据进行具体业务逻辑处理,比如解析命令等
std::cout << "Received from client " << clientSocket << ": " << receivedData << std::endl;
clientLastHeartbeatTime[clientSocket] = std::time(nullptr);
std::string heartbeatData = buffer;
heartbeatData+=" recvok:";
int sentBytes = sendData(clientSocket, heartbeatData.c_str(), heartbeatData.length());
}
}
}
// 发送心跳包给客户端,并检测客户端响应
void TcpServer::sendHeartbeatsAndCheck()
{
const char* heartbeatData = "HEARTBEAT"; // 简单的心跳包内容,可自定义
int dataLength = strlen(heartbeatData);
for (auto& clientPair : clientLastHeartbeatTime)
{
SOCKET clientSocket = clientPair.first;
std::time_t& lastHeartbeatTime = clientPair.second;
std::time_t currentTime = std::time(nullptr);
if (currentTime - lastHeartbeatTime > heartbeatInterval)
{
// 超过心跳间隔时间没收到心跳响应,认为客户端连接异常
socketsToRemove.push_back(clientSocket);
continue;
}
int sentBytes = sendData(clientSocket, heartbeatData, dataLength);
if (sentBytes == SOCKET_ERROR)
{
// 发送心跳包失败,认为客户端连接可能有问题
socketsToRemove.push_back(clientSocket);
continue;
}
}
}
// 移除已断开连接的客户端(更新函数定义,无参数)
void TcpServer::removeDisconnectedClients(std::vector<SOCKET>&socketsToRemove)
{
for (SOCKET socketToRemove : socketsToRemove)
{
auto it = std::find(clientSockets.begin(), clientSockets.end(), socketToRemove);
if (it != clientSockets.end())
{
std::cout << "Remove :"<< * it << std::endl;
clientSockets.erase(it);
clientLastHeartbeatTime.erase(socketToRemove);
}
}
}
//接收链接线程
DWORD WINAPI TcpServer::ThreadAccept(LPVOID lpParam)
{
TcpServer* t_Server = static_cast<TcpServer*>(lpParam);
while (t_Server->m_bExit==false)
{
t_Server->acceptNewClients();
}
return 0;
}
//接收数据
DWORD WINAPI TcpServer::ThreadRecvData(LPVOID lpParam)
{
TcpServer* t_Server = static_cast<TcpServer*>(lpParam);
while (t_Server->m_bExit == false)
{
t_Server->receiveClientData();
}
return 0;
}
//心跳包
DWORD WINAPI TcpServer::ThreadHeartBeat(LPVOID lpParam)
{
TcpServer* t_Server = static_cast<TcpServer*>(lpParam);
while (t_Server->m_bExit == false)
{
Sleep(HEARTBEATTIME);
t_Server->sendHeartbeatsAndCheck();
if (t_Server->heartbeatInterval > 0)
{
t_Server->removeDisconnectedClients(t_Server->socketsToRemove);
}
}
return 0;
}
// 处理服务器业务逻辑,通常在循环中调用
void TcpServer::handle()
{
//创建4个线程,分别进行接收链接 接收数据 发送数据 发送心跳包
// 创建线程,传入当前对象指针作为参数,线程启动函数为 SendHeartbeat
HANDLE acceptThreadHandle = CreateThread(NULL, 0, ThreadAccept, this, 0, NULL);
if (acceptThreadHandle == NULL)
{
std::cerr << "Create accept thread failed: " << GetLastError() << std::endl;
}
else
{
std::cerr << "Create accept thread success: " << acceptThreadHandle << std::endl;
}
HANDLE recvDatatThreadHandle = CreateThread(NULL, 0, ThreadRecvData, this, 0, NULL);
if (acceptThreadHandle == NULL)
{
std::cerr << "Create recvData thread failed: " << GetLastError() << std::endl;
}
else
{
std::cerr << "Create recvData thread success: " << recvDatatThreadHandle << std::endl;
}
if (m_bHeartBeat == true)
{
HANDLE heartBeatThreadHandle = CreateThread(NULL, 0, ThreadHeartBeat, this, 0, NULL);
if (acceptThreadHandle == NULL)
{
std::cerr << "Create heartBeat thread failed: " << GetLastError() << std::endl;
}
else
{
std::cerr << "Create heartBeat thread success: " << heartBeatThreadHandle << std::endl;
}
}
}
main.cpp
#include "TcpServer.h"
int main()
{
TcpServer server;
server.heartbeatInterval = 30;
server.m_bHeartBeat = true;
if (server.start(8080))
{
while (true)
{
server.handle();
// 可以在这里添加适当的延时,避免过于频繁地循环处理,消耗过多CPU资源
Sleep(100);
break;
}
}
else
{
std::cout << "server initiatefail" << std::endl;
}
Sleep(1000000);
return 0;
}