前言
之前我们学习过socket之tcp通信,知道了使用tcp建立连接的一系列操作,并通过write与read函数能让客户端与服务端进行通信,但是tcp是面向字节流的,有可能我们write时只写入了部分数据,此时另一端就来read了,可能会导致读取的数据不完整的问题。这就引入到tcp通信的一个重要操作了——序列化与反序列化!
一、使用结构体进行传输
在我们之前学习的代码中,一般想传递很多数据,都会使用到结构体。
比如pthread_create()需要传递函数地址,你如果需要传递很多参数,就得创建一个结构体,将他们组织起来,传递这个结构体指针给到pthread_create(),因为他第四个参数只能接受void*指针。不是可变参数包。
如下,制作一个网络计算器,我们客户端创建结构体数据并发送
服务端对数据进行接受
这样服务器就可以接受到数据并做相应的处理了,这里我们显示出来表示接收到了数据,处理起来也就很简单了。
结构体代码链接
二、自定义序列化与反序列化
tcp中的数据使用结构体传输在网络压力很大,或者传输内容很多时,就会发生问题。同样在不同的编译环境下,结构体内存对齐也不一致,并且有可能客户端是使用其他代码进行编写,C/C++的结构体也不会适合。
- 如果我们将传输的数据统一处理成字符串,让字符串在tcp中进行传输,同时设置一些固定的字段,代表当前数据包的结束,是不是能解决上述问题呢?
- 这就是数据序列化后进行传输,接收方将数据进行反序列化后提取再做处理。序列化的本质就是对字符串作处理。
一样的计算器,一样的需要传递两个整形数字与一个字符。
- 如果我们将该结构体转为字符串,如 "x oper y\n" ,这样我们就可以通过读取空格来表示取到的整形,字符,整形,如果一直没读到"\n"证明当前数据包还没读完,读到"\n"证明取到了一个完整的数据包,这样就可以完美解决问题了。
- 但是我们这个写法没有普适性,因为如果后面如果结构体内容比较复杂,带了一个"\n",就会导致序列化代码需要重新设计。如果在前面添加一个报头len,代表后面内容的长度,这样无论后面代码是什么,都可以统一看待处理。
- 于是,我们得到如下的序列化公式。
"len\nx op y"
这里len后面的"\n"代表我们读取到了len数据,后面就是真正要传输并处理的数据了。
了解了这些,那么我们进行网络计算器的序列化与反序列化也就不难了,代码如下
Protocol.hpp 自定义协议
#pragma once
#include <iostream>
#include <memory>
using namespace std;
namespace Protocol
{
const string ProtSep = " ";
const string LineBreakSep = "\n";
// message 现在我要给他头部添加len\n 尾部添加\n
// 如"x op y"-> "len\nx op y\n" "result code"-> "len\nresult code\n"
string Encode(string &message)
{
string len = to_string(message.size());
string package = len + LineBreakSep + message + LineBreakSep;
return package;
}
// 从一个完整的package进行解析,"len\nx op y\n"->"x op y"
bool Decode(string &package, string *message)
{
int left = package.find(LineBreakSep);
if (left == string::npos)
return false;
int len = stoi(package.substr(0, left));
int total = left + len + 2 * LineBreakSep.size();
if (package.size() < total)
return false;
// 到这里至少有一个完整的报文 截取出来一个报文写入message,然后删除该报文
*message = package.substr(left + LineBreakSep.size(), len);
package.erase(0, total);
return true;
}
class Request
{
public:
Request() : _data_x(0), _data_y(0), _oper(0)
{
}
Request(int x, int y, char op)
: _data_x(x), _data_y(y), _oper(op)
{
}
void Debug()
{
cout << "_data_x: " << _data_x << endl;
cout << "_data_y: " << _data_y << endl;
cout << "_oper: " << _oper << endl;
}
void Inc()
{
_data_x++;
_data_y++;
}
// 序列化 变成这种字符串"x op y"
void Serialize(string *out)
{
*out = to_string(_data_x) + ProtSep + _oper + ProtSep + to_string(_data_y);
}
// 反序列化 "x op y"字符串分别取出
bool Deserialize(string &in)
{
size_t left = in.find(ProtSep);
if (left == string::npos)
return false;
size_t right = in.rfind(ProtSep);
if (right == string::npos)
return false;
_data_x = stoi(in.substr(0, left));
_data_y = stoi(in.substr(right + ProtSep.size()));
string op = in.substr(left + ProtSep.size(), right - left - ProtSep.size());
if (op.size() != 1)
return false;
_oper = op[0];
return true;
}
int Getx()
{
return _data_x;
}
int Gety()
{
return _data_y;
}
char Getop()
{
return _oper;
}
string To_string()
{
return to_string(_data_x)+" "+_oper+ " "+to_string(_data_y)+" = ";
}
private:
// x oper y 比如 10 + 20
int _data_x;
int _data_y;
char _oper; // 符号
};
class Response
{
public:
Response() : _result(0), _code(0)
{
}
Response(int result, int code)
: _result(result), _code(code)
{
}
void Serialize(string *out)
{
//_result _code
*out = to_string(_result) + ProtSep + to_string(_code);
}
bool Deserialize(string &in)
{
size_t pos = in.find(ProtSep);
if (pos == string::npos)
return false;
_result = stoi(in.substr(0, pos));
_code = stoi(in.substr(pos + ProtSep.size()));
return true;
}
int Getresult()
{
return _result;
}
int Getcode()
{
return _code;
}
void Setresult(int result)
{
_result = result;
}
void Setcode(int code)
{
_code = code;
}
private:
int _result; // 运算结果
int _code; // 运算状态
};
// 工厂模式
class Factory
{
public:
shared_ptr<Request> BuildRequest()
{
return make_shared<Request>();
}
shared_ptr<Request> BuildRequest(int x, int y, char op)
{
return make_shared<Request>(x, y, op);
}
shared_ptr<Response> BuildResponse()
{
return make_shared<Response>();
}
shared_ptr<Response> BuildResponse(int result, int code)
{
return make_shared<Response>(result, code);
}
};
}
Socket.hpp 套接字函数封装
#pragma once
#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <cstring>
#include <unistd.h>
using namespace std;
namespace Net_Work
{
static const int default_backlog = 5;
static const int default_sockfd = -1;
using namespace std;
enum
{
SocketError = 1,
BindError,
ListenError,
ConnectError,
};
// 封装套接字接口基类
class Socket
{
public:
// 封装了socket相关方法
virtual ~Socket() {}
virtual void CreateSocket() = 0;
virtual void BindSocket(uint16_t port) = 0;
virtual void ListenSocket(int backlog) = 0;
virtual bool ConnectSocket(string &serverip, uint16_t serverport) = 0;
virtual Socket *AcceptSocket(string *peerip, uint16_t *peerport) = 0;
virtual int GetSockFd() = 0;
virtual void SetSockFd(int sockfd) = 0;
virtual void CloseSocket() = 0;
virtual bool Recv(string *buff, int size) = 0;
virtual void Send(string &send_string) = 0;
// 方法的集中在一起使用
public:
void BuildListenSocket(uint16_t port, int backlog = default_backlog)
{
CreateSocket();
BindSocket(port);
ListenSocket(backlog);
}
bool BuildConnectSocket(string &serverip, uint16_t serverport)
{
CreateSocket();
return ConnectSocket(serverip, serverport);
}
void BuildNormalSocket(int sockfd)
{
SetSockFd(sockfd);
}
};
class TcpSocket : public Socket
{
public:
TcpSocket(int sockfd = default_sockfd)
: _sockfd(sockfd)
{
}
~TcpSocket() {}
void CreateSocket() override
{
_sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (_sockfd < 0)
exit(SocketError);
}
void BindSocket(uint16_t port) override
{
int opt = 1;
setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = INADDR_ANY;
int n = bind(_sockfd, (struct sockaddr *)&local, sizeof(local));
if (n < 0)
exit(BindError);
}
void ListenSocket(int backlog) override
{
int n = listen(_sockfd, backlog);
if (n < 0)
exit(ListenError);
}
bool ConnectSocket(string &serverip, uint16_t serverport) override
{
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(serverport);
// addr.sin_addr.s_addr = inet_addr(serverip.c_str());
inet_pton(AF_INET, serverip.c_str(), &addr.sin_addr);
int n = connect(_sockfd, (sockaddr *)&addr, sizeof(addr));
if (n == 0)
return true;
return false;
}
Socket *AcceptSocket(string *peerip, uint16_t *peerport) override
{
struct sockaddr_in addr;
socklen_t len = sizeof(addr);
int newsockfd = accept(_sockfd, (sockaddr *)&addr, &len);
if (newsockfd < 0)
return nullptr;
// *peerip = inet_ntoa(addr.sin_addr);
// INET_ADDRSTRLEN 是一个定义在头文件中的宏,表示 IPv4 地址的最大长度
char ip_str[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &addr.sin_addr, ip_str, INET_ADDRSTRLEN);
*peerip = ip_str;
*peerport = ntohs(addr.sin_port);
Socket *s = new TcpSocket(newsockfd);
return s;
}
int GetSockFd() override
{
return _sockfd;
}
void SetSockFd(int sockfd) override
{
_sockfd = sockfd;
}
void CloseSocket() override
{
if (_sockfd > default_sockfd)
close(_sockfd);
}
bool Recv(string *buff, int size) override
{
char inbuffer[size];
ssize_t n = recv(_sockfd, inbuffer, size - 1, 0);
if (n > 0)
{
inbuffer[n] = 0;
*buff += inbuffer;
return true;
}
else
return false;
}
void Send(string &send_string) override
{
send(_sockfd, send_string.c_str(),send_string.size(),0);
}
private:
int _sockfd;
string _ip;
uint16_t _port;
};
}
Calculate.hpp 计算业务封装
#pragma once
#include <iostream>
#include <memory>
#include "Protocol.hpp"
enum ErrCode
{
Sucess = 0,
DivZeroErr,
ModZeroErr,
UnKnowOper,
};
//计算业务
class Calculate
{
public:
Calculate()
{
}
shared_ptr<Protocol::Response> Cal(shared_ptr<Protocol::Request> req)
{
shared_ptr<Protocol::Response> resp = factory.BuildResponse();
resp->Setcode(Sucess);
switch (req->Getop())
{
case '+':
resp->Setresult(req->Getx() + req->Gety());
break;
case '-':
resp->Setresult(req->Getx() - req->Gety());
break;
case '*':
resp->Setresult(req->Getx() * req->Gety());
break;
case '/':
{
if (req->Gety() == 0)
resp->Setcode(DivZeroErr);
else
resp->Setresult(req->Getx() / req->Gety());
}
break;
case '%':
{
if (req->Gety() == 0)
resp->Setcode(ModZeroErr);
else
resp->Setresult(req->Getx() % req->Gety());
}
break;
default:
resp->Setcode(UnKnowOper);
break;
}
return resp;
}
private:
Protocol::Factory factory;
};
TcpServer.hpp tcp服务端封装
#pragma once
#include "Protocol.hpp"
#include "Socket.hpp"
#include <thread>
#include <functional>
using func_t = function<string(string &inbufferstream, bool *error_code)>;
class TcpServer
{
public:
TcpServer(uint16_t port, func_t hander_request)
: _port(port), _listen_socket(new Net_Work::TcpSocket()), _handler_request(hander_request)
{
_listen_socket->BuildListenSocket(port);
}
void ThreadRun(Net_Work::Socket *s)
{
// cout<< " in HandlerRequest !"<<endl;
string inbufferstream;
while (true)
{
bool resultcode = true;
// 1.读取报文
if (!s->Recv(&inbufferstream, 1024))
break;
// 2.报文处理
string send_string = _handler_request(inbufferstream, &resultcode);
if (resultcode)
{
// 发送数据
if (!send_string.empty())
{
s->Send(send_string);
}
}
else
{
break;
}
}
s->CloseSocket();
delete s;
}
void Loop()
{
while (true)
{
string peerip;
uint16_t peerport;
Net_Work::Socket *newsockfd = _listen_socket->AcceptSocket(&peerip, &peerport);
if (newsockfd == nullptr)
continue;
cout << "获取一个新链接, sockfd: " << newsockfd->GetSockFd() << " ,client info: " << peerip << ":" << peerport << endl;
thread t1(&TcpServer::ThreadRun, this, newsockfd);
t1.detach();
}
}
~TcpServer()
{
delete _listen_socket;
}
private:
int _port;
Net_Work::Socket *_listen_socket;
func_t _handler_request;
};
Main.cc 服务端实现
#include "TcpServer.hpp"
#include "Calculate.hpp"
#include <unistd.h>
#include <memory>
string HandlerRequest(string &inbufferstream, bool *error_code)
{
Calculate calculate;
// 1.创建请求对象
unique_ptr<Protocol::Factory> factory(new Protocol::Factory());
shared_ptr<Protocol::Request> req = factory->BuildRequest();
string total_resp_string;
// 2.判断字节流是否读取到了一个完整的报文 把报文读完
string message;
while (Protocol::Decode(inbufferstream, &message))
{
// 3.这里已经读到完整的报文,需要进行反序列化 反序列化失败证明是非法请求
if (!req->Deserialize(message))
{
// 反序列化失败 不能忍受,因为客户端的请求是不合法的
*error_code = false;
return string();
}
// 4.业务处理
shared_ptr<Protocol::Response> resp = calculate.Cal(req);
// 5.序列化
string send_string;
resp->Serialize(&send_string);
// 6.添加报头
send_string = Protocol::Encode(send_string);
total_resp_string+=send_string;
}
return total_resp_string;
}
int main(int argc, char *argv[])
{
if (argc != 2)
{
cout << "Usage : " << argv[0] << " port" << endl;
return 0;
}
uint16_t localport = stoi(argv[1]);
unique_ptr<TcpServer> tsvr(new TcpServer(localport, HandlerRequest));
tsvr->Loop();
return 0;
}
TcpClient.cc 服务端实现
#include <iostream>
#include <string>
#include <unistd.h>
#include <ctime>
#include <cstdlib>
#include "Socket.hpp"
#include "Protocol.hpp"
using namespace std;
int main(int argc, char *argv[])
{
if (argc != 3)
{
cout << "Usage : " << argv[0] << " serverip serverport" << endl;
return 0;
}
string serverip = argv[1];
uint16_t serverport = stoi(argv[2]);
// 创建套接字对象
Net_Work::Socket *s = new Net_Work::TcpSocket();
if (!s->BuildConnectSocket(serverip, serverport))
{
cerr << "connect [" << serverip << ":" << serverport << "] failed" << endl;
return Net_Work::ConnectError;
}
cout << "connect [" << serverip << ":" << serverport << "] success" << endl;
// 创建工厂
unique_ptr<Protocol::Factory> factory(new Protocol::Factory());
srand(time(nullptr) ^ getpid());
const string opers = "+-*/%^&=";
while (true)
{
// 1.构建一个请求
int x = rand() % 100;
usleep(rand() % 5555);
int y = rand() % 100;
int oper = opers[rand() % opers.size()];
shared_ptr<Protocol::Request> req = factory->BuildRequest(x, y, oper);
// 2.序列化该请求
string requeststr;
req->Serialize(&requeststr);
// 3.添加报头信息
requeststr = Protocol::Encode(requeststr);
// 4.发送请求
s->Send(requeststr);
string message;
while (true)
{
// 5.读取响应
string responsestr;
s->Recv(&responsestr, 1024);
// 6.解析报文 收到的"len\n result code" 现在开始解析
if(!Protocol::Decode(responsestr,&message))
continue;
// 7.拆分数据 目前是"result code" 现在开始写入结构体
shared_ptr<Protocol::Response> resp = factory->BuildResponse();
resp->Deserialize(message);
// 8.resp已经得到的数据
cout<<req->To_string()<<resp->Getresult()<<"\tcode: "<<resp->Getcode()<<endl;
break;
}
sleep(1);
}
s->CloseSocket();
return 0;
}
Makefile
.PHONY:all
all:tcpserver tcpclient
tcpserver:Main.cc
g++ -o $@ $^ -std=c++11 -lpthread
tcpclient:TcpClient.cc
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -f tcpclient tcpserver
代码链接
运行结果吐下