深入理解序列化/反序列化与TCP通信协议
一、序列化与反序列化
1.1 基本概念
- 序列化(Serialization): 将数据结构或对象状态转换为可存储/传输格式的过程
- 反序列化(Deserialization): 将序列化后的数据恢复为原始数据结构的过程
示例:
// 原始数据结构
struct Person {
string name;
int age;
double salary;
};
// 序列化 -> {"name":"John","age":30,"salary":5000.0}
// 反序列化 -> 重建Person对象
1.2 存在意义
- 数据持久化:将内存对象保存到文件/数据库
- 网络传输:跨进程/网络传输结构化数据
- 跨语言交互:通过标准格式实现不同语言系统的数据交换
- 数据校验:通过反序列化验证数据结构完整性
1.3 实现方式对比
格式 | 可读性 | 体积 | 解析速度 | 典型应用场景 |
---|---|---|---|---|
JSON | 好 | 中等 | 较快 | Web API、配置文件 |
XML | 好 | 大 | 慢 | 企业级系统交互 |
Protobuf | 无 | 小 | 极快 | 高性能RPC通信 |
MessagePack | 无 | 小 | 快 | 移动端数据传输 |
二、TCP通信关键特性
2.1 全双工通信
实现原理:
// 内核数据结构示意
struct sock {
struct sk_buff_head receive_queue; // 接收缓冲区
struct sk_buff_head write_queue; // 发送缓冲区
// ...
};
- 每个socket维护两个独立缓冲区
- 发送/接收操作互不阻塞
2.2 面向字节流的特点
关键问题:
客户端发送:"HelloWorld"
服务端可能分两次接收:"Hello" 和 "World"
解决方案:
自定义协议格式:
[4字节长度头][有效载荷]
示例:
0x0000000A{"name":"John"}
2.3 应用层协议设计
推荐格式:
#pragma pack(push, 1)
struct PacketHeader {
uint32_t length; // 有效载荷长度
uint16_t version; // 协议版本
uint32_t checksum; // 数据校验码
};
#pragma pack(pop)
设计要点:
- 固定长度报文头
- 包含版本控制字段
- 添加数据校验机制
- 使用网络字节序(大端)
三、JSON序列化实战(jsoncpp)
3.1 基础示例
#include <json/json.h>
// 序列化
Json::Value root;
root["name"] = "John";
root["age"] = 30;
root["salary"] = 5000.0;
Json::StreamWriterBuilder builder;
string jsonStr = Json::writeString(builder, root);
// 反序列化
Json::CharReaderBuilder readerBuilder;
Json::Value parsedRoot;
string errs;
istringstream iss(jsonStr);
Json::parseFromStream(readerBuilder, iss, &parsedRoot, &errs);
3.2 调试技巧
启用格式化输出:
builder["indentation"] = "\t"; // 设置缩进
cout << Json::writeString(builder, root);
输出结果:
{
"name": "John",
"age": 30,
"salary": 5000.0
}
四、关键注意事项
-
字节序问题:
- 网络传输应统一使用大端字节序
- 使用
htonl()
/ntohl()
进行转换
-
版本兼容:
- 协议字段需要向后兼容
- 建议添加版本号字段
-
安全考虑:
- 限制最大报文长度
- 校验数据合法性
- 防止缓冲区溢出攻击
-
性能优化:
// 预分配缓冲区 jsonStr.reserve(1024); // 复用解析器实例 static thread_local Json::CharReaderBuilder readerBuilder;
-
错误处理:
if (!Json::parseFromStream(readerBuilder, iss, &parsedRoot, &errs)) { cerr << "JSON解析失败: " << errs << endl; // 实现重试或降级逻辑 }
五、扩展知识
5.1 二进制协议优化
对于高频通信场景,可考虑:
// 使用内存对齐结构
#pragma pack(push, 1)
struct BinaryProtocol {
uint32_t magic; // 魔数标识 0x5A5AA5A5
uint16_t cmdType; // 命令类型
uint32_t bodyLen; // 数据体长度
byte checksum; // 校验和
// 变长数据体...
};
#pragma pack(pop)
5.2 现代序列化方案
- FlatBuffers:零拷贝反序列化
- Cap’n Proto:直接内存映射
- Avro:Schema动态验证
通过合理选择序列化方案和设计通信协议,可以构建出高效可靠的分布式系统。理解底层原理有助于在性能与开发效率之间做出最佳权衡。
板书
NetCal
TcpServer.cc
#include "TcpServer.hpp"
// #include "CommandExec.hpp"
#include <functional>
#include <memory>
#include "Protocol.hpp"
#include "Calculator.hpp"
#include "Daemon.hpp"
// using task_t = function<std::string (std::string)>;
using cal_fun = std::function<Response(const Request &req)>;
// package不一定有完成报文,
// if 不完整-》继续读
// else 完整-》提取 ——》 反序列化-》构建Request对象-》调用计算模块
// using namespace
class Parse
{
public:
Parse(cal_fun c):_cal(c)
{}
std::string Entry(std::string &package)
{
// 判断报文完整性
std::string message;
std::string respstr;
while(Decode(package, &message))
{
LOG(LogLevel::DEBUG)<<"Content:\n"<<message;
if(message.empty()) break;
// 2. 反序列化, message是一个曾经被序列化的request
Request req;
if (!req.Deserialize(message))
break;
std::cout << "#############" << std::endl;
req.Print();
std::cout << "#############" << std::endl;
// 3. 计算
Response resp = _cal(req);
// 4. 序列化
std::string res;
resp.Serialize(res);
LOG(LogLevel::DEBUG) << "序列化: \n" << res;
// 5. 添加长度报头字段!
Encode(res);
LOG(LogLevel::DEBUG) << "Encode: \n" << res;
// 6. 拼接应答
respstr += res;
}
LOG(LogLevel::DEBUG) << "respstr: \n" << respstr;
return respstr;
}
private:
cal_fun _cal;
};
int main(int argc, char *argv[])
{
if(argc != 2)
{
std::cout<<"Usage: ./server port"<<std::endl;
Die(USAGE_ERR);
}
uint16_t port = std::stoi(argv[1]);
ENABLE_CONSOLE_LOG();
// Command cmd;
// task_t task = [&cmd](std::string cmdstr){
// return cmd.Execute(cmdstr);
// };
// Daemon(false, false);
// 计算模块
Calculator mycal;
// 解析对象
// printf("服务器启动\n");
Parse myparse([&mycal](const Request &req){
return mycal.Execute(req);
});
// 通信模块
// 只负责IO
std::unique_ptr<TcpServer> tsvr = std::make_unique<TcpServer>([&myparse](std::string &package){
return myparse.Entry(package);
}, port);
tsvr->InitServer();
tsvr->Start();
return 0;
}
TcpServer.hpp
#pragma once
#include <iostream>
#include <cstring>
#include <string>
#include <cerrno>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <sys/wait.h>
#include <signal.h>
#include <pthread.h>
#include <functional>
#include "Log.hpp"
#include "Common.hpp"
#include "InetAddr.hpp"
#include "ThreadPool.hpp"
#define BACKLOG 8
// using namespace LogModule;
using namespace ThreadPoolModule;
static const uint16_t gport = 8080;
using handler_t = std::function<std::string(std::string&)>;
class TcpServer
{
using task_t = std::function<void()>;
struct ThreadData
{
int sockfd;
TcpServer *self;
};
public:
TcpServer(handler_t handler, int port = gport):
_handler(handler),
_port(port),
_isrunning(false)
{
}
void InitServer()
{
// 先监听
_listensockfd = ::socket(AF_INET, SOCK_STREAM, 0);
if(_listensockfd < 0)
{
LOG(LogLevel::FATAL)<<"socket error";
Die(SOCKET_ERR);
}
LOG(LogLevel::INFO)<<"socket create success, _listensocked is "<<_listensockfd;
// 后bind
struct sockaddr_in local;// 网络通信用sockaddr_in, 本地用sockaddr_un
memset(&local, 0, sizeof local);
local.sin_family = AF_INET;
local.sin_port = htons(_port);
local.sin_addr.s_addr = INADDR_ANY;
int n = ::bind(_listensockfd, CONV(&local), sizeof(local));
if(n < 0)
{
LOG(LogLevel::FATAL)<<"bind error";
Die(BIND_ERR);
}
LOG(LogLevel::INFO) << "bind success, sockfd is: "<<_listensockfd;
// 设置为监听状态
n = listen(_listensockfd, BACKLOG);
if(n < 0)
{
LOG(LogLevel::FATAL)<<"listen error";
Die(LISTEN_ERR);
}
LOG(LogLevel::INFO)<<"listen success, socked is:"<<_listensockfd;
// 此处可使用::signal(SIGCHLD, SIG_IGN)来将父子进程解绑
}
void HandlerRequest(int sockfd)
{
LOG(LogLevel::INFO)<<"HandlerRequest, sockfd is:"<<sockfd;
char inbuffer[4096];
while(true)
{
ssize_t n = recv(sockfd, inbuffer, sizeof inbuffer, 0);
inbuffer[n] = 0;
LOG(LogLevel::INFO)<<"server recived:"<<inbuffer;
if(n > 0)
{
inbuffer[n] = 0;
std::string str(inbuffer);
std::string cmd_result = _handler(str);// 回调
::send(sockfd, cmd_result.c_str(), cmd_result.size(), 0);
LOG(LogLevel::INFO)<<"server sent:"<<cmd_result;
}
else if(n == 0)
{
LOG(LogLevel::INFO)<<"client quit"<<sockfd;
break;
}
else
{
break;
}
}
::close(sockfd);// 防止fd泄露
}
static void *ThreadEntry(void *args)// 设为静态函数就不用传递this
{// 当使用多线程(不是封装好的线程池), pthread_thread_create的函数只能接收一个参数
pthread_detach(pthread_self());
ThreadData *data = (ThreadData *)args;
data->self->HandlerRequest(data->sockfd);
return nullptr;
}
void Start()
{
_isrunning = true;
while(_isrunning)
{
struct sockaddr_in peer;
socklen_t peerlen = sizeof(peer);
LOG(LogLevel::DEBUG)<<"accepting...";
int sockfd = ::accept(_listensockfd, CONV(&peer), &peerlen);
if(sockfd < 0)
{
LOG(LogLevel::WARNING)<<"accept error:"<<strerror(errno);
continue;
}
// 连接成功
LOG(LogLevel::INFO)<<"accept success, sockfd is:"<<sockfd;
InetAddr addr(peer);
LOG(LogLevel::INFO)<<"client info:"<<addr.Addr();
// 使用线程池实现
ThreadPool<task_t>::getInstance()->Equeue([this, sockfd](){
this->HandlerRequest(sockfd);
});
}
}
~TcpServer()
{
}
private:
int _listensockfd;// 监听socket
uint16_t _port;
bool _isrunning;
// 处理上层任务入口
handler_t _handler;
};
TcpClient.cc
#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include "Protocol.hpp" // 形成约定
// ./client_tcp server_ip server_port
int main(int argc, char *argv[])
{
if (argc != 3)
{
std::cout << "Usage:./client_tcp server_ip server_port" << std::endl;
return 1;
}
std::string server_ip = argv[1]; // "192.168.1.1"
int server_port = std::stoi(argv[2]);
int sockfd = ::socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
{
std::cout << "create socket failed" << std::endl;
return 2;
}
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(server_port);
server_addr.sin_addr.s_addr = inet_addr(server_ip.c_str());
// client 不需要显示的进行bind, tcp是面向连接的, connect 底层会自动进行bind
int n = ::connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr));
if (n < 0)
{
std::cout << "connect failed" << std::endl;
return 3;
}
// echo client
std::string message;
while (true)
{
int x, y;
char oper;
std::cout << "input x: ";
std::cin >> x;
std::cout << "input y: ";
std::cin >> y;
std::cout << "input oper: ";
std::cin >> oper;
Request req(x, y, oper);
// 1. 序列化
req.Serialize(message);
// 2. Encode
Encode(message);
// 3. 发送
n = ::send(sockfd, message.c_str(), message.size(), 0);
if (n > 0)
{
char inbuffer[1024];
// 4. 获得应答
int m = ::recv(sockfd, inbuffer, sizeof(inbuffer), 0);
if (m > 0)
{
inbuffer[m] = 0;
std::string package = inbuffer;//TODO
std::string content;
// 4. 读到应答完整--暂定, decode
Decode(package, &content);
// 5. 反序列化
Response resp;
resp.Deserialize(content);
// 6. 得到结构化数据
std::cout << resp.Result() << "[" << resp.Code() << "]" << std::endl;
}
else
break;
}
else
break;
}
::close(sockfd);
return 0;
}
Calculator.hpp
#pragma once
#include <iostream>
#include "Protocol.hpp"
class Calculator
{
public:
Calculator()
{
}
Response Execute(const Request &req)
{
// 我们拿到的都是结构化的数据,拿到的不就是类对象吗!!!
Response resp;
switch (req.Oper())
{
case '+':
resp.SetResult(req.X() + req.Y());
break;
case '-':
resp.SetResult(req.X() - req.Y());
break;
case '*':
resp.SetResult(req.X() * req.Y());
break;
case '/':
{
if (req.Y() == 0)
{
resp.SetCode(1); // 1 就是除0
}
else
{
resp.SetResult(req.X() / req.Y());
}
}
break;
case '%':
{
if (req.Y() == 0)
{
resp.SetCode(2); // 2 就是mod 0
}
else
{
resp.SetResult(req.X() % req.Y());
}
}
break;
default:
resp.SetCode(3); // 3 用户发来的计算类型,无法识别
break;
}
return resp;
}
~Calculator()
{
}
};
Daemon.hpp
#pragma once
#include <iostream>
#include <cstdlib>
#include <signal.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
#define ROOT "/"
#define devnull "/dev/null"
void Daemon(bool ischdir, bool isclose)
{
// 1. 守护进程一般要屏蔽到特定的异常信号
signal(SIGCHLD, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
// 2. 成为非组长
if (fork() > 0)
exit(0);
// 3. 建立新会话
setsid();
// 4. 每一个进程都有自己的CWD,是否将当前进程的CWD更改成为 / 根目录
if (ischdir)
chdir(ROOT);
// 5. 已经变成守护进程啦,不需要和用户的输入输出,错误进行关联了
if (isclose)
{
::close(0);
::close(1);
::close(2);
}
else
{
int fd = ::open(devnull, O_WRONLY);
if (fd > 0)
{
// 各种重定向
dup2(fd, 0);
dup2(fd, 1);
dup2(fd, 2);
close(fd);
}
}
}
Protocol.hpp
#pragma once
#include <iostream>
#include <string>
#include <sstream>
#include <memory>
#include <jsoncpp/json/json.h>
const std::string Sep = "\n\r";
bool Encode(std::string &message)
{
if(message.size() == 0) return false;
std::string package = std::to_string(message.size()) + Sep + message + Sep;
message = package;
return true;
}
bool Decode(std::string &package, std::string* content)
{
auto pos = package.find(Sep);
if(pos == std::string::npos) return false;
std::string content_length_str = package.substr(0, pos);
int content_length = std::stoi(content_length_str);
int full_length = content_length_str.size() + content_length + 2 * Sep.size();
if(package.size() < full_length) return false;
*content = package.substr(pos + Sep.size(), content_length);
package.erase(0, full_length);
return true;
}
class Request
{
public:
Request(int x = 0, int y = 0, char oper = '+'):_x(x), _y(y), _oper(oper)
{
}
bool Serialize(std::string &out_string)
{
Json::Value root;
root["x"] = _x;
root["y"] = _y;
root["oper"] = _oper;
Json::StreamWriterBuilder wb;
std::unique_ptr<Json::StreamWriter> w(wb.newStreamWriter());
std::stringstream ss;
w->write(root, &ss);
out_string = ss.str();
return true;
}
bool Deserialize(std::string &in_string)
{
Json::Value root;
Json::Reader reader;
bool parsingSuccessful = reader.parse(in_string, root);
if(!parsingSuccessful)
{
std::cout<<"Failed to parse JSON: "<< reader.getFormattedErrorMessages()<<std::endl;
return false;
}
_x = root["x"].asInt();
_y = root["y"].asInt();
_oper = root["oper"].asInt();// char 也是 int
return true;
}
void Print()
{
std::cout << _x << std::endl;
std::cout << _oper << std::endl;
std::cout << _y << std::endl;
}
int X() const { return _x; }
int Y() const { return _y; }
char Oper() const { return _oper; }
private:
int _x;
int _y;
char _oper;
};
class Response
{
public:
Response():_result(0), _code(0)
{
}
Response(int result, int code):_result(result), _code(code)
{
}
bool Serialize(std::string &out_string)
{
Json::Value root;
root["result"] = _result;
root["code"] = _code;
Json::StreamWriterBuilder wb;
std::unique_ptr<Json::StreamWriter> w(wb.newStreamWriter());
std::stringstream ss;
w->write(root, &ss);
out_string = ss.str();
return true;
}
bool Deserialize(std::string &in_string)
{
Json::Value root;
Json::Reader reader;
bool parsingSuccessful = reader.parse(in_string, root);
if(!parsingSuccessful)
{
std::cout<<"Failed to parse JSON: "<<reader.getFormattedErrorMessages()<<std::endl;
return false;
}
_result = root["result"].asInt();
_code = root["code"].asInt();
return true;
}
int Result() const { return _result; }
int Code() const { return _code; }
void SetResult(int res) { _result = res;}
void SetCode(int c) {_code = c;}
private:
int _result;
int _code;
};