什么是gRPC
RPC 即远程过程调用协议(Remote Procedure Call Protocol),可以让我们像调用本地对象一样发起
远程调用。RPC 凭借其强大的治理功能,成为解决分布式系统通信问题的一大利器。
gRPC是一个现代的、高性能、开源的和语言无关的通用 RPC 框架,基于 HTTP2 协议设计,序列化使用
PB(Protocol Buffer),PB 是一种语言无关的高性能序列化框架,基于 HTTP2+PB 保证了的高性能。
本文介绍如何使用C++利用gRPC来编写client和server的代码。
rpc范例的一般编写流程
- 编写proto文件
- 根据proto文件生成对应的.cc和.h文件
- server程序继承service
- client程序使用stub
- 编译server和client程序
proto文件编写
syntax = "proto3";
package IM.Login; //生成.h文件后变为IM::Login namespace
//定义服务
service ImLogin {
//定义服务函数
rpc Regist (IMRegisterReq) returns (IMRegisterRes) {}
rpc Login (IMLoginReq) returns (IMLoginRes) {}
}
//注册账号,根据这个生成一个C++类,并自动提供一些修改和获取成员的方法
message IMRegisterReq {
string user_name = 1;
string password = 2;
}
message IMRegisterRes {
string user_name = 1;
uint32 user_id = 2;
uint32 result_code = 3;
}
message IMLoginReq {
string user_name = 1;
string password = 2;
}
message IMLoginRes {
uint32 user_id = 1;
uint32 result_code = 2;
}
输入如下指令生成protobuf序列化后的代码 IM.Login.pb.cc和IM.Login.pb.h:
jyhlinux@ubuntu:~/grpc-v1.45.2/examples/cpp/im_login$ protoc --cpp_out=. IM.Login.proto
// $SRC_DIR: .proto 所在的源目录
// --cpp_out: 生成 c++ 代码
// $DST_DIR: 生成代码的目标目录
// xxx.proto: 要针对哪个 proto 文件生成接口代码
protoc -I=$SRC_DIR --cpp_out=$DST_DIR $SRC_DIR/xxx.proto
生成的.cc文件和.h文件的类型是什么样呢?举个例子,比如下方proto代码
message IMLoginReq {
string user_name = 1;
string password = 2;
}
生成类似于如下的C++代码:IM.Login.pb.h(不完全)
class IMLoginReq final {
// string user_name = 1;
void clear_user_name();
void set_user_name(ArgT0&& arg0, ArgT... args);
// string password = 2;
void clear_password();
void set_user_name(ArgT0&& arg0, ArgT... args);
};
输入下方指令生成服务框架代码
jyhlinux@ubuntu:~/grpc-v1.45.2/examples/cpp/im_login$ protoc --cpp_out=. --grpc_out=. --plugin=protoc-gen-grpc=/usr/local/bin/grpc_cpp_plugin IM.Login.proto
生成服务框架的代码,在当前目录下生成 simple.grpc.pb.h 和 simple.grpc.pb.cc 文件
grpc server端代码编写
注意要引入正确的命名空间和头文件
#include <iostream>
#include <string>
#include <memory>
//grpc头文件
#include <grpcpp/ext/proto_server_reflection_plugin.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/health_check_service_interface.h>
#include "IM.Login.grpc.pb.h"
#include "IM.Login.pb.h"
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using IM::Login::ImLogin;
using IM::Login::IMRegisterReq;
using IM::Login::IMRegisterRes;
using IM::Login::IMLoginReq;
using IM::Login::IMLoginRes;
class IMLoginServiceImpl : public ImLogin::Service{
virtual Status Regist(ServerContext* context, const IMRegisterReq* request, IMRegisterRes* response) override {
std::cout<<"Regist user_name:" << request->user_name() << std::endl;
response->set_user_name(request->user_name()); // 这也是插件自动生成的函数 set_属性名()
response->set_user_id(10);
response->set_result_code(0);
return Status::OK;
}
virtual Status Login(ServerContext* context, const IMLoginReq* request, IMLoginRes* response) override {
std::cout << "Login user_name: " << request->user_name() << std::endl;
response->set_user_id(10);
response->set_result_code(0);
return Status::OK;
}
};
void RunServer() {
std::string server_addr("0.0.0.0:50001");
//服务类实例
IMLoginServiceImpl service;
ServerBuilder builder;
//第二参数: The credentials associated with the server.
builder.AddListeningPort(server_addr, grpc::InsecureServerCredentials());
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIME_MS, 5000);
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 10000);
//是否允许在没有任何未完成流的情况下发送保持连接的ping包
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
builder.RegisterService(&service);
//创建启动
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_addr << std::endl;
server->Wait();
}
int main(int argc, const char** argv) {
RunServer();
return 0;
}
grpc client端
#include <iostream>
#include <string>
#include <memory>
//grpc头文件
#include <grpcpp/ext/proto_server_reflection_plugin.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/health_check_service_interface.h>
#include "IM.Login.grpc.pb.h"
#include "IM.Login.pb.h"
using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;
using IM::Login::ImLogin;
using IM::Login::IMRegisterReq;
using IM::Login::IMRegisterRes;
using IM::Login::IMLoginReq;
using IM::Login::IMLoginRes;
class ImLoginClient{
public:
ImLoginClient(std::shared_ptr<Channel> channel)
:stub_(ImLogin::NewStub(channel)){}
void Regist(const std::string &user_name, const std::string &password) {
IMRegisterReq request;
request.set_user_name(user_name);
request.set_password(password);
IMRegisterRes response;
ClientContext context;
std::cout << "-> Regist req" << std::endl;
Status status = stub_->Regist(&context, request, &response);
if(status.ok()) {
std::cout << "user_name: " << response.user_name() << ", user_id: " << response.user_id() << std::endl;
} else {
std::cout << "user_name: " << response.user_name() << ",Regist failed: " << response.result_code() << std::endl;
}
}
void Login(const std::string &user_name, const std::string &password) {
IMLoginReq request;
request.set_user_name(user_name);
request.set_password(password);
IMLoginRes response;
ClientContext context;
std::cout << "-> Login req" << std::endl;
Status status = stub_->Login(&context, request, &response);
if(status.ok()) {
std::cout << "user_id: " << response.user_id() << ", login ok" << std::endl;
} else {
std::cout << "user_name: " << request.user_name() << ",Login failed: " << response.result_code() << std::endl;
}
}
private:
std::unique_ptr<ImLogin::Stub> stub_;
};
int main() {
std::string server_addr = "localhost:50001";
ImLoginClient im_login_client(
grpc::CreateChannel(server_addr, grpc::InsecureChannelCredentials())
);
std::string user_name = "jyh";
std::string password = "123456";
im_login_client.Regist(user_name, password);
im_login_client.Login(user_name, password);
return 0;
}
异步gRPC server端代码编写
#include <iostream>
#include <string>
#include <thread>
#include <memory>
#include "IM.Login.grpc.pb.h"
#include "IM.Login.pb.h"
#include <grpc/support/log.h>
#include <grpcpp/grpcpp.h>
using grpc::Server;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerCompletionQueue; //
using grpc::ServerContext;
using grpc::Status;
using IM::Login::ImLogin;
using IM::Login::IMRegisterReq;
using IM::Login::IMRegisterRes;
using IM::Login::IMLoginReq;
using IM::Login::IMLoginRes;
class ServerImpl final {
public:
~ServerImpl() {
server_->Shutdown();
cq_->Shutdown();
}
void Run() {
std::string server_address("0.0.0.0:50001");
//成员初始化
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service_); // 初始化service_
cq_ = builder.AddCompletionQueue();
server_ = builder.BuildAndStart(); //
std::cout << "Server listening on " << server_address << std::endl;
//Proceed to the server's main loop
HandleRpcs();
}
private:
class CallData{
public:
CallData(ImLogin::AsyncService* service, ServerCompletionQueue* cq)
:service_(service), cq_(cq), status_(CREATE){
std::cout << "CallData constructing, this:" << this << std::endl;
Proceed();
}
virtual ~CallData(){}
virtual void Proceed() {
return;
}
//与异步服务器的 gRPC 运行时进行通信的方式。
ImLogin::AsyncService* service_;
ServerCompletionQueue *cq_;
ServerContext ctx_;
enum CallStatus{CREATE, PROCESS, FINISH};
CallStatus status_; //current serving state
};
class RegistCallData : public CallData {
public:
RegistCallData(ImLogin::AsyncService* service, ServerCompletionQueue* cq)
:CallData(service, cq), responder_(&ctx_){
Proceed();
}
~RegistCallData(){}
void Proceed() override {
std::cout << "this: " << this
<<" RegistCallData Proceed(), status : " << status_
<< std::endl;
//状态机
if(status_ == CREATE) {
std::cout << "this: " << this
<<" RegistCallData Proceed(), status : " << "CREATE"
<< std::endl;
status_ = PROCESS; //
//this唯一标识这个request(故不同实例可并发的服务不同的requests)
service_->RequestRegist(&ctx_, &request_, &responder_, cq_, cq_,this);
} else if(status_ == PROCESS) {
std::cout << "this: " << this <<" RegistCallData Proceed(), status : " << "PROCESS"<< std::endl;
new RegistCallData(service_, cq_); //1. 创建处理逻辑
reply_.set_user_name(request_.user_name());
reply_.set_user_id(10);
reply_.set_result_code(0);
status_ = FINISH;
responder_.Finish(reply_, Status::OK, this);
} else {
std::cout << "this: " << this <<" RegistCallData Proceed(), status : " << "FINISH"<< std::endl;
GPR_ASSERT(status_ == FINISH);
delete this;
}
}
private:
IMRegisterReq request_;
IMRegisterRes reply_;
ServerAsyncResponseWriter<IMRegisterRes> responder_;
};
class LoginCallData : public CallData {
public:
LoginCallData(ImLogin::AsyncService* service, ServerCompletionQueue* cq)
:CallData(service, cq), responder_(&ctx_){
Proceed();
}
~LoginCallData(){}
void Proceed() override {
std::cout << "this: " << this
<<" LoginCallData Proceed(), status : " << status_
<< std::endl;
//状态机
if(status_ == CREATE) {
std::cout << "this: " << this
<<" LoginCallData Proceed(), status : " << "CREATE"
<< std::endl;
status_ = PROCESS; //
//this唯一标识这个request(故不同实例可并发的服务不同的requests)
service_->RequestLogin(&ctx_, &request_, &responder_, cq_, cq_,this);
} else if(status_ == PROCESS) {
std::cout << "this: " << this <<" LoginCallData Proceed(), status : " << "PROCESS"<< std::endl;
new LoginCallData(service_, cq_); //1. 创建处理逻辑
reply_.set_user_id(10);
reply_.set_result_code(0);
status_ = FINISH;
responder_.Finish(reply_, Status::OK, this);
} else {
std::cout << "this: " << this <<" LoginCallData Proceed(), status : " << "FINISH"<< std::endl;
GPR_ASSERT(status_ == FINISH);
delete this;
}
}
private:
IMLoginReq request_;
IMLoginRes reply_;
ServerAsyncResponseWriter<IMLoginRes> responder_;
};
void HandleRpcs() { // 可运行在多线程
new RegistCallData(&service_, cq_.get()); //
new LoginCallData(&service_, cq_.get());
void* tag;
bool ok;
while(true) {
std::cout << "before cq_->Next "<< std::endl;
// Read from the queue, blocking until an event is available or the queue is
// shutting down.
GPR_ASSERT(cq_->Next(&tag, &ok));
std::cout << "after cq_->Next " << std::endl;
GPR_ASSERT(ok);
std::cout << "before static_cast" << std::endl;
static_cast<CallData*>(tag)->Proceed();
std::cout << "after static_cast" << std::endl;
}
}
std::unique_ptr<ServerCompletionQueue> cq_;
ImLogin::AsyncService service_; //与异步服务器的 gRPC 运行时进行通信的方式
std::unique_ptr<Server> server_;
};
int main() {
ServerImpl server;
server.Run();
return 0;
}
CMakeLists文件编写
这个CMakeLists.txt
是从/grpc-v1.45.2/examples/cpp/helloworld
中拷贝出来进行修改的,
这个CMakeLists.txt文件放在稍后会创建的build目录下(examples/cpp/im_login/build/
)
#修改的地方用changed标记
cmake_minimum_required(VERSION 3.5.1)
project(IMLogin C CXX) #changed
include(../cmake/common.cmake)
# Proto file
get_filename_component(im_proto "./IM.Login.proto" ABSOLUTE) #changed
get_filename_component(im_proto_path "${im_proto}" PATH) #changed
# Generated sources
set(im_proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/IM.Login.pb.cc") #changed
set(im_proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/IM.Login.pb.h") #changed
set(im_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/IM.Login.grpc.pb.cc") #changed
set(im_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/IM.Login.grpc.pb.h") #changed
add_custom_command( #changed
OUTPUT "${im_proto_srcs}" "${im_proto_hdrs}" "${im_grpc_srcs}" "${im_grpc_hdrs}"
COMMAND ${_PROTOBUF_PROTOC}
ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}"
--cpp_out "${CMAKE_CURRENT_BINARY_DIR}"
-I "${im_proto_path}"
--plugin=protoc-gen-grpc="${_GRPC_CPP_PLUGIN_EXECUTABLE}"
"${im_proto}"
DEPENDS "${im_proto}")
# Include generated *.pb.h files
include_directories("${CMAKE_CURRENT_BINARY_DIR}")
# im_grpc_proto
#changed
add_library(im_grpc_proto
${im_grpc_srcs}
${im_grpc_hdrs}
${im_proto_srcs}
${im_proto_hdrs})
target_link_libraries(im_grpc_proto
${_REFLECTION}
${_GRPC_GRPCPP}
${_PROTOBUF_LIBPROTOBUF})
# Targets greeter_[async_](client|server)
foreach(_target
client server async_server #生成的可执行程序名
)
add_executable(${_target} "${_target}.cc")
target_link_libraries(${_target}
im_grpc_proto
${_REFLECTION}
${_GRPC_GRPCPP}
${_PROTOBUF_LIBPROTOBUF})
endforeach()
编译测试
cd ~/grpc-v1.45.2/examples/cpp/im_login/
mkdir build
cmake ..
make
进行测试1
#终端1执行
./server
#终端2执行
./client
进行测试2
#终端1执行
./async_server
#终端2执行
./client
测试结果:
服务端
jyhlinux@ubuntu:~/grpc-v1.45.2/examples/cpp/im_login/build$ ./async_server
Server listening on 0.0.0.0:50001
CallData constructing, this:0x55a0d9d11c50
this: 0x55a0d9d11c50 RegistCallData Proceed(), status : 0
this: 0x55a0d9d11c50 RegistCallData Proceed(), status : CREATE
CallData constructing, this:0x55a0d9d12420
this: 0x55a0d9d12420 LoginCallData Proceed(), status : 0
this: 0x55a0d9d12420 LoginCallData Proceed(), status : CREATE
before cq_->Next
after cq_->Next
before static_cast
this: 0x55a0d9d11c50 RegistCallData Proceed(), status : 1
this: 0x55a0d9d11c50 RegistCallData Proceed(), status : PROCESS
CallData constructing, this:0x55a0d9d28620
this: 0x55a0d9d28620 RegistCallData Proceed(), status : 0
this: 0x55a0d9d28620 RegistCallData Proceed(), status : CREATE
after static_cast
before cq_->Next
after cq_->Next
before static_cast
this: 0x55a0d9d11c50 RegistCallData Proceed(), status : 2
this: 0x55a0d9d11c50 RegistCallData Proceed(), status : FINISH
after static_cast
before cq_->Next
after cq_->Next
before static_cast
this: 0x55a0d9d12420 LoginCallData Proceed(), status : 1
this: 0x55a0d9d12420 LoginCallData Proceed(), status : PROCESS
CallData constructing, this:0x55a0d9d11c50
this: 0x55a0d9d11c50 LoginCallData Proceed(), status : 0
this: 0x55a0d9d11c50 LoginCallData Proceed(), status : CREATE
after static_cast
before cq_->Next
after cq_->Next
before static_cast
this: 0x55a0d9d12420 LoginCallData Proceed(), status : 2
this: 0x55a0d9d12420 LoginCallData Proceed(), status : FINISH
after static_cast
before cq_->Next
客户端
jyhlinux@ubuntu:~/grpc-v1.45.2/examples/cpp/im_login/build$ ./client
-> Regist req
user_name: jyh, user_id: 10
-> Login req
user_id: 10, login ok