0. RPC Context
保存某些必要的上下文信息;
某端独有功能:Client 获取请求成功或失败
1. RPCBuffer
const
和 constexpr
变量的主要区别是:const
变量的初始化可以被推迟到运行期,constexpr
必须在编译期初始化;所有 constexpr
都是 const
buffer_t
指向实际的数组,代表一个缓存块;再通过链表将各缓存块连接起来;
(1)BUFFER_MODE_NOCOPY 类型的缓存块如何释放?
在代码中,删除或合并时没用对 is_nocopy
进行删除,是否会引发内存泄漏?
规定标记:旧 RPCBuffer 为 B1,新 RPCBuffer 为 B2
在 B1 调用 cut
生成 B2 时,如果当前位置位于某个缓存块的中间位置,那么该缓存块就会由这两个 RPCBuffer 共同管理,注意到在 B2 中管理该缓存块的后半部分,类型为 BUFFER_MODE_NOCOPY ,但在 B1 中管理该缓存块的前半部分,其类型没用改变,仍然为 BUFFER_MODE_GIFT_MALLOC;
显然这种情况要求,B2 先释放后,B1 才能释放,如何保证该要求的实现?
(2)acquire
中申请缓存块时,有极值限制,过大过小都会调整,避免频繁申请内存;
(3)read_back
函数中,offset 绝对值过大时 cur_.first
为 begin
, second
为 begin->buflen
, 如果此时调用 internal_fetch
会获取第二块缓存,但应该获取第一块,就会造成问题;
因此需要保证 offset 不会大于当前的 size_
,如何保证?
2. zero_copy_stream
内部都包装了一个 RPCBuffer 指针;
根据作者的知乎文章,下述两个类是用于序列化和反序列化过程;
2.1 RPCOutputStream
构造时可以指定需要的缓存大小,如果未指定,会按最小的来申请;
(1)Next
结合循环,即内部不断申请缓存块,以填充数据;
// Copy the contents of "infile" to "outfile", using plain read() for
// "infile" but a ZeroCopyOutputStream for "outfile".
int infd = open("infile", O_RDONLY);
int outfd = open("outfile", O_WRONLY);
ZeroCopyOutputStream* output = new FileOutputStream(outfd);
void* buffer;
int size;
while (output->Next(&buffer, &size)) {
int bytes = read(infd, buffer, size);
if (bytes < size) {
// Reached EOF.
output->BackUp(size - bytes);
break;
}
}
delete output;
close(infd);
close(outfd);
由 Protocol Buffers 官网示例可以看出,最后一块内存一般来说会给得多一些,因此只需回退最后一个缓存块,与 RPCBuffer 中 backup
设计吻合;
2.2 RPCInputStream
(1)Next
其中调用了 RPCBuffer 的 fetch
方法,结合 while
循环,即不断获取 RPCBuffer 中待读取的缓存块,直到所有缓存块都读取完毕;
// Read in a file and print its contents to stdout.
int fd = open("myfile", O_RDONLY);
ZeroCopyInputStream* input = new FileInputStream(fd);
const void* buffer;
int size;
while (input->Next(&buffer, &size)) {
cout.write(buffer, size);
}
delete input;
close(fd);
(2)BackUp
理论上,如果内存里有超过一个 message,那么当前 message 解析完整之后,需要回退 count 字节。
这里需要注意一下 RPCBuffer 中的(3)的问题;(即保证不用过度回退)
3. RPCMessage
3.1 SRPCMessage
代表一条消息,message 数据可能是分成数份传递过来的
(1)append
函数,从形参 buf 中接收数据,workflow 收到网络包后会调用该函数;
header 由固定大小的数组接收
meta_buf
由 new
分配空间,存放 meta 数据
message 数据则放入 RPCBuffer 中
(2)deserialize
使用接收 message 数据部分的 buf
初始化 RPCInputStream
直接调用 Protobuf 的 ParseFromZeroCopyStream
解析消息
(3)encode
(要求 iovec 数组的大小最少为 3);workflow 会在进行网络发送时会被调用
- 设置 header
- 将 header 数组和
meta_buf
分别填入 iovec 数据结构中(使用两个) - 调用
buf
的encode
方法,将 message 部分填入 iovec 数据结构中(最少使用一个)
可以合理猜测底层 workflow 会使用 writev
来发送数据
4. rpc_module
4.1 SnowFlake
(1)get_id
生成不同的分布式 ID,总位数为 64 位,由以下四部分组成:
timestamp | group_id | machine_id | sequence |
---|
timestamp 由 std::chrono::steady_clock
产生,单位为 ms,如果在同一 ms 内的申请,由 sequence 进行区分
(std::chrono::steady_clock
为单调时钟,两个 tick 之间的时间固定,与钟表上的时间不同(可以是从某个时间点开始的),适合测量间隔;)
sequence 和 last_timestamp 由原子变量维护;
5. 服务器端
5.1 RPCWorker
封装了 请求和相应 Message,及 RPCContext,这些基类指针指向派生类(是 proto
生成类型,基类为 google::protobuf::Message
)
5.2 RPCService
服务名 name_
内部维护一个哈希表 methods_
,key 为方法名,val 为回调函数
(1)add_method
就是将回调函数加入到这个 methods_
中;
(2)ServiceRPCCallImpl
申请 req 的空间
从 RPCWorker 中接收到的 Message 反序列化出 req( SRPCMessage 中 buf
转化为 EchoRequest 类型)
如果成功解析,调用用户 rpc 函数;
5.2.1 Service
位于服务端;
service Example {
rpc Echo(EchoRequest) returns (EchoResponse);
};
是由 srpc_generator protobuf
生成的 xxx.srpc.h 文件中,继承自 RPCService;
RPC 服务为 Service 中的纯虚函数,用户需要继承自该类,实现该函数;
(1)Service 的构造函数会设置 name_
为 Example,调用 RPCService::add_method
,从而注册回调函数(方法名为 Echo,绑定 this 指针结合虚函数动态绑定机制的调用用户实现的函数);
5.2.2 RPCServer
继承自 WFServer
内部维护一个哈希表 service_map
,key 为服务名,val 为 RPCService 指针;
(1)add_service
就是在 service_map
中添加 RPCService ;
(2)start
为 WFServer 中的方法,为开启 TCP 服务器;
推测有数据到来,会调用 server_process
函数
(3)server_process
-
ParseFromArray
解析 RPCMeta;(反序列化,从meta_buf
转换为 RPCMeta 类型) -
从 RPCMeta 中的解析出 RPCMetaKeyValue 存放到 RPCModuleData 中(
map
类型) -
module 部分处理(此步看不太懂)
-
message 数据部分解压缩;
-
调用 rpc;
-
设置响应状态码
5.2.3 RPCServerTask
(1)message_out
,用来告诉 Workflow 网络层面这次发出的请求内容时啥
-
将 EchoResponse 序列化为到 SRPCMessage类型的
buf
中 -
将 SRPCMessage类型的
buf
中的数据进行压缩 -
处理 module
-
序列化 meta(将 RPCMeta 类型转换为 SRPCMessage 中的字符数组
meta_buf
)
RPCMeta 类型也是由proto
定义生成的类型 -
设置状态码,即设置 meta 中的 status_code 属性
6. RPCCompressor
使用了单例模式,局部静态变量初始化
私有构造函数会添加其支持的各种压缩算法;
持有 CompressHandler 的固定大小的数组,每种代表一个压缩算法;
(1)parse_from_compressed
,根据给定的类型,调用相关压缩算法的函数
6.1 CompressHandler
封装了一些函数句柄;
7. 客户端部分
由于要放寒假了,之后有时间再看吧;
7.1 RPCClientTask
user_done_
就是用户设置的 rpc 完成回调函数;
如果使用 rpc 的异步接口,create_rpc_client_task
即调用 RPCClient::create_rpc_client_task
,在该函数中会创建 RPCClientTask。
重要参考
作者本人的知乎,对整个流程介绍还是非常清晰的,尤其是对像我这种不懂 workflow 的人
https://zhuanlan.zhihu.com/p/619721187