文章目录
- 一、protobuf的好处
- 二、如何创建proto
- 三、编译生成的C++类
- UserServiceRpc
- UserServiceRpc_Stub
- 四、序列化和反序列化
- 序列化
- 反序列化
- 粘包问题解决
- 调用者组包
- 提供者解包
一、protobuf的好处
1、protobuf是二进制存储的,xml和json都是文本存储的。故protobuf占用带宽较低
2、protobuf不需要存储额外的信息。
json如何存储数据?键值对。例:Name:”zhang san”, pwd: “12345”。
protobuf存储数据的方式:“zhang san” “123456”(无额外信息)
二、如何创建proto
1、定义版本和声明,第三个为生成service所需要的声明(service服务类和rpc方法描述默认不生成)
syntax = "proto3";
package fixbug;
option cc_generic_services = true;
2、定义远端调用函数的input参数和return参数
message ResultCode
{
int32 errcode = 1;
bytes errmsg = 2;
}
message LoginRequest
{
bytes name = 1;
bytes pwd = 2;
}
message LoginResponse
{
ResultCode result = 1;
bool sucess = 2;
}
注意,建议将string类型替换为bytes类型,因为bytes直接存二进制文件,效率更高一点,如果用string,最后还要将其转换为字节数据,而bytes则不需要。最后结果和上面相同
3、生成rpc方法的类型
在protobuf里面定义描述rpc方法的类型 – service
service UserServiceRpc
{
rpc Login(LoginRequest) returns(LoginResponse);
}
三、编译生成的C++类
编译生成cpp和h文件
protoc test.proto --cpp_out=./
1、message具体生成的c++类如下
2、rpc方法的类型生成的类
test.proto代码
service UserServiceRpc
{
rpc Login(LoginRequest) returns(LoginResponse);
}
生成类如下:
可以看出,一共生成两个类
UserServiceRpc
一个供callee–>rpc服务提供者使用。继承goole::protobuf::Service得到
class UserServiceRpc : public google::protobuf::Service
UserServiceRpc_Stub
一个供caller–>rpc服务的调用者使用。继承UserServiceRpc得到
class UserServiceRpc_Stub : public UserServiceRpc
成员函数很干净,一切的源头只需要一个RpcChannel类
RpcChannel类中只需要重写一个CallMethod方法,如下
四、序列化和反序列化
序列化:对象转为字节序列称为对象的序列化
反序列化:字节序列转为对象称为对象的反序列化
protobuf跨平台语言支持,序列化和反序列化效率高速度快,且序列化后体积比XML和JSON都小很多,适合网络传输。
注意:序列化和反序列化可能对系统的消耗较大,因此原则是:远程调用函数传入参数和返回值对象要尽量简单,具体来说应避免:
远程调用函数传入参数和返回值对象体积较大,如传入参数是List或Map,序列化后字节长度较长,对网络负担较大
远程调用函数传入参数和返回值对象有复杂关系,传入参数和返回值对象有复杂的嵌套、包含、聚合关系等,性能开销大
远程调用函数传入参数和返回值对象继承关系复杂,性能开销大
序列化
1、定义生成的头文件
#include "test.pb.h"
2、函数调用方打包数据
LoginRequest reqA;
req.set_name("zhang san");
req.set_pwd("123456");
3、将打包好的LoginRequest reqA;数据交给protobuf进行序列化
std::string send_str;
// 进行序列化,框架干的事情
if (req.SerializeToString(&send_str))
{
// 序列化成功后 再发送
std::cout << send_str.c_str() << std::endl;
}
反序列化
此时数据被发送到被调用方,被调用方反序列化刚刚发送过来的send_str
LoginRequest reqB;
// 从send_str反序列化一个login请求对象
if (reqB.ParseFromString(send_str))
{
// 以下代码不属于框架内的代码
std::cout << reqB.name() << std::endl;
std::cout << reqB.pwd() << std::endl;
}
需要注意,所有不涉及抽象层,设计具体的业务的代码,都不属于RPC分布式网络通信框架的代码。
粘包问题解决
rpc服务调用者和rpc服务提供者在发送或解析函数的输入数据时,需要共同参照一个proto数据包格式RpcHeader,如下所示:
syntax = "proto3";
package mprpc;
message RpcHeader
{
bytes service_name = 1;
bytes method_name = 2;
uint32 args_size = 3;
}
rpc服务调用者和rpc服务提供者都需要遵循该格式组装数据或是解析数据。
调用者组包
首先调取服务名和方法名,之后序列化调用函数的输入,得到序列化后输入的长度。
将服务名,方法名,输入的长度按照预设定的protobuf message再次序列化得到rpc_header_str;
最后将rpc_header_str头部插入固定4字节的rpc_header_str.size()
,尾部插入序列化后的调用函数输入,得到send_rpc_str。
const google::protobuf::ServiceDescriptor* sd = method->service();
std::string service_name = sd->name(); // service_name
std::string method_name = method->name(); // method_name
// 获取参数的序列化字符串长度 args_size
uint32_t args_size = 0;
std::string args_str;
if (request->SerializeToString(&args_str))
{
args_size = args_str.size();
}
else
{
controller->SetFailed("serialize request error!");
return;
}
// 定义rpc的请求header
mprpc::RpcHeader rpcHeader;
rpcHeader.set_service_name(service_name);
rpcHeader.set_method_name(method_name);
rpcHeader.set_args_size(args_size);
uint32_t header_size = 0;
std::string rpc_header_str;
if (rpcHeader.SerializeToString(&rpc_header_str))
{
header_size = rpc_header_str.size();
}
else
{
controller->SetFailed("serialize rpc header error!");
return;
}
// 组织待发送的rpc请求的字符串
std::string send_rpc_str;
send_rpc_str.insert(0, std::string((char*)&header_size, 4)); // header_size
send_rpc_str += rpc_header_str; // rpcheader
send_rpc_str += args_str; // args
// 打印调试信息
std::cout << "============================================" << std::endl;
std::cout << "header_size: " << header_size << std::endl;
std::cout << "rpc_header_str: " << rpc_header_str << std::endl;
std::cout << "service_name: " << service_name << std::endl;
std::cout << "method_name: " << method_name << std::endl;
std::cout << "args_str: " << args_str << std::endl;
std::cout << "============================================" << std::endl;
提供者解包
根据头的长度,得到rpc_header_str的长度,使用substr将rpc_header_str从网络数据包中宅出来,并将数据头反序列化。
之后根据输入的长度反序列化输入args_str,成功拿到输入。
std::string recv_buf = buffer->retrieveAllAsString();
// 从字符流中读取前4个字节的内容
uint32_t header_size = 0;
recv_buf.copy((char*)&header_size, 4, 0);
// 根据 header_size 读取数据头的原始字符流,反序列化数据,得到rpc请求的详细信息
std::string rpc_header_str = recv_buf.substr(4, header_size);
mprpc::RpcHeader rpcHeader;
std::string service_name;
std::string method_name;
uint32_t args_size;
if (rpcHeader.ParseFromString(rpc_header_str))
{
// 数据头反序列化成功
service_name = rpcHeader.service_name();
method_name = rpcHeader.method_name();
args_size = rpcHeader.args_size();
}
else
{
// 数据头反序列化失败
std::cout << "rpc_header_str:" << rpc_header_str << " parse error!" << std::endl;
return;
}
// 获取rpc方法参数的字符流数据
std::string args_str = recv_buf.substr(4 + header_size, args_size);
// 打印调试信息
std::cout << "============================================" << std::endl;
std::cout << "header_size: " << header_size << std::endl;
std::cout << "rpc_header_str: " << rpc_header_str << std::endl;
std::cout << "service_name: " << service_name << std::endl;
std::cout << "method_name: " << method_name << std::endl;
std::cout << "args_str: " << args_str << std::endl;
std::cout << "============================================" << std::endl;