文章目录
- 为什么要序列化?
- 协议的实现
- 服务端与客户端代码实现
为什么要序列化?
由于默认对齐数的不同,不同的平台对相同数据进行内存对齐后,可能得到不同的数据。如果直接将这些数据进行网络传输,对方很可能无法正确的获取这些数据,通信也就失去了意义,为保证通信数据的准确性,我们需要设计序列化方式,将数据序列化成字节流,再进行传输,接收方通过对应的反序列化方式对字节流进行解析,从而得到原始且正确的数据。总而言之,序列化与反序列化是为了使通信正确的进行而对数据进行的处理操作
序列化与反序列化只是一种指导思想,关于它的具体实现有很多种,如XML,JSON等等。为了更好理解序列化与反序列化的过程,我在这里自己定制一个协议(这个协议只是模拟实现),该协议是针对简单计算进行设计的
协议的实现
在进行网络传输之前,我们要将数据转换成字符串的形式,以字节流的方式发送信息,这里就涉及到有效载荷和报头的相关知识,报头在有效载荷的前面,也就是字符串的最开始,报头和有效载荷之间用分隔符分割,其表明了有效载荷的长度,比如报头表示有效载荷的长度为n字节,那么分隔符向后n个字节都表示有效载荷。但是报头的长度谁来表示?读取字节流时,一直读取直到遇到分隔符,分隔符之前的数据就是报头。但这里有一个问题?为什么需要使用报头表示有效载荷的长度?有效载荷之间用分隔符分隔不就行了吗?注意一个问题:假如分隔符恰好是有效载荷的一部分数据,直接用分隔符分隔有效载荷的做法不够严谨,需要使用报头来表示有效载荷的长度,并且表示长度的报头一般不会包含分隔符(或者说其中的数据不会和分隔符冲突)
客户端需要和服务端进行通信,通信肯定要发送数据吧,但是客户端上的数据大多是结构化的数据,由于内存对其的问题,不能直接向服务端发送数据,因此,我们需要将结构化的数据转换成特定格式的字符串格式,这个过程分为两步:一是将结构化数据转换成字符串形式的有效载荷,二是为有效载荷添加报头。如此就得到了报头+有效载荷,此时才能将其发送给服务端
我们将:为有效载荷添加/删除报头的操作称为encode/decode,生成有效载荷与解析有效载荷的操作称为serialize/deserialize,也可以叫做序列化和反序列化。综上,客户端发送数据前需要serialize+encode,服务端接收数据后需要decode+deserialize。关于协议的定制,就是这4个接口的具体实现
先说明encode,我们将需要进行encode的字符串str作为参数,然后创建一个新的字符串ret,在ret后追加str的长度(当然了,需要将整数形式的长度转换成字符串形式),接着追加分隔符,最后追加str和分隔符,返回ret
#define CRLF "\n\r"
// 根据参数str,返回编码得到的ret
string encode(const string& str)
{
string ret = to_string(str.size());
ret += CRLF;
ret += str;
ret += CRLF;
return ret;
}
接着是decode,客户端可能发送一次完成的请求,也可能发送一次不完整的请求,还可能发送多次请求,但我们只需要区分客户发送不完整请求的情况。我们将需要decode的字符串str作为函数参数,再将一个uint32_t类型的数据len以引用的方式作为第二个参数,如果有效载荷不完整,len的值为0,这样请求是否完整就有了判断依据。
接着调用str的find方法,查找分隔符,find将返回分隔符在str中第一次出现的下标位置,如果分隔符没有在str中出现,find返回的数值等于string::npos。所以这里进行判断,如果find返回npos,decode返回空串,用户发送的请求不完整,需要再次进行发送。找到分隔符在str中第一次出现的下标后,我们就可以取出报头,接着解析报头,拿到有效载荷的长度,判断str剩下空间是否足以存储有效载荷,如果剩余空间不足以存储这个有效载荷,那么decode返回空串,用户发送的请求不完整。如果剩余空间足以存储有效载荷,那么我们要将有效载荷返回,修改len为有效载荷的程度,并且删除str中第一个:报头+有效载荷数据。
// 根据参数str,对其解码,并检测str是否有完整的有效载荷,函数返回解码得到的字符串
string decode(string& str, uint32_t& payload_len)
{
payload_len = 0;
// 查找报头
size_t head_pos = str.find(CRLF);
if (head_pos == string::npos)
return "";
// 获取有效载荷的长度
string head = str.substr(0, head_pos);
int tmp_len = atoi(head.c_str());
// 判断有效载荷是否完整
if (str.size() - head_pos - 2 * strlen(CRLF) < tmp_len) // 有效载荷不完整
return "";
// 有效载荷完整,删除报头数据与有效载荷
// 这是才修改payload_len参数
payload_len = tmp_len;
string package = str.substr(head_pos + strlen(CRLF), payload_len); // 删除之前保存有效载荷,因为要返回
str.erase(0, payload_len + head_pos + 2 * strlen(CRLF));
// 返回有效载荷的字符串
return package;
}
关于substr的边界确定,这里就不展开赘述,自己把握就行了。说明一下,服务端每次接收客户端的请求,都是将这些请求追加到一个string后面,所以当我们获取了一次完整的报头+有效载荷,我们就要将其删除,没有获取到完整的报头+有效载荷是,就将下次用户的请求追加到string后面,再次判断是否有完整的请求
encode和decode就讲解完了,接下来是serialize和deserialize。由于客户端发送的数据和服务端发送的数据不同,所以序列化和反序列化要设计两套接口。我们将客户端要发送的数据封装为Request,服务端要响应的数据封装为Response,这两个类有不同的serialize和deserialize方法。由于我是针对简单计算定制的序列化协议,所以客户端的Request就只有两个操作数,一个操作符,就比如1+1,然后服务端对于请求的响应Response,需要保存简单计算的结果与退出码(如果计算遇到错误就会设置退出码)
class Request
{
public:
int _x;
int _y;
char _op;
};
class Response
{
public:
// 退出码,0表示正常退出
int _exit_code = 0;
// 计算结果
int _result;
};
关于Request的序列化接口serialize,我们用引用的方式接收用户传入的字符串str,这里默认str是空串,然后将_x,_op,_y对象依次转换成字符串(调用to_string接口),追加到str后,并在两个对象之间插入空格(追加一次对象再追加一次空格),这样就完成了Request的序列化。对于Request的反序列化,deserialize的参数str接收需要反序列化的字符串,使用str的find接口查找空格(因为计算表达式中,不会出现空格,所以我们将空格作为每个对象之间的分隔符),利用空格找到每个对象的位置,调用substr与atoi接口将它们从字符串转换成整形(操作符也是一个整数,这个可以看ASCII码表),为Request对象赋上相应的值。
// 用来发送的计算请求
class Request
{
public:
void serialize(string& str)
{
str += to_string(_x);
str += SPACE;
str+= to_string(_op);
str += SPACE;
str += to_string(_y);
}
bool deserialize(string& str)
{
// 获取字符串中的两个空格,以及判断其正确性
size_t spaceone_pos = str.find(SPACE);
if (spaceone_pos == string::npos)
return false;
size_t spacetwo_pos = str.rfind(SPACE);
if (spacetwo_pos == string::npos)
return false;
// 根据空格获取操作数与操作符
string op = str.substr(spaceone_pos + strlen(SPACE), spacetwo_pos - spaceone_pos - strlen(SPACE));
string x = str.substr(0, spaceone_pos);
string y = str.substr(spacetwo_pos + strlen(SPACE));
// 将得到的字符转换成类成员
_x = atoi(x.c_str());
_y = atoi(y.c_str());
_op = atoi(op.c_str());
return true;
}
int _x;
int _y;
char _op;
};
Response的序列化serialize也是如此,函数以引用的方式接收一个空串,在空串后追加exitcode和result,每个对象用空格分隔。反序列化deserialize也是接收需要反序列化的字符串str,用find查找空格的位置,根据空格获取其他操作数的位置,也是用substr和atoi接口
class Response
{
public:
void serialisze(string& str)
{
str += to_string(_exit_code);
str += SPACE;
str += to_string(_result);
}
bool deserialize(string& str)
{
size_t space_pos = str.find(SPACE);
if (space_pos == string::npos)
return false;
_exit_code = atoi(str.substr(0, space_pos).c_str());
_result = atoi(str.substr(space_pos + strlen(SPACE)).c_str());
return true;
}
// 退出码,0表示正常退出
int _exit_code = 0;
// 计算结果
int _result = 0;
};
然后就是将用户输入的字符串转换成Request对象的函数,由于用户可能输入"1+1",“1 +1”,“1+ 1”,所以这里需要对这些空格进行特殊处理。首先使用strtok将两个操作数分开,对于左操作数,假设它现在是"1 “,我们从最后开始遍历,遍历到数字停下,在数字后插入’\0’就消除了这些多于空格。对于右操作数,它可能是” 1",我们就从头开始遍历,将指向右操作数的指针不断++,直到该指针指向的数据不再是空格。最后就是调用atoi得到整数形式的操作数
// 将message转换成Request
bool makeRequest(Request& req, char* message)
{
char copy[1024] = {0};
strcpy(copy, message);
// 分割左右操作数
char* left = strtok(copy, "+-*/%");
char* right = strtok(nullptr, "+-*/%");
if (!left || !right)
return false;
int left_len = strlen(left);
int right_len = strlen(right);
req._op = message[left_len];
// 消除多余空格
int i = 0;
for (i = left_len - 1; i >= 0; --i)
{
if (left[i] != ' ')
break;
}
left[i + 1] = '\0';
for (i = 0; i < right_len; ++i)
{
if (right[i] != ' ')
break;
else
right++;
}
// 将字符串形式的操作数转换成整数
req._x = atoi(left);
req._y = atoi(right);
return true;
}
对于序列化和反序列化除了自己定义方法,也可以使用Json协议,使用别人定制好的成熟的方法,关于Json第三方库的安装,可以使用命令
sudo yum install -y jsoncpp-deve
至于要使用自己的方法还是Json协议,这里可以使用条件编译的方式,将两者方法都写进代码中
// 用来发送的计算请求
class Request
{
public:
// 110 + 120
void serialize(string& str)
{
#ifdef MYSELF
str += to_string(_x);
str += SPACE;
str += to_string(_op);
str += SPACE;
str += to_string(_y);
#else
// 万能Json对象
Json::Value root;
root["x"] = _x;
root["y"] = _y;
root["op"] = _op;
// 定义写对象
Json::FastWriter fw;
// 将value序列化,保存结果到str中
str = fw.write(root);
#endif
}
bool deserialize(string& str)
{
#ifdef MYSELF
// 获取字符串中的两个空格,以及判断其正确性
size_t spaceone_pos = str.find(SPACE);
if (spaceone_pos == string::npos)
return false;
size_t spacetwo_pos = str.rfind(SPACE);
if (spacetwo_pos == string::npos)
return false;
// 根据空格获取操作数与操作符
string op = str.substr(spaceone_pos + strlen(SPACE), spacetwo_pos - spaceone_pos - strlen(SPACE));
string x = str.substr(0, spaceone_pos);
string y = str.substr(spacetwo_pos + strlen(SPACE));
// 将得到的字符转换成类成员
_x = atoi(x.c_str());
_y = atoi(y.c_str());
_op = atoi(op.c_str());
return true;
#else
Json::Value root;
Json::Reader rd;
rd.parse(str, root);
_x = root["x"].asInt();
_y = root["y"].asInt();
_op = root["op"].asInt();
return true;
#endif
}
int _x;
int _y;
char _op;
};
// 对请求做出的响应
class Response
{
public:
void serialisze(string& str)
{
#ifdef MYSELF
str += to_string(_exit_code);
str += SPACE;
str += to_string(_result);
#else
Json::Value root;
root["exitcode"] = _exit_code;
root["result"] = _result;
Json::FastWriter fw;
str = fw.write(root);
#endif
}
bool deserialize(string& str)
{
#ifdef MYSELF
size_t space_pos = str.find(SPACE);
if (space_pos == string::npos)
return false;
_exit_code = atoi(str.substr(0, space_pos).c_str());
_result = atoi(str.substr(space_pos + strlen(SPACE)).c_str());
return true;
#else
Json::Value root;
Json::Reader rd;
rd.parse(str, root);
_exit_code = root["exitcode"].asInt();
_result = root["result"].asInt();
return true;
#endif
}
// 退出码,0表示正常退出
int _exit_code = 0;
// 计算结果
int _result = 0;
};
关于Json接口的使用:Json序列化,形成的字符串是kv格式的,比如说1+1序列化后就是
{“op”:43,“x”:1,“y”:1}
数据前面的字符串就是为反序列化建立的索引,先说序列化:Value是Json中的一个万能对象,用它可以序列化不同格式的数据,创建Value对象root,root[“x”, _x]就是在root中插入了一个键值对,把所有的数插入到root后,创建FastWrite对象,以root为参数调用其write方法,用string类型对象str接收write的返回值,str中保存的就是{“op”:43,“x”:1,“y”:1}这样的字符串,是将数据序列化后的结果。
至于Json的反序列化:我们创建Reader类型对象rd与Value类型对象root,调用rd的parse方法,将root和序列化后的字符串str作为parse的参数,parse方法会将反序列化后的结果写入root,此时我们就能根据当时创建root对象时,为数据建立的索引来还原数据,比如root[“x”].asInt(),将root中以"x"为key的value值以int的格式返回,这样我们就能将数据还原到我们的结构体中
关于条件编译,我们可以在makefile文件中,以命令行的方式,在编译源文件时创建宏,具体是-D 宏的名字
服务端与客户端代码实现
对于具体的服务端与客户端通信细节,可以看我的这篇文章,这里不再赘述,只说明大概的思路。
客户端与服务端建立链接后,客户端从键盘读取用户的输入,由于输入的是字符串,并且格式可能不规范,所以客户端需要对用户输入的字符串进行处理,将其转换成Request对象req,接着调用req的serialize方法,将计算表达式序列化,然后是encode,为有效载荷添加报头,最后将这样的数据发送给服务端。注意,不是发送完就结束的了,用户需要得到服务端的响应,得到一个计算结果,所以客户端需要调用read方法,读取服务端的响应,所以这里需要将得到的响应decode,得到有效载荷,接着deserialize,反序列化,将数据填充到Response对象res中,至此,一次客户端与服务端的通信完成。
对于服务端,与客户端连接后,需要接收来自客户端的请求,得到请求后,decode+deserialize,得到一个Request对象req,服务端是要提供服务的,所以这里需要服务端调用一个计算函数,将req的计算表达式计算出一个具体值,并且将结果和退出码保存到Response对象res中。然后对res序列化+encode,将这样的字符串返回给客户端,一次通信中服务端的工作才算完成
#include "util.hpp"
#include "protocol.hpp"
void usage(const char *filename)
{
std::cout << "usage:\n\t"
<< filename << "IP port" << std::endl;
}
int main(int argc, char* argv[])
{
if (argc != 3)
{
usage(argv[0]);
exit(USAG_ERRO);
}
std::string server_ip = argv[1];
uint16_t server_port = atoi(argv[2]);
// 服务器套接字的填充
struct sockaddr_in server;
bzero(&server, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(server_port);
server.sin_addr.s_addr = inet_addr(server_ip.c_str());
// 套接字的创建
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
{
std::cerr << "socket: fail" << std::endl;
exit(SOCK_FAIL);
}
// 与服务器的连接
if (connect(sockfd, (const struct sockaddr*)&server, sizeof(server)) < 0)
{
std::cerr << "connect: fail" << std::endl;
exit(CONN_FAIL);
}
std::cout << "connect done" << std::endl;
while (true)
{
std::cout << "请输入计算表达式#";
char message[1024] = {0};
std::cin.getline(message, sizeof(message));
if (strcasecmp("quit", message) == 0)
{
// 注意不要退出,让客户端向服务器发送quit,服务器接收quit将关闭服务
ssize_t w_ret = write(sockfd, message, sizeof(message));
break;
}
// 创建请求对象
Request req;
// 用用户的输入填充req对象
makeRequest(req, message);
// 序列化,得到可以发送的字符流
string package = "";
req.serialize(package);
// 对字节流编码
package = encode(package);
ssize_t w_ret = write(sockfd, package.c_str(), package.size());
// 发送package失败
if (w_ret <= 0)
{
std::cerr << "write: fail" << std::endl;
break;
}
// 发送package成功,接收服务端的响应
char re_tmp[1024] = {0};
ssize_t r_ret = read(sockfd, re_tmp, sizeof(re_tmp));
// 接收失败
if (r_ret <= 0)
{
std::cerr << "read: fail" << std::endl;
break;
}
// 接收成功
re_tmp[r_ret] = '\0';
string re_package = re_tmp;
// 解码package
uint32_t payload_len = 0;
re_package = decode(re_package, payload_len);
// 创建Response对象
Response res;
// 将接收的有效载荷反序列化到res中
res.deserialize(re_package);
// for test
cout << "result:" << res._result << ", exit_code:" << res._exit_code << endl;
}
return 0;
}
#include "util.hpp"
#include "task.hpp"
#include "threadpool.hpp"
#include <signal.h>
#include <sys/wait.h>
#include "protocol.hpp"
Response calculate(const Request& req)
{
Response res;
switch(req._op)
{
case '+':
res._result = req._x + req._y;
break;
case '-':
res._result = req._x - req._y;
break;
case '*':
res._result = req._x * req._y;
break;
case '/':
if (req._y == 0)
res._exit_code = -1; // -1表示除0错误
else
res._result = req._x / req._y;
break;
case '%':
if (req._y == 0)
res._exit_code = -2; // -2表示模0错误
else
res._result = req._x % req._y;
break;
default:
res._exit_code = -3; // -3表示操作符错误
break;
}
return res;
}
void netCal(int sockfd)
{
while (true)
{
// 获取客户端的请求
char tmp_buf[1024] = {0};
// total_packag用来保存客户端的输入,就算输入信息不完整
string total_package = "";
// 读取客户端输入的信息
ssize_t r_ret = read(sockfd, tmp_buf, sizeof(tmp_buf));
// 读取失败
if (r_ret < 0)
{
cout << "read fail" << endl;
break;
}
// 客户端退出
else if (r_ret == 0)
{
cout << "client quit" << endl;
break;
}
// 读取成功
else
{
tmp_buf[r_ret] = '\0';
total_package += tmp_buf;
Response res;
Request req;
uint32_t payload_len = 0;
// 解码客户发送的信息
string cli_package = decode(total_package, payload_len);
// 没有获取完整的有效载荷
if (payload_len == 0) continue;
// 反序列化有效载荷,得到数据
req.deserialize(cli_package);
// for test
cout << "x:" << req._x << ' ' << "op:" << req._op << ' ' << "y:" << req._y << endl;
// 得到响应请求的对象
res = calculate(req);
// 序列化响应对象
string package = "";
res.serialisze(package);
// 对响应进行编码
package = encode(package);
// 向客户端发送响应
write(sockfd, package.c_str(), package.size());
// for test
cout << "完成一次计算" << endl;
}
}
}
class tcpServer
{
public:
tcpServer(uint16_t port, std::string ip = "") : _ip(ip), _port(port) {}
~tcpServer() {}
void init()
{
// 创建套接字文件
_listen_sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (_listen_sockfd < 0)
{
std::cerr << "socket: fail" << std::endl;
exit(SOCK_FAIL);
}
// 填充套接字信息
struct sockaddr_in local;
local.sin_family = AF_INET;
local.sin_port = htons(_port);
_ip.empty() ? local.sin_addr.s_addr = INADDR_ANY : inet_aton(_ip.c_str(), &local.sin_addr);
// 将信息绑定到套接字文件中
if (bind(_listen_sockfd, (const struct sockaddr *)&local, sizeof(local)) < 0)
{
std::cerr << "bind: fail" << std::endl;
exit(BIND_FAIL);
}
// 至此,套接字创建完成,所有的步骤与udp通信一样
// 使套接字进入监听状态
if (listen(_listen_sockfd, 5) < 0)
{
std::cerr << "listen: fail" << std::endl;
exit(LSTE_FAIL);
}
// 套接字初始化完成
std::cout << "listen done" << std::endl;
// 获取线程池的单例
_tp = threadpool<Task>::get_instance();
std::cout << "threadpool ready" << endl;
}
void loop()
{
// 先启动线程池
_tp->start();
// signal(SIGCHLD, SIG_IGN); // 设置SIGCHLD信号为忽略,这样子进程就会自动释放资源
// 创建保存套接字信息的结构体
struct sockaddr_in peer;
socklen_t peer_len = sizeof(peer);
// 接受监听队列中的套接字请求
while (1)
{
int server_sockfd = accept(_listen_sockfd, (struct sockaddr *)&peer, &peer_len);
if (server_sockfd < 0)
{
std::cerr << "accept: fail" << std::endl;
continue;
}
std::cout << "accept done" << std::endl;
// 提取请求方的套接字信息
uint16_t peer_port = ntohs(peer.sin_port);
std::string peer_ip = inet_ntoa(peer.sin_addr);
// 打印请求方的套接字信息
std::cout << "accept: " << peer_ip << " [" << peer_port << "]" << std::endl;
// 使用线程池技术提供服务
Task t(netCal, server_sockfd);
_tp->push(t);
}
}
private:
std::string _ip;
uint16_t _port;
int _listen_sockfd;
threadpool<Task>* _tp; // 线程池的引入
};
int main()
{
tcpServer server(8081);
server.init();
server.loop();
return 0;
}
测试结果
若读取需要进行测试,关于服务端与客户端通信的其他文件,我放在了我的gitee中,需要自取