目录
- gRPC原理
- 网络传输效率问题
- 基本概念概览
- Client
- Server
- 异步相关概念
- 异步 Client
- 异步 Server
gRPC原理
RPC 即远程过程调用协议(Remote Procedure Call Protocol),可以让我们像调用本地对象一样发起
远程调用。
网络传输效率问题
HTTP1.1核心问题在于:在同一个TCP连接中,没办法区分response是属于哪个请求,一旦多个请求返
回的文本内容混在一起,则没法区分数据归属于哪个请求,所以请求只能一个个串行排队发送。这直接
导致了TCP资源的闲置。
HTTP2为了解决这个问题,提出了 流 的概念,每一次请求对应一个流,有一个唯一ID,用来区分不同的
请求。基于流的概念,进一步提出了 帧 ,一个请求的数据会被分成多个帧,方便进行数据分割传输,每
个帧都唯一属于某一个流ID,将帧按照流ID进行分组,即可分离出不同的请求。这样同一个TCP连接中
就可以同时并发多个请求,不同请求的帧数据可穿插在一起,根据流ID分组即可。HTTP2.0基于这种二
进制协议的乱序模式 (Duplexing),直接解决了HTTP1.1的核心痛点,通过这种复用TCP连接的方式,不
用再同时建多个连接,提升了TCP的利用效率。
基本概念概览
Client
Client是对 Stub 封装;通过 Stub 可以真正的调用 RPC 请求。
class GreeterClient {
public:
GreeterClient(std::shared_ptr<Channel> channel)
: stub_(Greeter::NewStub(channel)) {}
std::string SayHello(const std::string& user) {
...
private:
std::unique_ptr<Greeter::Stub> stub_;
};
Channel 提供一个与特定 gRPC server 的主机和端口建立的连接。
Stub 就是在 Channel 的基础上创建而成的。
target_str = "localhost:50051";
auto channel =
grpc::CreateChannel(target_str, grpc::InsecureChannelCredentials());
GreeterClient greeter(channel);
std::string user("world");
std::string reply = greeter.SayHello(user);
Server
Server 端需要实现对应的 RPC,所有的 RPC 组成了 Service。
class GreeterServiceImpl final : public Greeter::Service {
Status SayHello(ServerContext* context, const HelloRequest* request,
HelloReply* reply) override {
std::string prefix("Hello ");
reply->set_message(prefix + request->name());
return Status::OK;
}
};
Server 的创建需要一个 Builder,添加上监听的地址和端口,注册上该端口上绑定的服务,最后构建出
Server 并启动
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
std::unique_ptr<Server> server(builder.BuildAndStart());
不管是哪种类型 RPC,都是由 Client 发起请求。
异步相关概念
不管是 Client 还是 Server,异步 gRPC 都是利用 CompletionQueue API 进行异步操作。基本的流程:
1、绑定一个 CompletionQueue 到一个 RPC 调用
2、利用唯一的 void* Tag 进行读写
3、调用 CompletionQueue::Next() 等待操作完成,完成后通过唯一的 Tag 来判断对应什么请求/返回进行后续操作
异步 Client
greeter_async_client.cc 中是异步 Client 的 Demo,其中只有一次请求,逻辑简单。
1、创建 CompletionQueue
2、创建 RPC (既 ClientAsyncResponseReader ),这里有两种方式:
-----stub_->PrepareAsyncSayHello() + rpc->StartCall()
-----stub_->AsyncSayHello()
3、调用 rpc->Finish() 设置请求消息 reply 和唯一的 tag 关联,将请求发送出去
4、使用 cq.Next() 等待 Completion Queue 返回响应消息体,通过 tag 关联对应的请求
class GreeterClient {
public:
explicit GreeterClient(std::shared_ptr<Channel> channel)
: stub_(Greeter::NewStub(channel)) {}
std::string SayHello(const std::string& user) {
HelloRequest request;
request.set_name(user);
HelloReply reply;
ClientContext context;
Status status;
//创建 CompletionQueue
CompletionQueue cq;
//创建 RPC (既 ClientAsyncResponseReader<HelloReply> ):stub_->PrepareAsyncSayHello() + rpc->StartCall()
stub_->PrepareAsyncSayHello(&context, request, &cq));
rpc->StartCall();
//调用 rpc->Finish() 设置请求消息 reply 和唯一的 tag =1关联,将请求发送出去
rpc->Finish(&reply, &status, (void*)1);
void* got_tag;
bool ok = false;
//使用 cq.Next() 等待 Completion Queue 返回响应消息体,通过 tag 关联对应的请求
GPR_ASSERT(cq.Next(&got_tag, &ok));
// Act upon the status of the actual RPC.
if (status.ok()) {
return reply.message();
} else {
return "RPC failed";
}
}
private:
std::unique_ptr<Greeter::Stub> stub_;
};
异步 Server
1、创建一个 CallData,初始构造列表中将状态设置为 CREATE
2、构造函数中,调用 Process()成员函数,调用 service_->RequestSayHello() 后,状态变更为PROCESS:
传入 ServerContext ctx_
传入 HelloRequest request_
传入 ServerAsyncResponseWriter responder_
传入 ServerCompletionQueue* cq_
将对象自身的地址作为 tag 传入
该动作,能将事件加入事件循环,可以在 CompletionQueue 中等待
3、收到请求, cq->Next() 的阻塞结束并返回,得到 tag,既上次传入的 CallData 对象地址
4、用 tag 对应 CallData 对象的 Proceed() ,此时状态为 Process
创建新的 CallData 对象以接收新请求
处理消息体并设置 reply_
将状态设置为 FINISH
调用 responder_.Finish() 将返回发送给客户端
该动作,能将事件加入到事件循环,可以在 CompletionQueue 中等待
5、发送完毕, cq->Next() 的阻塞结束并返回,得到 tag。现实中,如果发送有异常应当有其他相关的处理
6、调用 tag 对应 CallData 对象的 Proceed() ,此时状态为 FINISH, delete this 清理自己,一条消息处理完成。
class ServerImpl final {
private:
class CallData {
public:
// 创建一个 CallData,初始构造列表中将状态设置为 CREATE
CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)
: service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) {
//构造函数中,调用 Process()成员函数,调用 service_->RequestSayHello() 后,状态变更为PROCESS
Proceed();}
void Proceed() {
if (status_ == CREATE) {
//构造函数中,调用 Process()成员函数,调用 service_->RequestSayHello() 后,状态变更为PROCESS
status_ = PROCESS;
// 传入 ServerContext ctx_
// 传入 HelloRequest request_
// 传入 ServerAsyncResponseWriter<HelloReply> responder_
// 传入 ServerCompletionQueue* cq_
// 将对象自身的地址作为 tag 传入
//该动作,能将事件加入事件循环,可以在 CompletionQueue 中等待
service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,this);
}
//收到请求, cq->Next() 的阻塞结束并返回,得到 tag,既上次传入的 CallData 对象地址
//调用 tag 对应 CallData 对象的 Proceed() ,此时状态为 Process
else if (status_ == PROCESS) {
//创建新的 CallData 对象以接收新请求
new CallData(service_, cq_);
//处理消息体并设置 reply_
std::string prefix("Hello ");
reply_.set_message(prefix + request_.name());
// 将状态设置为 FINISH
status_ = FINISH;
//调用 responder_.Finish() 将返回发送给客户端
//该动作,能将事件加入到事件循环,可以在 CompletionQueue 中等待
responder_.Finish(reply_, Status::OK, this);
}//发送完毕, cq->Next() 的阻塞结束并返回,得到 tag。现实中,如果发送有异常应当有其他相关的处理
else {
GPR_ASSERT(status_ == FINISH);
// 调用 tag 对应 CallData 对象的 Proceed() ,此时状态为 FINISH, delete this 清理自己,一条消息处理完成
delete this;
}
}
private:
Greeter::AsyncService* service_;
ServerCompletionQueue* cq_;
ServerContext ctx_;
HelloRequest request_;
HelloReply reply_;
ServerAsyncResponseWriter<HelloReply> responder_;
enum CallStatus { CREATE, PROCESS, FINISH };
CallStatus status_;
};
private:
std::unique_ptr<ServerCompletionQueue> cq_;
Greeter::AsyncService service_;
std::unique_ptr<Server> server_;
};
关系图:
右侧 RPC 为创建的对象中的内存容,左侧使用相同颜色的小块进行代替。
以下 CallData 并非 gRPC 中的概念,而是异步 Server 在实现过程中为了方便进行的封装,其中的
Status 也是在异步调用过程中自定义的、用于转移的状态。