预备知识
理解为什么要应用层协议?
在学过套接字编程后,我们对协议的理解更深了一步,协议也就是一种约定,也可以通俗理解为一种口头约定,对于通信双方来说是必须要遵守的。TCP和UDP协议它们是传输层控制协议,也就是在传输层的,今天我们学习的是应用层的协议,它跟序列化和反序列化有什么关系呢?先看场景
TCP是全双工的,因此它有两个缓冲区,可以同时读和写。在通信的时候,我们使用了read和write将数据从用户拷贝到内核的缓冲区中(sendto和recvfrom也是如此),因此这样看来,read和write更像是一种拷贝函数 ,作为发送方,我们将数据通过read函数,拷贝到内核的发送缓冲区后,那么这个数据还需要我们管吗?
我们有没有想过,这个数据在发送缓冲区中,这个缓冲区也有一定的大小,那么这个数据它什么时候发给对方,要发送多少,或者发送错误了怎么办,这些事情我们关心吗?
很明显,我们不关心,因为这是别人已经设计好的协议决定的,我们作为程序猿,只关注应用层,对于传输层,就交给传输控制协议就好了。
理解这部分后,我们再看接收方,接收方同样也不关心传输层怎么样,它只关心应用层,但是我们使用比如write这样的函数从接收缓冲区中读取,我们可能直接拷贝了出来了一大坨东西,有可能这一坨包含了两个请求,也可能包含了两个半的请求,甚至也可能只有半个请求,那么对于这种数据该如何处理,我们肯定得有个标准才行。因此,我们应用层的约定,也就是协议来了!
所以为什么需要序列化和反序列化呢?
协议,我们双方只需要规定好数据的格式,比如长度,大小,分隔符这些,我们在发送的时候先对数据进行处理,然后接收方读到数据后,就可以按照我们原本的约定来对数据进行提取,这样就可以得到想要的数据。
所以,我们发送方对数据的处理其实就是加报文的过程,接收方提取数据就是解包的过程。
但是,网络之间发送的信息其实都比较复杂,并不只是简单的一些整形字符串,它可能是一个结构体,比如我们待会要实现的网络计算器,就算这个计算器再简单,它也得包含左操作数和右操作数,以及一个操作符。所以,我们把它定义成一个结构体,然后直接发送给对象行吗?技术上是可行的,但是非常不推荐这样做。因为结构体的大小在不同的操作系统和不同的编译器上可能是不一样的,比如内存对齐。
因此,我们可以将操作数们和操作符转化成一串字符串,然后在字符串前加上报文,然后发送给对方;对方再对拿到的数据进行反序列化,就可以提取到想要的数据。
网络计算器的简单实现
顺便一提,今后用到的协议几乎都是tcp协议。
Log.hpp
这里顺便复习一下之前封装的文件打印的类
#pragma once
#include<iostream>
#include<time.h>
#include<stdarg.h>
#include<sys/types.h>
#include<sys/stat.h>
#include<fcntl.h>
#include<unistd.h>
#include<stdlib.h>
#define SIZE 1024
#define Info 0
#define Debug 1
#define Warning 2
#define Error 3
#define Fatal 4
#define Screen 1
#define Onefile 2
#define Classfile 3
#define LogFile "log.txt"
class Log
{
public:
Log()
{
printMethod = Screen;
path = "./log/";
}
void Enable(int method)
{
printMethod=method;
}
std::string levelToString(int level)
{
switch(level)
{
case Info:
return "Info";
case Warning:
return "Warning";
case Error:
return "Error";
case Fatal:
return "Fatal";
default:
return "None";
}
}
void printLog(int level,const std::string &logtxt)
{
switch(printMethod)
{
case Screen:
std::cout << logtxt << std::endl;
break;
case Onefile:
printOneFile(LogFile,logtxt);//向一个文件打印
break;
case Classfile:
printClassFile(level,logtxt);//向多个文件打印--分类打印
break;
default:
break;
}
}
void printOneFile(const std::string &logname,const std::string &logtxt)
{
std::string _logname = path + logname;
int fd = open(_logname.c_str(),O_WRONLY|O_CREAT|O_APPEND,0666);
if(fd < 0)
{
std::cout << "打开文件失败" << std::endl;
return;
}
write(fd,logtxt.c_str(),logtxt.size());
close(fd);
}
void printClassFile(int level,const std::string &logtxt)
{
std::string filename = LogFile;
filename += ".";
filename += levelToString(level);
printOneFile(filename,logtxt);
}
~Log()
{
}
void operator()(int level,const char *format,...)
{
time_t t = time(nullptr);
struct tm *ctime = localtime(&t);
char leftbuffer[SIZE];
snprintf(leftbuffer,sizeof(leftbuffer),"[%s][%d-%d-%d %d:%d:%d]",levelToString(level).c_str(),
ctime->tm_year + 1900,ctime->tm_mon + 1,ctime->tm_mday,
ctime->tm_hour,ctime->tm_min,ctime->tm_sec);
va_list s;
va_start(s,format);
char rightbuffer[SIZE];
vsnprintf(rightbuffer,sizeof(rightbuffer),format,s);
va_end(s);
//格式:默认部分+自定义部分
char logtxt[SIZE * 2];
snprintf(logtxt,sizeof(logtxt),"%s %s\n",leftbuffer,rightbuffer);
printLog(level,logtxt);
}
private:
int printMethod;//打印的方式(比如向键盘还是文件)
std::string path;
};
Log lg;
Socket.hpp
之前使用套接字编程,但是今后我们更多的是解除应用层,所以在这里再对套接字进行一次封装,今后就可以直接拿这个使用,方便还能使代码简洁。
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
#include <cstring>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include "Log.hpp"
enum
{
SocketErr = 2,
BindErr,
ListenErr
};
const int backlog = 10;
class Sock
{
public:
Sock()
{}
~Sock()
{}
public:
void Socket()
{
sockfd_ = socket(AF_INET,SOCK_STREAM,0);
if(sockfd_ < 0)
{
lg(Fatal,"socker error,%s: %d",strerror(errno),errno);
exit(SocketErr);
}
}
void Bind(uint16_t port)
{
struct sockaddr_in local;
memset(&local,0,sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = INADDR_ANY;
if(bind(sockfd_,(struct sockaddr*)&local,sizeof(local)) < 0)
{
lg(Fatal,"bind error, %s: %d",strerror(errno),errno);
exit(BindErr);
}
}
void Listen()
{
if(listen(sockfd_,backlog) < 0)
{
lg(Fatal,"listen error,%s: %d",strerror(errno),errno);
exit(ListenErr);
}
}
int Accept(std::string *clientip,uint16_t *clientport) // 两个输出型参数
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int newfd = accept(sockfd_,(struct sockaddr*)&peer,&len);
if(newfd < 0)
{
lg(Warning,"accept error,%s: %d",strerror(errno),errno);
return -1;
}
// 开始准备输出ip和端口号
char ipstr[64];
inet_ntop(AF_INET,&peer.sin_addr,ipstr,sizeof(ipstr));
*clientip = ipstr;
*clientport = ntohs(peer.sin_port);
return newfd;
}
bool Connect(const std::string &ip,const uint16_t &port)
{
struct sockaddr_in peer;
memset(&peer,0,sizeof(peer));
peer.sin_family = AF_INET;
peer.sin_port = htons(port);
inet_pton(AF_INET,ip.c_str(),&(peer.sin_addr));
int n = connect(sockfd_,(struct sockaddr*)&peer,sizeof(peer));
if(n == -1)
{
std::cout << "connect to " << ip << ":" << port << "error" << std::endl;
return false;
}
return true;
}
void Close()
{
close(sockfd_);
}
int Fd()
{
return sockfd_;
}
private:
int sockfd_;
};
Protocol.hpp
拓展使用第三方库序列化和反序列化
实现这个计算器,其实主要是为了对序列化和反序列化进行更深刻的理解。我们费老半天的劲写出来的序列化其实存在各种问题,今后我们序列化其实可以用别人现成的,比如json和prutobuf ,其中prutobuf它是二进制的,主打效率,而json十分简单,这里就使用json。
要在Linux上时候,首先要先安装这个第三方库,执行以下命令进行安装
sudo yum install -y jsoncpp-devel
安装好后,执行以下命令来查看该库需要包含的头文件
ls /usr/include/jsoncpp/json
这里头文件看似比较多,但是我们最多只用json.h
除了看头文件,我们还可以用命令查看库在哪里
ls /lib64/libjsoncpp.so -l
代码
#pragma once
#include <iostream>
#include <string>
#include <jsoncpp/json/json.h>
const std::string blank_space_sep = " "; // 空格分隔符,主要作用于操作数
const std::string protocol_sep = "\n"; // 作用于报头
std::string Encode(std::string &content) // 加包
{
std::string package = std::to_string(content.size());
package += protocol_sep;
package += content;
package += protocol_sep;
return package;
}
bool Decode(std::string &package, std::string *content) // 解包
{
std::size_t pos = package.find(protocol_sep);
if (pos == std::string::npos)
return false; // 说明该包不完整
std::string len_str = package.substr(0, pos);
std::size_t len = std::stoi(len_str);
std::size_t total_len = len_str.size() + len + protocol_sep.size() * 2; // 总长度
if (package.size() < total_len)
return false; // 也说明包不完整
*content = package.substr(pos + 1, len);
// 特别注意,到这里说明这是一个完整的报文,并且已经处理完毕,因此我们应该移除该报文
package.erase(0, total_len);
return true;
}
class Request // 封装请求
{
public:
Request(int data1, int data2, char oper)
: x(data1), y(data2), op(oper)
{
}
Request() // 因为服务端和客户端共用这个类,所以无参构造是给解包的一方使用的
{
}
public:
bool Serialize(std::string *out)
{
#ifdef MySelf // 复习条件编译,灵活编写代码
// 序列化 即构建报文的有效载荷
// 将其信息转化成字符串
std::string s = std::to_string(x);
s += blank_space_sep;
s += op;
s += blank_space_sep;
s += std::to_string(y);
*out = s;
return true;
#else
Json::Value root;
root["x"] = x;
root["y"] = y;
root["op"] = op;
// Json::FastWriter w; // 写的时候有两种读写方式
Json::StyledWriter w;
*out = w.write(root);
return true;
#endif
}
bool Deserialize(const std::string &in)
{
#ifdef MySelf
std::size_t left = in.find(blank_space_sep);
if (left == std::string::npos)
return false;
std::string part_x = in.substr(0, left); // 注意substr是左闭右开的
std::size_t right = in.rfind(blank_space_sep); // 找右要用rfind,从尾开始找
if (right == std::string::npos)
return false;
std::string part_y = in.substr(right + 1);
if (left + 2 != right)
return false; // 检查包是否完整,有可能只有一半
op = in[left + 1];
x = std::stoi(part_x);
y = std::stoi(part_y);
return true;
#else
Json::Value root;
Json::Reader r;
r.parse(in, root);
x = root["x"].asInt();
y = root["y"].asInt();
op = root["op"].asInt();
return true;
#endif
}
void DebugPrint()
{
std::cout << "New Request: " << x << op << y << std::endl;
}
public:
int x; // 操作数和操作符
int y;
char op;
};
class Response // 封装响应类,与请求一个套路
{
public:
Response(int res, int c)
: result(res), code(c)
{
}
Response() // 同理
{
}
public:
bool Serialize(std::string *out)
{
#ifdef MySelf
std::string s = std::to_string(result);
s += blank_space_sep;
s += "code: ";
s += std::to_string(code);
*out = s;
return true;
#else
Json::Value root;
root["result"] = result;
root["code"] = code;
// Json::StyledWriter w; // 响应方换一种写法
Json::FastWriter w;
*out = w.write(root);
return true;
#endif
}
bool Deserialize(const std::string &in)
{
#ifdef MySelf
std::size_t pos = in.find(blank_space_sep);
if (pos == std::string::npos)
return false;
std::string part_left = in.substr(0, pos);
std::string part_right = in.substr(pos + 1);
result = std::stoi(part_left);
code = std::stoi(part_right);
return true;
#else
Json::Value root;
Json::Reader r;
r.parse(in, root);
result = root["result"].asInt();
code = root["code"].asInt();
return true;
#endif
}
void DebugPrint()
{
std::cout << "Response success, result: " << result << " ,code :" << code << std::endl;
}
public:
int result;
int code; // 这个用来判断结果是否可信,设置出错码可以找到原因
};
两个类都提供了无参的构造函数。
TcpServer.hpp
#pragma once
#include <functional>
#include <signal.h>
#include <string>
#include "Log.hpp"
#include "Socket.hpp"
using func_t = std::function<std::string(std::string &package)>;
class TcpServer
{
public:
TcpServer(uint16_t port,func_t callback)
:port_(port),callback_(callback)
{}
bool InitServer()
{
listensock_.Socket();
listensock_.Bind(port_);
listensock_.Listen();
lg(Info,"init server... done");
return true;
}
void Start()
{
signal(SIGCHLD,SIG_IGN);
signal(SIGPIPE,SIG_IGN);
while(true)
{
std::string clientip;
uint16_t clientport;
int sockfd = listensock_.Accept(&clientip,&clientport);
if(sockfd < 0) continue;
lg(Info,"accept a new link, sockfd: %d, clientip: %s, clientport: %d", sockfd, clientip.c_str(), clientport);
if(fork() == 0) // 子进程负责计算
{
listensock_.Close(); // 子进程用不到,所以可以关闭
std::string inbuuffer_stream;
while(true) // 需要特别理解这部分代码,如果读上来的不是一个完整的请求,那么会保留曾经之前读取的部分,直到读上一个完整的请求
{
char buffer[1024];
ssize_t n = read(sockfd,buffer,sizeof(buffer));
if(n > 0)
{
buffer[n] = 0;
inbuuffer_stream += buffer;
lg(Debug, "debug:\n%s", inbuuffer_stream.c_str());
while(true)
{
std::string info = callback_(inbuuffer_stream);
if (info.empty())
break;
lg(Debug, "debug,response:\n%s",info.c_str());
lg(Debug,"debug:\n%s",inbuuffer_stream.c_str());
write(sockfd,info.c_str(),info.size());
}
}
else if(n == 0)
break;
else
break;
}
exit(0);
}
close(sockfd);
}
}
~TcpServer()
{}
private:
uint16_t port_;
Sock listensock_;
func_t callback_; // 回调函数
};
这里我们没有用多线程了,因为加入线程池太麻烦了,简单实现就用多进程版本。
ServerCal.hpp
#pragma once
#include <iostream>
#include "Protocol.hpp"
enum
{
Div_Zero = 1,
Mod_Zero,
Other_Oper
};
class ServerCal
{
public:
ServerCal()
{
}
Response CalculatorHelper(const Request &req)
{
Response resp(0, 0);
switch (req.op)
{
case '+':
resp.result = req.x + req.y;
break;
case '-':
resp.result = req.x - req.y;
break;
case '*':
resp.result = req.x * req.y;
break;
case '%':
if (req.y == 0)
resp.code = Mod_Zero;
else
resp.result = req.x % req.y;
break;
case '/':
if (req.y == 0)
resp.code = Div_Zero;
else
resp.result = req.x / req.y;
break;
default:
resp.code = Other_Oper;
break;
}
return resp;
}
std::string Calculator(std::string &package)
{
std::string content;
bool r = Decode(package, &content); // 先解包
if (!r)
return "";
Request req;
r = req.Deserialize(content);
if (!r)
return "";
content = "";
Response resp = CalculatorHelper(req);
resp.Serialize(&content);
content = Encode(content); // 计算完后再打包准备发回客户端
return content;
}
~ServerCal()
{
}
};
ClientCal.cc
#include <iostream>
#include <string>
#include <ctime>
#include <cstdio>
#include <unistd.h>
#include "Protocol.hpp"
#include "Socket.hpp"
static void Usage(const std::string &proc)
{
std::cout << "\nUsage: " << proc << " serverip serverport\n";
}
int main(int argc,char *argv[])
{
if(argc != 3)
{
Usage(argv[0]);
exit(0);
}
std::string serverip = argv[1];
uint16_t serverport = std::stoi(argv[2]);
Sock sockfd;
sockfd.Socket();
bool r = sockfd.Connect(serverip,serverport);
if(!r)
{
std::cout << "Connet error" << std::endl;
return 1;
}
srand(time(nullptr) ^ getpid()); // 随机数种子,加强随机性
int cnt = 1;
const std::string opers = "+-*/%&^";
std::string inbuffer_stream;
while(cnt <= 10)
{
std::cout << "###############The " << cnt << " times test" << "#########" << std::endl;
int x = rand() % 200 + 1;
usleep(1000);
int y = rand() % 150;
usleep(1200);
char oper = opers[rand()%opers.size()];
Request req(x,y,oper); // 构建请求
req.DebugPrint();
std::string package;
req.Serialize(&package);
package = Encode(package);
ssize_t m = write(sockfd.Fd(),package.c_str(),package.size());
if(m < 0)
{
std::cout << "写入失败" << std::endl;
return 1;
}
char buffer[128];
ssize_t n = read(sockfd.Fd(),buffer,sizeof(buffer)); // 客户端也无法保证读到的是一个完整的响应
if(n > 0)
{
buffer[n] = 0;
inbuffer_stream += buffer;
std::cout << inbuffer_stream << std::endl;
std::string content;
bool r = Decode(inbuffer_stream,&content);
if(!r)
{
std::cout << "读取出错(1)!!!" << std::endl;
exit(0); // 这里为了方便调试,客户端读不完整直接退出
//这里其实直接扔一个断言(assert)会比较好
}
Response resp;
r = resp.Deserialize(content);
if(!r)
{
std::cout << "读取出错!!!" << std::endl;
exit(0); // 同理
}
resp.DebugPrint();
}
else
{
std::cout << "读取结果失败" << std::endl;
}
printf("\n");
sleep(1);
cnt++;
}
sockfd.Close();
return 0;
}
在这里可以结合代码考虑一下,如果读取的报文不是完整的,该怎么办?
ServerCal.cc
#include "TcpServer.hpp"
#include "ServerCal.hpp"
#include <unistd.h>
static void Usage(const std::string &proc)
{
std::cout << "\nUsage: " << proc << "port\n";
}
int main(int argc, char *argv[])
{
if (argc != 2)
{
Usage(argv[0]);
exit(0);
}
uint16_t port = std::stoi(argv[1]);
ServerCal cal;
TcpServer *tsvp = new TcpServer(port, std::bind(&ServerCal::Calculator, &cal, std::placeholders::_1)); // 绑定函数
tsvp->InitServer();
daemon(0, 0); // 系统库里的守护进程函数
tsvp->Start();
return 0;
}
这里顺便用了一下系统库里的守护进程函数。
daemon()
其中第一个参数就是决定这个进程运行后要放在哪个目录,选择0就默认是系统的根目录。
第二个参数就是我们之前也做过的,要不要将标准输入,标准输出,标准错误重定向到/dev/null。选择0就是默认重定向到/dev/null。
这里可以再温习一下之前我们自己实现的Daemon
#pragma once
#include <iostream>
#include <cstdlib>
#include <unistd.h>
#include <signal.h>
#include <string>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
const std::string nullfile = "/dev/null";
void Daemon(const std::string &cwd = "")
{
//1.忽略其他异常信号
signal(SIGCHLD, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
signal(SIGSTOP, SIG_IGN);
//2.将自己变成独立的会话
if(fork() > 0)
exit(0); // 如果是父进程,那么就会直接退出,子进程继续向后执行,并且已经成为孤儿进程了
setsid(); // 这个函数就是让这个进程成为一个新的会话
// 3.更改当前调用进程的工作目录
if(!cwd.empty())
chdir(cwd.c_str()); // 因为大多数下,服务器是安装在系统上的,所以工作目录也就是在根目录下
// 4.将标准输入,标准输出,标准错误重定向至/dev/null,这个文件也就是垃圾处理文件
int fd = open(nullfile.c_str(),O_RDWR);
if(fd > 0)
{
dup2(fd,0);
dup2(fd,1);
dup2(fd,2);
close(fd); // 最后关掉这个文件描述符
}
}
执行结果
ps ajx | grep servercal
用这个来看我们启动的服务。
makefile
对于之前的条件编译,我们没必要再代码中进行硬编码,我们可以在makefile中灵活的选择
.PHONY:all
all:servercal clientcal
Flag =#-DMySelf=1
Lib=-ljsoncpp
servercal:ServerCal.cc
g++ -o $@ $^ -std=c++11 $(Lib) $(Flag)
clientcal:ClientCal.cc
g++ -o $@ $^ -std=c++11 $(Lib) $(Flag)
.PHONY:clean
clean:
rm -f clientcal servercal
其中Flag和Lib可以看作是makefile里面的宏,在makefile中,加了-D就可以定义一个宏,#-D就是不定义,因此我们可以在makefile里面控制是否用json的库函数。
再说说OSI七层模型
之前说OSI七层协议的时候,因为我们作为初学者,很多东西只靠听并不了解,到现在已经积累了一定使用加深理解后,再看OSI就简单一些了。
比如传输层,就是我们之前用到的Sock接口相关的,传输层上面就是会话层,会话层就是负责建立连接和断开连接的,所以我们之前学到的TCP这些就是会话层的。比如我们在这次的例子中,我们通过创建子进程处理请求,其实就相当于新建立了一个会话。让子进程管理会话,只是在这里看不出来管理而已,比如如果客户端连接成功,但是一直不发送请求,我们可以设计多少时间后没有请求就关闭连接等管理行为。
再看表示层的描述,“固有的数据格式和网络标准数据格式”,说的很抽象,但这不就是今天我们自定义的协议吗?比如固有的数据格式,就是我们规定了有几个操作符,几个操作数,中间还有空格等等。其实就是我们设计的序列化和反序列化,以及添加和删除报头的功能。
最后就是应用层,在今天看来,就是我们写的SverCal.hpp这样的,它对数据进行处理计算,也就是网络计算。
虽然传输层往下还有,但是我们最多也就接触到传输层了,由此可见,OSI模型设计的还是很好的。
总结
今天我们自定义的协议只能处理整形,而现实生活中还有浮点数的计算,或者超大数字的运算,那么此时一份协议肯定就不够用了,这里也可以设计三份协议,然后在报头中添加该计算要使用哪种协议。
虽然今后我们在工作中,很少会自己自定义协议,设计序列化和反序列化,大多数都用现成的,但是我们必须要做过一次,心里有底才行。