本篇将主要介绍在应用层中自定义的协议,自定义协议的同时还需要将我们的数据继续序列化和反序列化,所以本篇的重点为序列化、反序列化的原因以及如何自定义协议,接着探讨了关于为什么 tcp 协议可以支持全双工协议。还根据用户自定义协议结合 tcp socket 编写了一份三层结构(应用层、表示层、会话层)的网络计算器代码。
目录
自定义协议
序列化 / 反序列化
read、write、recv、send 和 tcp协议为什么支持全双工
网络版本计算器
自定义协议
我们程序猿写的代码,解决的一个个实际问题,满足我们日常需求的网络程序,都是在应用层实现的。当我们想要使用一些能满足自己需求的协议的时候,这个时候我们就可以自定义协议。
我们在前文中(Linux网络基础-CSDN博客)已经提到,协议通信双方的一种约定,是通信双方都认识的结构化数据,所以我们设定协议,设定出结构化数据,然后使用接口发送和接收数据即可。
假设我们实现一个网络版本的计算器,只需要客户端发送对应的两个操作数以及操作符,然后由服务端计算数据之后将结构发回给客户端即可,所以现在存在两个方法:
方案一:
1. 客户端发送一个形如 “1 + 1” 的字符串 / 或者定义一个两个操作数一个操作符结构体
2. 这个字符串中有两个操作数,都是整数
3. 两个整数之间会有一个字符都是运算符,运算符只能是 + - * / %
4. 数字和运算符之间没有空格
......
方案二:
1. 定义结构体来表示我们需要交互的信息;
2. 发送数据时将这个结构体按照一个规则转化为字符串,接收到数据的时候再按照相同的规则把字符串转换回结构体;
3. 这个过程就叫做 “序列化” 和 “反序列化”
但是对于方案一来说,对于不同操作系统,对于结构体定义时的内存对齐、字节对齐等等的都可能不同,大小可能都不一样,所以发送接收时很可能就会导致两端拿到的数据不一致,但是即使是跨平台出现的问题,我们也可以一个一个的解决,但即使我们解决这些问题,对我们的协议进行修改的时候,也会导致新的问题,所以严重不推介方案一(除非仅仅用于本地通信)。
序列化 / 反序列化
对于序列化和反序列的可以形象的理解为下图:
如上图,从发送方获取到结构体数据,然后将结构体数据序列化,经过网络发送给服务器端,在发送到服务器端的应用层的时候,将数据反序列化,然后向上层输出。
但是对于 tcp 协议而言是面向字节流的,也就意味着每次发送、接收数据的时候,根本就不清楚自己发送出去的数据或者接收到的数据是一个完整的数据、部分的数据、还是多个数据,这些情况都有可能,那么 tcp 如何保证读到的数据是一个完成的数据呢?tcp 则需要对我们的报文进行分割,然后加上对用的报头,在接收方接收数据的时候,就会从报头中的信息来分析得到的数据是否是一个完整的数据。
read、write、recv、send 和 tcp协议为什么支持全双工
在我们使用tcp协议,使用read、write、send、recv等系统调用发送数据的过程,其实是按照一下过程发送数据的:
如上图所示,对应的一个 sockfd 打开了两个文件缓冲区,一个是接收缓冲区一个是发送缓冲区(一个 sockfd 就代表着一个连接),其中使用的 write 接口就是将数据拷贝进发送缓冲区,read 就是将数据从接收缓冲区中拷贝出来(write、read、send、recv本质就是拷贝函数)。
所以,对于发送数据的本质就是从发送方的发送缓冲区把数据通过协议栈和网络拷贝给接收方的接收缓冲区,其中一个 sockfd 拥有两个文件缓冲区,也就是意味着主机可以边使用 write 写数据,也可以使用 read 读数据,读写不会冲突,这就是 tcp 支持全双工的本质原因。
tcp 协议(传输控制协议)可以决定当前的发送缓冲区发送什么数据,发送多少数据,发送数据出错了如何解决,对于接收数据同样如此,但是同时 tcp 所属的传输层是属于操作系统内核的,所以本质来讲,对于数据的拷贝、发送等,都还是在操作系统在干预。
同时对于以上的发送、接收操作,其实也是属于生产者消费模型。既然属于生产消费模型,那么对于我们的发送缓冲区和接收缓冲区本质就是临界资源,对于临界资源我们就需要将其进行保护以及同步,所以这就是为什么发送缓冲区满了 write 会阻塞,接收缓冲区空了 read 会阻塞。
网络版本计算器
接下来将模仿上述发送接收数据的方式写一个网络计算器,简单来说就是将两个操作数和一个操作符交给服务端,服务端计算完成之后发送回来。
TcpClient.cc 客户端的代码
#include <iostream> #include <ctime> #include <unistd.h> #include "Protocol.hpp" #include "Socket.hpp" #include "Log.hpp" using namespace protocol_ns; using namespace socket_ns; using namespace log_ns; int main(int argc, char* args[]) { if (argc != 3) { LOG(ERROR, "please input the -> ./client、ip and port\n"); return 0; } // 获取 ip 和 port 以及 socket uint16_t server_port = std::stoi(args[2]); std::string server_ip = args[1]; // 创建sockfd,以及将其连接起来 ScokSPtr socket = std::make_shared<TcpSocket>(); if (!socket->BuildCilentSocket(server_port, server_ip)) { LOG(FATAL, "connect fail\n"); return 1; } srand(time(nullptr) ^ getpid()); std::string operators("+-*/!^&"); // 连接成功,现在开始通信 while (true) { // 1. 先创建出需要解决的Request int x = rand() % 10, y = rand() % 10; usleep(1000 * x * y); char oper = operators[y % operators.size()]; std::shared_ptr<Request> Req = Factory::BuildRequest(); Req->SetValue(x, y, oper); std::string jsonstr; // 1. 将数据序列化 jsonstr = Req->Serialize(); // 2. 为数据添加报头 jsonstr = Encode(jsonstr); // 3. 将数据发送出去 ssize_t n = socket->Send(jsonstr); std::cout << "----------------------------------------------" << std::endl; // 将发送出去的jsonstr打印出来 std::cout << "jsonstr\n" << jsonstr << std::endl; if (n == 0) { LOG(ERROR, "send message failed\n"); exit(1); } while (true) { // 4. 现在开始接收消息 std::shared_ptr<Responce> Resp = Factory::BuildResponce(); std::string packagestreamqueue; socket->Recv(&packagestreamqueue); // 5. 将收到的信息解码 std::string package = Decode(packagestreamqueue); if (package.empty()) continue; // 将解码后的数据打印出来 std::cout << "package\n" << package << std::endl; // 6. 反序列化 Resp->Deserialize(package); // 7. 将数据打印出来 Resp->PrintResult(); break; } sleep(1); } return 0; }
TcpServer.cc 服务端的代码
#include "TcpServer.hpp" // 通信管理,负责建立和断开通信 -> 会话层 #include "NetCal.hpp" // 针对特定的计算协议 -> 应用层 #include "IOService.hpp" // 负责数据个数转换 -> 表示层 #include <memory> // 自己建立端口号 int main(int argc, char* argv[]) { if (argc != 2) { LOG(ERROR, "please input: ./server port\n"); return 1; } uint16_t port = std::stoi(argv[1]); NetCal calculator; calculate_t cal = std::bind(&NetCal::Calculator, &calculator, std::placeholders::_1); IOService io_forword; tcp_task_t task = std::bind( &IOService::IOForword, &io_forword, cal, std::placeholders::_1, std::placeholders::_2 ); std::unique_ptr<TcpServer> tsvr = std::make_unique<TcpServer>(task, port); tsvr->Init(); tsvr->Start(); return 0; }
Socket.hpp 使用模板方式方法封装的 socket 接口,因为创建接口和连接等操作都是一定格式的,所以将其设置为模板(参考:UDP/TCP --- Socket编程-CSDN博客)
#pragma once #include <iostream> #include <string> #include <functional> #include <memory> #include <cstring> #include <unistd.h> #include <sys/wait.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <pthread.h> #include "Log.hpp" #include "InetAddr.hpp" namespace socket_ns { using namespace log_ns; enum { SOCKET_ERROR = 1, BIND_ERROR, LISTEN_ERROR }; const int gblcklog = 8; class TcpSocket; class Socket; using ScokSPtr = std::shared_ptr<Socket>; // 模板方法模式 class Socket { public: Socket() {} ~Socket() {} virtual int CreateSocketOrDie() = 0; virtual void CreateBindOrDie(uint16_t port) = 0; virtual void CreateListenOrDie(int blcklog = gblcklog) = 0; virtual int CreateAccepte(InetAdrr* addr) = 0; virtual bool CreateConnector(uint16_t server_port, std::string server_ip) = 0; virtual ssize_t Recv(std::string* out) = 0; virtual ssize_t Send(std::string& in) = 0; virtual int GetSockfd() = 0; public: void BuildListenSocket(uint16_t port, int blcklog = gblcklog) { // 分别是创建sockfd,然后将其绑定,然后listen CreateSocketOrDie(); CreateBindOrDie(port); CreateListenOrDie(blcklog); } bool BuildCilentSocket(uint16_t server_port, std::string server_ip) { // 分别是创建sockfd,然后绑定,然后connnect CreateSocketOrDie(); return CreateConnector(server_port, server_ip); } }; class TcpSocket : public Socket { public: TcpSocket() {} TcpSocket(int sockfd) : _sockfd(sockfd) {} // 创建 sockfd int CreateSocketOrDie() override { _sockfd = socket(AF_INET, SOCK_STREAM, 0); if (_sockfd < 0) { LOG(FATAL, "create sockfd fail\n"); exit(SOCKET_ERROR); } LOG(INFO, "get listensockfd success, sockfd: %d\n", _sockfd); return _sockfd; } // 绑定 void CreateBindOrDie(uint16_t port) override { struct sockaddr_in local; memset(&local, 0, sizeof(local)); local.sin_family = AF_INET; local.sin_port = htons(port); local.sin_addr.s_addr = INADDR_ANY; int bind_n = bind(_sockfd, (struct sockaddr*)&local, sizeof(local)); if (bind_n < 0) { LOG(FATAL, "bind listensockfd fail, the reason: %s\n", strerror(errno)); exit(BIND_ERROR); } LOG(INFO, "bind success\n"); } // 绑定之后listen void CreateListenOrDie(int blcklog = gblcklog) override { int n = listen(_sockfd, blcklog); if (n < 0) { LOG(FATAL, "listen socket fail\n"); exit(LISTEN_ERROR); } LOG(INFO, "listen sucess\n"); } int CreateAccepte(InetAdrr* addr) override { struct sockaddr_in peer; socklen_t len = sizeof(peer); int sockfd = accept(_sockfd, (struct sockaddr*)&peer, &len); *addr = peer; return sockfd; } bool CreateConnector(uint16_t server_port, std::string server_ip) override { struct sockaddr_in server; memset(&server, 0, sizeof(server)); socklen_t len = sizeof(server); server.sin_family = AF_INET; server.sin_port = htons(server_port); // server.sin_addr.s_addr = inet_addr(server_ip.c_str()); inet_pton(AF_INET, server_ip.c_str(), &server.sin_addr); int n = connect(_sockfd, (struct sockaddr*)&server, sizeof(server)); if (n < 0) { return false; } return true; } ssize_t Recv(std::string* out) override { // 收消息,将收到的信息 char buff[4096]; ssize_t n = recv(_sockfd, buff, sizeof(buff), 0); if (n <= 0) return n; buff[n] = 0; *out += buff; return n; } ssize_t Send(std::string& in) override { ssize_t n = send(_sockfd, in.c_str(), in.size(), 0); return n; } int GetSockfd() override { return _sockfd; } ~TcpSocket() { if (_sockfd < 0) close(_sockfd); } private: int _sockfd; }; }
TcpServer.hpp 服务端的头文件,其中主要实现的是调用逻辑
#pragma once #include <iostream> #include <string> #include <functional> #include <cstring> #include <unistd.h> #include <sys/wait.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <pthread.h> #include "Log.hpp" #include "InetAddr.hpp" #include "Socket.hpp" using namespace log_ns; using namespace socket_ns; enum { SOCKET_ERROR = 1, BIND_ERROR, LISTEN_ERROR }; const int glistensockfd = -1; const int gblcklog = 8; using tcp_task_t = std::function<void(ScokSPtr, InetAdrr&)>; class TcpServer { private: // 创建一个内部类 struct ThreadData { ScokSPtr _tcp_socket; InetAdrr _addr; TcpServer* _tcp_point; ThreadData(const ScokSPtr& tcpsocket, const InetAdrr& addr, TcpServer* tcp) : _tcp_socket(tcpsocket), _addr(addr), _tcp_point(tcp) {} }; static void* runServer(void* args) { // 将线程分离,就不用阻塞的join线程 pthread_detach(pthread_self()); ThreadData* td = static_cast<ThreadData*>(args); // LOG(INFO, "the sockfd: %d\n", td->_sockfd); td->_tcp_point->_task(td->_tcp_socket, td->_addr); // close(); // 将其转化为tcpsocket变量则不需要显示的close了,因为已经析构了 delete td; return nullptr; } public: // TcpServer(){} TcpServer(tcp_task_t task, uint16_t port) : _task(task), _port(port), _isrunning(false), _tcp_socket(std::make_shared<TcpSocket>()) {} void Init() { _tcp_socket->BuildListenSocket(_port); } void Start() { _isrunning = true; while (_isrunning) { InetAdrr addr; int sockfd = _tcp_socket->CreateAccepte(&addr); if (sockfd < 0) { LOG(ERROR, "%s get sockfd fail, the reason is %s\n", addr.AddrString().c_str(), strerror(errno)); continue; } LOG(INFO, "get sockfd success, sockfd: %d\n", sockfd); // 为accept建立一个tcpsocket变量 ScokSPtr tcp_accept = std::make_shared<TcpSocket>(sockfd); // 2. 多线程 pthread_t tid; ThreadData* data = new ThreadData(tcp_accept, addr, this); pthread_create(&tid, nullptr, runServer, (void*)data); } _isrunning = false; } ~TcpServer() { } private: uint16_t _port; // int _listensocked; bool _isrunning; tcp_task_t _task; ScokSPtr _tcp_socket; };
Thread.hpp 线程头文件,便于创建多线程执行流
#pragma once #include <iostream> #include <functional> #include <string> #include <unistd.h> #include <pthread.h> #include <cstring> #include <cerrno> #include "Log.hpp" // using func_t = std::function<void(const std::string& name, pthread_mutex_t* lock)>; using func_t = std::function<void(const std::string& name)>; using namespace log_ns; // typedef void*(*func_t)(void*); const pthread_t ctid = -1; class Thread { private: void excute() { // std::cout << _name << " begin to run" << std::endl; // LOG(INFO, "%s begin to run\n", _name.c_str()); _isrunning = true; _func(_name); _isrunning = false; } static void* ThreadRoutine(void* args) { Thread* self = static_cast<Thread*>(args); self->excute(); return nullptr; } public: Thread(func_t func, const std::string& name) : _func(func), _isrunning(false), _tid(ctid), _name(name) {} ~Thread() {} void Start() { // 创建之后就开始运行了 int n = pthread_create(&_tid, nullptr, ThreadRoutine, (void*)this); if (n != 0) { std::cout << "thread create failed!!!" << std::endl; exit(1); } } void Stop() { // 将线程暂停,使用 if (_isrunning == false) return; // std::cout << _name << " stop " << std::endl; int n = ::pthread_cancel(_tid); if (n != 0) { std::cout << "thread stop failed" << std::endl; } _isrunning = false; } void Join() { // 线程等待, if (_isrunning) return; int n = pthread_join(_tid, nullptr); if (n != 0) { std::cout << "thread wait failed!!!" << strerror(errno) << std::endl; } // std::cout << _name << " join " << std::endl; } std::string Status() { if (_isrunning) return "running"; else return "sleep"; } private: pthread_t _tid; func_t _func; bool _isrunning; std::string _name; };
Protocol.hpp 我们设计的协议,其中主要包含两个结构体 Responce 和 Request,同时还在结构体中设计了将其序列化和反序列化的接口,使用的是 Json 库,需要在提前下载,源库中没有,使用如下命令:
ubuntu:sudo apt-get install libjsoncpp-dev centos: sudo yum install jsoncpp-devel
#pragma once #include <iostream> #include <string> #include <functional> #include <memory> #include <jsoncpp/json/json.h> #include "Log.hpp" namespace protocol_ns { using namespace log_ns; // 设计的协议的报头和报文的格式 // "len"\r\n"{json}"\r\n const std::string sep = "\r\n"; // 给我们的报文加上报头 std::string Encode(const std::string& jsonstring) { int len = jsonstring.size(); if (len == 0) return ""; return std::to_string(len) + sep + jsonstring + sep; } // 给我们的报文解密 std::string Decode(std::string& jsonstring) { auto pos = jsonstring.find(sep); if (pos == std::string::npos) return ""; // 现在开始截取json串 std::string lenstr = jsonstring.substr(0, pos); int len = std::stoi(lenstr); if (jsonstring.size() < 2 * sep.size() + len + lenstr.size()) return ""; size_t nextpos = jsonstring.find(sep, pos + sep.size()); if (nextpos == std::string::npos) return ""; std::string json = jsonstring.substr(pos + sep.size(), len); // 现在将jsonstring 给删除一部分 jsonstring.erase(0, 2 * sep.size() + len + lenstr.size()); return json; } class Request { public: Request() {} Request(int x, int y, char oper) : _x(x), _y(y), _oper(oper) {} // 序列化、反序列化 std::string Serialize() { Json::Value root; root["x"] = _x; root["y"] = _y; root["oper"] = _oper; Json::FastWriter writer; std::string s = writer.write(root); return s; } void Deserialize(const std::string& info) { Json::Reader reader; Json::Value root; bool parsingSuccessful = reader.parse(info, root); if (!parsingSuccessful) { LOG(FATAL, "fail to parse json\n"); return; } _x = root["x"].asInt(); _y = root["y"].asInt(); _oper = root["oper"].asInt(); } int Get_X() { return _x; } int Get_Y() { return _y; } char Get_oper() { return _oper; } void SetValue(int x, int y, char oper) { _x = x; _y = y; _oper = oper; } ~Request() {} private: int _x; int _y; char _oper; }; class Responce { public: Responce() : _result(0), _code(0), _desc("success") {} std::string Serialize() { Json::Value root; root["result"] = _result; root["code"] = _code; root["desc"] = _desc; Json::FastWriter writer; std::string s = writer.write(root); return s; } void Deserialize(const std::string& info) { Json::Reader reader; Json::Value root; bool parsingSuccessful = reader.parse(info, root); if (!parsingSuccessful) { LOG(FATAL, "fail to parse json\n"); return; } _result = root["result"].asInt(); _code = root["code"].asInt(); _desc = root["desc"].asString(); } int& Get_result() { return _result; } int& Get_code() { return _code; } std::string& Get_desc() { return _desc; } void PrintResult() { std::cout << "result: " << _result << ", code: " << _code << ", desc:" << _desc << std::endl; } ~Responce() {} private: int _result; int _code; // 0:success 1:div zero 2.illegal std::string _desc; }; class Factory { public: static std::shared_ptr<Request> BuildRequest() { return std::make_shared<Request>(); } static std::shared_ptr<Responce> BuildResponce() { return std::make_shared<Responce>(); } }; }
NetCal.hpp 设计的网络计算器头文件,负责处理发送过来的数据,然后计算出来发送回去。
#pragma once #include <iostream> #include <memory> #include <functional> #include <string> #include "Protocol.hpp" using namespace protocol_ns; using calculate_t = std::function<std::shared_ptr<Responce>(const std::shared_ptr<Request>& req)>; // 需要一个Req和一个Resp class NetCal { public: NetCal() {} std::shared_ptr<Responce> Calculator(const std::shared_ptr<Request>& req) { std::shared_ptr<Responce> Resp = std::make_shared<Responce>(); int x = req->Get_X(), y = req->Get_Y(); char oper = req->Get_oper(); int& result = Resp->Get_result(); int& code = Resp->Get_code(); std::string& desc = Resp->Get_desc(); switch (oper) { case '+': result = x + y; code = 0; desc = "success"; break; case '-': result = x - y; code = 0; desc = "success"; break; case '*': result = x * y; code = 0; desc = "success"; break; case '/': if (y == 0) { code = 1; desc = "div zero"; } else { code = 0; desc = "success"; result = x / y; } break; case '%': if (y == 0) { code = 2; desc = "mod zero"; } else { code = 0; desc = "success"; result = x % y; } break; default: code = 2; desc = "illegal operation"; break; } return Resp; } ~NetCal() {} private: };
Log.hpp 日志文件,记录我们的信息(参考:日志Log程序(C++))
#pragma once #include <iostream> #include <string> #include <cstdarg> #include <cstring> #include <fstream> #include <sys/types.h> #include <pthread.h> #include <unistd.h> namespace log_ns { enum { DEBUG = 1, INFO, WARNING, ERROR, FATAL }; // 定义日子真正需要记录的信息 struct LogMessage { std::string _level; int _id; std::string _filename; int _filenumber; std::string _curtime; std::string _log_message; }; #define SCREEN_TYPE 1 #define FILE_TYPE 2 const std::string defaultlogfile = "./log.txt"; pthread_mutex_t log_lock = PTHREAD_MUTEX_INITIALIZER; class Log { private: std::string LevelToString(int level) { switch(level) { case DEBUG: return "DEBUG"; case INFO: return "INFO"; case WARNING: return "WARNING"; case ERROR: return "ERROR"; case FATAL: return "FATAL"; default: return "UNKNOWN"; } } std::string CurTime() { // 获取当前的时间戳 time_t curtime = time(nullptr); // 将当前时间戳转换成结构体 struct tm* now = localtime(&curtime); char buff[128]; snprintf(buff, sizeof(buff), "%d-%02d-%02d %02d:%02d:%02d", now->tm_year + 1900, now->tm_mon + 1, now->tm_mday, now->tm_hour, now->tm_min, now->tm_sec ); return buff; } void Flush(const LogMessage& lg) { // 打印日志的时候可能存在线程安全,使用锁lock住 pthread_mutex_lock(&log_lock); switch(_type) { case SCREEN_TYPE: FlushToScreen(lg); break; case FILE_TYPE: FlushToFile(lg); break; } pthread_mutex_unlock(&log_lock); } void FlushToFile(const LogMessage& lg) { std::ofstream out; out.open(_logfile, std::ios::app); // 文件的操作使用追加 if (!out.is_open()) return; char buff[2024]; snprintf(buff ,sizeof(buff), "[%s][%d][%s][%d][%s] %s", lg._level.c_str(), lg._id, lg._filename.c_str(), lg._filenumber, lg._curtime.c_str(), lg._log_message.c_str() ); out.write(buff, strlen(buff)); out.close(); } void FlushToScreen(const LogMessage& lg) { printf("[%s][%d][%s][%d][%s] %s", lg._level.c_str(), lg._id, lg._filename.c_str(), lg._filenumber, lg._curtime.c_str(), lg._log_message.c_str() ); } public: Log(std::string logfile = defaultlogfile) : _type(SCREEN_TYPE), _logfile(logfile) {} void Enable(int type) { _type = type; } void LoadMessage(std::string filename, int filenumber, int level, const char* format, ...) { LogMessage lg; lg._level = LevelToString(level); lg._filename = filename; lg._filenumber = filenumber; // 获取当前时间 lg._curtime = CurTime(); // std::cout << lg._curtime << std::endl; lg._id = getpid(); // 获取可变参数 va_list ap; va_start(ap, format); char buff[2048]; vsnprintf(buff, sizeof(buff), format, ap); va_end(ap); lg._log_message = buff; // std::cout << lg._log_message; Flush(lg); } void ClearOurFile() { std::ofstream out; out.open(_logfile); out.close(); } ~Log() {} private: int _type; std::string _logfile; }; Log lg; // LOG 宏 #define LOG(level, format, ...) \ do \ { \ lg.LoadMessage(__FILE__, __LINE__, level, format, ##__VA_ARGS__); \ } while (0) #define EnableToScreen() \ do \ { \ lg.Enable(SCREEN_TYPE); \ } while (0) #define EnableToFile() \ do \ { \ lg.Enable(FILE_TYPE); \ } while (0) // 清理文件 #define ClearFile() \ do \ { \ lg.ClearOurFile(); \ } while (0) }
IOService.hpp 负责接收数据,然后将数据处理,然后转发回结果
#pragma once #include <iostream> #include <string> #include "InetAddr.hpp" #include "Log.hpp" #include "Socket.hpp" #include "Protocol.hpp" #include "NetCal.hpp" using namespace log_ns; using namespace socket_ns; using namespace protocol_ns; // 提供io服务,需要将对应的信息反序列化,然后执行,接着加上报头,然后继续序列化 // 然后将数据发送出去 class IOService { public: IOService() {} void IOForword(calculate_t cal, ScokSPtr socket, InetAdrr& who) { // 将读和写包装起来 while (true) { // 开始读和写 std::string packages_stream; // LOG(INFO, "the sockfd: %d\n", sockfd); int n = socket->Recv(&packages_stream); if (n <= 0) { LOG(WARNING, "client %s quit or read error\n", who.AddrString().c_str()); break; } else { // 在这里处理信息 // 1. 先将数据解码 std::string jsonstr = Decode(packages_stream); if (jsonstr.empty()) continue; std::cout << "----------------------------------" << std::endl; // 将解码的数据打印出来 std::cout << "jsonstr\n" << jsonstr << std::endl; // 2. 然后将数据反序列化 std::shared_ptr<Request> Req = Factory::BuildRequest(); Req->Deserialize(jsonstr); // 3. 开始运行处理数据 std::shared_ptr<Responce> Resp = cal(Req); // 4. 运行处理完数据之后将数据序列化 std::string anti_jsonstr = Resp->Serialize(); // 将序列化后的数据打印出来 std::cout << "anti_json\n" << anti_jsonstr << std::endl; // 5. 给序列化的数据加上报头 std::string package = Encode(anti_jsonstr); // 6. 将数据发送出去 socket->Send(package); } } } ~IOService() {} };
InetAddr.hpp 记录客户端或者服务端的 ip 和 port
#pragma once #include <iostream> #include <string> #include <netinet/in.h> #include <arpa/inet.h> #include <sys/types.h> #include <sys/socket.h> class InetAdrr { void ToHost(const struct sockaddr_in& addr) { // inet_ntoa 函数不是线程安全的函数,推荐使用 inet_ntop 函数 // _ip = inet_ntoa(addr.sin_addr); char ip_buff[32]; // 该函数是网络序列转主机序列 :network to process inet_ntop(AF_INET, &addr.sin_addr, ip_buff, sizeof(ip_buff)); // 若想要将主机序列转换成网络序列使用函数 : // inet_pton(AF_INET, _ip.c_str(), (void*)&addr.sin_addr.s_addr); _ip = ip_buff; _port = ntohs(addr.sin_port); } public: InetAdrr() {} InetAdrr(const struct sockaddr_in& addr) : _addr(addr) { ToHost(_addr); } InetAdrr& operator=(const struct sockaddr_in& addr) { _addr = addr; return *this; } std::string Ip() const { return _ip; } bool operator==(const InetAdrr& addr) { return (_port == addr._port && _ip == addr._ip); } struct sockaddr_in Addr() const { return _addr; } std::string AddrString() const { return _ip + ":" + std::to_string(_port); } uint16_t Port() const { return _port; } ~InetAdrr() {} private: uint16_t _port; std::string _ip; struct sockaddr_in _addr; };
makefile
.PHONY:all all:server client server:TcpServer.cc g++ -o $@ $^ -std=c++14 -lpthread -ljsoncpp client:TcpClient.cc g++ -o $@ $^ -std=c++11 -ljsoncpp .PHONY:clean clean: rm -f server client
测试结果:
对于以上代码主要实现的是 IOS 七层协议中的表示层、会话层、应用层,其中应用层实现的代码为:NetCal.hpp、表示层实现的代码为:IOService.hpp、会话层实现的代码为 TcpServer.hpp。