📢博客主页:https://blog.csdn.net/2301_779549673
📢博客仓库:https://gitee.com/JohnKingW/linux_test/tree/master/lesson
📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
📢本文由 JohnKi 原创,首发于 CSDN🙉
📢未来很长,值得我们全力奔赴更美好的生活✨
文章目录
- 📢前言
- 🏳️🌈一、基于 dispatcher 的 RPC 框架逻辑梳理
- 1.1 模块职责划分
- 1.2 核心流程解析
- 1.3 详细交互流程
- 🏳️🌈二、服务端 RpcRouter 实现
- 2.1 模块职责划分
- 2.2 VType:JSON 值类型枚举
- 2.3 ServiceDescribe:服务描述元数据
- 2.4 SDescribeFactory:建造者模式创建服务描述
- 2.5 ServiceManager:服务注册与管理
- 2.6 RpcRouter:RPC 请求路由处理
- 2.7 RpcRouter 整体代码
- 🏳️🌈三、客户端 Requestor 实现
- 3.1 模块职责划分
- 3.2 RequestDescribe:请求描述元数据
- 3.3 onResponse 方法
- 3.4 send 方法
- 3.4.1 同步发送(阻塞等待结果)
- 3.4.2 异步 Future 发送(非阻塞,返回 future)
- 3.4.3 异步回调发送(非阻塞,触发回调函数)
- 3.5 增查删方法
- 3.6 Requestor 整体代码
- 🏳️🌈四、客户端 RpcCaller 方法
- 2.1 模块职责划分
- 2.2 call 同步调用(阻塞等待结果)
- 2.3 call 异步 Future 调用(非阻塞,通过 future 获取结果)
- 2.4 call 异步回调调用(非阻塞,通过回调函数处理结果)
- 2.5 RpcCaller 整体代码
- 👥总结
📢前言
前几篇文章中,笔者介绍了rpc
的原理和目的,也介绍了需要使用的部分第三方库
和我们所需实现的功能
现在我们着手项目实现篇章
,目前零碎接口 、 项目消息字段类型 和 抽象层的封装 都已经完成了
截至上一篇文章,我们已经进行到 Dispatcher 路由的封装了
根据之前的讨论,dispatcher
负责消息的分发和处理。
再根据rpc框架的实现模式
graph TD
A[客户端] -->|1. 发起 RPC 调用| B(RpcCaller)
B -->|2. 发送 RpcRequest| C[Requestor]
C -->|3. 网络传输| D[服务端 Dispatcher]
D -->|4. 路由到 RpcRouter| E[RpcRouter]
E -->|5. 参数校验| F[ServiceManager]
F -->|6. 调用业务回调| G[ServiceDescribe]
G -->|7. 返回 RpcResponse| E
E -->|8. 返回响应| C
C -->|9. 处理结果| A
H[客户端] -->|发布主题消息| I[TopicRequest]
I --> J[服务端 Dispatcher]
J -->|广播到订阅者| K[订阅同一主题的客户端]
在实现了 dispatcher
的封装后,我们还需要对 业务层 的主要功能进行封装
- RPC 调用(核心请求-响应机制)
- 主题(消息发布-订阅机制)
- 服务管理(服务注册、发现、治理)
这一篇,我们将对RPC 调用(核心请求-响应机制)进行封装
🏳️🌈一、基于 dispatcher 的 RPC 框架逻辑梳理
1.1 模块职责划分
1.2 核心流程解析
(1) 服务端流程(RpcRouter
+ Dispatcher
)
- 注册服务
通过SDescribeFactory
创建服务描述(参数校验规则、回调函数等)。
调用RpcRouter::registerMethod
将服务注册到ServiceManager
。 - 处理请求
客户端发送RpcRequest
→ 服务端Dispatcher
根据消息类型 REQ_RPC 调用RpcRouter::onRpcRequest
。
RpcRouter
查询服务描述 → 参数校验 → 调用业务回调 → 组织RpcResponse
并发送。
// 伪代码:服务端处理流程
dispatcher->registerHandler<RpcRequest>(MType::REQ_RPC, [&](conn, req) {
rpc_router->onRpcRequest(conn, req); // 由 Dispatcher 触发
});
(2) 客户端流程(Requestor
+ RpcCaller
+ Dispatcher
)
- 发送请求
用户通过RpcCaller
发送请求(同步/异步)。
RpcCaller
创建RpcRequest
→ 调用Requestor::send
发送。 - 接收响应
服务端返回RpcResponse
→ 客户端Dispatcher
根据消息类型 RSP_RPC 调用Requestor::onResponse
。
Requestor
根据请求 ID 找到对应RequestDescribe
→ 设置promise
或触发回调。
// 伪代码:客户端处理流程
dispatcher->registerHandler<RpcResponse>(MType::RSP_RPC, [&](conn, rsp) {
requestor->onResponse(conn, rsp); // 由 Dispatcher 触发
});
1.3 详细交互流程
同步调用示例
sequenceDiagram
participant Client as 客户端 (RpcCaller)
participant Requestor as Requestor
participant Dispatcher as Dispatcher
participant Server as 服务端 (RpcRouter)
Client->>Requestor: call(同步)
Requestor->>+Dispatcher: 发送 RpcRequest
Dispatcher->>Server: 路由到 RpcRouter::onRpcRequest
Server->>Server: 参数校验 → 业务处理
Server->>Dispatcher: 返回 RpcResponse
Dispatcher->>Requestor: 触发 Requestor::onResponse
Requestor->>Requestor: 设置 future 值
Requestor->>-Client: 返回结果
异步 Future 调用示例
sequenceDiagram
participant Client as 客户端 (RpcCaller)
participant Requestor as Requestor
participant Dispatcher as Dispatcher
participant Server as 服务端 (RpcRouter)
Client->>Requestor: call(异步 Future)
Requestor->>+Dispatcher: 发送 RpcRequest
Dispatcher->>Server: 路由到 RpcRouter::onRpcRequest
Server->>Server: 参数校验 → 业务处理
Server->>Dispatcher: 返回 RpcResponse
Dispatcher->>Requestor: 触发 Requestor::onResponse
Requestor->>Requestor: 设置 promise 值
Client->>Client: 通过 future.get() 获取结果
异步回调调用示例
sequenceDiagram
participant Client as 客户端 (RpcCaller)
participant Requestor as Requestor
participant Dispatcher as Dispatcher
participant Server as 服务端 (RpcRouter)
Client->>Requestor: call(异步回调)
Requestor->>+Dispatcher: 发送 RpcRequest
Dispatcher->>Server: 路由到 RpcRouter::onRpcRequest
Server->>Server: 参数校验 → 业务处理
Server->>Dispatcher: 返回 RpcResponse
Dispatcher->>Requestor: 触发 Requestor::onResponse
Requestor->>Client: 调用用户回调函数
通过 Dispatcher
实现消息分发中枢,RpcRouter
和 Requestor
分别处理 服务端 和 客户端 的 RPC 协议逻辑,代码结构清晰且扩展性强。
🏳️🌈二、服务端 RpcRouter 实现
2.1 模块职责划分
先来看看最后需要实现哪些部分
VType
:JSON 值类型枚举ServiceDescribe
:服务描述元数据SDescribeFactory
:建造者模式创建服务描述ServiceManager
:服务注册与管理RpcRouter
:RPC 请求路由处理
** 完整请求处理流程**
- 客户端发送
RpcRequest
,包含方法名method
、参数params
和唯一ID
。 Dispatcher
将RpcRequest
路由到RpcRouter::onRpcRequest
- 服务查找与校验
// 服务存在性检查
auto service = _service_manager->select("add");
// 参数合法性检查
if (!service->paramcheck({ {"num1", 10}, {"num2", 20} }))...
- 业务处理
// 回调用户定义的业务函数
_callback(params, result);
// params示例:{ "num1": 10, "num2": 20 }, result示例:30
- 组织
RpcResponse
并发送回 客户端
2.2 VType:JSON 值类型枚举
enum class VType{
BOOLEAN = 0,
INTEGRAL,
NUMERIC,
STRING,
ARRAY,
OBJECT
};
用途:定义 JSON 数据类型的枚举,用于参数校验。
关键点:每个类型对应 Json::Value 的检查方法(如 isBool())。
2.3 ServiceDescribe:服务描述元数据
主要目的
- 检查参数是否存在且类型匹配
- 调用业务回调接口进行业务处理
成员变量
关键方法
- paramcheck(const Json::Value& params) 检查参数是否存在且类型匹配
// 判断 json值 类型是否符合预期
bool check(VType vtype, const Json::Value& val) {
switch (vtype) {
case VType::BOOLEAN:
return val.isBool();
case VType::INTEGRAL:
return val.isInt();
case VType::NUMERIC:
return val.isNumeric();
case VType::STRING:
return val.isString();
case VType::ARRAY:
return val.isArray();
case VType::OBJECT:
return val.isObject();
default:
return false;
}
}
// 判断 json值 类型是否符合预期
bool rtypeCheck(const Json::Value& val) { return check(_return_type, val); }
// 进行参数校验
bool paramcheck(const Json::Value& params) {
// 对 params 进行参数校验
for (auto desc : _params_desc) {
// 判断参数是否存在
if (params.isMember(desc.first) == false) {
ELOG("参数字段完整性校验失败,缺少字段:%s", desc.first.c_str())
return false;
}
// 判断参数类型是否符合预期
if (check(desc.second, params[desc.first]) == false) {
ELOG("参数字段类型校验失败,字段:%s", desc.first.c_str())
return false;
}
}
DLOG("参数字段完整性校验成功");
return true;
}
- call(const Json::Value& params, Json::Value& result) 调用业务回调接口进行业务处理
// 调用业务回调接口进行业务处理
bool call(const Json::Value& params, Json::Value& result) {
// 构建回调方法
// 前者放入参数,后者得到结果
_callback(params, result);
if (rtypeCheck(result) == false) {
ELOG("回调处理函数中的相应信息校验失败!");
return false;
}
return true;
}
2.4 SDescribeFactory:建造者模式创建服务描述
其实这部分一开始是想就直接放在上面的 ServiceDescribe
部分的,但是考虑到整个部分的严谨性和独立性,还是决定单独封装一个接口用来创建相关的字段
相较于前面的工厂模式来看,这种方法称为建造者模式
- 工厂模式:当一个类不知道它所需要的对象的类,或是当一个类希望由子类来指定创建的对象时,可以使用工厂方法。当一系列相关对象需要被创建以一起工作,并且希望避免与具体类的紧密耦合时,可以使用抽象工厂模式。
- 建造者模式:当创建复杂对象的算法应该独立于该对象的组成部分以及它们的装配方式时,或者当构造过程必须允许被构造的对象有不同的表示时,适用于建造者模式。
这种模式构造出来的对象都是不可修改的,只能用来创建对象,不能修改对象内部的状态
class SDescribeFactory {
public:
void setMethodName(const std::string& name) { _method_name = name; }
void setReturnType(VType vtype) { _return_type = vtype; }
void setParamsDesc(const std::string& pname, VType vtype) {
_params_desc.push_back(ServiceDescribe::ParamsDescribe(pname, vtype));
}
void setCallback(const ServiceDescribe::ServiceCallback& cb) {
_callback = cb;
}
ServiceDescribe::ptr build() {
return std::make_shared<ServiceDescribe>(std::move(_method_name), std::move(_params_desc), _return_type, std::move(_callback));
}
private:
std::string _method_name;
ServiceDescribe::ServiceCallback _callback; // 实际的业务回调函数
std::vector<ServiceDescribe::ParamsDescribe>
_params_desc; // 参数字段格式描述
VType _return_type; // 结果作为返回值类型的描述
};
功能:分步构建 ServiceDescribe
对象,提升代码可读性。
设计优势:避免 ServiceDescribe
构造函数参数过多。
2.5 ServiceManager:服务注册与管理
我们利用 mutex
的机制,确保在多线程环境下,能够安全地进行响应操作
// 线程安全地管理服务注册表,提供增删查功能
class ServiceManager {
public:
using ptr = std::shared_ptr<ServiceManager>;
// 增
void insert(const ServiceDescribe::ptr& desc) {
std::unique_lock<std::mutex> lock(_mutex);
_services.insert(std::make_pair(desc->method(), desc));
}
// 查
ServiceDescribe::ptr select(const std::string& method_name) {
std::unique_lock<std::mutex> lock(_mutex);
auto it = _services.find(method_name);
if (it == _services.end()) {
return ServiceDescribe::ptr();
}
return it->second;
}
// 删
void remove(const std::string& method_name) {
std::unique_lock<std::mutex> lock(_mutex);
_services.erase(method_name);
}
private:
std::mutex _mutex;
std::unordered_map<std::string, ServiceDescribe::ptr> _services;
};
成员变量:std::unordered_map<std::string
, ServiceDescribe::ptr>
。
职责:线程安全地管理服务注册表,提供增删查功能。
2.6 RpcRouter:RPC 请求路由处理
核心方法 onRpcRequest
// 这是注册到 dispatcher 模块针对 rpc 请求进行回调处理的业务函数
void onRpcRequest(const BaseConnection::ptr& conn, RpcRequest::ptr& req) {
// 1. 查询客户端请求的方法描述 -- 判断当前服务端能否提供对应的服务
auto service = _service_manager->select(req->method());
if (service.get() == nullptr) {
ELOG("客户端请求的方法描述不存在!", req->method().c_str())
return response(conn, req, Json::Value(),
fields::RCode::RCODE_NOT_FOUND_SERVICE);
}
// 2. 进行参数校验,确定能否提供服务
if (service->paramcheck(req->params() == false)) {
ELOG("客户端请求的参数校验失败!", req->method().c_str())
return response(conn, req, Json::Value(),
fields::RCode::RCODE_INVALID_PARAMS);
}
// 3. 调用业务回调接口进行业务处理
Json::Value result;
bool ret = service->call(req->params(), result);
if (ret == false) {
ELOG("客户端请求的业务处理失败!", req->method().c_str())
return response(conn, req, Json::Value(),
fields::RCode::RCODE_INTERNAL_ERROR);
}
// 4. 处理完毕得到结果,组织相应,向客户端发送
return response(conn, req, result, fields::RCode::RCODE_OK);
}
流程:服务查找 → 参数校验 → 业务处理 → 返回响应
registerMethod 方法
// 注册服务描述
void registerMethod(const ServiceDescribe::ptr& service) {
return _service_manager->insert(service);
}
response 方法
// 组织响应报文
void response(const BaseConnection::ptr& conn, RpcRequest::ptr& req,
const Json::Value& rsp, RCode rcode) {
auto msg = MessageFactory::create<RpcResponse>();
msg->setId(req->rid());
msg->setMType(fields::MType::RSP_RPC);
msg->setRcode(rcode);
msg->setResult(rsp);
conn->send(msg);
}
2.7 RpcRouter 整体代码
// 业务回调函数总结
#pragma once
#include "../common/net.hpp"
#include "../common/message.hpp"
namespace rpc{
namespace server{
enum class VType{
BOOLEAN = 0,
INTEGRAL,
NUMERIC,
STRING,
ARRAY,
OBJECT
};
class ServiceDescribe{
public:
using ptr = std::shared_ptr<ServiceDescribe>;
using ServiceCallback = std::function<void(const Json::Value&, Json::Value&)>;
using ParamsDescribe = std::pair<std::string, VType>;
ServiceDescribe(std::string &&mname, std::vector<ParamsDescribe> &&desc, VType vtype, ServiceCallback &&handler) :
_method_name(std::move(mname)),
_callback(std::move(handler)),
_params_desc(std::move(desc)),
_return_type(vtype)
{}
const std::string& method() { return _method_name; }
// 进行参数校验
bool paramcheck(const Json::Value& params){
// 对 params 进行参数校验
for(auto desc : _params_desc){
// 判断参数是否存在
if(params.isMember(desc.first) == false){
ELOG("参数字段完整性校验失败,缺少字段:%s", desc.first.c_str())
return false;
}
// 判断参数类型是否符合预期
if(check(desc.second, params[desc.first]) == false){
ELOG("参数字段类型校验失败,字段:%s", desc.first.c_str())
return false;
}
}
DLOG("参数字段完整性校验成功");
return true;
}
// 调用业务回调接口进行业务处理
bool call(const Json::Value& params, Json::Value& result){
// 构建回调方法
// 前者放入参数,后者得到结果
_callback(params, result);
if(rtypeCheck(result) == false){
ELOG("回调处理函数中的相应信息校验失败!");
return false;
}
return true;
}
private:
// 判断 json值 类型是否符合预期
bool check(VType vtype, const Json::Value& val){
switch(vtype){
case VType::BOOLEAN : return val.isBool();
case VType::INTEGRAL : return val.isInt();
case VType::NUMERIC : return val.isNumeric();
case VType::STRING : return val.isString();
case VType::ARRAY : return val.isArray();
case VType::OBJECT : return val.isObject();
default: return false;
}
}
// 判断 json值 类型是否符合预期
bool rtypeCheck(const Json::Value& val){
return check(_return_type, val);
}
private:
std::string _method_name; // 方法名
ServiceCallback _callback; // 实际的业务回调函数
std::vector<ParamsDescribe> _params_desc; // 参数字段格式描述
VType _return_type; // 结果作为返回值类型的描述
};
// 建造者模式
// 这种模式构造出来的对象都是不可修改的,只能用来创建对象,不能修改对象内部的状态
class SDescribeFactory{
public:
void setMethodName(const std::string& name){
_method_name = name;
}
void setReturnType(VType vtype){
_return_type = vtype;
}
void setParamsDesc(const std::string& pname, VType vtype){
_params_desc.push_back(ServiceDescribe::ParamsDescribe(pname, vtype));
}
void setCallback(const ServiceDescribe::ServiceCallback& cb){
_callback = cb;
}
ServiceDescribe::ptr build() {
return std::make_shared<ServiceDescribe>(std::move(_method_name), std::move(_params_desc), _return_type, std::move(_callback));
}
private:
std::string _method_name;
ServiceDescribe::ServiceCallback _callback; // 实际的业务回调函数
std::vector<ServiceDescribe::ParamsDescribe> _params_desc; // 参数字段格式描述
VType _return_type; // 结果作为返回值类型的描述
};
// 线程安全地管理服务注册表,提供增删查功能
class ServiceManager{
public:
using ptr = std::shared_ptr<ServiceManager>;
// 增
void insert(const ServiceDescribe::ptr& desc){
std::unique_lock<std::mutex> lock(_mutex);
_services.insert(std::make_pair(desc->method(), desc));
}
// 查
ServiceDescribe::ptr select(const std::string& method_name){
std::unique_lock<std::mutex> lock(_mutex);
auto it = _services.find(method_name);
if(it == _services.end()){
return ServiceDescribe::ptr();
}
return it->second;
}
// 删
void remove(const std::string& method_name){
std::unique_lock<std::mutex> lock(_mutex);
_services.erase(method_name);
}
private:
std::mutex _mutex;
std::unordered_map<std::string, ServiceDescribe::ptr> _services;
};
class RpcRouter{
public:
using ptr = std::shared_ptr<RpcRouter>;
RpcRouter()
: _service_manager(std::make_shared<ServiceManager>())
{}
// 这是注册到 dispatcher 模块针对 rpc 请求进行回调处理的业务函数
void onRpcRequest(const BaseConnection::ptr& conn, RpcRequest::ptr& req){
// 1. 查询客户端请求的方法描述 -- 判断当前服务端能否提供对应的服务
auto service = _service_manager->select(req->method());
if(service.get() == nullptr){
ELOG("客户端请求的方法描述不存在!: %s", req->method().c_str())
return response(conn, req, Json::Value(), fields::RCode::RCODE_NOT_FOUND_SERVICE);
}
// 2. 进行参数校验,确定能否提供服务
if(service->paramcheck(req->params() == false)){
ELOG("客户端请求的参数校验失败!: %s", req->method().c_str())
return response(conn, req, Json::Value(), fields::RCode::RCODE_INVALID_PARAMS);
}
// 3. 调用业务回调接口进行业务处理
Json::Value result;
bool ret = service->call(req->params(), result);
if(ret == false){
ELOG("客户端请求的业务处理失败!: %s", req->method().c_str())
return response(conn, req, Json::Value(), fields::RCode::RCODE_INTERNAL_ERROR);
}
// 4. 处理完毕得到结果,组织相应,向客户端发送
return response(conn, req, result, fields::RCode::RCODE_OK);
}
// 注册服务描述
void registerMethod(const ServiceDescribe::ptr& service){
return _service_manager->insert(service);
}
private:
// 组织响应报文
void response(const BaseConnection::ptr& conn, RpcRequest::ptr& req, const Json::Value& rsp, RCode rcode){
auto msg = MessageFactory::create<RpcResponse>();
msg->setId(req->rid());
msg->setMType(fields::MType::RSP_RPC);
msg->setRcode(rcode);
msg->setResult(rsp);
conn->send(msg);
}
private:
ServiceManager::ptr _service_manager;
};
}
}
🏳️🌈三、客户端 Requestor 实现
3.1 模块职责划分
这段代码是 RPC 客户端的核心模块 Requestor
,负责管理请求的发送和响应的处理,支持 同步、异步 Future、异步回调 三种调用方式
方法总览
3.2 RequestDescribe:请求描述元数据
// 请求描述
struct RequestDescribe {
using ptr = std::shared_ptr<RequestDescribe>;
// 1. 请求消息
// 2. 请求类型
// 3. 接收响应
// 4. 响应回调方法
BaseMessage::ptr request;
RType rtype;
std::promise<BaseMessage::ptr> response;
RequestCallback callback;
// 1. 设置请求
// 2. 设置请求类型
// 3. 设置回调方法
// 4. 获取异步响应
void setRequest(const BaseMessage::ptr& req) { request = req; }
void setRType(RType rt) { rtype = rt; }
void setCallback(const RequestCallback& cb) { callback = cb; }
AsyncResponse asyncResponse() { return response.get_future(); }
};
用途:存储每个请求的元数据,包括请求消息、类型、结果容器(promise)或回调函数。
3.3 onResponse 方法
// 组织响应报文
void response(const BaseConnection::ptr& conn, RpcRequest::ptr& req,
const Json::Value& rsp, RCode rcode) {
auto msg = MessageFactory::create<RpcResponse>();
msg->setId(req->rid());
msg->setMType(fields::MType::RSP_RPC);
msg->setRcode(rcode);
msg->setResult(rsp);
conn->send(msg);
} // 对响应信息的处理回调
void onResponse(const BaseConnection::ptr& conn, BaseMessage::ptr& msg) {
std::string rid = msg->rid();
RequestDescribe::ptr rdp = getDescribe(rid);
if (rdp.get() == nullptr) {
ELOG("收到响应,但未找到对应的请求描述:%s", rid.c_str());
return;
}
if (rdp->rtype == fields::RType::REQ_ASYNC) {
rdp->response.set_value(msg);
} else if (rdp->rtype == fields::RType::REQ_CALLBACK) {
// 如果设置了回调函数,就调用,来处理响应
if (rdp->callback)
rdp->callback(msg);
else
ELOG("收到响应,但未设置回调函数:%s", rid.c_str());
} else {
ELOG("收到响应,但未知的请求类型:%d", rdp->rtype);
}
delDescribe(rid);
}
关键步骤:
- 查找请求描述:通过响应消息中的请求
ID
(rid) 找到对应的RequestDescribe
。 - 处理响应:
- 异步 Future:设置promise
的值,唤醒阻塞的future.get()
。
- 异步回调:调用用户提供的回调函数。 - 清理资源:从
_request_desc
中删除已处理的请求描述。
3.4 send 方法
3.4.1 同步发送(阻塞等待结果)
// 同步发送请求,通过 rsp 获取结果
// 发起同步 RPC 请求,调用方通过 rsp 获取响应结果。
bool send(const BaseConnection::ptr& conn, const BaseMessage::ptr& req,
BaseMessage::ptr& rsp) {
AsyncResponse rsp_future;
bool ret = send(conn, req, rsp_future);
if (ret == false) {
return false;
}
rsp = rsp_future.get();
return true;
}
流程:
- 调用异步
Future
接口发送请求。 - 通过
future.get()
阻塞当前线程,直到收到响应。
适用场景:需要立即获取结果的简单调用。
3.4.2 异步 Future 发送(非阻塞,返回 future)
// 同步发送请求,通过 rsp 获取结果
// 发起同步 RPC 请求,调用方通过 rsp 获取响应结果。
bool send(const BaseConnection::ptr& conn, const BaseMessage::ptr& req,
BaseMessage::ptr& rsp) {
AsyncResponse
rsp_future; // 异步发送请求,通过 future 获取结果
// 发起异步 RPC 请求,调用方通过
// async_rsp(std::future)在后续通过 get() 获取响应结果。
bool send(const BaseConnection::ptr& conn, const BaseMessage::ptr& req,
AsyncResponse& async_rsp) {
RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_ASYNC);
if (rdp.get() == nullptr) {
ELOG("创建请求描述失败!");
return false;
}
async_rsp = rdp->response.get_future();
return true;
}
bool ret = send(conn, req, rsp_future);
if (ret == false) {
return false;
}
rsp = rsp_future.get();
return true;
}
流程:
- 创建
RequestDescribe
,设置请求类型为REQ_ASYNC
。 - 返回
future
对象,调用方后续通过future.get()
获取结果。
适用场景:需要并行处理多个请求,灵活控制结果获取时机。
3.4.3 异步回调发送(非阻塞,触发回调函数)
// 异步发送请求,通过回调处理结果
// 注册回调函数 cb,当响应到达时自动触发回调处理结果。
bool send(const BaseConnection::ptr& conn, const BaseMessage::ptr& req,
const RequestCallback& cb) {
RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_CALLBACK, cb);
if (rdp.get() == nullptr) {
ELOG("回调函数设置时,创建请求描述失败!");
return false;
}
conn->send(req);
return true;
}
流程:
- 创建
RequestDescribe
,设置请求类型为REQ_CALLBACK
并绑定用户回调。 - 发送请求,响应到达时自动触发回调。
适用场景:事件驱动模型,避免阻塞主线程。
3.5 增查删方法
// 创建新的请求描述,添加到 _request_desc 中
RequestDescribe::ptr
newDescribe(const BaseMessage::ptr& req, RType rtype,
const RequestCallback& cb = RequestCallback()) {
std::unique_lock<std::mutex> lock(_mutex);
RequestDescribe::ptr rd = std::make_shared<RequestDescribe>();
rd->request = req;
rd->rtype = rtype;
if (rtype == RType::REQ_CALLBACK && cb)
rd->callback = cb;
_request_desc.insert(std::make_pair(req->rid(), rd));
return rd;
}
// 从 _request_desc 获取请求描述
RequestDescribe::ptr getDescribe(const std::string& id) {
std::unique_lock<std::mutex> lock(_mutex);
auto it = _request_desc.find(id);
if (it == _request_desc.end()) {
ELOG("没有找到请求描述");
return RequestDescribe::ptr();
}
return it->second;
}
// 删除 _request_desc 中的请求描述
void delDescribe(const std::string& id) {
std::unique_lock<std::mutex> lock(_mutex);
_request_desc.erase(id);
}
3.6 Requestor 整体代码
#pragma once
#include "../common/net.hpp"
#include "../common/message.hpp"
#include <future>
namespace rpc{
namespace client{
class Requestor{
public:
using ptr = std::shared_ptr<Requestor>;
using RequestCallback = std::function<void(const BaseMessage::ptr&)>;
using AsyncResponse = std::future<BaseMessage::ptr>;
// 请求描述
struct RequestDescribe{
using ptr = std::shared_ptr<RequestDescribe>;
// 1. 请求消息
// 2. 请求类型
// 3. 接收响应
// 4. 响应回调方法
BaseMessage::ptr request;
RType rtype;
std::promise<BaseMessage::ptr> response;
RequestCallback callback;
// 1. 设置请求
// 2. 设置请求类型
// 3. 设置回调方法
// 4. 获取异步响应
void setRequest(const BaseMessage::ptr& req) { request = req; }
void setRType(RType rt) { rtype = rt; }
void setCallback(const RequestCallback& cb) { callback = cb; }
AsyncResponse asyncResponse() { return response.get_future(); }
};
// 对响应信息的处理回调
void onResponse(const BaseConnection::ptr& conn, BaseMessage::ptr& msg){
std::string rid = msg->rid();
RequestDescribe::ptr rdp = getDescribe(rid);
if(rdp.get() == nullptr){
ELOG("收到响应,但未找到对应的请求描述:%s", rid.c_str());
return;
}
if(rdp->rtype == fields::RType::REQ_ASYNC){
rdp->response.set_value(msg);
}
else if(rdp->rtype == fields::RType::REQ_CALLBACK){
// 如果设置了回调函数,就调用,来处理响应
if(rdp->callback) rdp->callback(msg);
else ELOG("收到响应,但未设置回调函数:%s", rid.c_str());
}
else{
ELOG("收到响应,但未知的请求类型:%d", rdp->rtype);
}
delDescribe(rid);
}
// 异步发送请求,通过 future 获取结果
// 发起异步 RPC 请求,调用方通过 async_rsp(std::future)在后续通过 get() 获取响应结果。
bool send(const BaseConnection::ptr& conn, const BaseMessage::ptr& req, AsyncResponse& async_rsp){
RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_ASYNC);
if(rdp.get() == nullptr){
ELOG("创建请求描述失败!");
return false;
}
async_rsp = rdp->response.get_future();
return true;
}
// 同步发送请求,通过 rsp 获取结果
// 发起同步 RPC 请求,调用方通过 rsp 获取响应结果。
bool send(const BaseConnection::ptr& conn, const BaseMessage::ptr& req, BaseMessage::ptr& rsp){
AsyncResponse rsp_future;
bool ret = send(conn, req, rsp_future);
if(ret == false){
return false;
}
rsp = rsp_future.get();
return true;
}
// 异步发送请求,通过回调处理结果
// 注册回调函数 cb,当响应到达时自动触发回调处理结果。
bool send(const BaseConnection::ptr& conn, const BaseMessage::ptr& req, const RequestCallback& cb){
RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_CALLBACK, cb);
if(rdp.get() == nullptr){
ELOG("回调函数设置时,创建请求描述失败!");
return false;
}
conn->send(req);
return true;
}
private:
// 创建新的请求描述,添加到 _request_desc 中
RequestDescribe::ptr newDescribe(const BaseMessage::ptr& req, RType rtype, const RequestCallback& cb = RequestCallback()){
std::unique_lock<std::mutex> lock(_mutex);
RequestDescribe::ptr rd = std::make_shared<RequestDescribe>();
rd->request = req;
rd->rtype = rtype;
if(rtype == RType::REQ_CALLBACK && cb)
rd->callback = cb;
_request_desc.insert(std::make_pair(req->rid(), rd));
return rd;
}
// 从 _request_desc 获取请求描述
RequestDescribe::ptr getDescribe(const std::string& id){
std::unique_lock<std::mutex> lock(_mutex);
auto it = _request_desc.find(id);
if(it == _request_desc.end()){
ELOG("没有找到请求描述");
return RequestDescribe::ptr();
}
return it->second;
}
// 删除 _request_desc 中的请求描述
void delDescribe(const std::string& id){
std::unique_lock<std::mutex> lock(_mutex);
_request_desc.erase(id);
}
private:
std::mutex _mutex;
std::unordered_map<std::string, RequestDescribe::ptr> _request_desc;
};
}
}
🏳️🌈四、客户端 RpcCaller 方法
RpcCaller
是 RPC 客户端的高层封装,负责 组织 RPC 请求、处理响应结果,并对外提供 同步、异步 Future、异步回调 三种调用方式
2.1 模块职责划分
2.2 call 同步调用(阻塞等待结果)
// 同步调用(阻塞等待结果)
// 发送 RPC 请求并 阻塞当前线程 直到收到响应,结果直接写入 result
// 参数。
bool call(const BaseConnection::ptr& conn, const std::string& method,
const Json::Value& params, Json::Value& result) {
// 1. 组织请求
auto req_msg = MessageFactory::create<RpcRequest>();
req_msg->setId(UUID::uuid());
req_msg->setMType(MType::REQ_RPC);
req_msg->setMethod(method);
req_msg->setParams(params);
BaseMessage::ptr rsp_msg;
// 2. 发送请求
// 同步请求发送
bool ret = _requestor->send(
conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), rsp_msg);
if (ret == false) {
ELOG("同步发送请求失败");
return false;
}
// 3. 等待响应
RpcResponse::ptr rpc_rsp_msg =
std::dynamic_pointer_cast<RpcResponse>(rsp_msg);
if (!rpc_rsp_msg) {
ELOG("rpc响应,向下类型转换失败!");
return false;
}
if (rpc_rsp_msg->rcode() != RCode::RCODE_OK) {
ELOG("rpc响应,错误码:%d", static_cast<int>(rpc_rsp_msg->rcode()));
return false;
}
result = rpc_rsp_msg->result();
return true;
}
流程:
- 创建请求:生成唯一请求 ID,封装方法名和参数到 RpcRequest。
- 发送请求:通过 Requestor::send 同步发送,阻塞直到响应返回。
- 解析响应:转换为 RpcResponse,检查错误码,提取 result。
适用场景:需要立即获取结果的简单调用。
2.3 call 异步 Future 调用(非阻塞,通过 future 获取结果)
// 同步调用(阻塞等待结果)
// 发送 RPC 请求并 阻塞当前线程
// 直到收到响应,结果直接写入 result 参数。
bool call(const BaseConnection::ptr& conn, const std::string& method,
const Json::Value& params, Json::Value& result) {
// 1. 组织请求
auto req_msg = MessageFactory::create<RpcRequest>();
req_msg->setId(UUID::uuid());
req_msg->setMType(MType::REQ_RPC);
req_msg->setMethod(method);
req_msg->setParams(params);
BaseMessage::ptr rsp_msg;
// 2. 发送请求
// 同步请求发送
bool ret = _requestor->send(
conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), rsp_msg);
if (ret == false) {
ELOG("同步发送请求失败");
return false;
}
// 3. 等待响应
RpcResponse::ptr rpc_rsp_msg =
std::dynamic_pointer_cast<RpcResponse>(rsp_msg);
if (!rpc_rsp_msg) {
ELOG("rpc响应,向下类型转换失败!");
return false;
}
if (rpc_rsp_msg->rcode() != RCode::RCODE_OK) {
ELOG("rpc响应,错误码:%d", static_cast<int>(rpc_rsp_msg->rcode()));
return false;
}
result = rpc // 异步 Future 调用(非阻塞,通过 future 获取结果)
// 发送 RPC 请求并立即返回,调用方通过 future
// 异步获取结果
bool
call(const BaseConnection::ptr& conn, const std::string& method,
const Json::Value& params, std::future<Json::Value>& result) {
// 向服务器发送异步回调请求,设置回调函数,回调函数中会传入一个 promise
// 对象,在回调函数中去对 promise 设置数据
auto req_msg = MessageFactory::create<RpcRequest>();
req_msg->setId(UUID::uuid());
req_msg->setMType(MType::REQ_RPC);
req_msg->setMethod(method);
req_msg->setParams(params);
auto json_promise = std::make_shared<std::promise<Json::Value>>();
result = json_promise->get_future();
Requestor::RequestCallback cb =
std::bind(&RpcCaller::FutureCallback, this, json_promise,
std::placeholders::_1);
// 异步请求发送
bool ret = _requestor->send(
conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), cb);
if (ret == false) {
ELOG("异步发送请求失败");
return false;
}
return true;
}
_rsp_msg->result();
return true;
}
// 处理异步 Future 调用的响应,将结果设置到 promise 中
void FutureCallback(std::shared_ptr<std::promise<Json::Value>> result,
const BaseMessage::ptr& msg) {
RpcResponse::ptr rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponse>(msg);
if (!rpc_rsp_msg) {
ELOG("rpc响应,向下类型转换失败!");
return;
}
if (rpc_rsp_msg->rcode() != RCode::RCODE_OK) {
ELOG("rpc异步请求出错:%s", errReason(rpc_rsp_msg->rcode()));
return;
}
result->set_value(rpc_rsp_msg->result());
}
流程:
- 创建请求:同同步调用。
- 绑定异步结果:通过 promise/future 传递结果。
- 注册回调:当响应到达时,FutureCallback 将结果设置到 promise。
- 返回 Future:用户通过 future.get() 异步获取结果。
适用场景:需要并行处理多个请求,灵活控制结果获取时机。
2.4 call 异步回调调用(非阻塞,通过回调函数处理结果)
// 异步回调调用(非阻塞,通过回调函数处理结果)
// 发送 RPC 请求并立即返回,响应到达时 触发回调函数 cb
// 处理结果
bool call(const BaseConnection::ptr& conn, const std::string& method,
const Json::Value& params, const JsonResponseCallback& cb) {
auto req_msg = MessageFactory::create<RpcRequest>();
req_msg->setId(UUID::uuid());
req_msg->setMType(MType::REQ_RPC);
req_msg->setMethod(method);
req_msg->setParams(params);
Requestor::RequestCallback req_cb =
std::bind(&RpcCaller::Callback, this, cb, std::placeholders::_1);
bool ret = _requestor->send(
conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), req_cb);
if (ret == false) {
ELOG("回调Rpc请求失败!");
return false;
}
return true;
}
// 处理异步回调调用的响应,调用用户提供的回调函数 cb
void Callback(const JsonResponseCallback& cb, const BaseMessage::ptr& msg) {
RpcResponse::ptr rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponse>(msg);
if (!rpc_rsp_msg) {
ELOG("rpc响应,向下类型转换失败!");
return;
}
if (rpc_rsp_msg->rcode() != RCode::RCODE_OK) {
ELOG("rpc异步请求出错:%s", errReason(rpc_rsp_msg->rcode()));
return;
}
cb(rpc_rsp_msg->result());
}
流程:
- 创建请求:同同步调用。
- 注册用户回调:当响应到达时,Callback 解析结果并调用用户提供的回调函数。
适用场景:事件驱动模型,避免阻塞主线程。
2.5 RpcCaller 整体代码
// 请求 Rpc 请求接口
#pragma once
#include "requestor.hpp"
namespace rpc{
namespace client{
class RpcCaller{
public:
using ptr = std::shared_ptr<RpcCaller>;
using JsonAsyncResponse = std::future<Json::Value>;
using JsonResponseCallback = std::function<void(const Json::Value&)>;
RpcCaller(const Requestor::ptr& requestor)
: _requestor(requestor)
{}
// requestor 中的处理 send 里面的回调是针对 BaseMessage 进行处理的
// 用于在 rpccaller 中针对结果的处理是针对 RpcResponse 里边的 result 进行的
// 同步调用(阻塞等待结果)
// 发送 RPC 请求并 阻塞当前线程 直到收到响应,结果直接写入 result 参数。
bool call(const BaseConnection::ptr& conn, const std::string& method, const Json::Value& params, Json::Value& result){
// 1. 组织请求
auto req_msg = MessageFactory::create<RpcRequest>();
req_msg->setId(UUID::uuid());
req_msg->setMType(MType::REQ_RPC);
req_msg->setMethod(method);
req_msg->setParams(params);
BaseMessage::ptr rsp_msg;
// 2. 发送请求
// 同步请求发送
bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), rsp_msg);
if(ret == false){
ELOG("同步发送请求失败");
return false;
}
// 3. 等待响应
RpcResponse::ptr rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponse>(rsp_msg);
if(!rpc_rsp_msg){
ELOG("rpc响应,向下类型转换失败!");
return false;
}
if(rpc_rsp_msg->rcode() != RCode::RCODE_OK){
ELOG("rpc响应,错误码:%d", static_cast<int>(rpc_rsp_msg->rcode()));
return false;
}
result = rpc_rsp_msg->result();
return true;
}
// 异步 Future 调用(非阻塞,通过 future 获取结果)
// 发送 RPC 请求并立即返回,调用方通过 future 异步获取结果
bool call(const BaseConnection::ptr& conn, const std::string& method, const Json::Value& params, std::future<Json::Value>& result){
// 向服务器发送异步回调请求,设置回调函数,回调函数中会传入一个 promise 对象,在回调函数中去对 promise 设置数据
auto req_msg = MessageFactory::create<RpcRequest>();
req_msg->setId(UUID::uuid());
req_msg->setMType(MType::REQ_RPC);
req_msg->setMethod(method);
req_msg->setParams(params);
auto json_promise = std::make_shared<std::promise<Json::Value>>();
result = json_promise->get_future();
Requestor::RequestCallback cb = std::bind(&RpcCaller::FutureCallback, this, json_promise, std::placeholders::_1);
// 异步请求发送
bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), cb);
if(ret == false){
ELOG("异步发送请求失败");
return false;
}
return true;
}
// 异步回调调用(非阻塞,通过回调函数处理结果)
// 发送 RPC 请求并立即返回,响应到达时 触发回调函数 cb 处理结果
bool call(const BaseConnection::ptr& conn, const std::string& method, const Json::Value ¶ms, const JsonResponseCallback &cb){
auto req_msg = MessageFactory::create<RpcRequest>();
req_msg->setId(UUID::uuid());
req_msg->setMType(MType::REQ_RPC);
req_msg->setMethod(method);
req_msg->setParams(params);
Requestor::RequestCallback req_cb = std::bind(&RpcCaller::Callback, this, cb, std::placeholders::_1);
bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), req_cb);
if (ret == false){
ELOG("回调Rpc请求失败!");
return false;
}
return true;
}
private:
// 处理异步 Future 调用的响应,将结果设置到 promise 中
void FutureCallback(std::shared_ptr<std::promise<Json::Value>> result, const BaseMessage::ptr& msg){
RpcResponse::ptr rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponse>(msg);
if(!rpc_rsp_msg){
ELOG("rpc响应,向下类型转换失败!");
return;
}
if(rpc_rsp_msg->rcode() != RCode::RCODE_OK){
ELOG("rpc异步请求出错:%s", errReason(rpc_rsp_msg->rcode()));
return;
}
result->set_value(rpc_rsp_msg->result());
}
// 处理异步回调调用的响应,调用用户提供的回调函数 cb
void Callback(const JsonResponseCallback& cb, const BaseMessage::ptr& msg){
RpcResponse::ptr rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponse>(msg);
if(!rpc_rsp_msg){
ELOG("rpc响应,向下类型转换失败!");
return;
}
if(rpc_rsp_msg->rcode() != RCode::RCODE_OK){
ELOG("rpc异步请求出错:%s", errReason(rpc_rsp_msg->rcode()));
return;
}
cb(rpc_rsp_msg->result());
}
private:
Requestor::ptr _requestor;
};
}
}
👥总结
本篇博文对 从零实现Json-Rpc框架】- 项目实现 - 基于Dispatcher模块的RPC框架 做了一个较为详细的介绍,不知道对你有没有帮助呢
觉得博主写得还不错的三连支持下吧!会继续努力的~