测试gRPC例子
- 编写proto文件
- 实现服务端代码
- 实现客户端代码
通过gRPC 已经编译并且安装好之后,就可以在源码目录下找到example 文件夹下来试用gRPC 提供的例子。
在这里我使用VS2022来打开仓库目录下example/cpp/helloworld
目录
编写proto文件
下面是我改写的example/protos/helloworld.proto
,和相应的greeter_client.cc
和 greeter_server.cc
两个文件。这里面定义了一个叫做Greeter的服务,同时这里尝试4种类型的方法。
- 简单 RPC ,客户端使用存根发送请求到服务器并等待响应返回,就像平常的函数调用一样。
- 服务器端流式 RPC , 客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息。从例子中可以看出,通过在响应类型前插入
stream
关键字,可以指定一个服务器端的流方法。 - 客户端流式 RPC ,客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦客户端完成写入消息,它等待服务器完成读取返回它的响应。通过在请求类型前指定
stream
关键字来指定一个客户端的流方法。 - 双向流式 RPC,双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器可以以任意喜欢的顺序读写:比如,服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替的读取和写入消息,或者其他读写的组合。 每个流中的消息顺序被预留。你可以通过在请求和响应前加
stream
关键字去制定方法的类型。
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";
package helloworld;
// The greeting service definition.
service Greeter {
// A simple RPC
rpc SayHello (HelloRequest) returns (HelloReply) {}
// A server-side streaming RPC
rpc SayHelloStreamReply (HelloRequest) returns (stream HelloReply) {}
// A client-side streaming RPC
rpc StreamHelloReply (stream HelloRequest) returns (HelloReply) {}
// A bidirectional streaming RPC
rpc StreamHelloStreamReply (stream HelloRequest) returns (stream HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
当编辑完helloworld.proto文件之后,可以直接点击vs2022->生成->全部重新生成。此时VS会调用protocol buffer 编译器和gRPC C++ plugin生成4个客户端和服务端的文件,所有生成的文件在${projectDir}\out\build\${name}
文件夹下。
如下方是我编写的手动生成这4个文件的批处理脚本
@echo off
set fileName=temp
set currentPath=%~dp0%
set target=--cpp_out
set protocPath= "Your protoc path"
set pluginPath= "Your c++ plugin path"
for %%I in (%*) do (
for /f "tokens=1,2 delims=-/:" %%J in ("%%I%") do (
if %%J==f (set fileName=%%K)
if %%J==t (set target=%%K)
)
)
if not exist "%currentPath%%fileName%" (
echo file not exist
goto:error
)
if %target%==python (
set pluginPath= "Your python plugin path"
set target=--python_out
)
if %target%==csharp (
set pluginPath= "Your C# plugin path"
set target=--csharp_out
)
%protocPath% --grpc_out=%currentPath% --plugin=protoc-gen-grpc=%pluginPath% --proto_path=%currentPath% %fileName%
%protocPath% %target%=%currentPath% --proto_path=%currentPath% %fileName%
:error
pause
exit
真正起作用编译出4个文件的代码是
%protocPath% --grpc_out=%currentPath% --plugin=protoc-gen-grpc=%pluginPath% --proto_path=%currentPath% %fileName%
%protocPath% %target%=%currentPath% --proto_path=%currentPath% %fileName%
至此${projectDir}\out\build\${name}
文件夹下会生成如下4个文件
- helloworld.pb.h 消息类的头文件
- helloworld.pb.cc 消息类的实现
- helloworld.grpc.pb.h 服务类的头文件
- helloworld.grpc.pb.cc 服务类的实现
同时里面还包含所有的填充、序列化和获取请求和响应消息类型的 protocol buffer 代码,和一个名为Greeter的类,这个类中包含了:
- 方便客户端调用的存根
- 需要服务端实现的虚接口
实现服务端代码
以下是greeter_server.cc
#include <iostream>
#include <memory>
#include <string>
#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include "absl/strings/str_format.h"
#include <grpcpp/ext/proto_server_reflection_plugin.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/health_check_service_interface.h>
#ifdef BAZEL_BUILD
#include "examples/protos/helloworld.grpc.pb.h"
#else
#include "helloworld.grpc.pb.h"
#endif
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using grpc::ServerWriter;
using grpc::ServerReader;
using grpc::ServerReaderWriter;
using helloworld::Greeter;
using helloworld::HelloReply;
using helloworld::HelloRequest;
using std::string;
ABSL_FLAG(uint16_t, port, 50051, "Server port for the service");
// Logic and data behind the server's behavior.
class GreeterServiceImpl final : public Greeter::Service
{
Status SayHello(ServerContext* context, const HelloRequest* request, HelloReply* reply) override
{
std::cout << "--------" << __FUNCTION__ << "--------" << std::endl;
string strName = request->name();
reply->set_message("Hello : " + strName);
std::cout << "--------" << __FUNCTION__ << "--------" << std::endl;
return Status::OK;
}
Status SayHelloStreamReply(ServerContext* context, const HelloRequest* request, ServerWriter<HelloReply>* writer)
{
std::cout << "--------" << __FUNCTION__ << "--------" << std::endl;
string strName = request->name();
HelloReply reply;
reply.set_message("Hello " + strName);
writer->Write(reply);
for (unsigned int i = 0; i < 10; i++)
{
reply.clear_message();
reply.set_message("This is Server Reply : " + std::to_string(i));
writer->Write(reply);
}
std::cout << "--------" << __FUNCTION__ << "--------" << std::endl;
return Status::OK;
}
Status StreamHelloReply(ServerContext* context, ServerReader<HelloRequest>* reader, HelloReply* response)
{
std::cout << "--------" << __FUNCTION__ << "--------" << std::endl;
HelloRequest req;
while (reader->Read(&req))
{
std::cout << "Server got what you said " << req.name() << std::endl;
}
response->set_message("Server got what you said " + req.name());
std::cout << "--------" << __FUNCTION__ << "--------" << std::endl;
return Status::OK;
}
Status StreamHelloStreamReply(ServerContext* context, ServerReaderWriter< HelloReply, HelloRequest>* stream)
{
std::cout << "--------" << __FUNCTION__ << "--------" << std::endl;
HelloRequest req;
HelloReply reply;
unsigned long uiCount = 0ul;
while (stream->Read(&req))
{
std::cout << "Server got what you said " << req.name() << std::endl;
reply.clear_message();
reply.set_message("This is Server count " + std::to_string(uiCount++));
stream->Write(reply);
}
std::cout << "--------" << __FUNCTION__ << "--------" << std::endl;
return Status::OK;
}
};
void RunServer(uint16_t port) {
std::string server_address = absl::StrFormat("0.0.0.0:%d", port);
GreeterServiceImpl service;
grpc::EnableDefaultHealthCheckService(true);
grpc::reflection::InitProtoReflectionServerBuilderPlugin();
ServerBuilder builder;
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
// Register "service" as the instance through which we'll communicate with
// clients. In this case it corresponds to an *synchronous* service.
builder.RegisterService(&service);
// Finally assemble the server.
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
// Wait for the server to shutdown. Note that some other thread must be
// responsible for shutting down the server for this call to ever return.
server->Wait();
}
int main(int argc, char** argv) {
absl::ParseCommandLine(argc, argv);
RunServer(absl::GetFlag(FLAGS_port));
return 0;
}
可以看到,里面有两部分代码。一部分是真正实现服务接口内在逻辑的代码,逻辑都在GreeterServiceImpl 这个类中。另一部分是运行一个服务,并使它监听固定端口的代码,逻辑都在RunServer这个函数中。
实现客户端代码
以下是greeter_client.cc
#include <iostream>
#include <memory>
#include <string>
#include <chrono>
#include <thread>
#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include <grpcpp/grpcpp.h>
#ifdef BAZEL_BUILD
#include "examples/protos/helloworld.grpc.pb.h"
#else
#include "helloworld.grpc.pb.h"
#endif
ABSL_FLAG(std::string, target, "localhost:50051", "Server address");
using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;
using helloworld::Greeter;
using helloworld::HelloReply;
using helloworld::HelloRequest;
using std::thread;
class GreeterClient {
public:
GreeterClient(std::shared_ptr<Channel> channel)
: stub_(Greeter::NewStub(channel)) {}
// Assembles the client's payload, sends it and presents the response back
// from the server.
void SayHello(const std::string& user)
{
std::cout << "--------" << __FUNCTION__ << "--------" << std::endl;
// Data we are sending to the server.
HelloRequest request;
request.set_name(user);
// Container for the data we expect from the server.
HelloReply reply;
// Context for the client. It could be used to convey extra information to
// the server and/or tweak certain RPC behaviors.
ClientContext context;
// The actual RPC.
Status status = stub_->SayHello(&context, request, &reply);
// Act upon its status.
if (status.ok()) {
std::cout << "Server said " << reply.message() << std::endl;
} else {
status.error_message();
std::cout << status.error_code() << ": " << status.error_message()
<< std::endl;
}
std::cout << "--------" << __FUNCTION__ << "--------" << std::endl;
}
void SayHelloStreamReply(const std::string& user)
{
std::cout << "--------" << __FUNCTION__ << "--------" << std::endl;
// Data we are sending to the server.
HelloRequest request;
request.set_name(user);
// Context for the client. It could be used to convey extra information to
// the server and/or tweak certain RPC behaviors.
ClientContext context;
HelloReply reply;
// The actual RPC.
auto Reader = stub_->SayHelloStreamReply(&context, request);
while (Reader->Read(&reply))
{
std::cout << reply.message() << std::endl;
}
if (!Reader->Finish().ok())
{
std::cout << Reader->Finish().error_code() << ": " << Reader->Finish().error_message()
<< std::endl;
std::cout << "RPC failed" << std::endl;
}
std::cout << "--------" << __FUNCTION__ << "--------" << std::endl;
}
void StreamHelloReply(const std::string& user)
{
std::cout << "--------" << __FUNCTION__ << "--------" << std::endl;
HelloReply reply;
ClientContext context;
auto Writer = stub_->StreamHelloReply(&context, &reply);
for (unsigned int i = 0; i < 10; i++)
{
// Data we are sending to the server.
HelloRequest request;
std::string strName = user + std::to_string(i);
request.set_name(strName);
Writer->Write(request);
}
Writer->WritesDone();
if (!Writer->Finish().ok())
{
std::cout << Writer->Finish().error_code() << ": " << Writer->Finish().error_message()
<< std::endl;
std::cout << "RPC failed" << std::endl;
}
else
{
std::cout << reply.message() << std::endl;
}
std::cout << "--------" << __FUNCTION__ << "--------" << std::endl;
}
void StreamHelloStreamReply(const std::string& user)
{
std::cout << "--------" << __FUNCTION__ << "--------" << std::endl;
HelloReply reply;
ClientContext context;
std::shared_ptr< ::grpc::ClientReaderWriter< ::helloworld::HelloRequest, ::helloworld::HelloReply>> Stream = stub_->StreamHelloStreamReply(&context);
thread t1([Stream, user]() {
for (unsigned int i = 0; i < 20; i++)
{
// Data we are sending to the server.
HelloRequest request;
std::string strName = user + std::to_string(i);
request.set_name(strName);
Stream->Write(request);
}
Stream->WritesDone();
});
while (Stream->Read(&reply))
{
std::cout << reply.message() << std::endl;
}
t1.join();
if (!Stream->Finish().ok())
{
std::cout << Stream->Finish().error_code() << ": " << Stream->Finish().error_message()
<< std::endl;
std::cout << "RPC failed" << std::endl;
}
std::cout << "--------" << __FUNCTION__ << "--------" << std::endl;
}
private:
std::unique_ptr<Greeter::Stub> stub_;
};
int main(int argc, char** argv) {
absl::ParseCommandLine(argc, argv);
// Instantiate the client. It requires a channel, out of which the actual RPCs
// are created. This channel models a connection to an endpoint specified by
// the argument "--target=" which is the only expected argument.
std::string target_str = absl::GetFlag(FLAGS_target);
// We indicate that the channel isn't authenticated (use of
// InsecureChannelCredentials()).
GreeterClient greeter(grpc::CreateChannel(target_str, grpc::InsecureChannelCredentials()));
std::string strName = "User";
greeter.SayHello(strName);
Sleep(500);
greeter.SayHelloStreamReply(strName);
Sleep(500);
greeter.StreamHelloReply(strName);
Sleep(500);
greeter.StreamHelloStreamReply(strName);
return 0;
}
客户端代码想要调用服务必须要使用到存根,所以我们可以在代码里看到std::unique_ptr<Greeter::Stub> stub_
。这样我们在客户端调用时,就像是调用一个本地方法一样。