🌈个人主页:秦jh__https://blog.csdn.net/qinjh_?spm=1010.2135.3001.5343
🔥 系列专栏:https://blog.csdn.net/qinjh_/category_12891150.html
目录
应用层
再谈 "协议"
网络版计算器
序列化 和 反序列化
重新理解 read、write、recv、send和tcp为什么支持全双工
Socket封装
Jsoncpp
安装
序列化
反序列化
Json::Value
构造函数
访问元素
类型检查
赋值和类型转换
数组和对象操作
自定义协议的网络版计算器
ClientMain.cc
InetAddr.hpp
LockGuard.hpp
Log.hpp
NetCal.hpp
Protocol.hpp
ServerMain.cc
Service.hpp
Socket.hpp
TcpServer.hpp
前言
💬 hello! 各位铁子们大家好哇。
今日更新了Linux网络应用层自定义协议与序列化的内容
🎉 欢迎大家关注🔍点赞👍收藏⭐️留言📝
应用层
我们程序员写的一个个解决我们实际问题, 满足我们日常需求的网络程序, 都是在应用层.
再谈 "协议"
协议是一种 "约定". socket api 的接口, 在读写数据时, 都是按 "字符串" 的方式来发送接收的. 如果我们要传输一些 "结构化的数据" 怎么办呢?
其实,协议就是双方约定好的结构化的数据
网络版计算器
例如, 我们需要实现一个服务器版的计算器. 我们需要客户端把要计算的两个数发过去, 然后由服务器进行计算, 最后再把结果返回给客户端.
约定方案一:
- 客户端发送一个形如"1+1"的字符串;
- 这个字符串中有两个操作数, 都是整形;
- 两个数字之间会有一个字符是运算符, 运算符只能是 + ;
- 数字和运算符之间没有空格;
约定方案二:
- 定义结构体来表示我们需要交互的信息;
- 发送数据时将这个结构体按照一个规则转换成字符串, 接收到数据的时候再按照相同的规则把字符串转化回结构体;
- 这个过程叫做 "序列化" 和 "反序列化"
方案二是推荐的做法。
序列化 和 反序列化
无论我们采用方案一, 还是方案二, 还是其他的方案, 只要保证, 一端发送时构造的数据, 在另一端能够正确的进行解析, 就是 ok 的. 这种约定, 就是 应用层协议
但是,为了让我们深刻理解协议,我们打算自定义实现一下协议的过程。
- 我们采用方案 2,我们也要体现协议定制的细节
- 我们要引入序列化和反序列化,只不过这里直接采用现成的方案-- jsoncpp库
- 我们要对 socket 进行字节流的读取处理
重新理解 read、write、recv、send和tcp为什么支持全双工
一个fd,代表一个连接,一个连接,有两个缓冲区。
数据传输时,都是把内容拷贝到缓冲区里,然后再通过网络发送。而不是直接通过网络发送。接收数据也是如此。
结论:
1.read,write,send,recv本质都是拷贝函数。
2.发数据的本质:是从发送方的发送缓冲区把数据通过协议栈和网络拷贝给接收方的接收缓冲区。
在任何一台主机上,TCP 连接既有发送缓冲区,又有接受缓冲区,所以,在内核中,可以在发消息的同时,也可以收消息,即全双工。
实际数据什么时候发,发多少,出错了怎么办,由 TCP 控制,所以TCP叫做传输控制协议。
Socket封装
#pragma once
#include <iostream>
#include <cstring>
#include <functional>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <pthread.h>
#include <memory>
#include "Log.hpp"
#include "InetAddr.hpp"
namespace socket_ns
{
using namespace log_ns;
class Socket;
using SockSPtr = std::shared_ptr<Socket>;
enum
{
SOCKET_ERROR = 1,
BIND_ERROR,
LISTEN_ERR
};
const static int gblcklog = 8;
// 模板方法模式
class Socket
{
public:
virtual void CreateSocketOrDie() = 0;
virtual void CreateBindOrDie(uint16_t port) = 0;
virtual void CreateListentOrDie(int backlog=gblcklog) = 0;
virtual SockSPtr Accepter(InetAddr *cliaddr) = 0;
virtual bool Conntecor(const std::string& peerip,uint16_t peerport) = 0;
virtual int Sockfd()=0;
virtual void Close()=0;
virtual ssize_t Recv(std::string* out)=0;
virtual ssize_t Send(const std::string &in)=0;
public:
void BuildListenSocket(uint16_t port)
{
CreateSocketOrDie();
CreateBindOrDie(port);
CreateListentOrDie();
}
bool BuildClientSocket(const std::string& peerip,uint16_t peerport)
{
CreateSocketOrDie();
return Conntecor(peerip,peerport);
}
// void BuildUdpSocket()
// {}
};
class TcpSocket : public Socket
{
public:
TcpSocket()
{
}
TcpSocket(int sockfd) : _sockfd(sockfd)
{
}
~TcpSocket()
{
}
void CreateSocketOrDie() override
{
_sockfd = ::socket(AF_INET, SOCK_STREAM, 0);
if (_sockfd < 0)
{
LOG(FATAL, "socket create error\n");
exit(SOCKET_ERROR);
}
LOG(INFO, "socket create success,sockfd:%d\n", _sockfd); // 3
}
void CreateBindOrDie(uint16_t port) override
{
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = INADDR_ANY;
// 2.bind sockfd 和 socket addr
if (::bind(_sockfd, (struct sockaddr *)&local, sizeof(local)) < 0)
{
LOG(FATAL, "bind error\n");
exit(BIND_ERROR);
}
LOG(INFO, "bind success,sockfd:%d\n", _sockfd);
}
void CreateListentOrDie(int backlog) override
{
// 3.因为tcp是面向连接的,tcp需要未来不断的能够做到获取连接
if (::listen(_sockfd, gblcklog) < 0)
{
LOG(FATAL, "listen error\n");
exit(BIND_ERROR);
}
LOG(INFO, "listen success\n");
}
SockSPtr Accepter(InetAddr *cliaddr) override
{
struct sockaddr_in client;
socklen_t len = sizeof(client);
// 4.获取新连接
int sockfd = ::accept(_sockfd, (struct sockaddr *)&client, &len);
if (sockfd < 0)
{
LOG(WARNING, "accept error\n");
return nullptr;
}
*cliaddr = InetAddr(client);
LOG(INFO, "get a new link,client info:%s,sockfd is :%d\n", cliaddr->AddrStr().c_str(), sockfd);
return std::make_shared<TcpSocket>(sockfd); // C++14
}
bool Conntecor(const std::string& peerip,uint16_t peerport) override
{
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(peerport);
::inet_pton(AF_INET, peerip.c_str(), &server.sin_addr);
int n = ::connect(_sockfd, (struct sockaddr *)&server, sizeof(server));
if (n < 0)
{
return false;
}
return true;
}
int Sockfd()
{
return _sockfd;
}
void Close()
{
if(_sockfd>0)
{
::close(_sockfd);
}
}
ssize_t Recv(std::string* out) override
{
char inbuffer[4096];
ssize_t n = ::recv(_sockfd, inbuffer, sizeof(inbuffer) - 1,0);
if (n > 0)
{
inbuffer[n] = 0;
*out+=inbuffer;
}
return n;
}
ssize_t Send(const std::string &in) override
{
return ::send(_sockfd,in.c_str(),in.size(),0);
}
private:
int _sockfd;
};
// class UdpSocket:public Socket
// {};
}
Jsoncpp
Jsoncpp 是一个用于处理 JSON 数据的 C++ 库。它提供了将 JSON 数据序列化为字符串以及从字符串反序列化为 C++ 数据结构的功能。
安装
C++
ubuntu:sudo apt-get install libjsoncpp-dev
Centos: sudo yum install jsoncpp-devel
序列化
序列化指的是将数据结构或对象转换为一种格式,以便在网络上传输或存储到文件中。Jsoncpp 提供了多种方式进行序列化:
- 使用 Json::Value 的 toStyledString 方法
- 使用 Json::StreamWriter
- 使用 Json::FastWriter
反序列化
反序列化指的是将序列化后的数据重新转换为原来的数据结构或对象。Jsoncpp 提供了以下方法进行反序列化:
- 使用 Json::Reader
- 使用 Json::CharReader 的派生类
Json::Value
Json::Value 是 Jsoncpp 库中的一个重要类,用于表示和操作JSON 数据结构。以下是一些常用的 Json::Value 操作列表:
构造函数
- Json::Value():默认构造函数,创建一个空的 Json::Value 对象。
- Json::Value(ValueType type, bool allocated = false):根据给定的ValueType(如 nullValue, intValue, stringValue 等)创建一个Json::Value对象
访问元素
- Json::Value& operator[](const char* key):通过键(字符串)访问对象中的元素。如果键不存在,则创建一个新的元素。
- Json::Value& operator[](const std::string& key):同上,但使用std::string 类型的键。
- Json::Value& operator[](ArrayIndex index):通过索引访问数组中的元素。如果索引超出范围,则创建一个新的元素。
- Json::Value& at(const char* key):通过键访问对象中的元素,如果键不存在则抛出异常。
- Json::Value& at(const std::string& key):同上,但使用std::string类型的键。
类型检查
- bool isNull():检查值是否为 null。
- bool isBool():检查值是否为布尔类型。
- bool isInt():检查值是否为整数类型。
- bool isInt64():检查值是否为 64 位整数类型。
- bool isUInt():检查值是否为无符号整数类型。
- bool isUInt64():检查值是否为 64 位无符号整数类型。
- bool isIntegral():检查值是否为整数或可转换为整数的浮点数。
- bool isDouble():检查值是否为双精度浮点数。
- bool isNumeric():检查值是否为数字(整数或浮点数)。
- bool isString():检查值是否为字符串。
- bool isArray():检查值是否为数组。
- bool isObject():检查值是否为对象(即键值对的集合)
赋值和类型转换
- Json::Value& operator=(bool value):将布尔值赋给Json::Value 对象。
- Json::Value& operator=(int value):将整数赋给 Json::Value 对象。
- Json::Value& operator=(unsigned int value):将无符号整数赋给Json::Value 对象。
- Json::Value& operator=(Int64 value):将 64 位整数赋给Json::Value对象。
- Json::Value& operator=(UInt64 value):将 64 位无符号整数赋给Json::Value 对象。
- Json::Value& operator=(double value):将双精度浮点数赋给Json::Value 对象。
- Json::Value& operator=(const char* value):将C 字符串赋给Json::Value 对象。
- Json::Value& operator=(const std::string& value):将std::string赋给 Json::Value 对象。
- bool asBool():将值转换为布尔类型(如果可能)。
- int asInt():将值转换为整数类型(如果可能)。
- Int64 asInt64():将值转换为 64 位整数类型(如果可能)。
- unsigned int asUInt():将值转换为无符号整数类型(如果可能)。
- UInt64 asUInt64():将值转换为 64 位无符号整数类型(如果可能)。
- double asDouble():将值转换为双精度浮点数类型(如果可能)。
- std::string asString():将值转换为字符串类型(如果可能)
数组和对象操作
- size_t size():返回数组或对象中的元素数量。
- bool empty():检查数组或对象是否为空。
- void resize(ArrayIndex newSize):调整数组的大小。
- void clear():删除数组或对象中的所有元素。
- void append(const Json::Value& value):在数组末尾添加一个新元素。
- Json::Value& operator[](const char* key, const Json::Value& defaultValue = Json::nullValue):在对象中插入或访问一个元素,如果键不存在则使用默认值。
- Json::Value& operator[](const std::string& key, const Json::Value& defaultValue = Json::nullValue):同上,但使用std::string类型的
自定义协议的网络版计算器
完整过程如下图:
ClientMain.cc
#include <iostream>
#include <ctime>
#include <unistd.h>
#include "Socket.hpp"
#include "Protocol.hpp"
using namespace socket_ns;
int main(int argc, char *argv[])
{
if (argc != 3)
{
std::cerr << "Usage: " << argv[0] << "server-ip server-port" << std::endl;
exit(0);
}
std::string serverip = argv[1];
uint16_t serverport = std::stoi(argv[2]);
SockSPtr sock = std::make_shared<TcpSocket>();
if (!sock->BuildClientSocket(serverip, serverport))
{
std::cerr << "connect error" << std::endl;
exit(1);
}
srand(time(nullptr) ^ getpid());
const std::string opers = "+-*/&^!";
int cnt=3;
std::string packagestreamqueue;
while (true)
{
// 构建数据
int x = rand() % 10;
usleep(x * 1000);
int y = rand() % 10;
usleep(x * y * 100);
char oper = opers[y % opers.size()];
// 构建请求
auto req = Factory::BuildRequestDefault();
req->SetValue(x, y, oper);
// 1.序列化
std::string reqstr;
req->Serialize(&reqstr);
// 2.添加长度报头字段
reqstr = Encode(reqstr);
std::cout<<"#######################################"<<std::endl;
std::cout<<"request string:\n"<<reqstr<<std::endl;
// 3.发送数据
sock->Send(reqstr);
while (true)
{
// 4.读取应答,response
ssize_t n = sock->Recv(&packagestreamqueue);
if (n <= 0)
{
break;
}
// 我们不能保证我们读到的是一个完整的报文
// 5.报文解析,提取报头和有效载荷
std::string package = Decode(packagestreamqueue);
if (package.empty())
continue;
std::cout<<"package:\n"<<package<<std::endl;
//6.反序列化
auto resp=Factory::BuildResponseDefault();
resp->Deserialize(package);
//7.打印结果
resp->PrintResult();
break;
}
sleep(1);
// break;
}
sock->Close();
return 0;
}
InetAddr.hpp
#pragma once
#include<iostream>
#include<string>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>
class InetAddr
{
private:
void ToHost(const struct sockaddr_in &addr) //主机转本地地址
{
_port=ntohs(addr.sin_port);
//_ip=inet_ntoa(addr.sin_addr);
char ip_buf[32];
::inet_ntop(AF_INET,&addr.sin_addr,ip_buf,sizeof(ip_buf));
_ip=ip_buf;
}
public:
InetAddr(const struct sockaddr_in &addr):_addr(addr)
{
ToHost(addr);
}
InetAddr()
{}
bool operator==(const InetAddr& addr)
{
return (this->_ip==addr._ip && this->_port==addr._port);
}
std::string Ip()
{
return _ip;
}
uint16_t Port()
{
return _port;
}
struct sockaddr_in Addr()
{
return _addr;
}
std::string AddrStr()
{
return _ip+":"+std::to_string(_port);
}
~InetAddr()
{}
private:
std::string _ip;
uint16_t _port;
struct sockaddr_in _addr;
};
LockGuard.hpp
#pragma once
#include <pthread.h>
class LockGuard
{
public:
LockGuard(pthread_mutex_t *mutex):_mutex(mutex)
{
pthread_mutex_lock(_mutex);
}
~LockGuard()
{
pthread_mutex_unlock(_mutex);
}
private:
pthread_mutex_t *_mutex;
};
Log.hpp
#pragma once
#include <iostream>
#include <sys/types.h>
#include <unistd.h>
#include <ctime>
#include <cstdarg>
#include <fstream>
#include <cstring>
#include <pthread.h>
#include "LockGuard.hpp"
namespace log_ns
{
enum
{
DEBUG = 1,
INFO,
WARNING,
ERROR,
FATAL
};
std::string LevelToString(int level)
{
switch (level)
{
case DEBUG:
return "DEBUG";
case INFO:
return "INFO";
case WARNING:
return "WARNING";
case ERROR:
return "ERROR";
case FATAL:
return "FATAL";
default:
return "UNKNOWN";
}
}
std::string GetCurrTime()
{
time_t now = time(nullptr);
struct tm *curr_time = localtime(&now);
char buffer[128];
snprintf(buffer, sizeof(buffer), "%d-%02d-%02d %02d:%02d:%02d",
curr_time->tm_year + 1900,
curr_time->tm_mon + 1,
curr_time->tm_mday,
curr_time->tm_hour,
curr_time->tm_min,
curr_time->tm_sec);
return buffer;
}
class logmessage
{
public:
std::string _level;
pid_t _id;
std::string _filename;
int _filenumber;
std::string _curr_time;
std::string _message_info;
};
#define SCREEN_TYPE 1
#define FILE_TYPE 2
const std::string glogfile = "./log.txt";
pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER;
// log.logMessage("", 12, INFO, "this is a %d message ,%f, %s hellwrodl", x, , , );
class Log
{
public:
Log(const std::string &logfile = glogfile) : _logfile(logfile), _type(SCREEN_TYPE)
{
}
void Enable(int type)
{
_type = type;
}
void FlushLogToScreen(const logmessage &lg)
{
printf("[%s][%d][%s][%d][%s] %s",
lg._level.c_str(),
lg._id,
lg._filename.c_str(),
lg._filenumber,
lg._curr_time.c_str(),
lg._message_info.c_str());
}
void FlushLogToFile(const logmessage &lg)
{
std::ofstream out(_logfile, std::ios::app);
if (!out.is_open())
return;
char logtxt[2048];
snprintf(logtxt, sizeof(logtxt), "[%s][%d][%s][%d][%s] %s",
lg._level.c_str(),
lg._id,
lg._filename.c_str(),
lg._filenumber,
lg._curr_time.c_str(),
lg._message_info.c_str());
out.write(logtxt, strlen(logtxt));
out.close();
}
void FlushLog(const logmessage &lg)
{
// 加过滤逻辑 --- TODO
LockGuard lockguard(&glock);
switch (_type)
{
case SCREEN_TYPE:
FlushLogToScreen(lg);
break;
case FILE_TYPE:
FlushLogToFile(lg);
break;
}
}
void logMessage(std::string filename, int filenumber, int level, const char *format, ...)
{
logmessage lg;
lg._level = LevelToString(level);
lg._id = getpid();
lg._filename = filename;
lg._filenumber = filenumber;
lg._curr_time = GetCurrTime();
va_list ap;
va_start(ap, format);
char log_info[1024];
vsnprintf(log_info, sizeof(log_info), format, ap);
va_end(ap);
lg._message_info = log_info;
// 打印出来日志
FlushLog(lg);
}
~Log()
{
}
private:
int _type;
std::string _logfile;
};
Log lg;
#define LOG(Level, Format, ...) \
do \
{ \
lg.logMessage(__FILE__, __LINE__, Level, Format, ##__VA_ARGS__); \
} while (0)
#define EnableScreen() \
do \
{ \
lg.Enable(SCREEN_TYPE); \
} while (0)
#define EnableFILE() \
do \
{ \
lg.Enable(FILE_TYPE); \
} while (0)
};
NetCal.hpp
#pragma once
#include "Protocol.hpp"
#include <memory>
class NetCal
{
public:
NetCal()
{
}
~NetCal()
{
}
std::shared_ptr<Response> Calculator(std::shared_ptr<Request> req)
{
auto resp = Factory::BuildResponseDefault();
switch (req->Oper())
{
case '+':
resp->_result=req->X()+req->Y();
break;
case '-':
resp->_result=req->X()-req->Y();
break;
case '*':
resp->_result=req->X()*req->Y();
break;
case '/':
{
if(req->Y()==0)
{
resp->_code=1;
resp->_desc="div zero";
}
else
{
resp->_result=req->X()/req->Y();
}
}
break;
case '%':
{
if(req->Y()==0)
{
resp->_code=2;
resp->_desc="mod zero";
}
else
{
resp->_result=req->X()%req->Y();
}
}
break;
default:
{
resp->_code=3;
resp->_desc="illegal operation";
}
break;
}
return resp;
}
private:
};
Protocol.hpp
#pragma once
#include<memory>
#include <iostream>
#include <string>
#include <jsoncpp/json/json.h>
static const std::string sep="\r\n";
//设计一下协议的报头和报文的完整格式
// "len"\r\n"{json}"\r\n ---完整报文 len:有效载荷的长度
// 第一个\r\n:区分len和json串
// 第二个\r\n:暂时没用,打印方便,debug
//添加报头
std::string Encode(const std::string &jsonstr)
{
int len=jsonstr.size();
std::string lenstr=std::to_string(len);
return lenstr+sep+jsonstr+sep;
}
// "len
// "len"
// "len"\r\n"{json}"\r\n
std::string Decode(std::string &packagestream)//不能带const
{
//分析
auto pos=packagestream.find(sep);
if(pos==std::string::npos) return std::string();//如果找不到分隔符,说明不是完整报文
std::string lenstr=packagestream.substr(0,pos);
int len=std::stoi(lenstr);
//计算一个完整的报文有多长
int total=lenstr.size()+len+2*sep.size();
if(packagestream.size()<total) return std::string();
//提取
std::string jsonstr=packagestream.substr(pos+sep.size(),len);
packagestream.erase(0,total);
return jsonstr;
}
class Request
{
public:
Request()
{
}
Request(int x, int y, char oper) : _x(x), _y(y), _oper(oper)
{
}
bool Serialize(std::string *out)
{
// 1. 使用现成的库, xml, json(jsoncpp), protobuf
Json::Value root;
root["x"] = _x;
root["y"] = _y;
root["oper"] = _oper;
Json::FastWriter writer;
// Json::StyledWriter writer;
std::string s = writer.write(root);
*out = s;
return true;
}
bool Deserialize(const std::string &in)
{
Json::Value root;
Json::Reader reader;
bool res = reader.parse(in, root);
_x = root["x"].asInt();
_y = root["y"].asInt();
_oper = root["oper"].asInt();
return true;
}
void Print()
{
std::cout << _x << std::endl;
std::cout << _y << std::endl;
std::cout << _oper << std::endl;
}
~Request()
{
}
int X()
{
return _x;
}
int Y()
{
return _y;
}
char Oper()
{
return _oper;
}
void SetValue(int x,int y,char oper)
{
_x=x;
_y=y;
_oper=oper;
}
private:
int _x;
int _y;
char _oper; // + - * / % // x oper y
};
// struct request resp={30,0};
class Response
{
public:
Response():_result(0),_code(0),_desc("success")
{
}
bool Serialize(std::string *out)
{
// 1. 使用现成的库, xml, json(jsoncpp), protobuf
Json::Value root;
root["result"] = _result;
root["code"] = _code;
root["desc"] = _desc;
Json::FastWriter writer;
// Json::StyledWriter writer;
std::string s = writer.write(root);
*out = s;
return true;
}
bool Deserialize(const std::string &in)
{
Json::Value root;
Json::Reader reader;
bool res = reader.parse(in, root);
if(!res) return false;
_result = root["result"].asInt();
_code = root["code"].asInt();
_desc = root["desc"].asString();
return true;
}
void PrintResult()
{
std::cout<<"result:"<<_result<<",code:"<<_code<<",desc:"<<_desc<<std::endl;
}
~Response()
{
}
public:
int _result;
int _code; // 0:success 1:div zero 2.非法操作
std::string _desc;
};
class Factory
{
public:
static std::shared_ptr<Request> BuildRequestDefault()
{
return std::make_shared<Request>();
}
static std::shared_ptr<Response> BuildResponseDefault()
{
return std::make_shared<Response>();
}
};
ServerMain.cc
#include "TcpServer.hpp"
#include "Service.hpp"
#include"NetCal.hpp"
// ./tcpserver 8888
int main(int argc, char *argv[])
{
if (argc != 2)
{
std::cerr << "Usage: " << argv[0] << " local-port" << std::endl;
exit(0);
}
uint16_t port = std::stoi(argv[1]);
//软件代码,划分了三层
NetCal cal;
IOService service(std::bind(&NetCal::Calculator,&cal,std::placeholders::_1));
std::unique_ptr<TcpServer> tsvr=std::make_unique<TcpServer>(
std::bind(&IOService::IOExcute,&service,std::placeholders::_1,std::placeholders::_2),
port
);
tsvr->Loop();
return 0;
}
Service.hpp
#pragma once
#include <iostream>
#include<functional>
#include "InetAddr.hpp"
#include "Socket.hpp"
#include "Log.hpp"
#include"Protocol.hpp"
using namespace socket_ns;
using namespace log_ns;
using process_t =std::function<std::shared_ptr<Response>(std::shared_ptr<Request>)>;
class IOService
{
public:
IOService(process_t process):_process(process)
{
}
void IOExcute(SockSPtr sock, InetAddr &addr)
{
std::string packagestreamqueue;
while (true)
{
//1.负责读取
ssize_t n = sock->Recv(&packagestreamqueue);
if (n <= 0)
{
LOG(INFO, "client %s quit or recv error\n", addr.AddrStr().c_str());
break;
}
std::cout<<"----------------------------------------------"<<std::endl;
std::cout<<"packagestreamqueue:\n"<<packagestreamqueue<<std::endl;
//我们不能保证我们读到的是一个完整的报文
//报文解析,提取报头和有效载荷
std::string package=Decode(packagestreamqueue);
if(package.empty()) continue;
//这里就可以保证我们读到的是一个完整的报文了
auto req=Factory::BuildRequestDefault();
std::cout<<"package:\n"<<package<<std::endl;
//3.反序列化
req->Deserialize(package);
//4.业务处理
auto resp=_process(req);//通过请求,得到应答
//5.序列化应答
std::string respjson;
resp->Serialize(&respjson);
std::cout<<"respjson:\n"<<respjson<<std::endl;
//6.添加len长度报头
respjson=Encode(respjson);
std::cout<<"respjson add header done:\n"<<respjson<<std::endl;
//7.发送回去
sock->Send(respjson);
}
}
~IOService()
{
}
private:
process_t _process;
};
Socket.hpp
#pragma once
#include <iostream>
#include <cstring>
#include <functional>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <pthread.h>
#include <memory>
#include "Log.hpp"
#include "InetAddr.hpp"
namespace socket_ns
{
using namespace log_ns;
class Socket;
using SockSPtr = std::shared_ptr<Socket>;
enum
{
SOCKET_ERROR = 1,
BIND_ERROR,
LISTEN_ERR
};
const static int gblcklog = 8;
// 模板方法模式
class Socket
{
public:
virtual void CreateSocketOrDie() = 0;
virtual void CreateBindOrDie(uint16_t port) = 0;
virtual void CreateListentOrDie(int backlog=gblcklog) = 0;
virtual SockSPtr Accepter(InetAddr *cliaddr) = 0;
virtual bool Conntecor(const std::string& peerip,uint16_t peerport) = 0;
virtual int Sockfd()=0;
virtual void Close()=0;
virtual ssize_t Recv(std::string* out)=0;
virtual ssize_t Send(const std::string &in)=0;
public:
void BuildListenSocket(uint16_t port)
{
CreateSocketOrDie();
CreateBindOrDie(port);
CreateListentOrDie();
}
bool BuildClientSocket(const std::string& peerip,uint16_t peerport)
{
CreateSocketOrDie();
return Conntecor(peerip,peerport);
}
// void BuildUdpSocket()
// {}
};
class TcpSocket : public Socket
{
public:
TcpSocket()
{
}
TcpSocket(int sockfd) : _sockfd(sockfd)
{
}
~TcpSocket()
{
}
void CreateSocketOrDie() override
{
_sockfd = ::socket(AF_INET, SOCK_STREAM, 0);
if (_sockfd < 0)
{
LOG(FATAL, "socket create error\n");
exit(SOCKET_ERROR);
}
LOG(INFO, "socket create success,sockfd:%d\n", _sockfd); // 3
}
void CreateBindOrDie(uint16_t port) override
{
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = INADDR_ANY;
// 2.bind sockfd 和 socket addr
if (::bind(_sockfd, (struct sockaddr *)&local, sizeof(local)) < 0)
{
LOG(FATAL, "bind error\n");
exit(BIND_ERROR);
}
LOG(INFO, "bind success,sockfd:%d\n", _sockfd);
}
void CreateListentOrDie(int backlog) override
{
// 3.因为tcp是面向连接的,tcp需要未来不断的能够做到获取连接
if (::listen(_sockfd, gblcklog) < 0)
{
LOG(FATAL, "listen error\n");
exit(BIND_ERROR);
}
LOG(INFO, "listen success\n");
}
SockSPtr Accepter(InetAddr *cliaddr) override
{
struct sockaddr_in client;
socklen_t len = sizeof(client);
// 4.获取新连接
int sockfd = ::accept(_sockfd, (struct sockaddr *)&client, &len);
if (sockfd < 0)
{
LOG(WARNING, "accept error\n");
return nullptr;
}
*cliaddr = InetAddr(client);
LOG(INFO, "get a new link,client info:%s,sockfd is :%d\n", cliaddr->AddrStr().c_str(), sockfd);
return std::make_shared<TcpSocket>(sockfd); // C++14
}
bool Conntecor(const std::string& peerip,uint16_t peerport) override
{
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(peerport);
::inet_pton(AF_INET, peerip.c_str(), &server.sin_addr);
int n = ::connect(_sockfd, (struct sockaddr *)&server, sizeof(server));
if (n < 0)
{
return false;
}
return true;
}
int Sockfd()
{
return _sockfd;
}
void Close()
{
if(_sockfd>0)
{
::close(_sockfd);
}
}
ssize_t Recv(std::string* out) override
{
char inbuffer[4096];
ssize_t n = ::recv(_sockfd, inbuffer, sizeof(inbuffer) - 1,0);
if (n > 0)
{
inbuffer[n] = 0;
*out+=inbuffer;
}
return n;
}
ssize_t Send(const std::string &in) override
{
return ::send(_sockfd,in.c_str(),in.size(),0);
}
private:
int _sockfd;
};
// class UdpSocket:public Socket
// {};
}
TcpServer.hpp
#pragma once
#include<functional>
#include"Socket.hpp"
#include"Log.hpp"
#include"InetAddr.hpp"
using namespace socket_ns;
static const int gport=8888;
using service_io_t =std::function<void(SockSPtr,InetAddr&)>;
class TcpServer
{
public:
TcpServer(service_io_t service,int port=gport)
:_port(port),
_listensock(std::make_shared<TcpSocket>()),
_isrunning(false),
_service(service)
{
_listensock->BuildListenSocket(_port);
}
class ThreadData
{
public:
SockSPtr _sockfd;
TcpServer* _self;
InetAddr _addr;
public:
ThreadData(SockSPtr sockfd,TcpServer* self,const InetAddr& addr):_sockfd(sockfd),_self(self),_addr(addr)
{}
};
void Loop()
{
// signal(SIGCHLD,SIG_IGN); //进行忽略,父进程就不需要等待子进程了
_isrunning=true;
while(_isrunning)
{
InetAddr client;
SockSPtr newsock=_listensock->Accepter(&client);
if(newsock==nullptr)
continue;
LOG(INFO,"get a new link,client info:%s,sockfd is :%d\n",client.AddrStr().c_str(),newsock->Sockfd());
//version 2 --多线程版本
pthread_t tid;
ThreadData* td=new ThreadData(newsock,this,client);
pthread_create(&tid,nullptr,Execute,td);//新线程进行分离
}
_isrunning=false;
}
static void* Execute(void* args)
{
pthread_detach(pthread_self());
ThreadData* td=static_cast<ThreadData*>(args);
td->_self->_service(td->_sockfd,td->_addr);
td->_sockfd->Close();
delete td;
return nullptr;
}
~TcpServer(){}
private:
uint16_t _port;
SockSPtr _listensock;
bool _isrunning;
service_io_t _service;
};