前言
本篇博客通过网络计算器的实现来帮助各位理解应用层自定义协议以及序列化和反序列化。
一、认识自定义协议&&序列化和反序列化
我们程序员写的一个个解决我们实际问题,满足我们日常需求的网络程序都是在应用层。前面我们说到:协议是一种 "约定"。socket api 的接口在读写数据时,都是按 "字符串" 的方式来发送接收的。如果我们要传输一些 "结构化的数据" 怎么办呢?
例如,我们需要实现一个服务器版的计算器。我们需要客户端把要计算的两个加数发过去,然后由服务器进行计算,最后再把结果返回给客户端。
约定方案一:
• 客户端发送一个形如"1+1"的字符串
• 这个字符串中有两个整型操作数
• 两个数字之间会有一个字符是运算符
• 数字和运算符之间没有空格
约定方案二:
• 定义结构体来表示我们需要交互的信息
• 发送数据时将这个结构体按照一个规则转换成字符串, 接收到数据的时候再按照相同的规则把字符串转化回结构体
这个过程叫做 "序列化" 和 "反序列化"。
无论我们采用方案一还是方案二,抑或是还是其他的方案。只要保证一端发送时构造的数据,在另一端能够正确的进行解析就是可以的。这种约定就是应用层协议。
二、理解 tcp 全双工&&面向字节流
在我们创建sockfd时,操作系统会自动创建两个缓冲区——发送缓冲区和接收缓冲区。所以,发送消息的本质是把数据拷贝到发送缓冲区,接收消息的本质就是把数据从接收缓冲区拷贝拿到。而这两个动作是可以同时进行的,即TCP全双工。
源码剖析:
TCP协议即传输控制协议,它控制着实际数据什么时候发,发多少,出错了怎么办,故而它是面向字节流的。
三、自定义协议实现网络计算器
首先我们需要定制协议+序列化与反序列化。
上面要实现序列化和反序列化,有两种方案:
1. 自己做:x + oper(+ - * /) + y,做空格的字符串分割就行
2. xml && json && protobuf
这里我们为了增加可读性,建议将结构化数据转化为 json(jsoncpp) 的字符串,这篇文章主要是关于第二种方案
//Protocol.hpp
#pragma once
#include <string>
#include <jsoncpp/json/json.h>
#include "Log.hpp"
using namespace LogMudule;
// 接收
class Request
{
public:
Request() = default;
Request(int x, int y, char oper) : _x(x), _y(y), _oper(oper)
{
}
// 序列化
bool Serialize(std::string &out_string)
{
Json::Value root;
root["x"] = _x;
root["y"] = _y;
root["oper"] = _oper;
out_string = root.toStyledString();
return true;
}
// 反序列化
bool Deserialize(std::string &in_string)
{
Json::Value root;
Json::Reader reader;
bool parsingSuccessful = reader.parse(in_string, root);
if (!parsingSuccessful)
{
LOG(LogLevel::ERROR) << "Failed to parse JSON: " << reader.getFormattedErrorMessages();
return false;
}
_x = root["x"].asInt();
_y = root["y"].asInt();
_oper = root["oper"].asInt();
return true;
}
int X() const { return _x; }
int Y() const { return _y; }
int Oper() const { return _oper; }
~Request()
{
}
private:
int _x;
int _y;
char _oper;
};
// 应答
class Response
{
public:
Response() :_result(0),_code(0){}
Response(int result, int code) : _result(result), _code(code)
{
}
// 序列化
bool Serialize(std::string &out_string)
{
Json::Value root;
root["result"] = _result;
root["code"] = _code;
out_string = root.toStyledString();
// LOG(LogLevel::DEBUG)<<out_string;
return true;
}
// 反序列化
bool Deserialize(std::string &in_string)
{
Json::Value root;
Json::Reader reader;
bool parsingSuccessful = reader.parse(in_string, root);
if (!parsingSuccessful)
{
LOG(LogLevel::ERROR) << "Failed to parse JSON: " << reader.getFormattedErrorMessages();
return false;
}
_result = root["result"].asInt();
_code = root["code"].asInt();
return true;
}
int Result() { return _result; }
int Code() { return _code; }
void SetResult(int result){_result=result;}
void SetCode(int code){_code=code;}
~Response() {}
private:
int _result; // 结果
int _code; // 错误码
};
const static std::string sep = "\r\n";
// 封包
bool EnCode(std::string &message, std::string *package)
{
if (message.size() == 0)
return false;
//转成17\r\nmessage\r\n的格式
*package = std::to_string(message.size()) + sep + message + sep;
return true;
}
// 解包
bool Decode(std::string &package, std::string *content)
{
auto pos = package.find(sep);
if (pos == std::string::npos)
return false;
std::string content_length_str = package.substr(0, pos);
int content_length = std::stoi(content_length_str);
int full_length = content_length_str.size() + content_length + 2 * sep.size();
if (package.size() < full_length)
return false;
*content = package.substr(pos + sep.size(), content_length);
// package erase
package.erase(0, full_length);
return true;
}
完成协议的编写之后,我们顺手写我们的计算逻辑:
//Calculator.hpp
#pragma once
#include <string>
#include "Protocol.hpp"
class Calculator
{
public:
Calculator()
{
}
Response Execute(const Request &req)
{
Response resp;
switch (req.Oper())
{
case '+':
resp.SetResult(req.X() + req.Y());
break;
case '-':
resp.SetResult(req.X() - req.Y());
break;
case '*':
resp.SetResult(req.X() * req.Y());
break;
case '/':
{
if (req.Y() == 0)
{
resp.SetCode(1); // 1 就是除0
}
else
{
resp.SetResult(req.X() / req.Y());
}
}
break;
case '%':
{
if (req.Y() == 0)
{
resp.SetCode(2); // 2 就是mod 0
}
else
{
resp.SetResult(req.X() % req.Y());
}
}
break;
default:
resp.SetCode(3); // 3 用户发来的计算类型,无法识别
break;
}
return resp;
}
~Calculator()
{
}
};
接下来编写我们的服务端代码:
这里的服务端教之前的TCP服务端相比只有执行的方法不同,其他并无二异。
//TCPSever.hpp
#pragma once
#include <string>
#include <functional>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <sys/wait.h>
#include <pthread.h>
#include "InetAddr.hpp"
#include "Log.hpp"
#include "ThreadPool.hpp"
using namespace LogMudule;
using namespace ThreadPoolModual;
const static uint16_t defaultport = 8888;
//回调函数
using work_t =std::function<std::string(std::string&)>;
class TCPSever
{
void Service(int sockfd)
{
//package充当缓冲区
std:: string package;
char buff[1024];
while(true)
{
int n=::recv(sockfd,buff,sizeof(buff)-1,0);
if(n>0)
{
buff[n]=0;
package+=buff;//必须是+=,这样才能保证发送过来的数据被加入到缓冲区
//读取到的数据计算返回结果
std::string result=_work(package);
if(result.empty()) continue;//这里表明上面的报文不完整没法解析
::send(sockfd,result.c_str(),result.size(),0);
}
else if(n==0)
{
//表示读到了文件末尾
LOG(LogLevel::INFO)<<"Client Quit……";
break;
}
else
{
LOG(LogLevel::ERROR)<<"read error";
break;
}
}
}
// 线程分离管理
struct ThreadData
{
int _sockfd;
TCPSever *_self;
};
static void *Handler(void *args)
{
pthread_detach(pthread_self());
ThreadData *data = (ThreadData *)args;
data->_self->Service(data->_sockfd);
return nullptr;
}
public:
TCPSever(work_t work ,uint16_t port = defaultport) : _work(work),_addr(port)
{
// 创建套接字
int n = _listensockfd = ::socket(AF_INET, SOCK_STREAM, 0);
if (n < 0)
{
LOG(LogLevel::FATAL) << "socket failed";
exit(1);
}
LOG(LogLevel::INFO) << "socket succeed";
// 绑定
n = ::bind(_listensockfd, _addr.NetAddr(), _addr.Len());
if (n < 0)
{
LOG(LogLevel::FATAL) << "bind failed";
exit(1);
}
LOG(LogLevel::INFO) << "bind succeed";
// 开始监听
n = ::listen(_listensockfd, 5);
if (n < 0)
{
LOG(LogLevel::FATAL) << "listen failed";
exit(1);
}
LOG(LogLevel::INFO) << "listen succeed";
}
void Run()
{
while (true)
{
// 获取连接
struct sockaddr_in connected_addr;
socklen_t len = sizeof(connected_addr);
int sockfd = ::accept(_listensockfd, (struct sockaddr *)&connected_addr, &len);
if (sockfd < 0)
{
LOG(LogLevel::ERROR) << "accept failed";
continue;
}
InetAddr peer(connected_addr);
LOG(LogLevel::INFO) << "accept succeed connected is " << peer.Addr() << " sockfd is " << sockfd;
ThreadData *data = new ThreadData;
data->_sockfd = sockfd;
data->_self = this;
pthread_t tid;
pthread_create(&tid, nullptr, Handler, data);
}
}
~TCPSever()
{
::close(_listensockfd);
}
private:
int _listensockfd;
InetAddr _addr;
work_t _work;
};
而服务端的主函数这里我们需要注入执行方法:
//TCPSever.cc
#include "TCPSever.hpp"
#include "Protocol.hpp"
#include "Calculator.hpp"
std::string Work(std::string& package)
{
std::string message;
std::string ret;
//解包,循环获取直到不能解析为止
while(Decode(package,&message))
{
if(message.empty())
break;
//反序列化
Request req;
if(!req.Deserialize(message))
break;
//计算结果
Response res=Calculator().Execute(req);
//序列化
res.Serialize(message);
//封包
EnCode(message,&message);
//添加到结果缓存
ret+=message;
}
return ret;
}
int main()
{
std::unique_ptr<TCPSever> ts_ptr = std::make_unique<TCPSever>(Work);
ts_ptr->Run();
return 0;
}
完成服务端之后我们继续客户端的编写,这里我将前面的客户端代码进行抽离,头文件中仅仅增加了执行方法的注入,由主函数传递方法交由Run函数执行:
//TCPClient.hpp
#pragma once
#include <functional>
#include <string>
#include <unistd.h>
#include <sys/socket.h>
#include "Log.hpp"
#include "InetAddr.hpp"
using namespace LogMudule;
const static std::string defaultip="127.0.0.1";
const static int defaultport=8888;
using work_t=std::function<void(int)>;
class TCPClient
{
public:
TCPClient(work_t work,std::string ip,uint16_t port):_work(work),_dst_addr({ip,port})
{
//创建套接字
_sockfd=::socket(AF_INET,SOCK_STREAM,0);
if(_sockfd<0)
{
LOG(LogLevel::FATAL)<<"socket failed";
exit(1);
}
LOG(LogLevel::INFO)<<"socket succeed";
//不需要绑定
}
void Run()
{
int n=::connect(_sockfd,_dst_addr.NetAddr(),_dst_addr.Len());
if(n<0)
{
LOG(LogLevel::ERROR)<<"connect failed";
exit(3);
}
LOG(LogLevel::INFO)<<"connect succeed";
while(true)
{
_work(_sockfd);
}
}
~TCPClient()
{
::close(_sockfd);
}
private:
int _sockfd;
InetAddr _dst_addr;
work_t _work;
};
//TCPClient.cc
#include <memory>
#include "TCPClient.hpp"
#include "Protocol.hpp"
void Work(int sockfd)
{
// 获取输入
int x, y;
char oper;
std::cout << "Please input x:";
std::cin >> x;
std::cout << "Please input y:";
std::cin >> y;
std::cout << "Please input oper:";
std::cin >> oper;
// 序列化
Request req(x, y, oper);
std::string package;
req.Serialize(package);
//封包
std::string message;
EnCode(package,&message);
// 发送消息
::send(sockfd, message.c_str(), message.size(), 0);
// 接收结果
char buff[1024];
int n = ::recv(sockfd, buff, sizeof(buff), 0);
if (n > 0)
{
buff[n] = 0;
std::string result = buff;
//解包
Decode(result,&message);
// 反序列化
Response res;
res.Deserialize(message);
LOG(LogLevel::DEBUG)<<"result:"<<res.Result()<<":code:"<<res.Code();
}
}
int main(int argc, char *argv[])
{
if (argc != 3)
{
std::cout << "Usgae Error" << std::endl;
exit(-1);
}
std::string ip = argv[1];
uint16_t port = std::stoi(argv[2]);
std::unique_ptr<TCPClient> c_ptr = std::make_unique<TCPClient>(Work,ip, port);
c_ptr->Run();
return 0;
}
其实编写实现之后我们发现其逻辑不过如下图: