demo有一下功能
1、心跳包
2、断开重连
3、非阻塞
4、接受数据单独线程处理
#include <iostream>
#include <winsock2.h>
#include <ws2tcpip.h>
#include <windows.h>
#include <string>
#include <process.h> // 用于Windows下的线程相关操作
#pragma comment(lib, "ws2_32.lib")
#define SERVER_IP "127.0.0.1"
#define SERVER_PORT 6000
#define RECV_BUF_SIZE 1024
#define HEARTBEAT_INTERVAL 5000 // 心跳包发送间隔,单位:毫秒
#define HEARTBEAT_TIMEOUT 10000 // 心跳包超时时间,单位:毫秒
#define MAX_RECONNECT_ATTEMPTS 10 // 最大重连尝试次数
#define RECONNECT_INTERVAL_SECONDS 2 // 重连间隔时间(秒)
class TCPClient
{
public:
TCPClient();
~TCPClient();
bool connectToServer();
void disconnect();
int sendData(const std::string& data);
private:
SOCKET m_socket;
sockaddr_in m_serverAddr;
bool m_connected;
// 心跳包相关变量和函数
DWORD m_lastHeartbeatTime;
bool m_heartbeatSent;
HANDLE m_heartbeatThreadHandle;
bool m_heartbeatThreadRunning;
static unsigned int __stdcall HeartbeatThread(void* param);
bool sendHeartbeat();
bool checkHeartbeatResponse();
// 用于设置套接字为非阻塞模式
bool setSocketNonBlocking();
// 尝试重连服务器
bool reconnect();
// 初始化Winsock库
bool initializeWinsock();
// 关闭套接字并清理相关资源
void closeSocket();
// 接收数据线程相关函数和变量
static unsigned int __stdcall ReceiveDataThread(void* param);
HANDLE m_receiveThreadHandle;
bool m_receiveThreadRunning;
};
// 构造函数,初始化成员变量并初始化Winsock库
TCPClient::TCPClient() : m_socket(INVALID_SOCKET), m_connected(false),
m_lastHeartbeatTime(0), m_heartbeatSent(false),
m_heartbeatThreadHandle(NULL), m_heartbeatThreadRunning(false),
m_receiveThreadHandle(NULL), m_receiveThreadRunning(false)
{
if (!initializeWinsock())
{
std::cerr << "初始化Winsock库失败" << std::endl;
}
m_serverAddr.sin_family = AF_INET;
m_serverAddr.sin_port = htons(SERVER_PORT);
if (inet_pton(AF_INET, SERVER_IP, &(m_serverAddr.sin_addr)) <= 0)
{
std::cerr << "inet_pton转换IP地址错误" << std::endl;
}
}
// 析构函数,断开连接并清理Winsock库,同时关闭心跳包线程和接收数据线程
TCPClient::~TCPClient()
{
disconnect();
if (m_heartbeatThreadHandle!= NULL)
{
m_heartbeatThreadRunning = false;
// 等待心跳包线程结束
WaitForSingleObject(m_heartbeatThreadHandle, INFINITE);
CloseHandle(m_heartbeatThreadHandle);
}
if (m_receiveThreadHandle!= NULL)
{
m_receiveThreadRunning = false;
// 等待接收数据线程结束
WaitForSingleObject(m_receiveThreadHandle, INFINITE);
CloseHandle(m_receiveThreadHandle);
}
WSACleanup();
}
// 连接服务器的函数
bool TCPClient::connectToServer()
{
m_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (m_socket == INVALID_SOCKET)
{
std::cerr << "创建套接字失败,错误码: " << WSAGetLastError() << std::endl;
return false;
}
// 设置套接字为非阻塞模式
if (!setSocketNonBlocking())
{
std::cerr << "设置套接字为非阻塞模式失败" << std::endl;
closeSocket();
return false;
}
int ret = connect(m_socket, (struct sockaddr*)&m_serverAddr, sizeof(m_serverAddr));
if (ret == SOCKET_ERROR)
{
int errCode = WSAGetLastError();
if (errCode!= WSAEWOULDBLOCK)
{
std::cerr << "连接服务器失败,错误码: " << errCode << std::endl;
closeSocket();
return false;
}
}
// 等待连接真正建立(非阻塞模式下需要轮询检查)
timeval timeout;
timeout.tv_sec = 5; // 设置超时时间为5秒
timeout.tv_usec = 0;
fd_set writefds;
FD_ZERO(&writefds);
FD_SET(m_socket, &writefds);
ret = select(0, NULL, &writefds, NULL, &timeout);
if (ret == SOCKET_ERROR)
{
std::cerr << "select函数出错,错误码: " << WSAGetLastError() << std::endl;
closeSocket();
return false;
}
else if (ret == 0)
{
std::cerr << "连接超时" << std::endl;
closeSocket();
return false;
}
if (FD_ISSET(m_socket, &writefds))
{
m_connected = true;
// 创建并启动接收数据线程
m_receiveThreadHandle = (HANDLE)_beginthreadex(NULL, 0, ReceiveDataThread, this, 0, NULL);
if (m_receiveThreadHandle == NULL)
{
std::cerr << "创建接收数据线程失败" << std::endl;
closeSocket();
return false;
}
m_receiveThreadRunning = true;
// 创建并启动心跳包线程
m_heartbeatThreadHandle = (HANDLE)_beginthreadex(NULL, 0, HeartbeatThread, this, 0, NULL);
if (m_heartbeatThreadHandle == NULL)
{
std::cerr << "创建心跳包线程失败" << std::endl;
closeSocket();
return false;
}
m_heartbeatThreadRunning = true;
std::cout << "成功连接到服务器" << std::endl;
return true;
}
return false;
}
// 断开与服务器连接的函数
void TCPClient::disconnect()
{
if (m_connected)
{
closesocket(m_socket);
m_connected = false;
std::cout << "已断开与服务器的连接" << std::endl;
}
m_receiveThreadRunning = false;
m_heartbeatThreadRunning = false;
}
// 发送数据到服务器的函数
int TCPClient::sendData(const std::string& data)
{
if (!m_connected)
{
if (reconnect())
{
}
else
{
std::cerr << "未连接到服务器,无法发送数据" << std::endl;
return SOCKET_ERROR;
}
}
int ret = send(m_socket, data.c_str(), data.size(), 0);
if (ret == SOCKET_ERROR)
{
int errCode = WSAGetLastError();
if (errCode == WSAEWOULDBLOCK)
{
// 在非阻塞模式下,缓冲区满等情况会返回此错误,可根据需要处理
return 0;
}
else
{
std::cerr << "发送数据失败,错误码: " << errCode << std::endl;
// 如果是连接断开相关错误,尝试重连
if (errCode == WSAECONNRESET || errCode == WSAENETRESET)
{
if (reconnect())
{
// 重连成功后再次发送数据
return sendData(data);
}
}
return SOCKET_ERROR;
}
}
return ret;
}
// 发送心跳包的函数
bool TCPClient::sendHeartbeat()
{
if (!m_connected)
{
return false;
}
const std::string heartbeatData = "HEARTBEAT_CLIENT";
int ret = send(m_socket, heartbeatData.c_str(), heartbeatData.size(), 0);
if (ret == SOCKET_ERROR)
{
int errCode = WSAGetLastError();
if (errCode == WSAEWOULDBLOCK)
{
return false;
}
else
{
std::cerr << "发送心跳包失败,错误码: " << errCode << std::endl;
return false;
}
}
m_heartbeatSent = true;
return true;
}
// 检查心跳包响应的函数
bool TCPClient::checkHeartbeatResponse()
{
if (!m_connected)
{
return false;
}
char buffer[RECV_BUF_SIZE];
int ret = recv(m_socket, buffer, RECV_BUF_SIZE, 0);
if (ret == SOCKET_ERROR)
{
int errCode = WSAGetLastError();
if (errCode == WSAEWOULDBLOCK)
{
return false;
}
else
{
std::cerr << "接收心跳包响应失败,错误码: " << errCode << std::endl;
return false;
}
}
else if (ret == 0)
{
// 对方关闭了连接
std::cerr << "服务器关闭了连接" << std::endl;
disconnect();
return false;
}
else
{
std::string response(buffer, ret);
if (response == "HEARTBEAT_ACK")
{
return true;
}
}
return false;
}
// 设置套接字为非阻塞模式的函数
bool TCPClient::setSocketNonBlocking()
{
u_long mode = 1;
int ret = ioctlsocket(m_socket, FIONBIO, &mode);
return ret!= SOCKET_ERROR;
}
// 尝试重连服务器的函数
bool TCPClient::reconnect()
{
int attempt = 0;
while (attempt < MAX_RECONNECT_ATTEMPTS)
{
attempt++;
closeSocket();
Sleep(RECONNECT_INTERVAL_SECONDS * 1000); // 等待一段时间后重连
m_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (m_socket == INVALID_SOCKET)
{
std::cerr << "重连时创建套接字失败,错误码: " << WSAGetLastError() << std::endl;
continue;
}
// 设置套接字为非阻塞模式
if (!setSocketNonBlocking())
{
std::cerr << "重连时设置套接字为非阻塞模式失败" << std::endl;
closeSocket();
continue;
}
int ret = connect(m_socket, (struct sockaddr*)&m_serverAddr, sizeof(m_serverAddr));
if (ret == SOCKET_ERROR)
{
int errCode = WSAGetLastError();
if (errCode!= WSAEWOULDBLOCK)
{
std::cerr << "重连失败,错误码: " << errCode << std::endl;
continue;
}
}
// 等待连接真正建立(非阻塞模式下需要轮询检查)
timeval timeout;
timeout.tv_sec = 5; // 设置超时时间为5秒
timeout.tv_usec = 0;
fd_set writefds;
FD_ZERO(&writefds);
FD_SET(m_socket, &writefds);
ret = select(0, NULL, &writefds, NULL, &timeout);
if (ret == SOCKET_ERROR)
{
std::cerr << "重连时select函数出错,错误码: " << WSAGetLastError() << std::endl;
closeSocket();
continue;
}
else if (ret == 0)
{
std::cerr << "重连超时" << std::endl;
closeSocket();
continue;
}
if (FD_ISSET(m_socket, &writefds))
{
m_connected = true;
// 重新创建并启动接收数据线程
if (m_receiveThreadHandle!= NULL)
{
m_receiveThreadRunning = false;
WaitForSingleObject(m_receiveThreadHandle, INFINITE);
CloseHandle(m_receiveThreadHandle);
}
m_receiveThreadHandle = (HANDLE)_beginthreadex(NULL, 0, ReceiveDataThread, this, 0, NULL);
if (m_receiveThreadHandle == NULL)
{
std::cerr << "重连后创建接收数据线程失败" << std::endl;
closeSocket();
return false;
}
m_receiveThreadRunning = true;
// 重新创建并启动心跳包线程
if (m_heartbeatThreadHandle!= NULL)
{
m_heartbeatThreadRunning = false;
WaitForSingleObject(m_heartbeatThreadHandle, INFINITE);
CloseHandle(m_heartbeatThreadHandle);
}
m_heartbeatThreadHandle = (HANDLE)_beginthreadex(NULL, 0, HeartbeatThread, this, 0, NULL);
if (m_heartbeatThreadHandle == NULL)
{
std::cerr << "重连后创建心跳包线程失败" << std::endl;
closeSocket();
return false;
}
m_heartbeatThreadRunning = true;
std::cout << "重连成功" << std::endl;
return true;
}
}
std::cerr << "达到最大重连尝试次数,重连失败" << std::endl;
return false;
}
// 初始化Winsock库的函数
bool TCPClient::initializeWinsock()
{
WSADATA wsaData;
return WSAStartup(MAKEWORD(2, 2), &wsaData) == 0;
}
// 关闭套接字并清理相关资源的函数
void TCPClient::closeSocket()
{
if (m_socket!= INVALID_SOCKET)
{
closesocket(m_socket);
m_socket = INVALID_SOCKET;
}
}
// 心跳包线程函数
unsigned int __stdcall TCPClient::HeartbeatThread(void* param)
{
TCPClient* client = static_cast<TCPClient*>(param);
while (client->m_heartbeatThreadRunning && client->m_connected)
{
DWORD currentTime = GetTickCount();
if (currentTime - client->m_lastHeartbeatTime >= HEARTBEAT_INTERVAL)
{
if (client->sendHeartbeat())
{
client->m_lastHeartbeatTime = currentTime;
}
}
if (currentTime - client->m_lastHeartbeatTime > HEARTBEAT_TIMEOUT)
{
std::cerr << "心跳包超时,通知主线程尝试重连" << std::endl;
client->m_connected = false;
break;
}
Sleep(100); // 适当休眠,避免过于频繁循环检查
}
return 0;
}
// 接收数据线程函数
unsigned int __stdcall TCPClient::ReceiveDataThread(void* param)
{
TCPClient* client = static_cast<TCPClient*>(param);
std::string receivedData;
while (client->m_receiveThreadRunning && client->m_connected)
{
char recvBuf[RECV_BUF_SIZE];
int ret = recv(client->m_socket, recvBuf, RECV_BUF_SIZE, 0);
if (ret == SOCKET_ERROR)
{
int errCode = WSAGetLastError();
if (errCode == WSAEWOULDBLOCK)
{
// 在非阻塞模式下,无数据可读时会返回此错误,可根据需要处理
continue;
}
else
{
std::cerr << "接收数据线程中接收数据失败,错误码: " << errCode << std::endl;
// 如果是连接断开相关错误,通知主线程尝试重连
if (errCode == WSAECONNRESET || errCode == WSAENETRESET)
{
client->m_connected = false;
break;
}
}
}
else if (ret == 0)
{
// 对方关闭了连接
std::cerr << "服务器关闭了连接(接收数据线程中)" << std::endl;
client->m_connected = false;
break;
}
else
{
receivedData.assign(recvBuf, ret);
std::cout << "接收数据线程从服务器接收到数据: " << receivedData << std::endl;
}
}
return 0;
}
int main()
{
TCPClient client;
if (client.connectToServer())
{
while (true)
{
// 发送数据示例
std::string sendDataStr = "Hello, server!\n";
client.sendData(sendDataStr);
// 简单的休眠,避免过于频繁循环
Sleep(100);
}
}
return 0;
}
1. 接收数据线程相关的成员变量
m_receiveThreadHandle
:用于存储接收数据线程的句柄,通过_beginthreadex
函数创建线程时获取,用于后续对线程的操作,比如等待线程结束、关闭线程句柄等。m_receiveThreadRunning
:布尔类型变量,用于标记接收数据线程是否正在运行,在启动线程时设置为true
,当需要停止线程(比如断开连接或者程序结束时)设置为false
,线程函数内部会根据这个变量来判断是否继续循环接收数据。
2. connectToServer
函数
在成功连接到服务器后,不仅将m_connected
标记设置为true
,还会创建并启动接收数据线程。通过_beginthreadex
函数创建线程,传入ReceiveDataThread
函数作为线程执行的入口点,并将当前TCPClient
对象指针this
作为参数传递进去,以便在线程函数中能够访问对象的成员变量和函数。如果线程创建失败,会关闭套接字并返回false
,表示连接失败;若线程创建成功,则将m_receiveThreadRunning
设置为true
,表示接收数据线程开始运行。
3. disconnect
函数
除了关闭套接字并将m_connected
标记设置为false
外,还会将m_receiveThreadRunning
设置为false
,通知接收数据线程停止运行。这样线程函数在下次循环判断时就会退出循环,结束线程的执行。
4. reconnect`函数
在重连成功后,除了进行之前的一些连接相关的设置外,还需要重新创建并启动接收数据线程。