上周提到我们要给llama.cpp增加一个grpc入口,这是最终成果仓库,等待进一步测试后提交合并。
今天讲讲GRPC CPP开发的麻烦事情。
参考文档
Quick start | C++ | gRPC,参考文档就是官方的这篇文档了,安装grpc可以参考我上一篇文章,GRPC C++ windows下的简易安装方法https://blog.csdn.net/baijiafan/article/details/130935982?spm=1001.2014.3001.5501
可以看的示例仓库
动手前其实没有想到grpc cpp开发和golang差这么多,下面是我主要参考的两个例子,都是官方库里的例子,但具体目录不同。
grpc/examples/cpp/helloworld at master · grpc/grpc · GitHubhttps://github.com/grpc/grpc/tree/master/examples/cpp/helloworld
example下的这个例子,是入门的例子,对应的proto是
grpc/helloworld.proto,这个例子作为第一次做CPP的GRPC编程入口可以,先可以用来了解基础的架构。如果只是用简单的调用模式,应该就足够了。
但是要做stream还不太够
stream要看对应的hellostreamingworld.proto,stream版本的示例是个双向stream,对应有两个版本的实现:
async版本
使用async api,这个api的详细介绍在下文,参考greeter_async_***的实现,这个async的实现,说实话,我没怎么看懂,似乎把很多GRPC底层的状态变化都交给了上层处理,过程中遇到了几次搞不清楚的状况后,被我抛弃了,我也不建议使用C++的同学参考。
Asynchronous-API tutorial | C++ | gRPC
callback版本
使用callback api,这是个2021年提交的proporal对应的实现方式,我个人觉得更为简单,逻辑更为清楚。
proposal/L67-cpp-callback-api.md at master · grpc/proposal · GitHub
这个版本的实现参考greeter_callback_*的实现即可,他把主要的流程隐藏了,对于实现方只需要实现对应的callback函数即可,整体逻辑性更强。
单向stream的实现方式
如果是双向stream,参考上面的示例应该差不多了,但是单向的还不行,上面的例子还不行。下面是官方库里另一个目录的示例,可以参考,里面的代码很详细。
grpc/test_service_impl.cc at master · grpc/grpc · GitHub
因为我们是单向stream,我下面详细讲讲单向的写法
单向stream写法
对应定义如下:
service LlamaGoService {
rpc Answer(Job) returns (stream Output){}
}
主要的代码部分包括:
启动服务
void RunServer(uint16_t port, LlamaServerContext *llama)
{
//监听地址
std::string server_address = absl::StrFormat("0.0.0.0:%d", port);
//对应的service的实现类
LlamaServiceImpl service(llama);
//这个可选
grpc::EnableDefaultHealthCheckService(true);
ServerBuilder builder;
//去掉了tls检查,生产版本请修改
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
//注册服务
builder.RegisterService(&service);
// 启动五福
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
//监听服务
server->Wait();
}
上述代码中,主要就是启动服务,并把具体的service逻辑指向了一个具体的实现类LLamaServiceImpl。
LLamaServiceImpl实现类
class LlamaServiceImpl final : public LlamaGoService::CallbackService
{
class Reactor : public grpc::ServerWriteReactor<Output>
{
public:
Reactor(CallbackServerContext *ctx, const Job *request)
: ctx_(ctx), request_(request),
{
if (...) //正常情况
{
NextWrite();
}
else
{
Finish(grpc::Status::OK);
}
}
void OnDone() override
{
delete this;
}
void OnWriteDone(bool /*ok*/) override
{
NextWrite();
}
void OnCancel() override
{
FinishOnce(Status::CANCELLED);
}
private:
CallbackServerContext *const ctx_;
const Job *const request_;
std::mutex finish_mu_;
bool finished_{false};
Output response;
void NextWrite()
{
if (not_finish)
{
std::lock_guard<std::mutex> l(finish_mu_);
//设置业务内容
response.set_output(result);
StartWrite(&response);
}
else
{
{
std::lock_guard<std::mutex>
l(finish_mu_);
if (!finished_)
{
response.set_status(llama::Status::FINISHED);
StartWriteLast(&response, grpc::WriteOptions());
}
}
FinishOnce(Status::OK);
}
}
void FinishOnce(const Status &s)
{
std::lock_guard<std::mutex> l(finish_mu_);
if (!finished_)
{
Finish(s);
finished_ = true;
}
}
};
public:
LlamaServiceImpl()
{
}
ServerWriteReactor<Output> *Answer(
CallbackServerContext *context, const Job *request)
{
Reactor *reactor = new Reactor(context, request);
return reactor;
}
private:
};
上面这个代码可以作为单向s2c stream的一个参考实现,他主要包含两个部分,一个是主类LlamaServiceImpl,主要的功能是在客户端调用对应的函数(这里是Answer的时候,初始化一个Reactor类),这个类必须是单向s2c stream要求的ServerWriteReactor<T>的派生类。
第二个类Reactor是单向s2c stream实现的主要功能类。这个类的功能逻辑实际是通过不断地调用StartWrite来实现异步消息发送:
初始化函数调用一次NextWrite之后就返回自身给grpc框架,grpc框架完成这次stream消息发送后,掉Reactor的OnWriteDone函数,告诉业务类,上次消息写完了,这时业务判断是否写完,如果没有,继续调NextWrite,继续写,然后继续回调,依此循环。
按我们踩坑的经验,还有个有意思的地方,可能是太久不写C++,对变量生命周期有误解,NextWrite里调用StartWrite的response不能是函数类声明变量,必须是类成员变量。指针或对象都可以。但是如果参数传的是函数内声明变量,就无法正常发送消息。