文章目录
- 介绍
- 为什么使用protobuf
- protobuf的service rpc
- 框架发布方的上层使用逻辑
- 框架的提供方逻辑
- Rpc调用方框架实现
- 客户端上层框架使用
- 重新梳理
- 简单聊一下RpcController
- 引入缓冲区队列
- zookeeper
- 总结
介绍
以下博客覆盖内容:
集群和分布式概念原理;
RPC远程调用过程和实现;
为什么采用protobuf;
分布式节点机器如何得知哪台机器部署哪个具体的服务?服务注册中心。ZooKeeper 分布式一致性协调服务应用以及编程
muduo网络库编程;
在进入正文之前,我们需要了解为什么需要RPC框架,什么时候使用RPC框架:
1.为什么要使用分布式的架构?
单机架构受限于硬件资源,单机的服务器不能承受太高的用户并发量。
模块化的设计,当某个模块的内容需要修改的时候,需要编译整个项目。
系统中,有些模块属于CPU密集型的,有些是IO密集型的。所以需要分布式框架来将各个模块部署在不同的硬件资源上面。
当然上面的单机架构受限硬件资源,其实还能采用集群式的服务器来解决,通过一个中间控件来实现负载均衡即可。但是在编译一套代码的时候需要多份部署。
集群比分布式:差别就是在于集群比较相对容易时间,在一些时间紧迫的场景下,采用集群也是可以的。
分布式需要做的就是关于模块划分。并且如果是分布式节点需要在并发场景下面使用,可以对每一个分布式节点上面使用集群。
也有一些不需要高并发,比如后台管理模块就可以不需要集群,那么节点使用单机也是可以的。
使用分布式后,模块之间的编译就可以缩小到分布式节点的集群内部。即编译的范围小了。
当然,自然而然能想到一些问题:
1.各个模块的设计需要合理,否则会出现大量的重复代码?
2.机器1上的一个模块进程1怎么调用机器1的模块进程2里面的一个业务代码呢?服务发现中心,或者将提供服务的主机的ip端口写入配置,由程序读取配置,明显服务发现中心会比较合理。
RPC(Remote Procedure Call Protocol)远程过程调用协议。
红色部分:应用层的调用远端方法。
黄色部分:设计rpc方法参数的打包和解析,也就是数据的序列化和反序列化,使用Protobuf。
绿色部分:网络部分,包括寻找rpc服务主机,发起rpc调用请求和响应rpc调用结果,使用muduo网络
库和zookeeper服务配置中心(专门做服务发现,必须有的)。
mprpc框架主要包含以上两个部分的内容。
为什么使用protobuf
负责结构化数据的序列化和反序列化。效率高等优点
syntax = "proto3"; // 声明了protobuf的版本
package fixbug; // 声明了代码所在的包(对于C++来说是namespace)
// 定义下面的选项,表示生成service服务类和rpc方法描述,默认不生成
option cc_generic_services = true;
// 数据 列表 映射表
// 定义登录请求消息类型 name pwd
message LoginRequest
{
bytes name = 1;
bytes pwd = 2;
}
// 定义登录响应消息类型
message LoginResponse
{
ResultCode result = 1;
bool success = 2;
}
以上面的LoginRequest 为例子,下面是protobuf自动帮我们实现的cpp版本。
message就是对应生成类的名称,定义的字段对应对应的成员方法。
class LoginRequest :
public ::PROTOBUF_NAMESPACE_ID::Message /* @@protoc_insertion_point(class_definition:fixbug.LoginRequest) */ {
public:
LoginRequest();
virtual ~LoginRequest();
LoginRequest(const LoginRequest& from);
LoginRequest(LoginRequest&& from) noexcept
: LoginRequest() {
*this = ::std::move(from);
}
inline LoginRequest& operator=(const LoginRequest& from) {
CopyFrom(from);
return *this;
}
inline LoginRequest& operator=(LoginRequest&& from) noexcept {
if (GetArenaNoVirtual() == from.GetArenaNoVirtual()) {
if (this != &from) InternalSwap(&from);
} else {
CopyFrom(from);
}
return *this;
}
static const ::PROTOBUF_NAMESPACE_ID::Descriptor* descriptor() {
return GetDescriptor();
}
static const ::PROTOBUF_NAMESPACE_ID::Descriptor* GetDescriptor() {
return GetMetadataStatic().descriptor;
}
static const ::PROTOBUF_NAMESPACE_ID::Reflection* GetReflection() {
return GetMetadataStatic().reflection;
}
static const LoginRequest& default_instance();
static void InitAsDefaultInstance(); // FOR INTERNAL USE ONLY
static inline const LoginRequest* internal_default_instance() {
return reinterpret_cast<const LoginRequest*>(
&_LoginRequest_default_instance_);
}
static constexpr int kIndexInFileMessages =
1;
friend void swap(LoginRequest& a, LoginRequest& b) {
a.Swap(&b);
}
inline void Swap(LoginRequest* other) {
if (other == this) return;
InternalSwap(other);
}
// implements Message ----------------------------------------------
inline LoginRequest* New() const final {
return CreateMaybeMessage<LoginRequest>(nullptr);
}
LoginRequest* New(::PROTOBUF_NAMESPACE_ID::Arena* arena) const final {
return CreateMaybeMessage<LoginRequest>(arena);
}
void CopyFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) final;
void MergeFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) final;
void CopyFrom(const LoginRequest& from);
void MergeFrom(const LoginRequest& from);
PROTOBUF_ATTRIBUTE_REINITIALIZES void Clear() final;
bool IsInitialized() const final;
size_t ByteSizeLong() const final;
const char* _InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID::internal::ParseContext* ctx) final;
::PROTOBUF_NAMESPACE_ID::uint8* _InternalSerialize(
::PROTOBUF_NAMESPACE_ID::uint8* target, ::PROTOBUF_NAMESPACE_ID::io::EpsCopyOutputStream* stream) const final;
int GetCachedSize() const final { return _cached_size_.Get(); }
private:
inline void SharedCtor();
inline void SharedDtor();
void SetCachedSize(int size) const final;
void InternalSwap(LoginRequest* other);
friend class ::PROTOBUF_NAMESPACE_ID::internal::AnyMetadata;
static ::PROTOBUF_NAMESPACE_ID::StringPiece FullMessageName() {
return "fixbug.LoginRequest";
}
private:
inline ::PROTOBUF_NAMESPACE_ID::Arena* GetArenaNoVirtual() const {
return nullptr;
}
inline void* MaybeArenaPtr() const {
return nullptr;
}
public:
::PROTOBUF_NAMESPACE_ID::Metadata GetMetadata() const final;
private:
static ::PROTOBUF_NAMESPACE_ID::Metadata GetMetadataStatic() {
::PROTOBUF_NAMESPACE_ID::internal::AssignDescriptors(&::descriptor_table_test_2eproto);
return ::descriptor_table_test_2eproto.file_level_metadata[kIndexInFileMessages];
}
public:
// nested types ----------------------------------------------------
// accessors -------------------------------------------------------
enum : int {
kNameFieldNumber = 1,
kPwdFieldNumber = 2,
};
// bytes name = 1;
void clear_name();
const std::string& name() const;
void set_name(const std::string& value);
void set_name(std::string&& value);
void set_name(const char* value);
void set_name(const void* value, size_t size);
std::string* mutable_name();
std::string* release_name();
void set_allocated_name(std::string* name);
private:
const std::string& _internal_name() const;
void _internal_set_name(const std::string& value);
std::string* _internal_mutable_name();
public:
// bytes pwd = 2;
void clear_pwd();
const std::string& pwd() const;
void set_pwd(const std::string& value);
void set_pwd(std::string&& value);
void set_pwd(const char* value);
void set_pwd(const void* value, size_t size);
std::string* mutable_pwd();
std::string* release_pwd();
void set_allocated_pwd(std::string* pwd);
private:
const std::string& _internal_pwd() const;
void _internal_set_pwd(const std::string& value);
std::string* _internal_mutable_pwd();
public:
// @@protoc_insertion_point(class_scope:fixbug.LoginRequest)
private:
class _Internal;
::PROTOBUF_NAMESPACE_ID::internal::InternalMetadataWithArena _internal_metadata_;
::PROTOBUF_NAMESPACE_ID::internal::ArenaStringPtr name_;
::PROTOBUF_NAMESPACE_ID::internal::ArenaStringPtr pwd_;
mutable ::PROTOBUF_NAMESPACE_ID::internal::CachedSize _cached_size_;
friend struct ::TableStruct_test_2eproto;
};
// -------------------------------------------------------------------
后续main.cc就可以通过生成的.h文件定义对应的对象,填充里面的字段,使用SerializeToString等方法生成对应的格式。
int main()
{
// 封装了login请求对象的数据
LoginRequest req;
req.set_name("zhang san");
req.set_pwd("123456");
// 对象数据序列化 =》 char*
std::string send_str;
if (req.SerializeToString(&send_str))
{
std::cout << send_str.c_str() << std::endl;// zhang san123456
}
// 从send_str反序列化一个login请求对象
LoginRequest reqB;
if (reqB.ParseFromString(send_str))
{
std::cout << reqB.name() << std::endl;
std::cout << reqB.pwd() << std::endl;
}
return 0;
}
结果能发现紧凑的数据内容。
protobuf 有数据,列表,映射表等,后续专门我会出一期博客仔细讲讲protobuf的使用。
映射表就是map<int32,string>
列表就是repeated xxx
// 列表
message GetFriendListsResponse
{
ResultCode result = 1;
repeated User friend_list = 2; // 定义了一个列表类型
}
处理protobuf的列表类型,如果成员变量是另一个类,则使用mutable_result返回对象指针,通过指针来修改。
add_friend_list则是将新增加数据,返回的指针进行内容修改。
int main()
{
// LoginResponse rsp;
// ResultCode *rc = rsp.mutable_result();
// rc->set_errcode(1);
// rc->set_errmsg("登录处理失败了");
GetFriendListsResponse rsp;
ResultCode *rc = rsp.mutable_result();
rc->set_errcode(0);
User *user1 = rsp.add_friend_list();
user1->set_name("zhang san");
user1->set_age(20);
user1->set_sex(User::MAN);
User *user2 = rsp.add_friend_list();
user2->set_name("li si");
user2->set_age(22);
user2->set_sex(User::MAN);
std::cout << rsp.friend_list_size() << std::endl;
return 0;
}
protobuf的service rpc
rpc远程调用的时候,目标主机,方法名称,方法的参数都有发送。
protobuf会提供rpc方法的一个类型。 service
注意需要开启:
// 定义下面的选项,表示生成service服务类和rpc方法描述,默认不生成 option cc_generic_services = true;
否则是不会开启的。
// 在protobuf里面怎么定义描述rpc方法的类型 - service
service UserServiceRpc
{
rpc Login(LoginRequest) returns(LoginResponse);
rpc GetFriendLists(GetFriendListsRequest) returns(GetFriendListsResponse);
}
为什么有UserServiceRpc和UserServiceRpc_Stub类。
message LoginRequest 继承自 google::protobuf::Message,Message 继承自 MessageLite 。序列化和反序列方法都是从MessageLite 继承而来。
bool SerializeToString(std::string* output) const;
service UserServiceRpc
{
rpc Login(LoginRequest) returns(LoginResponse);
rpc Register(RegisterRequest) returns(RegisterResponse);
}
以上就是proto文件的编写,我们可以在生成的.h里面寻找,可以看到UserServiceRpc和UserServiceRpc_Stub。
UserServiceRpc是rpc服务的提供者使用,提供对象需要继承这个类型, UserServiceRpc_Stub 是rpc服务的消费者使用。::PROTOBUF_NAMESPACE_ID::RpcChannel* channel 则是我们的框架内部封装好的。class MprpcChannel : public google::protobuf::RpcChannel
UserServiceRpc_Stub 只有带参数的构造函数,而UserServiceRpc有默认的构造函数。
UserServiceRpc_Stub 和 UserServiceRpc的Login和GetFriendLists是都是虚函数。
UserServiceRpc_Stub 里面有成员变量:PROTOBUF_NAMESPACE_ID::RpcChannel* channel_;
桩类实际上最终调用的继承自RpcChannel的基类,最终UserServiceRpc_Stub 的构造参数传参传入的是RpcChannel的派生类。
UserServiceRpc_Stub(::PROTOBUF_NAMESPACE_ID::RpcChannel* channel);
即重写以下类型
// Abstract interface for an RPC channel. An RpcChannel represents a
// communication line to a Service which can be used to call that Service's
// methods. The Service may be running on another machine. Normally, you
// should not call an RpcChannel directly, but instead construct a stub Service
// wrapping it. Example:
// RpcChannel* channel = new MyRpcChannel("remotehost.example.com:1234");
// MyService* service = new MyService::Stub(channel);
// service->MyMethod(request, &response, callback);
class PROTOBUF_EXPORT RpcChannel {
public:
inline RpcChannel() {}
virtual ~RpcChannel();
// Call the given method of the remote service. The signature of this
// procedure looks the same as Service::CallMethod(), but the requirements
// are less strict in one important way: the request and response objects
// need not be of any specific class as long as their descriptors are
// method->input_type() and method->output_type().
virtual void CallMethod(const MethodDescriptor* method,
RpcController* controller, const Message* request,
Message* response, Closure* done) = 0;
private:
GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(RpcChannel);
};
} // namespace protobuf
} // namespace google
站在我们的发布方,我们本来有一个本地的函数,比如Login,后续我们希望将这个方法提供给其他的节点进行调用。
一开始的cpp方法如下,简化了逻辑,用日志进行替换。
class UserService
{
public:
bool Login(std::string name, std::string pwd)
{
std::cout << "doing local service: Login" << std::endl;
std::cout << "name:" << name << " pwd:" << pwd << std::endl;
return false;
}
- 提供对应的proto文件,描述Login的参数返回值函数名。
syntax = "proto3";
package fixbug;
option cc_generic_services = true;
message ResultCode
{
int32 errcode = 1;
bytes errmsg = 2;
}
message LoginRequest
{
bytes name = 1;
bytes pwd = 2;
}
message LoginResponse
{
ResultCode result = 1;
bool sucess = 2;
}
service UserServiceRpc
{
// 这里实际上不需要和提供的本地的函数的名称相同,但是最好一样
rpc Login(LoginRequest) returns(LoginResponse);
}
2.其次修改userservice.cc,需要调用到user.proto对应的,user.proto是需要调用方和提供方都相同的。
UserService 继承对应的UserServiceRpc。
并且重写基类的Login方法。
Login 的方法的解释:request就是调用方填充的参数,并且是const类型表示不可以修改。
response就可以调用本地的Login(std::string name,std::sting pwd)的返回值。
Closure对象就是一个执行完后的回调函数,并且这个参数必定不是基类,而是派生类。执行响应对象的序列化和网络发送(框架完成)。
以下就是业务代码:其实就是制作proto文件,改变业务代码。
controller在下面做解释。
框架发布方的上层使用逻辑
class UserService : public fixbug::UserServiceRpc // 使用在rpc服务发布端(rpc服务提供者)
{
public:
bool Login(std::string name, std::string pwd)
{
std::cout << "doing local service: Login" << std::endl;
std::cout << "name:" << name << " pwd:" << pwd << std::endl;
return false;
}
/*
重写基类UserServiceRpc的虚函数 下面这些方法都是框架直接调用的
1. caller ===> Login(LoginRequest) => muduo => callee
2. callee ===> Login(LoginRequest) => 交到下面重写的这个Login方法上了
*/
void Login(::google::protobuf::RpcController* controller,
const ::fixbug::LoginRequest* request,
::fixbug::LoginResponse* response,
::google::protobuf::Closure* done)
{
// 框架给业务上报了请求参数LoginRequest,应用获取相应数据做本地业务
std::string name = request->name();
std::string pwd = request->pwd();
// 做本地业务
bool login_result = Login(name, pwd);
// 把响应写入 包括错误码、错误消息、返回值
fixbug::ResultCode *code = response->mutable_result();
code->set_errcode(0);
code->set_errmsg("");
response->set_sucess(login_result);
// 执行回调操作 执行响应对象数据的序列化和网络发送(都是由框架来完成的)
done->Run();
}
框架的提供方需要进行框架的初始化。
int main(int argc, char **argv)
{
// 调用框架的初始化操作
MprpcApplication::Init(argc, argv);
// provider是一个rpc网络服务对象。把UserService对象发布到rpc节点上
RpcProvider provider;
provider.NotifyService(new UserService());
// 启动一个rpc服务发布节点 Run以后,进程进入阻塞状态,等待远程的rpc调用请求
provider.Run();
return 0;
}
客户端要想要运行,可以有两种方法,比如把提供服务的ip端口写入配置,或者用服务发现中心。
框架的提供方逻辑
启动一个rpc服务发布节点,Run函数。
// 启动rpc服务节点,开始提供rpc远程网络调用服务
void RpcProvider::Run()
{
// 读取配置文件rpcserver的信息
std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip");
uint16_t port = atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str());
muduo::net::InetAddress address(ip, port);
// 创建TcpServer对象
muduo::net::TcpServer server(&m_eventLoop, address, "RpcProvider");
// 绑定连接回调和消息读写回调方法 分离了网络代码和业务代码
server.setConnectionCallback(std::bind(&RpcProvider::OnConnection, this, std::placeholders::_1));
server.setMessageCallback(std::bind(&RpcProvider::OnMessage, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));
// 设置muduo库的线程数量
server.setThreadNum(4);
// 把当前rpc节点上要发布的服务全部注册到zk上面,让rpc client可以从zk上发现服务
// session timeout 30s zkclient 网络I/O线程 1/3 * timeout 时间发送ping消息
ZkClient zkCli;
zkCli.Start();
// service_name为永久性节点 method_name为临时性节点
for (auto &sp : m_serviceMap)
{
// /service_name /UserServiceRpc
std::string service_path = "/" + sp.first;
zkCli.Create(service_path.c_str(), nullptr, 0);
for (auto &mp : sp.second.m_methodMap)
{
// /service_name/method_name /UserServiceRpc/Login 存储当前这个rpc服务节点主机的ip和port
std::string method_path = service_path + "/" + mp.first;
char method_path_data[128] = {0};
sprintf(method_path_data, "%s:%d", ip.c_str(), port);
// ZOO_EPHEMERAL表示znode是一个临时性节点
zkCli.Create(method_path.c_str(), method_path_data, strlen(method_path_data), ZOO_EPHEMERAL);
}
}
// rpc服务端准备启动,打印信息
std::cout << "RpcProvider start service at ip:" << ip << " port:" << port << std::endl;
// 启动网络服务
server.start();
m_eventLoop.loop();
}
protobuf只提供了数据的序列化和反序列方法。rpc提供方 RpcProvider::Run()包括了服务的发布功能。
有了上面的铺垫,实际上进行序列化反序列化,底层的网络通信其实都已经实现了,但是还缺少了一样东西,当远端请求我们的服务时,我如何知道请求的是应用程序的哪一个服务对象里面的哪一个服务方法?
即需要维护 Service 类对象 -> Method 类对象的方法(即发布的对象需要将已有的发布方法进行保存,方便后续框架提供者OnMessage函数CallMethod方法进行使用)。并且还需要进行协议制定。
// 存储注册成功的服务对象和其服务方法的所有信息
std::unordered_map<std::string, ServiceInfo> m_serviceMap;
// service服务类型信息
struct ServiceInfo
{
google::protobuf::Service *m_service; // 保存服务对象
std::unordered_map<std::string, const google::protobuf::MethodDescriptor*> m_methodMap; // 保存服务方法
};
到远程有rpc调用请求,底层的muduo库就会调用到OnMessage这个回调函数,在这个回调函数内部,我们需要对发送过来的数据进行反序列化(利用protobuf),然后再获取到对应的调用方法,服务对象,以及对应的参数。由框架内部进行调用。并且框架能够调用到应用层的方法是因为框架提供了对应用层的服务对象进行注册。
由于我们用到的是muduo库的tcpServer编程,所以上层协议得由我们来完成,否则有粘包问题。
自定义协议(辨别调用方需要的是哪一个服务对象的哪一个服务方法,以及传的参数):
header_size(4个字节) + header_str + args_str
新增一个proto文件RpcHeader,这样我们只需要读取四字节知道header_str的长度,然后读取header_str反序列后拿到args_size的长度,接着把args_str读取下来,这样就解决了粘包问题。
message RpcHeader
{
bytes service_name = 1;
bytes method_name = 2;
uint32 args_size = 3;
}
/*
在框架内部,RpcProvider和RpcConsumer协商好之间通信用的protobuf数据类型
service_name method_name args 定义proto的message类型,进行数据头的序列化和反序列化
service_name method_name args_size
16UserServiceLoginzhang san123456
header_size(4个字节) + header_str + args_str
10 "10"
10000 "1000000"
std::string insert和copy方法
*/
// 已建立连接用户的读写事件回调 如果远程有一个rpc服务的调用请求,那么OnMessage方法就会响应
void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr &conn,
muduo::net::Buffer *buffer,
muduo::Timestamp)
{
// 网络上接收的远程rpc调用请求的字符流 Login args
std::string recv_buf = buffer->retrieveAllAsString();
// 从字符流中读取前4个字节的内容
uint32_t header_size = 0;
recv_buf.copy((char*)&header_size, 4, 0);
// 根据header_size读取数据头的原始字符流,反序列化数据,得到rpc请求的详细信息
std::string rpc_header_str = recv_buf.substr(4, header_size);
mprpc::RpcHeader rpcHeader;
std::string service_name;
std::string method_name;
uint32_t args_size;
if (rpcHeader.ParseFromString(rpc_header_str))
{
// 数据头反序列化成功
service_name = rpcHeader.service_name();
method_name = rpcHeader.method_name();
args_size = rpcHeader.args_size();
}
else
{
// 数据头反序列化失败
std::cout << "rpc_header_str:" << rpc_header_str << " parse error!" << std::endl;
return;
}
// 获取rpc方法参数的字符流数据
std::string args_str = recv_buf.substr(4 + header_size, args_size);
// 打印调试信息
std::cout << "============================================" << std::endl;
std::cout << "header_size: " << header_size << std::endl;
std::cout << "rpc_header_str: " << rpc_header_str << std::endl;
std::cout << "service_name: " << service_name << std::endl;
std::cout << "method_name: " << method_name << std::endl;
std::cout << "args_str: " << args_str << std::endl;
std::cout << "============================================" << std::endl;
// 获取service对象和method对象
auto it = m_serviceMap.find(service_name);
if (it == m_serviceMap.end())
{
std::cout << service_name << " is not exist!" << std::endl;
return;
}
auto mit = it->second.m_methodMap.find(method_name);
if (mit == it->second.m_methodMap.end())
{
std::cout << service_name << ":" << method_name << " is not exist!" << std::endl;
return;
}
google::protobuf::Service *service = it->second.m_service; // 获取service对象 new UserService
const google::protobuf::MethodDescriptor *method = mit->second; // 获取method对象 Login
// 生成rpc方法调用的请求request和响应response参数
google::protobuf::Message *request = service->GetRequestPrototype(method).New();
if (!request->ParseFromString(args_str))
{
std::cout << "request parse error, content:" << args_str << std::endl;
return;
}
google::protobuf::Message *response = service->GetResponsePrototype(method).New();
// 给下面的method方法的调用,绑定一个Closure的回调函数
google::protobuf::Closure *done = google::protobuf::NewCallback<RpcProvider, const muduo::net::TcpConnectionPtr&,google::protobuf::Message*> (this,&RpcProvider::SendRpcResponse, conn, response);
// 在框架上根据远端rpc请求,调用当前rpc节点上发布的方法
// new UserService().Login(controller, request, response, done)
service->CallMethod(method, nullptr, request, response, done);
}
发布方调用函数service->CallMethod调用的方法如下。
void UserServiceRpc::CallMethod(const ::PROTOBUF_NAMESPACE_ID::MethodDescriptor* method,
::PROTOBUF_NAMESPACE_ID::RpcController* controller,
const ::PROTOBUF_NAMESPACE_ID::Message* request,
::PROTOBUF_NAMESPACE_ID::Message* response,
::google::protobuf::Closure* done) {
GOOGLE_DCHECK_EQ(method->service(), file_level_service_descriptors_user_2eproto[0]);
switch(method->index()) {
case 0:
Login(controller,
::PROTOBUF_NAMESPACE_ID::internal::DownCast<const ::fixbug::LoginRequest*>(
request),
::PROTOBUF_NAMESPACE_ID::internal::DownCast<::fixbug::LoginResponse*>(
response),
done);
break;
case 1:
Register(controller,
::PROTOBUF_NAMESPACE_ID::internal::DownCast<const ::fixbug::RegisterRequest*>(
request),
::PROTOBUF_NAMESPACE_ID::internal::DownCast<::fixbug::RegisterResponse*>(
response),
done);
break;
default:
GOOGLE_LOG(FATAL) << "Bad method index; this should never happen.";
break;
}
}
紧接着就是回调函数的设置。回调函数的内容就是将response的内容进行序列化并且发送,这很简单,用protobuf封装好的接口即可。
// Closure的回调操作,用于序列化rpc的响应和网络发送
void RpcProvider::SendRpcResponse(const muduo::net::TcpConnectionPtr& conn, google::protobuf::Message *response)
{
std::string response_str;
if (response->SerializeToString(&response_str)) // response进行序列化
{
// 序列化成功后,通过网络把rpc方法执行的结果发送会rpc的调用方
conn->send(response_str);
}
else
{
std::cout << "serialize response_str error!" << std::endl;
}
conn->shutdown(); // 模拟http的短链接服务,由rpcprovider主动断开连接
}
Rpc调用方框架实现
上面说到了调用方需要利用到的是Stub类。Stub类需要提供一个带参数的构造函数,我们需要重写这个实参RpcChannel。
提供方调用函数的方法:MprpcChannel::CallMethod,调用方的框架逻辑就是将访问的对象,函数,参数序列化,socket连接到zookeeper,获取对应的response。
比较简单,就一个函数重写。
class MprpcChannel : public google::protobuf::RpcChannel
class MprpcChannel : public google::protobuf::RpcChannel
{
public:
// 所有通过stub代理对象调用的rpc方法,都走到这里了,统一做rpc方法调用的数据数据序列化和网络发送
void CallMethod(const google::protobuf::MethodDescriptor* method,
google::protobuf::RpcController* controller,
const google::protobuf::Message* request,
google::protobuf::Message* response,
google::protobuf:: Closure* done);
};
/*
header_size + service_name method_name args_size + args
*/
// 所有通过stub代理对象调用的rpc方法,都走到这里了,统一做rpc方法调用的数据数据序列化和网络发送
void MprpcChannel::CallMethod(const google::protobuf::MethodDescriptor* method,
google::protobuf::RpcController* controller,
const google::protobuf::Message* request,
google::protobuf::Message* response,
google::protobuf:: Closure* done){
// 序列化发送数据
// 发送序列化数据,此时不涉及高并发,普通的socket编程即可。
const google::protobuf::ServiceDescriptor* sd = method->service();
std::string service_name = sd->name(); // service_name
std::string method_name = method->name(); // method_name
// 获取参数的序列化字符串长度 args_size
uint32_t args_size = 0;
std::string args_str;
// 这个request是客户端传给服务器的参数
if (request->SerializeToString(&args_str))
{
// 记录的args_str 序列化后的长度。 由于除了args_str,还有service_name,method_name,辅助size,所以弄了一个proto来专门序列化他们
args_size = args_str.size();
}
else
{
controller->SetFailed("serialize request error!");
return;
}
// 定义rpc的请求header
mprpc::RpcHeader rpcHeader;
rpcHeader.set_service_name(service_name);
rpcHeader.set_method_name(method_name);
rpcHeader.set_args_size(args_size);
uint32_t header_size = 0;
std::string rpc_header_str;
if (rpcHeader.SerializeToString(&rpc_header_str))
{
header_size = rpc_header_str.size();
}
else
{
controller->SetFailed("serialize rpc header error!");
return;
}
// 组织待发送的rpc请求的字符串
std::string send_rpc_str;
send_rpc_str.insert(0, std::string((char*)&header_size, 4)); // header_size
send_rpc_str += rpc_header_str; // rpcheader
send_rpc_str += args_str; // args
// 打印调试信息
std::cout << "============================================" << std::endl;
std::cout << "header_size: " << header_size << std::endl;
std::cout << "rpc_header_str: " << rpc_header_str << std::endl;
std::cout << "service_name: " << service_name << std::endl;
std::cout << "method_name: " << method_name << std::endl;
std::cout << "args_str: " << args_str << std::endl;
std::cout << "============================================" << std::endl;
// 使用tcp编程,完成rpc方法的远程调用
int clientfd = socket(AF_INET, SOCK_STREAM, 0);
if (-1 == clientfd)
{
char errtxt[512] = {0};
sprintf(errtxt, "create socket error! errno:%d", errno);
controller->SetFailed(errtxt);
return;
}
// 读取配置文件rpcserver的信息
// std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip");
// uint16_t port = atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str());
// rpc调用方想调用service_name的method_name服务,需要查询zk上该服务所在的host信息
ZkClient zkCli;
zkCli.Start();
// /UserServiceRpc/Login
std::string method_path = "/" + service_name + "/" + method_name;
// 127.0.0.1:8000
std::string host_data = zkCli.GetData(method_path.c_str());
if (host_data == "")
{
controller->SetFailed(method_path + " is not exist!");
return;
}
int idx = host_data.find(":");
if (idx == -1)
{
controller->SetFailed(method_path + " address is invalid!");
return;
}
std::string ip = host_data.substr(0, idx);
uint16_t port = atoi(host_data.substr(idx+1, host_data.size()-idx).c_str());
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = inet_addr(ip.c_str());
// 连接rpc服务节点
if (-1 == connect(clientfd, (struct sockaddr*)&server_addr, sizeof(server_addr)))
{
close(clientfd);
char errtxt[512] = {0};
sprintf(errtxt, "connect error! errno:%d", errno);
controller->SetFailed(errtxt);
return;
}
// 发送rpc请求
if (-1 == send(clientfd, send_rpc_str.c_str(), send_rpc_str.size(), 0))
{
close(clientfd);
char errtxt[512] = {0};
sprintf(errtxt, "send error! errno:%d", errno);
controller->SetFailed(errtxt);
return;
}
// 接收rpc请求的响应值
char recv_buf[1024] = {0};
int recv_size = 0;
if (-1 == (recv_size = recv(clientfd, recv_buf, 1024, 0)))
{
close(clientfd);
char errtxt[512] = {0};
sprintf(errtxt, "recv error! errno:%d", errno);
controller->SetFailed(errtxt);
return;
}
// 反序列化rpc调用的响应数据
// std::string response_str(recv_buf, 0, recv_size); // bug出现问题,recv_buf中遇到\0后面的数据就存不下来了,导致反序列化失败
// if (!response->ParseFromString(response_str))
if (!response->ParseFromArray(recv_buf, recv_size))
{
close(clientfd);
char errtxt[512] = {0};
sprintf(errtxt, "parse error! response_str:%s", recv_buf);
controller->SetFailed(errtxt);
return;
}
close(clientfd);
}
客户端上层框架使用
客户端的主函数只需要创建桩类 fixbug::UserServiceRpc_Stub stub(new MprpcChannel());,其次封装请求参数,和一个返回的对象,最终我们可以通过返回对象的errcode判断是否失败,调用过程属于同步的过程。
int main(int argc, char **argv)
{
// 整个程序启动以后,想使用mprpc框架来享受rpc服务调用,一定需要先调用框架的初始化函数(只初始化一次)
MprpcApplication::Init(argc, argv);
// 演示调用远程发布的rpc方法Login
fixbug::UserServiceRpc_Stub stub(new MprpcChannel());
// rpc方法的请求参数
fixbug::LoginRequest request;
request.set_name("zhang san");
request.set_pwd("123456");
// rpc方法的响应
fixbug::LoginResponse response;
// 发起rpc方法的调用 同步的rpc调用过程 MprpcChannel::callmethod
stub.Login(nullptr, &request, &response, nullptr); // RpcChannel->RpcChannel::callMethod 集中来做所有rpc方法调用的参数序列化和网络发送
// 一次rpc调用完成,读调用的结果
if (0 == response.result().errcode())
{
std::cout << "rpc login response success:" << response.sucess() << std::endl;
}
else
{
std::cout << "rpc login response error : " << response.result().errmsg() << std::endl;
}
// 演示调用远程发布的rpc方法Register
fixbug::RegisterRequest req;
req.set_id(2000);
req.set_name("mprpc");
req.set_pwd("666666");
fixbug::RegisterResponse rsp;
// 以同步的方式发起rpc调用请求,等待返回结果
stub.Register(nullptr, &req, &rsp, nullptr);
// 一次rpc调用完成,读调用的结果
if (0 == rsp.result().errcode())
{
std::cout << "rpc register response success:" << rsp.sucess() << std::endl;
}
else
{
std::cout << "rpc register response error : " << rsp.result().errmsg() << std::endl;
}
return 0;
}
重新梳理
由发布方添加proto文件,重写一套cpp函数,进行业务逻辑处理。
调用方调用Stub类对象即可。可以看到框架完成后,实际上对于客户端是比较友好的。服务端需要新增服务则需要新增proto文件以及重写对应的Service方法。
其实大体来看都已经实现了,还剩下就是关于zookeeper网络服务配置中心的讲解,在上面的例子中,代码实际已经给出,我们是将提供服务的ip端口直接写入配置文件,但是后面我们配置了zookeeper后只需要去zookeeper里面找需要的网络服务即可。
简单聊一下RpcController
以及RpcController 的讲解。
RpcController 的作用就是防止例如我的客户端底层网络序列化就出错了,假如客户端把request进行序列化,或者其他操作,没有把请求发送出去,此时出现了问题,实际上是可以通过记录在controller了,后续才能知道response这个结构体是否可以正常使用。也就是只要是任何一个环节网络发送之前出错,那就可能会造成问题。
RpcController实际上就是一个抽象类,它里面有一些抽象方法,表示在rpc过程当中出现的状态。通常我们需要的是错误。
在mpcpchannel当中会遇到错误的地方都用controller记录所有的错误。
这样用户调用后只需要检查controller是否有携带错误数据即可。
引入缓冲区队列
由于后续新增日志,涉及大量IO(若是同步IO,由于直接进行磁盘访问的等待,效率会很低),所以采用异步式的IO方式,添加一个缓冲区来实现。并且由于RpcProvider是一个epoll+多线程的模型(muduo库的底层实现),所以队列还得需要保证多线程并发场景下使用。
核心结构如下:
#pragma once
#include <queue>
#include <thread>
#include <mutex> // pthread_mutex_t
#include <condition_variable> // pthread_condition_t
// 异步写日志的日志队列
template<typename T>
class LockQueue
{
public:
// 多个worker线程都会写日志queue
void Push(const T &data)
{
std::lock_guard<std::mutex> lock(m_mutex);
m_queue.push(data);
m_condvariable.notify_one();
}
// 一个线程读日志queue,写日志文件
T Pop()
{
std::unique_lock<std::mutex> lock(m_mutex);
while (m_queue.empty())
{
// 日志队列为空,线程进入wait状态
m_condvariable.wait(lock);
}
T data = m_queue.front();
m_queue.pop();
return data;
}
private:
std::queue<T> m_queue;
std::mutex m_mutex;
std::condition_variable m_condvariable;
};
zookeeper
zookeeper 除了作为服务注册中心,还有分布式锁,全局分布式锁等等。
znode 的节点都是保存到我们的磁盘的。 我们可以更改zookeeper里面的配置文件,dataDir,更改为我们的其他路径,因为他默认的是/tmp,每次重启后数据就会更新。
运行./zkServer.sh start 即可运行在后台。通过ps -ef |grep zookper 就可以看到他在哪里运行。zookeeper需要在服务配置中心配置了之后,服务端之后需要定时向服务配置中心刷新,不然就会认为这个服务挂掉。 zk节点分为临时性和永久性,临时性的会在rpc节点超时未发送心跳信息后,zk节点会自动删除临时性节点。
zk的watcher机制:通过api提供监听节点的变化。zk会通知客户端节点发生变化了。
zk有单线程和多线程版本,zookeeper_mt 和 zookeeper_st 版本。
zk原生api也有缺陷,客户端设置watcher只会通知一次。只能存储简单的byte字节数组,如果需要存储对象,那么需要自己将对象转化为字节数组。
zk会在1/3的Timeout自动发送心跳检测(若没有心跳检测则会导致zk删除所有的临时性节点)假设30s为超时,那么10s会发送一次心跳检测。
zookeeperutil.h
#pragma once
#include <semaphore.h>
#include <zookeeper/zookeeper.h>
#include <string>
// 封装的zk客户端类
class ZkClient
{
public:
ZkClient();
~ZkClient();
// zkclient启动连接zkserver
void Start();
// 在zkserver上根据指定的path创建znode节点
void Create(const char *path, const char *data, int datalen, int state=0);
// 根据参数指定的znode节点路径,或者znode节点的值
std::string GetData(const char *path);
private:
// zk的客户端句柄
zhandle_t *m_zhandle;
};
zookeeperutil.cc
#include "zookeeperutil.h"
#include "mprpcapplication.h"
#include <semaphore.h>
#include <iostream>
// 全局的watcher观察器 zkserver给zkclient的通知
void global_watcher(zhandle_t *zh, int type,
int state, const char *path, void *watcherCtx)
{
if (type == ZOO_SESSION_EVENT) // 回调的消息类型是和会话相关的消息类型
{
if (state == ZOO_CONNECTED_STATE) // zkclient和zkserver连接成功
{
sem_t *sem = (sem_t*)zoo_get_context(zh);
sem_post(sem);
}
}
}
ZkClient::ZkClient() : m_zhandle(nullptr)
{
}
ZkClient::~ZkClient()
{
if (m_zhandle != nullptr)
{
zookeeper_close(m_zhandle); // 关闭句柄,释放资源 MySQL_Conn
}
}
// 连接zkserver
void ZkClient::Start()
{
std::string host = MprpcApplication::GetInstance().GetConfig().Load("zookeeperip");
std::string port = MprpcApplication::GetInstance().GetConfig().Load("zookeeperport");
std::string connstr = host + ":" + port;
/*
zookeeper_mt:多线程版本
zookeeper的API客户端程序提供了三个线程
API调用线程
网络I/O线程 pthread_create poll
watcher回调线程 pthread_create
*/
m_zhandle = zookeeper_init(connstr.c_str(), global_watcher, 30000, nullptr, nullptr, 0);
if (nullptr == m_zhandle)
{
std::cout << "zookeeper_init error!" << std::endl;
exit(EXIT_FAILURE);
}
sem_t sem;
sem_init(&sem, 0, 0);
zoo_set_context(m_zhandle, &sem);
sem_wait(&sem);
std::cout << "zookeeper_init success!" << std::endl;
}
void ZkClient::Create(const char *path, const char *data, int datalen, int state)
{
char path_buffer[128];
int bufferlen = sizeof(path_buffer);
int flag;
// 先判断path表示的znode节点是否存在,如果存在,就不再重复创建了
flag = zoo_exists(m_zhandle, path, 0, nullptr);
if (ZNONODE == flag) // 表示path的znode节点不存在
{
// 创建指定path的znode节点了
flag = zoo_create(m_zhandle, path, data, datalen,
&ZOO_OPEN_ACL_UNSAFE, state, path_buffer, bufferlen);
if (flag == ZOK)
{
std::cout << "znode create success... path:" << path << std::endl;
}
else
{
std::cout << "flag:" << flag << std::endl;
std::cout << "znode create error... path:" << path << std::endl;
exit(EXIT_FAILURE);
}
}
}
// 根据指定的path,获取znode节点的值
std::string ZkClient::GetData(const char *path)
{
char buffer[64];
int bufferlen = sizeof(buffer);
int flag = zoo_get(m_zhandle, path, 0, buffer, &bufferlen, nullptr);
if (flag != ZOK)
{
std::cout << "get znode error... path:" << path << std::endl;
return "";
}
else
{
return buffer;
}
}
service name为永久节点,method name 为临时性节点。
总结
RPC框架到此为止~
- 喜欢就收藏
- 认同就点赞
- 支持就关注
- 疑问就评论