📢博客主页:https://blog.csdn.net/2301_779549673
📢博客仓库:https://gitee.com/JohnKingW/linux_test/tree/master/lesson
📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
📢本文由 JohnKi 原创,首发于 CSDN🙉
📢未来很长,值得我们全力奔赴更美好的生活✨
文章目录
- 📢前言
- 🏳️🌈一、服务端主题管理模块
- 1.1 核心功能
- 1.2 核心设计思路
- 1.3 主题 结构构造
- 1.4 订阅者 结构构造
- 1.5 主题 和 订阅者 统筹管理
- (1) onTopicRequest
- (2) onShutdown
- (3) 细节实现方法
- 1.8 框架代码
- 1.7 整体代码
- 🏳️🌈二、服务端整体实现
- 2.1 逻辑框架
- 2.1.1 注册中心(RegistryServer)
- 2.1.2 RPC 服务核心(RpcServer)
- 2.1.3 主题服务(TopicServer)
- 2.2 逻辑代码
- 2.3 整体代码
- 👥总结
📢前言
截至现在,在项目实现上,我们已经封装好了 零碎接口,对 各种消息 及 常用结构体 进行了封装
也实现了 dispatcher 路由转发的功能。
在 服务端 方面,完成了
- 业务函数回调总结 - rpc_route.hpp
- 服务的提供、发现、注册 - rpc_registry.hpp
在 客户端 方面,完成了
- 消息请求及其回调 - requestor.hpp
- 消息请求发送 - rpc_caller.hpp
服务端 未完成部分
- 服务端主题管理模块 - rpc_topic.hpp
- 服务端功能整合 - rpc_server.hpp
客户端 未完成部分
- 客户的提供、发现、注册 - rpc_registry.hpp
- 客户端主题管理模块 - rpc_topic.hpp
- 客户端功能整合 - rpc_client.hpp
这一篇文章,笔者就为服务端的 主题实现 和 整体封装 画上句号
🏳️🌈一、服务端主题管理模块
1.1 核心功能
这段代码实现了一个 服务端主题管理模块(TopicManager),支持 发布-订阅模式 的核心功能,包括:
- 主题的创建与删除
- 客户端的订阅与取消订阅
- 消息的发布与推送
- 连接断开时的自动清理
1.2 核心设计思路
线程安全:通过 std::mutex
保护共享数据(_topics 和 _subscribers)。
数据映射:
_topics
:维护主题名称到Topic
对象的映射。_subscribers
:维护客户端连接到Subscriber
对象的映射。
操作统一入口:通过 onTopicRequest
分发不同类型的主题操作请求。
1.3 主题 结构构造
作用:表示一个主题,管理其所有订阅者。
成员:
_subscribers
:存储所有订阅者的集合(Subscriber::ptr)。
方法:
appendSubscriber / removeSubscriber
:增删订阅者。pushMessage
:向所有订阅者发送消息。
// 主题名称 和 其订阅者连接 的映射
struct Topic {
using ptr = std::shared_ptr<Topic>;
std::mutex _mutex;
std::string _topic_name;
std::unordered_set<Subscriber::ptr> _subscribers; // 当前主题的订阅者的连接
Topic(const std::string& topic_name) : _topic_name(topic_name) {}
// 增加订阅者
void appendSubscriber(const Subscriber::ptr& subscriber);
// 删除订阅者
void removeSubscriber(const Subscriber::ptr& subscriber);
// 推送消息
void pushMessage(const BaseMessage::ptr& msg);
};
1.4 订阅者 结构构造
作用:表示一个订阅者,记录其订阅的主题。
成员:
_conn
:订阅者的网络连接对象。_topics
:订阅者当前订阅的所有主题名称集合。
方法:
appendTopic / removeTopic
:增删订阅的主题。
// 定义一个订阅者对象,记录其订阅的 所有主题名称
struct Subscriber {
using ptr = std::shared_ptr<Subscriber>;
std::mutex _mutex;
BaseConnection::ptr _conn;
std::unordered_set<std::string> _topics; // 该订阅者所订阅的主题名称
Subscriber(const BaseConnection::ptr& conn) : _conn(conn) {};
// 增加订阅的主题
void appendTopic(const std::string& topic);
// 删除订阅的主题
void removeTopic(const std::string& topic);
};
1.5 主题 和 订阅者 统筹管理
我们需要建立两个映射关系
- 主题名 -> 主题结构
- 订阅者连接 -> 订阅者结构
std::mutex _mutex;
std::unordered_map<std::string, Topic::ptr> _topics;
std::unordered_map<BaseConnection::ptr, Subscriber::ptr> _subscribers;
核心接口解析
(1) onTopicRequest
功能:处理客户端发送的主题操作请求(总入口)。
操作类型:
TOPIC_CREATE
:调用 topicCreate 创建主题。TOPIC_REMOVE
:调用 topicRemove 删除主题。TOPIC_SUBSCRIBE
:调用 topicSubscribe 订阅主题。TOPIC_CANCEL
:调用 topicCancel 取消订阅。TOPIC_PUBLISH
:调用 topicPublish 发布消息。错误处理
:若操作失败,返回错误响应(errorResponse)。
(2) onShutdown
功能:处理客户端连接断开时的清理逻辑。
步骤:
- 从
_subscribers
中移除订阅者。 - 遍历订阅者的所有主题,从主题的订阅列表中移除该订阅者。
(3) 细节实现方法
变更类
// 构造一个主题对象,添加映射关系的管理
void topicCreate(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg);
// 删除一个主题对象,删除映射关系的管理
void topicRemove(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg);
// 将订阅者订阅到指定主题。
bool topicSubscribe(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg);
// 取消该主题的订阅者
void topicCancel(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg);
// 向各个该主题的 订阅者 发布主题消息
bool topicPublish(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg);
返回类
// 返回一个错误响应
void errorResponse(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg, RCode rcode);
// 返回一个主题响应
void topicResponse(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg);
1.8 框架代码
#pragma once
#include "../common/net.hpp"
#include "../common/message.hpp"
#include <unordered_set>
namespace rpc
{
namespace server
{
class TopicManager
{
public:
using ptr = std::shared_ptr<TopicManager>;
// 统一处理有关主题的请求
void onTopicRequest(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg);
// 关闭连接
void onShutdown(const BaseConnection::ptr &conn);
private:
// 返回一个错误响应
void errorResponse(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg, RCode rcode);
// 返回一个主题响应
void topicResponse(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg);
private:
// 构造一个主题对象,添加映射关系的管理
void topicCreate(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg);
// 删除一个主题对象,删除映射关系的管理
void topicRemove(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg);
// 将订阅者订阅到指定主题。
bool topicSubscribe(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg);
// 取消该主题的订阅者
void topicCancel(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg);
// 向各个该主题的 订阅者 发布主题消息
bool topicPublish(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg);
private:
// 定义一个订阅者对象,记录其订阅的 所有主题名称
struct Subscriber
{
using ptr = std::shared_ptr<Subscriber>;
std::mutex _mutex;
BaseConnection::ptr _conn;
std::unordered_set<std::string> _topics; // 该订阅者所订阅的主题名称
Subscriber(const BaseConnection::ptr &conn) : _conn(conn) {};
// 增加订阅的主题
void appendTopic(const std::string &topic);
// 删除订阅的主题
void removeTopic(const std::string &topic);
};
// 主题名称 和 其订阅者连接 的映射
struct Topic
{
using ptr = std::shared_ptr<Topic>;
std::mutex _mutex;
std::string _topic_name;
std::unordered_set<Subscriber::ptr> _subscribers; // 当前主题的订阅者的连接
Topic(const std::string &topic_name) : _topic_name(topic_name) {}
// 增加订阅者
void appendSubscriber(const Subscriber::ptr &subscriber);
// 删除订阅者
void removeSubscriber(const Subscriber::ptr &subscriber);
// 推送消息
void pushMessage(const BaseMessage::ptr &msg);
};
private:
std::mutex _mutex;
std::unordered_map<std::string, Topic::ptr> _topics;
std::unordered_map<BaseConnection::ptr, Subscriber::ptr> _subscribers;
};
}
}
1.7 整体代码
#pragma once
#include "../common/net.hpp"
#include "../common/message.hpp"
#include <unordered_set>
namespace rpc
{
namespace server
{
class TopicManager
{
public:
using ptr = std::shared_ptr<TopicManager>;
// 统一处理有关主题的请求
void onTopicRequest(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg)
{
TopicOptype topic_optype = msg->optype();
bool ret = true;
switch(topic_optype){
case TopicOptype::TOPIC_CREATE: topicCreate(conn, msg); break; // 主题创建
case TopicOptype::TOPIC_REMOVE: topicRemove(conn, msg); break; // 主题删除
case TopicOptype::TOPIC_SUBSCRIBE: ret = topicSubscribe(conn, msg); break; // 主题订阅
case TopicOptype::TOPIC_CANCEL: topicCancel(conn, msg); break; // 主题取消订阅
case TopicOptype::TOPIC_PUBLISH: ret = topicPublish(conn, msg); break; // 主题发布
default: return errorResponse(conn, msg, RCode::RCODE_INVALID_OPTYPE); break;
}
if(!ret) return errorResponse(conn, msg, RCode::RCODE_NOT_FOUND_TOPIC);
return topicResponse(conn, msg);
}
// 关闭连接
void onShutdown(const BaseConnection::ptr &conn)
{
// 消息发布者断开连接,不需要任何操作
// 1. 判断断开连接的是否为订阅者,不是的话直接返回
// 2. 获取到订阅者退出,受影响的主题对象
// 3. 从主题对象中,移除订阅者
// 4. 从订阅者映射信息中,删除订阅者
std::vector<Topic::ptr> topics;
Subscriber::ptr subscriber;
{
std::unique_lock<std::mutex> lock(_mutex);
auto sub_it = _subscribers.find(conn);
if(sub_it == _subscribers.end()){
ELOG("该订阅者连接不存在");
return;
}
// 获取该订阅者锁定月的主题
subscriber = sub_it->second;
// 2. 获取到订阅者退出,受影响的主题对象
for(auto& topic_name : subscriber->_topics){
auto topic_it = _topics.find(topic_name);
if(topic_it == _topics.end())
continue;
topics.push_back(topic_it->second);
}
_subscribers.erase(sub_it);
}
}
private:
void errorResponse(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg, RCode rcode){
auto msg_rsp = MessageFactory::create<TopicResponse>();
msg_rsp->setId(msg->rid());
msg_rsp->setMType(MType::RSP_TOPIC);
msg_rsp->setRcode(rcode);
return conn->send(msg_rsp);
}
void topicResponse(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg){
auto msg_rsp = MessageFactory::create<TopicResponse>();
msg_rsp->setId(msg->rid());
msg_rsp->setMType(MType::RSP_TOPIC);
msg_rsp->setRcode(RCode::RCODE_OK);
return conn->send(msg_rsp);
}
private:
// 构造一个主题对象,添加映射关系的管理
void topicCreate(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg)
{
std::unique_lock<std::mutex> lock(_mutex);
// 获取主题名字
std::string topic_name = msg->topicKey();
// 构造一个主题对象
auto topic = std::make_shared<Topic>(topic_name);
// 增加订阅者
_topics.insert(std::make_pair(topic_name, topic));
std::cout << "创建主题" << topic_name << std::endl;
}
// 删除一个主题对象,删除映射关系的管理
void topicRemove(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg)
{
// 1. 查看当前主题,有哪些订阅者,然后从订阅者中将主题信息删掉
// 2. 删除主题的数据 -- 主题名称余出题对象的映射
std::string topic_name = msg->topicKey();
std::unordered_set<Subscriber::ptr> subscribers; // 记录 当前主题 的 所有订阅者连接
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _topics.find(msg->topicKey());
if (it == _topics.end())
{
ELOG("没有找到 %s 主题的订阅者", msg->topicKey().c_str());
return;
}
subscribers = it->second->_subscribers;
_topics.erase(it); // 删除主题对象
}
for (auto &subscriber : subscribers)
{
subscriber->removeTopic(topic_name);
}
}
// 将订阅者订阅到指定主题。
bool topicSubscribe(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg)
{
Topic::ptr topic;
Subscriber::ptr subscriber;
{
std::unique_lock<std::mutex> lock(_mutex);
// 1. 查找或创建订阅者对象(Subscriber)
auto topic_it = _topics.find(msg->topicKey());
if (topic_it == _topics.end())
{
return false;
}
topic = topic_it->second;
auto sub_it = _subscribers.find(conn);
if (sub_it != _subscribers.end())
{
subscriber = sub_it->second;
}
else
{
subscriber = std::make_shared<Subscriber>(conn);
_subscribers.insert(std::make_pair(conn, subscriber));
}
}
// 2. 在主题对象中,新增一个订阅者对象关联的连接; 在订阅者对象中新增一个订阅的主题
topic->appendSubscriber(subscriber);
subscriber->appendTopic(msg->topicKey());
return true;
}
// 取消该主题的订阅者
void topicCancel(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){
// 1. 先找出主题对象,和订阅者对象
Topic::ptr topic;
Subscriber::ptr subscriber;
{
std::unique_lock<std::mutex> lock(_mutex);
auto topic_it = _topics.find(msg->topicKey());
if (topic_it != _topics.end())
{
topic = topic_it->second;
}
auto sub_it = _subscribers.find(conn);
if (sub_it != _subscribers.end())
{
subscriber = sub_it->second;
}
}
// 2. 从主对象中删除当前的订阅者连接
if(subscriber)
subscriber->removeTopic(msg->topicKey());
if(topic && subscriber)
topic->removeSubscriber(subscriber);
}
// 向各个该主题的 订阅者 发布主题消息
bool topicPublish(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){
Topic::ptr topic;
{
std::unique_lock<std::mutex> lock(_mutex);
auto topic_it = _topics.find(msg->topicKey());
if (topic_it == _topics.end()){
ELOG("没有找到 %s 主题的订阅者", msg->topicKey().c_str());
return false;
}
topic = topic_it->second;
}
topic->pushMessage(msg);
return true;
}
private:
// 定义一个订阅者对象,记录其订阅的 所有主题名称
struct Subscriber
{
using ptr = std::shared_ptr<Subscriber>;
std::mutex _mutex;
BaseConnection::ptr _conn;
std::unordered_set<std::string> _topics; // 该订阅者所订阅的主题名称
Subscriber(const BaseConnection::ptr &conn) : _conn(conn) {};
// 增加订阅的主题
void appendTopic(const std::string &topic)
{
std::unique_lock<std::mutex> lock(_mutex);
_topics.insert(topic);
}
void removeTopic(const std::string &topic)
{
std::unique_lock<std::mutex> lock(_mutex);
_topics.erase(topic);
}
};
// 主题名称 和 其订阅者连接 的映射
struct Topic
{
using ptr = std::shared_ptr<Topic>;
std::mutex _mutex;
std::string _topic_name;
std::unordered_set<Subscriber::ptr> _subscribers; // 当前主题的订阅者的连接
Topic(const std::string &topic_name) : _topic_name(topic_name) {}
// 增加订阅者
void appendSubscriber(const Subscriber::ptr &subscriber)
{
std::unique_lock<std::mutex> lock(_mutex);
_subscribers.insert(subscriber);
}
// 删除订阅者
void removeSubscriber(const Subscriber::ptr &subscriber)
{
std::unique_lock<std::mutex> lock(_mutex);
_subscribers.erase(subscriber);
}
// 推送消息
void pushMessage(const BaseMessage::ptr &msg)
{
std::unique_lock<std::mutex> lock(_mutex);
for (auto &subscriber : _subscribers)
{
subscriber->_conn->send(msg);
}
}
};
private:
std::mutex _mutex;
std::unordered_map<std::string, Topic::ptr> _topics;
std::unordered_map<BaseConnection::ptr, Subscriber::ptr> _subscribers;
};
}
}
🏳️🌈二、服务端整体实现
2.1 逻辑框架
一个 分布式
RPC 服务端系统
包含三个核心模块:服务注册中心、RPC 服务核心 和 主题服务。
以下是各模块的作用及整体架构:
2.1.1 注册中心(RegistryServer)
功能:
- 服务注册与发现:接收服务提供者(Provider)注册的服务信息,供消费者(Consumer)查询可用服务。
- 连接管理:在客户端断开时清理相关资源(如服务下线通知)。
关键成员:
_pd_manager
:管理服务注册与发现的业务逻辑(如维护服务列表)。_dispatcher
:分发客户端请求到对应的处理逻辑。_server
:底层网络服务器,监听端口并处理连接。
使用场景:
- 服务提供者启动时向注册中心注册自身服务。
- 消费者通过注册中心查询可用的服务地址和方法。
2.1.2 RPC 服务核心(RpcServer)
功能:
- RPC 服务管理:启动 RPC 服务,处理客户端调用请求。
- 服务注册(可选):将服务方法注册到注册中心(若启用)。
- 请求路由:将 RPC 请求路由到对应的业务处理逻辑。
关键成员:
_router
:路由请求到具体的服务方法(如根据方法名匹配处理函数)。_req_client
:与注册中心通信的客户端(用于服务注册或发现)。_dispatcher
:分发网络消息到业务逻辑。
配置选项:
_enableRegistry
:是否启用注册中心(决定是否自动注册服务)。
2.1.3 主题服务(TopicServer)
功能:
- 发布-订阅模式:管理主题的创建、订阅和消息推送。
- 消息广播:向订阅特定主题的客户端推送消息。
关键成员:
_topic_manager
:管理主题和订阅者(如 TopicManager 类)。_dispatcher
:处理客户端的订阅/发布请求。
2.2 逻辑代码
#pragma once
#include "../common/dispatcher.hpp"
#include "../client/rpc_client.hpp"
#include "rpc_route.hpp"
#include "rpc_registry.hpp"
#include "rpc_topic.hpp"
namespace rpc
{
namespace server
{
// 注册中心的服务端实现
// 启动服务:监听指定端口,接收客户端(服务提供者/发现者)连接。
// 消息路由:将服务注册/发现请求分发给业务处理器(PDManager)。
// 连接管理:在客户端断开时清理相关资源(如服务下线通知)。
class RegistryServer
{
// 注册中心服务端,只需要针对服务注册与发现请求进行处理即可
public:
using ptr = std::shared_ptr<RegistryServer>;
RegistryServer(int port);
void start();
private:
void onConnShutdown(const BaseConnection::ptr &conn);
private:
ProviderDiscovererManager::ptr _pd_manager;
Dispatcher::ptr _dispatcher;
BaseServer::ptr _server;
};
// RPC 服务端核心类,负责管理 RPC 服务的生命周期
// 启动 RPC 服务:监听指定端口,处理客户端 RPC 请求。
// 服务注册(可选):将服务方法注册到注册中心,供客户端发现。
// 请求路由:将接收到的 RPC 请求分发给对应的业务处理逻辑。
class RpcServer
{
public:
using ptr = std::shared_ptr<RpcServer>;
RpcServer(const Address &access_addr, bool enableRegistry = false, const Address ®istry_server_addr = Address());
// 注册服务到注册中心
void registerMethod(const ServiceDescribe::ptr &service);
void start();
private:
bool _enableRegistry;
Address _access_addr;
rpc::client::RegistryClient::ptr _req_client;
RpcRouter::ptr _router;
Dispatcher::ptr _dispatcher;
BaseServer::ptr _server;
};
class TopicServer
{
public:
using ptr = std::shared_ptr<TopicServer>;
TopicServer(int port);
void start();
private:
void onConnShutdown(const BaseConnection::ptr &conn);
private:
TopicManager::ptr _topic_manager;
Dispatcher::ptr _dispatcher;
BaseServer::ptr _server;
};
}
}
2.3 整体代码
#pragma once
#include "../common/dispatcher.hpp"
#include "../client/rpc_client.hpp"
#include "rpc_route.hpp"
#include "rpc_registry.hpp"
#include "rpc_topic.hpp"
namespace rpc{
namespace server{
// 注册中心的服务端实现
// 启动服务:监听指定端口,接收客户端(服务提供者/发现者)连接。
// 消息路由:将服务注册/发现请求分发给业务处理器(PDManager)。
// 连接管理:在客户端断开时清理相关资源(如服务下线通知)。
class RegistryServer{
// 注册中心服务端,只需要针对服务注册与发现请求进行处理即可
public:
using ptr = std::shared_ptr<RegistryServer>;
RegistryServer(int port)
: _pd_manager(std::make_shared<ProviderDiscovererManager>()),
_dispatcher(std::make_shared<Dispatcher>())
{
// 1. 注册服务请求处理器
// 将 PDManager::onServiceRequest 绑定到 MType::REQ_SERVICE 消息类型。当收到服务注册或发现请求时,调用此方法处理
auto service_cb = std::bind(&ProviderDiscovererManager::onServiceRequest, _pd_manager.get(), std::placeholders::_1, std::placeholders::_2);
_dispatcher->registerHandler<ServiceRequest>(MType::REQ_SERVICE, service_cb);
// 2. 创建底层服务器并设置回调
// 通过 ServerFactory 创建底层服务器,设置消息总入口为 Dispatcher::onMessage
_server = rpc::ServerFactory::create(port);
auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);
_server->setMessageCallback(message_cb);
// 3. 设置连接关闭回调
// 当客户端断开连接时,调用 onConnShutdown 清理相关资源
auto close_cb = std::bind(&RegistryServer::onConnShutdown, this, std::placeholders::_1);
_server->setCloseCallback(close_cb);
}
void start(){
_server->start(); // 启动服务器,开始监听端口
}
private:
void onConnShutdown(const BaseConnection::ptr& conn){
_pd_manager->onConnShutdown(conn); // 通知 PDManager 处理连接断开
}
private:
ProviderDiscovererManager::ptr _pd_manager;
Dispatcher::ptr _dispatcher;
BaseServer::ptr _server;
};
// RPC 服务端核心类,负责管理 RPC 服务的生命周期
// 启动 RPC 服务:监听指定端口,处理客户端 RPC 请求。
// 服务注册(可选):将服务方法注册到注册中心,供客户端发现。
// 请求路由:将接收到的 RPC 请求分发给对应的业务处理逻辑。
class RpcServer{
public:
using ptr = std::shared_ptr<RpcServer>;
RpcServer(const Address& access_addr, bool enableRegistry = false, const Address& registry_server_addr = Address())
: _enableRegistry(enableRegistry),
_access_addr(access_addr),
_router(std::make_shared<rpc::server::RpcRouter>()),
_dispatcher(std::make_shared<Dispatcher>())
{
// 1. 创建注册客户端(若启用注册)
if(enableRegistry){
_req_client = std::make_shared<client::RegistryClient>(
registry_server_addr.first, registry_server_addr.second);
}
// 2. 注册 RPC 请求处理回调
auto rpc_cb = std::bind(&RpcRouter::onRpcRequest, _router.get(), std::placeholders::_1, std::placeholders::_2);
_dispatcher->registerHandler<RpcRequest>(MType::REQ_RPC, rpc_cb);
// 3. 创建底层服务器并设置回调
_server = rpc::ServerFactory::create(access_addr.second);
auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);
_server->setMessageCallback(message_cb);
}
// 注册服务到注册中心
void registerMethod(const ServiceDescribe::ptr& service){
if(_enableRegistry)
_req_client->registryMethod(service->method(), _access_addr);
_router->registerMethod(service);
}
void start(){
_server->start();
}
private:
bool _enableRegistry;
Address _access_addr;
rpc::client::RegistryClient::ptr _req_client;
RpcRouter::ptr _router;
Dispatcher::ptr _dispatcher;
BaseServer::ptr _server;
};
class TopicServer{
public:
using ptr = std::shared_ptr<TopicServer>;
TopicServer(int port)
: _topic_manager(std::make_shared<TopicManager>()),
_dispatcher(std::make_shared<Dispatcher>())
{
// 1. 注册主题请求处理器
auto topic_cb = std::bind(&TopicManager::onTopicRequest, _topic_manager.get(), std::placeholders::_1, std::placeholders::_2);
_dispatcher->registerHandler<TopicRequest>(MType::REQ_TOPIC, topic_cb);
// 2. 创建底层服务器并设置回调
_server = ServerFactory::create(port);
auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);
_server->setMessageCallback(message_cb);
// 3. 设置连接关闭回调
auto close_cb = std::bind(&TopicServer::onConnShutdown, this, std::placeholders::_1);
_server->setCloseCallback(close_cb);
}
void start(){
_server->start();
}
private:
void onConnShutdown(const BaseConnection::ptr& conn){
_topic_manager->onShutdown(conn);
}
private:
TopicManager::ptr _topic_manager;
Dispatcher::ptr _dispatcher;
BaseServer::ptr _server;
};
}
}
👥总结
本篇博文对 从零实现Json-Rpc框架】- 项目实现 - 服务端主题实现及整体封装 做了一个较为详细的介绍,不知道对你有没有帮助呢
觉得博主写得还不错的三连支持下吧!会继续努力的~