一、理解协议
在网络层面,协议(Protocol)是一组规则、标准或约定,它们定义了在网络环境中,计算机、服务器、路由器、交换机等网络设备之间如何相互通信和交换信息。这些规则涵盖了数据格式、数据交换的顺序、速度、以及数据同步方式等各个方面,以确保数据在网络中的传输是可靠、有效和安全的。
在语言层面,协议就是通信双方定制一样的结构化数据
二、序列化与反序列化
2.1 概念
在数据发送与接收方面,我们通常使用write、read、recv、send等接口,当我们如果要发送一些结构化数据时,不建议直接发送结构体,而是建议将结构化数据按一定的方式转化为字符串,再将字符串发送给对方,对方再将字符串转化为结构化数据,这就是序列化与反序列化
那为什么要这样做呢?
-
跨平台兼容性:不同的编程语言和平台可能有不同的内存布局、字节序(endianess)和数据类型表示方式。直接发送结构化数据可能会导致接收方无法正确解析这些数据,因为它们可能不遵循接收方平台上的数据表示规则。序列化将结构化数据转换为一种平台无关的格式(如JSON、XML、Protobuf等),从而确保了数据可以在不同的平台之间无缝传输。
-
网络传输效率:直接发送结构化数据可能包含大量的冗余信息(如对齐填充、指针等),这些信息对于网络传输来说是不必要的,并且会增加传输的数据量。序列化过程可以去除这些冗余信息,只保留必要的数据,从而提高网络传输的效率。
2.2 重新理解 read、write、recv、send 和 tcp 为什么支持全双工
在TCP网络套接字的代码举例中,我们发现客户端给服务端发送信息的同时服务端也可以同时给客户端发送信息,这是因为对于网络套接字来说 read、write、recv、send 和 tcp支持全双工。
read、write、recv、send这几个函数的本质是拷贝函数,以write举例,他的工作原理是将用户定义的缓冲区数据拷贝到文件描述符对应的文件级缓冲区中,然后操作系统在一定的时间会将文件级缓冲区的内容在刷新到磁盘中,那这和他们支持全双工有什么联系吗?
我们知道tcp套接字的本质也是一个文件描述符,但是tcp套接字在设计的时候其实设计了两个缓冲区,一个用来发送数据一个用来接受数据,在通信时客户端和服务端会各自创建一个套接字,此时其实一共存在四个缓冲区,当客户端发送数据时其实是将客户端发送缓冲区的数据拷贝到服务端的接收缓冲区,而客户端接收数据时其实是将服务端的发送缓冲区的数据拷贝到客户端的接收缓冲区,这两个缓冲区相对独立互不干扰
2.3 理解tcp的面向字节流
由于tcp是面向字节流的,它不像面向数据报一样数据的发送与接受都一定是一个整体,我们接收的数据可能不一定正好是一条完整的请求,我们可能一次就读取到多条请求,也可能一次才读取到半条请求,这就像我们拿盆给缸里舀水一样,虽然我们舀了20次,但是缸却能一次性全部接收,而如果我们拿缸里的水让盆接的话,他可能就需要接收许多次了,所以为了解决这个问题,我们可以采用添加报头的方式,来检测我们接收的数据是否为一条完整的请求,当检测到只有半条时我们继续让他读直到检测到一条完整的请求在执行,当检测到多条请求时,我们先执行一个,随后再依次执行,具体的实现我们通过代码讲解
三、网络计算器实现思路
2.1 Socket封装
我们知道套接字分为UDP网络套接字和TCP网络套接字,而两种套接字的大概创建代码都是相似的,为了让代码更加简洁明了和增强代码的复用性,我们可以单独封装一个文件用来包装套接字的初始化等工作。
我们可以设计一个子类Socket,并提供一些虚函数,让TCPSocket和UDPSocket继承Socket并重写各自需要的函数即可,我们也可以将创建套接字的函数一起封装成一个函数,例如 BuildListenSocket, BuildClientSocket等。
namespace socket_ns
{
class Socket;
using SockSPtr = std::shared_ptr<Socket>;
static const int gbacklog = 8;
enum
{
SOCKET_ERROR = 1,
BIND_ERROR,
LISTEN_ERR
};
class Socket
{
public:
virtual void CreateSocket() = 0;
virtual void CreateBind(uint16_t port) = 0;
virtual void CreateListen() = 0;
virtual bool Connecter(std::string ip, u_int16_t port) = 0;
virtual SockSPtr Accepter(InetAddr *cliaddr) = 0;
virtual int Sockfd() = 0;
virtual void Close() = 0;
virtual ssize_t Recv(std::string *out) = 0;
virtual ssize_t Send(const std::string &in) = 0;
public:
void BuildListenSocket(uint16_t port)
{
CreateSocket();
CreateBind(port);
CreateListen();
}
bool BuildClientSocket(std::string ip, u_int16_t port)
{
CreateSocket();
return Connecter(ip,port);
}
};
class TcpSocket : public Socket
{
public:
TcpSocket()
{}
TcpSocket(int sockfd)
: _sockfd(sockfd)
{
}
~TcpSocket()
{}
void CreateSocket() override
{
_sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (_sockfd < 0)
{
LOG(FATAL, "socket create error!\n");
exit(SOCKET_ERROR);
}
LOG(INFO, "socket create success, sockfd: %d\n", _sockfd);
}
void CreateBind(uint16_t port) override
{
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = INADDR_ANY;
int n = ::bind(_sockfd, (struct sockaddr *)&local, sizeof(local));
if (n < 0)
{
LOG(FATAL, "bind error\n");
exit(BIND_ERROR);
}
LOG(INFO, "Bind success\n");
}
void CreateListen()override
{
int n = ::listen(_sockfd, gbacklog);
if (n < 0)
{
LOG(FATAL, "listen error\n");
exit(LISTEN_ERR);
}
LOG(INFO, "listen success\n");
}
SockSPtr Accepter(InetAddr* cliaddr) override
{
struct sockaddr_in client;
socklen_t len = sizeof(client);
int sockfd = accept(_sockfd, (struct sockaddr *)&client, &len);
if (sockfd < 0)
{
LOG(FATAL, "listen error\n");
return nullptr;
}
*cliaddr = InetAddr(client);
LOG(INFO, "get a new link, client info : %s, sockfd is : %d\n", cliaddr->AddrStr().c_str(), sockfd);
return std::make_shared<TcpSocket>(sockfd); // C++14
}
bool Connecter(std::string ip, u_int16_t port) override
{
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(port);
server.sin_addr.s_addr = inet_addr(ip.c_str());
int n = connect(_sockfd, (struct sockaddr *)&server, sizeof(server));
if (n < 0)
{
return false;
}
std::cout<<"link success!"<<std::endl;
return true;
}
int Sockfd()
{
return _sockfd;
}
void Close()
{
if (_sockfd > 0)
{
::close(_sockfd);
}
}
ssize_t Recv(std::string *out) override
{
char inbuffer[4096];
ssize_t n = ::recv(_sockfd, inbuffer, sizeof(inbuffer) - 1, 0);
if (n > 0)
{
inbuffer[n] = 0;
*out += inbuffer;
}
return n;
}
ssize_t Send(const std::string &in) override
{
return ::send(_sockfd, in.c_str(), in.size(), 0);
}
private:
int _sockfd;
};
class UDPSocket: public Socket
{
//.....
}
}
2.2 定制协议
定制协议就是定制一个双方需要的结构化字段,对于网络计算器来说,我们需要设计一个类包含操作数1、操作数2和操作符,而结果的返回我们也需要设计一个类,包含计算结果、返回码及相关描述等信息
class Request
{
public:
Request()
{
}
Request(int x, int y, char oper) : _x(x), _y(y), _oper(oper)
{
}
private:
int _x;
int _y;
char _oper;
};
class Response
{
public:
Response() : _result(0), _code(0), _desc("success")
{
}
private:
int _result;
int _code; // 返回码
std::string _desc; // 返回码描述
};
有了结构化数据我们还需要解决数据发送与接收的问题也就是序列化与反序列化,这里我们采用Json来处理
2.2.1 Jsoncpp
简介:
Jsoncpp 是一个用于处理 JSON 数据的 C++ 库。它提供了将 JSON 数据序列化为字 符串以及从字符串反序列化为 C++ 数据结构的功能。Jsoncpp 是开源的,广泛用于各 种需要处理 JSON 数据的 C++ 项目中。
安装:
C++
ubuntu:sudo apt-get install libjsoncpp-dev
Centos: sudo yum install jsoncpp-devel
使用:
- 序列化
首先我们需要创建一个Json::Value对象,如代码所示,我们将p对象的成员赋值给了Json::Value 的root对象,于此同时我们还给这些成员各自起了一个名字,这样我们就可以通过键值对的方式来找到对应的数据了
#include <iostream>
#include <string>
#include <jsoncpp/json/json.h>
int main()
{
people p("joe","男");
Json::Value root;
root["name"] = p.name;
root["sex"] = p.sex;
return 0;
}
我们可以直接调用Json::Value的toStyledString 方法,将数据写到字符串中
#include <iostream>
#include <string>
#include <jsoncpp/json/json.h>
int main()
{
Json::Value root;
root["name"] = "joe";
root["sex"] = "男";
std::string s = root.toStyledString();
std::cout << s << std::endl;
return 0;
}
这样字符串的格式是这样的:
{
"name" : "joe",
"sex" : "男"
}
然后我们也可以创建一个Json::FastWriter对象,调用他的write方法
#include <iostream>
#include <string>
#include <jsoncpp/json/json.h>
int main()
{
people p("joe","男");
Json::Value root;
root["name"] = p.name;
root["sex"] = p.sex;
Json::FastWriter writer;
std::string s = writer.write(root);
return 0;
}
这样序列化出来的字符串是这中类型的:
{"name":"joe","sex":"男"}
- 反序列化
使用 Json::Reader
int main()
{
std::string str={"name":"joe","sex":"男"}
Json::Value root;
Json::Reader read;
//将Json字符串写到Json对象中
bool ret = read.parse(str, root);
//将Json对象中的数据写到数据化结构中
People p;
p.name= root["x"].asString();
p.sex= root["y"].asString();
}
回到我们的网络计算器的序列化与反序列化中
class Request
{
public:
Request()
{
}
Request(int x, int y, char oper) : _x(x), _y(y), _oper(oper)
{
}
//序列化
bool Serialize(std::string *out)
{
Json::Value root;
root["x"] = _x;
root["y"] = _y;
root["oper"] = _oper;
Json::FastWriter writer;
std::string s = writer.write(root);
*out = s;
return true;
}
//反序列化
bool DeSerialize(std::string &str)
{
Json::Value root;
Json::Reader read;
bool ret = read.parse(str, root);
_x = root["x"].asInt();
_y = root["y"].asInt();
_oper = root["oper"].asInt();
return true;
}
private:
int _x;
int _y;
char _oper;
};
class Response
{
public:
Response() : _result(0), _code(0), _desc("success")
{
}
bool Serialize(std::string *out)
{
Json::Value root;
root["result"] = _result;
root["code"] = _code;
root["desc"] = _desc;
Json::FastWriter writer;
std::string s = writer.write(root);
*out = s;
return true;
}
bool DeSerialize(std::string &str)
{
Json::Value root;
Json::Reader read;
bool ret = read.parse(str, root);
_result = root["result"].asInt();
_code = root["code"].asInt();
_desc = root["oper"].asString();
return true;
}
private:
int _result;
int _code; // 返回码
std::string _desc; // 返回码描述
};
2.2.2 设计报头
再序列化与反序列化后,我们先给数据添加一个报头,这里我设计的是添加一个Json字符串的长度,根据这个长度判断读取的数据是否是一个完整的数据,所以这里我们需要设计两个函数,一个用来发送数据前添加报头,一个用来收到数据解析报头
static const std::string seq = "\r\n";
// 添加报文
// 格式:len\r\njsonstr\r\n
std::string Encode(std::string &jsonstr)
{
int len = jsonstr.size();
std::string lenstr = std::to_string(len);
return lenstr + seq + jsonstr + seq;
}
// 检测报文
// "le
// "len"
// "len"\r\n
// "len"\r\n"{json}"\r\n (]
// "len"\r\n"{j
// "len"\r\n"{json}"\r\n"len"\r\n"{
// "len"\r\n"{json}"\r\n
// "len"\r\n"{json}"\r\n"len"\r\n"{json}"\r\n"len"\r\n"{json}"\r\n"len"\r\n"{json}"\r
std::string Decode(std::string &packagestream)
{
auto pos = packagestream.find(seq);
// 说明报文没有\r\n,报文不完整
if (pos == std::string::npos)
return std::string();
// 计算报文的总长度
std::string lenstr = packagestream.substr(0, pos);
int len = stoi(lenstr);
int total = lenstr.size() + len + 2 * seq.size();
if (packagestream.size() < total)
return std::string();
// 到这说明收到的报文流一定存在一个完整的报文
std::string jsonstr = packagestream.substr(pos + seq.size(), len);
packagestream.erase(0, total);
return jsonstr;
}
2.3 会话层设计(构建连接)
在会话层中我设计了一个TcpServer的类用来建立和断开通信连接。
可以利用之前封装的TcpSocket中实现的BuildListenSocket方法来创建tcp套接字、绑定并监听,随后调用Acceper方法来接收客户端的连接,并得到双方通信所需的socket文件描述符,这样就可以进行业务处理了,在这层中的业务是由外部传递的,在类的创建时确定。
#pragma once
#include <pthread.h>
#include <functional>
#include "socket.hpp"
#include "InetAddr.hpp"
using namespace socket_ns;
using Service_t = std::function<void(SockSPtr, InetAddr&)>;
class TcpServer
{
public:
TcpServer(Service_t Service, u_int16_t port)
: _Service(Service),
_port(port),
_isrunning(false),
_listenSocket(std::make_shared<TcpSocket>())
{
_listenSocket->BuildListenSocket(port);
}
~TcpServer()
{}
class ThreadData
{
public:
SockSPtr _sockfd;
TcpServer *_self;
InetAddr _addr;
public:
ThreadData(SockSPtr sockfd, TcpServer *self, const InetAddr &addr) : _sockfd(sockfd), _self(self), _addr(addr)
{
}
};
static void *Execute(void *args)
{
pthread_detach(pthread_self());
ThreadData *td = static_cast<ThreadData *>(args);
td->_self->_Service(td->_sockfd, td->_addr);
td->_sockfd->Close();
delete td;
return nullptr;
}
void Run()
{
_isrunning=true;
while (_isrunning)
{
InetAddr client;
SockSPtr newsock = _listenSocket->Accepter(&client);
if (newsock == nullptr)
continue;
LOG(INFO, "get a new link client:%s sockfd:%d\n", client.AddrStr().c_str(), newsock->Sockfd());
pthread_t tid;
ThreadData *td = new ThreadData(newsock, this, client);
pthread_create(&tid, nullptr, Execute, td);
}
_isrunning = false;
}
private:
Service_t _Service;
u_int16_t _port;
bool _isrunning;
SockSPtr _listenSocket;
};
2.4 表示层(数据处理)
表示层是用来设备固有数据格式和网络标准数据结构之间进行转换的。
具体思路是:
- 读取数据
- 检测报头
- 反序列化
- 业务处理
- 数据响应,序列化,添加报头
- 发送数据
#pragma once
#include <iostream>
#include<string>
#include<functional>
#include"socket.hpp"
#include"InetAddr.hpp"
#include"Protocol.hpp"
using namespace socket_ns;
using process_t=std::function<std::shared_ptr<Response>(std::shared_ptr<Request>)> ;
class IOService
{
public:
IOService(process_t process)
:_process(process)
{
}
~IOService()
{
}
void IOExcute(SockSPtr socket,InetAddr& client)
{
std::string packagestreamqueue;
while(true)
{
int n=socket->Recv(&packagestreamqueue);
if(n<=0)
{
LOG(INFO,"client %s quit or recv err\n",client.AddrStr().c_str());
break;
}
//从流里读取的,不一定正好是一条完整的信息
std::cout<<"packagestreamqueue: "<<packagestreamqueue<<std::endl;
//去掉报头
std::string package=Decode(packagestreamqueue);
//说明读取的数据不完整
if(package.empty())
continue;
//到这package中一定是一条完整序列化的信息
std::cout<<"package: "<<package<<std::endl;
auto req=Factory::BuildRequestDefault();
req->DeSerialize(package);
//业务处理
auto resp=_process(req);
//序列化数据应答
std::string respjson;
resp->Serialize(&respjson); //序列化
respjson=Encode(respjson);//添加报头
socket->Send(respjson); //发送数据
}
}
private:
//计算业务
process_t _process;
};
2.5 应用层(针对特定应用的协议)
这一层包含了我们上面设计的协议和我们具体要实现的业务
#pragma once
#include <iostream>
#include <memory>
#include "Protocol.hpp"
class NetCal
{
public:
std::shared_ptr<Response> calculator(std::shared_ptr<Request> req)
{
auto resp = Factory::BuildResponseDefault();
switch (req->get_oper())
{
case '+':
resp->set_result(req->get_x() + req->get_y());
break;
case '-':
resp->set_result(req->get_x() - req->get_y());
break;
case '*':
resp->set_result(req->get_x() * req->get_y());
break;
case '/':
if (req->get_y() == 0)
{
resp->set_code(1);
resp->set_desc("Div zero!");
}
else
{
resp->set_result(req->get_x() / req->get_y());
}
break;
default:
resp->set_code(2);
resp->set_desc("illegal operation");
break;
}
return resp;
}
};
2.6 将会话层 表示层 应用层结合起来
#include"TcpServer.hpp"
#include"NetCal.hpp"
#include"Service.hpp"
int main(int argc,char* argv[])
{
if(argc<2)
{
std::cerr<<"Usage:"<<argv[0]<<"Server port"<<std::endl;
exit(0);
}
uint16_t port=std::stoi(argv[1]);
NetCal nc;
IOService service(std::bind(&NetCal::calculator,&nc,std::placeholders::_1));
std::unique_ptr<TcpServer> tsvr=std::make_unique<TcpServer>
(std::bind(&IOService::IOExcute,&service,std::placeholders::_1,std::placeholders::_2),port);
tsvr->Run();
return 0;
}
2.7 客户端设计
客户端的实现如下:
- 创建请求
- 将数据序列化
- 添加报头
- 发送数据
- 读取响应
- 检测报头
- 反序列化
#include <iostream>
#include <string>
#include <ctime>
#include <unistd.h>
#include "socket.hpp"
#include "Protocol.hpp"
using namespace socket_ns;
int main(int argc, char *argv[])
{
if (argc < 3)
{
std::cerr << "Usage:" << argv[0] << "Server_ip Server_port" << std::endl;
exit(0);
}
std::string serverip = argv[1];
uint16_t serverport = std::stoi(argv[2]);
//构建连接
SockSPtr sock = std::make_shared<TcpSocket>();
int n = sock->BuildClientSocket(serverip, serverport);
if (n < 0)
{
std::cerr << "connect error" << std::endl;
exit(1);
}
srand(time(nullptr) ^ getpid());
const std::string opers = "+-*/";
while (true)
{
int x = rand() % 10;
usleep(x * 1000);
int y = rand() % 10;
usleep(x * y * 1000);
char oper=opers[rand()%opers.size()];
//1.构建请求
auto req = Factory::BuildRequestDefault();
req->set_value(x,y,oper);
//2.序列化
std::string jsonstr;
req->Serialize(&jsonstr);
std::cout<<"jsonstr: "<<jsonstr<<std::endl;
//3.添加报头
jsonstr=Encode(jsonstr);
std::cout<<"Encode jsonstr: "<<jsonstr<<std::endl;
//4.发送
sock->Send(jsonstr);
std::string packagestreamqueue;
while (true)
{
// 5. 读取应答,response
ssize_t n = sock->Recv(&packagestreamqueue);
if (n <= 0)
{
break;
}
std::cout<<packagestreamqueue<<std::endl;
// 6. 报文解析,提取报头和有效载荷
std::string package = Decode(packagestreamqueue);
if (package.empty())
continue;
std::cout << "package: \n" << package << std::endl;
// 6. 反序列化
auto resp = Factory::BuildResponseDefault();
resp->DeSerialize(package);
// 7. 打印结果
resp->PrintResult();
break;
}
sleep(1);
// break;
}
sock->Close();
return 0;
}
完整实现代码:
张得帅c/Linux