【从零实现Json-Rpc框架】- 项目实现 - 服务端主题实现及整体封装

news2025/4/9 14:56:15

📢博客主页:https://blog.csdn.net/2301_779549673
📢博客仓库:https://gitee.com/JohnKingW/linux_test/tree/master/lesson
📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
📢本文由 JohnKi 原创,首发于 CSDN🙉
📢未来很长,值得我们全力奔赴更美好的生活✨

在这里插入图片描述

在这里插入图片描述

文章目录

  • 📢前言
  • 🏳️‍🌈一、服务端主题管理模块​
    • 1.1 核心功能
    • 1.2 核心设计思路
    • 1.3 主题 结构构造
    • 1.4 订阅者 结构构造
    • 1.5 主题 和 订阅者 统筹管理
      • (1) onTopicRequest
      • (2) onShutdown
      • (3) 细节实现方法
    • 1.8 框架代码
    • 1.7 整体代码
  • 🏳️‍🌈二、服务端整体实现
    • 2.1 逻辑框架
    • 2.1.1 注册中心(RegistryServer)​
    • 2.1.2 RPC 服务核心(RpcServer)​
      • 2.1.3 主题服务(TopicServer)​
    • 2.2 逻辑代码
    • 2.3 整体代码
  • 👥总结


📢前言

截至现在,在项目实现上,我们已经封装好了 零碎接口,对 各种消息 及 常用结构体 进行了封装

也实现了 dispatcher 路由转发的功能。

服务端 方面,完成了

  • 业务函数回调总结 - rpc_route.hpp
  • 服务的提供、发现、注册 - rpc_registry.hpp

客户端 方面,完成了

  • 消息请求及其回调 - requestor.hpp
  • 消息请求发送 - rpc_caller.hpp

服务端 未完成部分

  • 服务端主题管理模块 - rpc_topic.hpp
  • 服务端功能整合 - rpc_server.hpp

客户端 未完成部分

  • 客户的提供、发现、注册 - rpc_registry.hpp
  • 客户端主题管理模块 - rpc_topic.hpp
  • 客户端功能整合 - rpc_client.hpp

这一篇文章,笔者就为服务端的 主题实现整体封装 画上句号


🏳️‍🌈一、服务端主题管理模块​

1.1 核心功能

这段代码实现了一个 ​服务端主题管理模块​(TopicManager),支持 ​发布-订阅模式 的核心功能,包括:

  • 主题的创建与删除
  • 客户端的订阅与取消订阅
  • 消息的发布与推送
  • 连接断开时的自动清理

1.2 核心设计思路

​线程安全:通过 std::mutex 保护共享数据(_topics 和 _subscribers)。
​数据映射

  • _topics:维护主题名称到 Topic 对象的映射。
  • _subscribers:维护客户端连接到 Subscriber 对象的映射。

​操作统一入口:通过 onTopicRequest 分发不同类型的主题操作请求。

1.3 主题 结构构造

作用:表示一个主题,管理其所有订阅者。
​成员

  • _subscribers:存储所有订阅者的集合(Subscriber::ptr)。

​方法

  • appendSubscriber / removeSubscriber:增删订阅者。
  • pushMessage:向所有订阅者发送消息。
// 主题名称 和 其订阅者连接 的映射
struct Topic {
    using ptr = std::shared_ptr<Topic>;
    std::mutex _mutex;
    std::string _topic_name;
    std::unordered_set<Subscriber::ptr> _subscribers; // 当前主题的订阅者的连接

    Topic(const std::string& topic_name) : _topic_name(topic_name) {}

    // 增加订阅者
    void appendSubscriber(const Subscriber::ptr& subscriber);

    // 删除订阅者
    void removeSubscriber(const Subscriber::ptr& subscriber);

    // 推送消息
    void pushMessage(const BaseMessage::ptr& msg);
};

1.4 订阅者 结构构造

作用:表示一个订阅者,记录其订阅的主题。
​成员

  • _conn:订阅者的网络连接对象。
  • _topics:订阅者当前订阅的所有主题名称集合。

​方法

  • appendTopic / removeTopic:增删订阅的主题。
// 定义一个订阅者对象,记录其订阅的 所有主题名称
struct Subscriber {
    using ptr = std::shared_ptr<Subscriber>;
    std::mutex _mutex;
    BaseConnection::ptr _conn;
    std::unordered_set<std::string> _topics; // 该订阅者所订阅的主题名称

    Subscriber(const BaseConnection::ptr& conn) : _conn(conn) {};

    // 增加订阅的主题
    void appendTopic(const std::string& topic);

    // 删除订阅的主题
    void removeTopic(const std::string& topic);
};

1.5 主题 和 订阅者 统筹管理

我们需要建立两个映射关系

  • 主题名 -> 主题结构
  • 订阅者连接 -> 订阅者结构
std::mutex _mutex;
std::unordered_map<std::string, Topic::ptr> _topics;
std::unordered_map<BaseConnection::ptr, Subscriber::ptr> _subscribers;

核心接口解析

(1) onTopicRequest

​功能:处理客户端发送的主题操作请求(总入口)。
​操作类型

  • TOPIC_CREATE:调用 topicCreate 创建主题。
  • TOPIC_REMOVE:调用 topicRemove 删除主题。
  • TOPIC_SUBSCRIBE:调用 topicSubscribe 订阅主题。
  • TOPIC_CANCEL:调用 topicCancel 取消订阅。
  • TOPIC_PUBLISH:调用 topicPublish 发布消息。
  • 错误处理:若操作失败,返回错误响应(errorResponse)。

(2) onShutdown

​功能:处理客户端连接断开时的清理逻辑。
​步骤

  • _subscribers 中移除订阅者。
  • 遍历订阅者的所有主题,从主题的订阅列表中移除该订阅者。

(3) 细节实现方法

变更类

// 构造一个主题对象,添加映射关系的管理
void topicCreate(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg);

// 删除一个主题对象,删除映射关系的管理
void topicRemove(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg);

// 将订阅者订阅到指定主题。
bool topicSubscribe(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg);

// 取消该主题的订阅者
void topicCancel(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg);

// 向各个该主题的 订阅者 发布主题消息
bool topicPublish(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg);

返回类

//  返回一个错误响应
void errorResponse(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg, RCode rcode);

//  返回一个主题响应
void topicResponse(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg);

1.8 框架代码

#pragma once
#include "../common/net.hpp"
#include "../common/message.hpp"
#include <unordered_set>

namespace rpc
{
    namespace server
    {
        class TopicManager
        {
        public:
            using ptr = std::shared_ptr<TopicManager>;

            // 统一处理有关主题的请求
            void onTopicRequest(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg);

            // 关闭连接
            void onShutdown(const BaseConnection::ptr &conn);

        private:
            //  返回一个错误响应
            void errorResponse(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg, RCode rcode);

            //  返回一个主题响应
            void topicResponse(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg);
        
        private:
            // 构造一个主题对象,添加映射关系的管理
            void topicCreate(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg);

            // 删除一个主题对象,删除映射关系的管理
            void topicRemove(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg);

            // 将订阅者订阅到指定主题。
            bool topicSubscribe(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg);

            // 取消该主题的订阅者 
            void topicCancel(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg);

            // 向各个该主题的 订阅者 发布主题消息
            bool topicPublish(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg);

        private:
            // 定义一个订阅者对象,记录其订阅的 所有主题名称
            struct Subscriber
            {
                using ptr = std::shared_ptr<Subscriber>;
                std::mutex _mutex;
                BaseConnection::ptr _conn;
                std::unordered_set<std::string> _topics; // 该订阅者所订阅的主题名称

                Subscriber(const BaseConnection::ptr &conn) : _conn(conn) {};

                // 增加订阅的主题
                void appendTopic(const std::string &topic);

                // 删除订阅的主题
                void removeTopic(const std::string &topic);
            };

            // 主题名称 和 其订阅者连接 的映射
            struct Topic
            {
                using ptr = std::shared_ptr<Topic>;
                std::mutex _mutex;
                std::string _topic_name;
                std::unordered_set<Subscriber::ptr> _subscribers; // 当前主题的订阅者的连接

                Topic(const std::string &topic_name) : _topic_name(topic_name) {}

                // 增加订阅者
                void appendSubscriber(const Subscriber::ptr &subscriber);

                // 删除订阅者
                void removeSubscriber(const Subscriber::ptr &subscriber);

                // 推送消息
                void pushMessage(const BaseMessage::ptr &msg);
            };

        private:
            std::mutex _mutex;
            std::unordered_map<std::string, Topic::ptr> _topics;
            std::unordered_map<BaseConnection::ptr, Subscriber::ptr> _subscribers;
        };
    }
}

1.7 整体代码

#pragma once
#include "../common/net.hpp"
#include "../common/message.hpp"
#include <unordered_set>

namespace rpc
{
    namespace server
    {
        class TopicManager
        {
        public:
            using ptr = std::shared_ptr<TopicManager>;

            // 统一处理有关主题的请求
            void onTopicRequest(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg)
            {
                TopicOptype topic_optype = msg->optype();
                bool ret = true;
                switch(topic_optype){
                    case TopicOptype::TOPIC_CREATE: topicCreate(conn, msg); break;              // 主题创建
                    case TopicOptype::TOPIC_REMOVE: topicRemove(conn, msg); break;              // 主题删除
                    case TopicOptype::TOPIC_SUBSCRIBE: ret = topicSubscribe(conn, msg); break;  // 主题订阅
                    case TopicOptype::TOPIC_CANCEL: topicCancel(conn, msg); break;              // 主题取消订阅
                    case TopicOptype::TOPIC_PUBLISH: ret = topicPublish(conn, msg); break;      // 主题发布
                    default: return errorResponse(conn, msg, RCode::RCODE_INVALID_OPTYPE); break;
                }
                if(!ret) return errorResponse(conn, msg, RCode::RCODE_NOT_FOUND_TOPIC);
                return topicResponse(conn, msg);
            }

            // 关闭连接
            void onShutdown(const BaseConnection::ptr &conn)
            {
                // 消息发布者断开连接,不需要任何操作
                // 1. 判断断开连接的是否为订阅者,不是的话直接返回
                // 2. 获取到订阅者退出,受影响的主题对象
                // 3. 从主题对象中,移除订阅者
                // 4. 从订阅者映射信息中,删除订阅者
                std::vector<Topic::ptr> topics;
                Subscriber::ptr subscriber;
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    auto sub_it = _subscribers.find(conn);
                    if(sub_it == _subscribers.end()){
                        ELOG("该订阅者连接不存在");
                        return;
                    }
                    // 获取该订阅者锁定月的主题
                    subscriber = sub_it->second;
                    // 2. 获取到订阅者退出,受影响的主题对象
                    for(auto& topic_name : subscriber->_topics){
                        auto topic_it = _topics.find(topic_name);
                        if(topic_it == _topics.end())
                            continue;
                        topics.push_back(topic_it->second);
                    }
                    _subscribers.erase(sub_it);
                }
            }

        private:
            void errorResponse(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg, RCode rcode){
                auto msg_rsp = MessageFactory::create<TopicResponse>();
                msg_rsp->setId(msg->rid());
                msg_rsp->setMType(MType::RSP_TOPIC);
                msg_rsp->setRcode(rcode);
                return conn->send(msg_rsp);
            }

            void topicResponse(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg){
                auto msg_rsp = MessageFactory::create<TopicResponse>();
                msg_rsp->setId(msg->rid());
                msg_rsp->setMType(MType::RSP_TOPIC);
                msg_rsp->setRcode(RCode::RCODE_OK);
                return conn->send(msg_rsp);
            }
        
        private:
            // 构造一个主题对象,添加映射关系的管理
            void topicCreate(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg)
            {
                std::unique_lock<std::mutex> lock(_mutex);
                // 获取主题名字
                std::string topic_name = msg->topicKey();
                // 构造一个主题对象
                auto topic = std::make_shared<Topic>(topic_name);
                // 增加订阅者
                _topics.insert(std::make_pair(topic_name, topic));
                std::cout << "创建主题" <<  topic_name << std::endl;
            }

            // 删除一个主题对象,删除映射关系的管理
            void topicRemove(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg)
            {
                // 1. 查看当前主题,有哪些订阅者,然后从订阅者中将主题信息删掉
                // 2. 删除主题的数据 -- 主题名称余出题对象的映射
                std::string topic_name = msg->topicKey();
                std::unordered_set<Subscriber::ptr> subscribers; // 记录 当前主题 的 所有订阅者连接
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    auto it = _topics.find(msg->topicKey());
                    if (it == _topics.end())
                    {
                        ELOG("没有找到 %s 主题的订阅者", msg->topicKey().c_str());
                        return;
                    }
                    subscribers = it->second->_subscribers;
                    _topics.erase(it); // 删除主题对象
                }
                for (auto &subscriber : subscribers)
                {
                    subscriber->removeTopic(topic_name);
                }
            }

            // 将订阅者订阅到指定主题。
            bool topicSubscribe(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg)
            {
                Topic::ptr topic;
                Subscriber::ptr subscriber;
                {
                    std::unique_lock<std::mutex> lock(_mutex);

                    // 1. 查找或创建订阅者对象(Subscriber)
                    auto topic_it = _topics.find(msg->topicKey());

                    if (topic_it == _topics.end())
                    {
                        return false;
                    }
                    topic = topic_it->second;
                    auto sub_it = _subscribers.find(conn);
                    if (sub_it != _subscribers.end())
                    {
                        subscriber = sub_it->second;
                    }
                    else
                    {
                        subscriber = std::make_shared<Subscriber>(conn);
                        _subscribers.insert(std::make_pair(conn, subscriber));
                    }
                }
                // 2. 在主题对象中,新增一个订阅者对象关联的连接;  在订阅者对象中新增一个订阅的主题
                topic->appendSubscriber(subscriber);
                subscriber->appendTopic(msg->topicKey());
                return true;
            }

            // 取消该主题的订阅者 
            void topicCancel(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){
                // 1. 先找出主题对象,和订阅者对象
                Topic::ptr topic;
                Subscriber::ptr subscriber;
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    auto topic_it = _topics.find(msg->topicKey());

                    if (topic_it != _topics.end())
                    {
                        topic = topic_it->second;
                    }

                    auto sub_it = _subscribers.find(conn);
                    if (sub_it != _subscribers.end())
                    {
                        subscriber = sub_it->second;
                    }
                }

                // 2. 从主对象中删除当前的订阅者连接
                if(subscriber)
                    subscriber->removeTopic(msg->topicKey());
                if(topic && subscriber)
                    topic->removeSubscriber(subscriber);
            }

            // 向各个该主题的 订阅者 发布主题消息
            bool topicPublish(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){
                Topic::ptr topic;
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    auto topic_it = _topics.find(msg->topicKey());
                    if (topic_it == _topics.end()){
                        ELOG("没有找到 %s 主题的订阅者", msg->topicKey().c_str());
                        return false;
                    }
                    topic = topic_it->second;
                }
                topic->pushMessage(msg);
                return true;
            }

        private:
            // 定义一个订阅者对象,记录其订阅的 所有主题名称
            struct Subscriber
            {
                using ptr = std::shared_ptr<Subscriber>;
                std::mutex _mutex;
                BaseConnection::ptr _conn;
                std::unordered_set<std::string> _topics; // 该订阅者所订阅的主题名称

                Subscriber(const BaseConnection::ptr &conn) : _conn(conn) {};

                // 增加订阅的主题
                void appendTopic(const std::string &topic)
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    _topics.insert(topic);
                }

                void removeTopic(const std::string &topic)
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    _topics.erase(topic);
                }
            };

            // 主题名称 和 其订阅者连接 的映射
            struct Topic
            {
                using ptr = std::shared_ptr<Topic>;
                std::mutex _mutex;
                std::string _topic_name;
                std::unordered_set<Subscriber::ptr> _subscribers; // 当前主题的订阅者的连接

                Topic(const std::string &topic_name) : _topic_name(topic_name) {}

                // 增加订阅者
                void appendSubscriber(const Subscriber::ptr &subscriber)
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    _subscribers.insert(subscriber);
                }

                // 删除订阅者
                void removeSubscriber(const Subscriber::ptr &subscriber)
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    _subscribers.erase(subscriber);
                }

                // 推送消息
                void pushMessage(const BaseMessage::ptr &msg)
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    for (auto &subscriber : _subscribers)
                    {
                        subscriber->_conn->send(msg);
                    }
                }
            };

        private:
            std::mutex _mutex;
            std::unordered_map<std::string, Topic::ptr> _topics;
            std::unordered_map<BaseConnection::ptr, Subscriber::ptr> _subscribers;
        };
    }
}

🏳️‍🌈二、服务端整体实现

2.1 逻辑框架

一个 ​分布式 RPC 服务端系统

包含三个核心模块:​服务注册中心RPC 服务核心 和 ​主题服务

以下是各模块的作用及整体架构:

2.1.1 注册中心(RegistryServer)​

​功能

  • ​服务注册与发现:接收服务提供者(Provider)注册的服务信息,供消费者(Consumer)查询可用服务。
  • 连接管理:在客户端断开时清理相关资源(如服务下线通知)。

​关键成员

  • _pd_manager:管理服务注册与发现的业务逻辑(如维护服务列表)。
  • _dispatcher:分发客户端请求到对应的处理逻辑。
  • _server:底层网络服务器,监听端口并处理连接。

​使用场景

  • 服务提供者启动时向注册中心注册自身服务。
  • 消费者通过注册中心查询可用的服务地址和方法。

2.1.2 RPC 服务核心(RpcServer)​

​功能

  • ​RPC 服务管理:启动 RPC 服务,处理客户端调用请求。
  • 服务注册(可选)​:将服务方法注册到注册中心(若启用)。
  • 请求路由:将 RPC 请求路由到对应的业务处理逻辑。

​关键成员

  • _router:路由请求到具体的服务方法(如根据方法名匹配处理函数)。
  • _req_client:与注册中心通信的客户端(用于服务注册或发现)。
  • _dispatcher:分发网络消息到业务逻辑。

    配置选项
    _enableRegistry:是否启用注册中心(决定是否自动注册服务)。

2.1.3 主题服务(TopicServer)​

​功能

  • ​发布-订阅模式:管理主题的创建、订阅和消息推送。
  • ​消息广播:向订阅特定主题的客户端推送消息。

​关键成员

  • _topic_manager:管理主题和订阅者(如 TopicManager 类)。
  • _dispatcher:处理客户端的订阅/发布请求。

2.2 逻辑代码

#pragma once

#include "../common/dispatcher.hpp"
#include "../client/rpc_client.hpp"

#include "rpc_route.hpp"
#include "rpc_registry.hpp"
#include "rpc_topic.hpp"

namespace rpc
{
    namespace server
    {

        // 注册中心的服务端实现
        // 启动服务:监听指定端口,接收客户端(服务提供者/发现者)连接。
        // ​消息路由:将服务注册/发现请求分发给业务处理器(PDManager)。
        // ​连接管理:在客户端断开时清理相关资源(如服务下线通知)。
        class RegistryServer
        {
            // 注册中心服务端,只需要针对服务注册与发现请求进行处理即可
        public:
            using ptr = std::shared_ptr<RegistryServer>;
            RegistryServer(int port);

            void start();

        private:
            void onConnShutdown(const BaseConnection::ptr &conn);

        private:
            ProviderDiscovererManager::ptr _pd_manager;
            Dispatcher::ptr _dispatcher;
            BaseServer::ptr _server;
        };

        // ​RPC 服务端核心类,负责管理 RPC 服务的生命周期
        // 启动 RPC 服务:监听指定端口,处理客户端 RPC 请求。
        // ​服务注册​(可选):将服务方法注册到注册中心,供客户端发现。
        // ​请求路由:将接收到的 RPC 请求分发给对应的业务处理逻辑。
        class RpcServer
        {
        public:
            using ptr = std::shared_ptr<RpcServer>;
            RpcServer(const Address &access_addr, bool enableRegistry = false, const Address &registry_server_addr = Address());

            // 注册服务到注册中心
            void registerMethod(const ServiceDescribe::ptr &service);

            void start();

        private:
            bool _enableRegistry;
            Address _access_addr;
            rpc::client::RegistryClient::ptr _req_client;
            RpcRouter::ptr _router;
            Dispatcher::ptr _dispatcher;
            BaseServer::ptr _server;
        };

        
        class TopicServer
        {
        public:
            using ptr = std::shared_ptr<TopicServer>;
            TopicServer(int port);

            void start();

        private:
            void onConnShutdown(const BaseConnection::ptr &conn);

        private:
            TopicManager::ptr _topic_manager;
            Dispatcher::ptr _dispatcher;
            BaseServer::ptr _server;
        };
    }
}

2.3 整体代码

#pragma once

#include "../common/dispatcher.hpp"
#include "../client/rpc_client.hpp" 

#include "rpc_route.hpp"
#include "rpc_registry.hpp"
#include "rpc_topic.hpp"

namespace rpc{
    namespace server{

        // 注册中心的服务端实现
            // 启动服务:监听指定端口,接收客户端(服务提供者/发现者)连接。
            // ​消息路由:将服务注册/发现请求分发给业务处理器(PDManager)。
            // ​连接管理:在客户端断开时清理相关资源(如服务下线通知)。
        class RegistryServer{
            // 注册中心服务端,只需要针对服务注册与发现请求进行处理即可
            public:
                using ptr = std::shared_ptr<RegistryServer>;
                RegistryServer(int port)
                    : _pd_manager(std::make_shared<ProviderDiscovererManager>()),
                      _dispatcher(std::make_shared<Dispatcher>())
                { 
                    // 1. 注册服务请求处理器
                        // 将 PDManager::onServiceRequest 绑定到 MType::REQ_SERVICE 消息类型。当收到服务注册或发现请求时,调用此方法处理
                    auto service_cb = std::bind(&ProviderDiscovererManager::onServiceRequest, _pd_manager.get(), std::placeholders::_1, std::placeholders::_2);
                    _dispatcher->registerHandler<ServiceRequest>(MType::REQ_SERVICE, service_cb);

                    // 2. 创建底层服务器并设置回调
                        // 通过 ServerFactory 创建底层服务器,设置消息总入口为 Dispatcher::onMessage
                    _server = rpc::ServerFactory::create(port);
                    auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);
                    _server->setMessageCallback(message_cb);
                
                    // 3. 设置连接关闭回调
                        // 当客户端断开连接时,调用 onConnShutdown 清理相关资源
                    auto close_cb = std::bind(&RegistryServer::onConnShutdown, this, std::placeholders::_1);
                    _server->setCloseCallback(close_cb);
                }

                void start(){
                    _server->start();   // 启动服务器,开始监听端口
                }

            private:
                void onConnShutdown(const BaseConnection::ptr& conn){
                    _pd_manager->onConnShutdown(conn);  // 通知 PDManager 处理连接断开
                }
            private:
                ProviderDiscovererManager::ptr _pd_manager;
                Dispatcher::ptr _dispatcher;
                BaseServer::ptr _server;

        };

        // ​RPC 服务端核心类,负责管理 RPC 服务的生命周期
            // 启动 RPC 服务:监听指定端口,处理客户端 RPC 请求。
            // ​服务注册​(可选):将服务方法注册到注册中心,供客户端发现。
            // ​请求路由:将接收到的 RPC 请求分发给对应的业务处理逻辑。
        class RpcServer{
            public:
                using ptr = std::shared_ptr<RpcServer>;
                RpcServer(const Address& access_addr, bool enableRegistry = false, const Address& registry_server_addr = Address())
                    : _enableRegistry(enableRegistry),
                      _access_addr(access_addr),
                      _router(std::make_shared<rpc::server::RpcRouter>()),
                      _dispatcher(std::make_shared<Dispatcher>())
                {
                    // 1. ​创建注册客户端​(若启用注册)
                    if(enableRegistry){
                        _req_client = std::make_shared<client::RegistryClient>(
                            registry_server_addr.first, registry_server_addr.second);
                    }

                    // 2. 注册 RPC 请求处理回调
                    auto rpc_cb = std::bind(&RpcRouter::onRpcRequest, _router.get(), std::placeholders::_1, std::placeholders::_2);
                    _dispatcher->registerHandler<RpcRequest>(MType::REQ_RPC, rpc_cb);

                    // 3. 创建底层服务器并设置回调
                    _server = rpc::ServerFactory::create(access_addr.second);
                    auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);
                    _server->setMessageCallback(message_cb);
                }

                // 注册服务到注册中心
                void registerMethod(const ServiceDescribe::ptr& service){
                    if(_enableRegistry)
                        _req_client->registryMethod(service->method(), _access_addr);
                    _router->registerMethod(service);
                }
 
                void start(){
                    _server->start();
                }

            private:
                bool _enableRegistry;
                Address _access_addr;
                rpc::client::RegistryClient::ptr _req_client;
                RpcRouter::ptr _router;
                Dispatcher::ptr _dispatcher;
                BaseServer::ptr _server;
        };

        class TopicServer{
            public:
                using ptr = std::shared_ptr<TopicServer>;
                TopicServer(int port)
                    : _topic_manager(std::make_shared<TopicManager>()),
                      _dispatcher(std::make_shared<Dispatcher>())
                {
                    // 1. 注册主题请求处理器
                    auto topic_cb = std::bind(&TopicManager::onTopicRequest, _topic_manager.get(), std::placeholders::_1, std::placeholders::_2);
                    _dispatcher->registerHandler<TopicRequest>(MType::REQ_TOPIC, topic_cb);

                    // 2. 创建底层服务器并设置回调
                    _server = ServerFactory::create(port);
                    auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);
                    _server->setMessageCallback(message_cb);

                    // 3. 设置连接关闭回调
                    auto close_cb = std::bind(&TopicServer::onConnShutdown, this, std::placeholders::_1);
                    _server->setCloseCallback(close_cb);
                }

                void start(){
                    _server->start();
                }
            
            private:
                void onConnShutdown(const BaseConnection::ptr& conn){
                    _topic_manager->onShutdown(conn);
                }
            private:
                TopicManager::ptr _topic_manager;
                Dispatcher::ptr _dispatcher;
                BaseServer::ptr _server;
        };
    }
}

👥总结

本篇博文对 从零实现Json-Rpc框架】- 项目实现 - 服务端主题实现及整体封装 做了一个较为详细的介绍,不知道对你有没有帮助呢

觉得博主写得还不错的三连支持下吧!会继续努力的~

请添加图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2328004.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

开源的 LLM 应用开发平台Dify的安装和使用

文章目录 前提环境应用安装deocker desktop镜像源配置Dify简介Dify本地docker安装Dify安装ollama插件Dify安装硅基流动插件简单应用练习进阶应用练习数据库图像检索与展示助手echart助手可视化 前提环境 Windows环境 docker desktop魔法环境&#xff1a;访问Dify项目ollama电脑…

从零构建大语言模型全栈开发指南:第五部分:行业应用与前沿探索-5.1.2行业落地挑战:算力成本与数据隐私解决方案

👉 点击关注不迷路 👉 点击关注不迷路 👉 点击关注不迷路 文章大纲 从零构建大语言模型全栈开发指南-第五部分:行业应用与前沿探索5.1.2 行业落地挑战:算力成本与数据隐私解决方案1. 算力成本挑战与优化策略1.1 算力成本的核心问题1.2 算力优化技术方案2. 数据隐私挑战…

NodeJS--NPM介绍使用

1、使用npm install命令安装模块 1.1、本地安装 npm install express 1.2、全局安装 npm install express -g 1.3、本地安装和全局安装的区别

DeepSeek与ChatGPT的优势对比:选择合适的工具来提升工作效率

选DeepSeek还是ChatGPT&#xff1f;这就像问火锅和披萨哪个香&#xff01; "到底该用DeepSeek还是ChatGPT?” 这个问题最近在互联网圈吵翻天!其实这就跟选手机系统-样&#xff0c;安卓党iOS党都能说出一万条理由&#xff0c;但真正重要的是你拿它来干啥&#xff01;&am…

25大唐杯赛道一本科B组知识点大纲(下)

5G/6G网络技术知识点&#xff08;10%&#xff09; 工程概论及通信工程项目实践&#xff08;20%&#xff09; 5G垂直行业应用知识点&#xff08;20%&#xff09; ⭐⭐⭐为重点知识&#xff0c;尽量要过一遍哦 大唐杯赛道一国一备赛思路 大唐杯国一省赛回忆录--有付出就会有收…

Python+Playwright自动化测试-1-环境准备与搭建

1、Playwright 是什么&#xff1f; 微软在 2020 年初开源的新一代自动化测试工具&#xff0c;它的功能类似于 Selenium、Pyppeteer 等&#xff0c;都可以驱动浏览器进行各种自动化操作。它的功能也非常强大&#xff0c;对市面上的主流浏览器都提供了支持&#xff0c;API 功能简…

生产管理系统如何破解汽车零部件行业追溯难痛点

在汽车零部件制造行业中&#xff0c;生产追溯一直是企业面临的核心挑战之一。随着市场竞争的加剧和客户需求的日益复杂&#xff0c;如何确保产品质量、快速定位问题源头、减少批次性返工&#xff0c;成为了每个企业亟待解决的问题。而生产管理系统&#xff0c;作为智能制造的重…

【XTerminal】【树莓派】Linux系统下的函数调用编程

目录 一、XTerminal下的Linux系统调用编程 1.1理解进程和线程的概念并在Linux系统下完成相应操作 (1) 进程 (2)线程 (3) 进程 vs 线程 (4)Linux 下的实践操作 1.2Linux的“虚拟内存管理”和stm32正式物理内存&#xff08;内存映射&#xff09;的区别 (1)Linux虚拟内存管…

umi框架开发移动端h5

1、官网&#xff1a;https://umijs.org/ 2、创建出来的项目 yarn create umi yarn start3、推荐目录结构 . ├── config │ └── config.ts ├── public//静态资源 ├── dist ├── mock │ └── app.ts&#xff5c;tsx ├── src │ ├── .umi │ ├── .um…

3.9/Q2,Charls最新文章解读

文章题目&#xff1a;Association between remnant cholesterol and depression in middle-aged and older Chinese adults: a population-based cohort study DOI&#xff1a;10.3389/fendo.2025.1456370 中文标题&#xff1a;中国中老年人残留胆固醇与抑郁症的关系&#xff1…

Java Lambda 表达式提升效率

lambda 表达式的应用场景 Stream 的应用场景 Lambda/Stream 的进一步封装 自定义函数式接口&#xff08;用 jdk 自带的函数式接口也可以&#xff09; https://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html import java.io.Serializable;/*** 可序…

人工智能混合编程实践:C++ ONNX进行图像超分重建

人工智能混合编程实践:C++ ONNX进行图像超分重建 前言相关介绍C++简介ONNX简介ONNX Runtime 简介**核心特点**图像超分辨率重建简介应用场景前提条件实验环境项目结构使用C++ ONNX进行图像超分重建sr_main.cpp参考文献前言 由于本人水平有限,难免出现错漏,敬请批评改正。更多…

K8S学习之基础七十四:部署在线书店bookinfo

部署在线书店bookinfo 在线书店-bookinfo 该应用由四个单独的微服务构成&#xff0c;这个应用模仿在线书店的一个分类&#xff0c;显示一本书的信息&#xff0c;页面上会显示一本书的描述&#xff0c;书籍的细节&#xff08;ISBN、页数等&#xff09;&#xff0c;以及关于这本…

Python不可变数据类型全解析:原理、优势与实战指南

目录 引言&#xff1a;为什么Python要区分可变与不可变&#xff1f; 一、不可变数据类型的核心特性 二、五大不可变数据类型深度解析 三、不可变数据类型的三大核心优势 四、不可变数据类型的典型应用场景 五、不可变 vs 可变&#xff1a;如何选择&#xff1f; 六、实战技…

Apache Doris 2025 Roadmap:构建 GenAI 时代实时高效统一的数据底座

在全球 290 位开发者的协作下&#xff0c;Apache Doris 在 2024 年完成了 7000 次代码提交&#xff0c;并发布了 22 个版本&#xff0c;实现在实时分析、湖仓一体和半结构化数据分析等核心场景的技术突破及创新。 2025 年&#xff0c;Apache Doris 社区将秉承“以场景驱动创新…

二极管正负极区分

二极管正负极区分 二极管是一种具有单向导电性的半导体器件&#xff0c;正确区分正负极对于其使用非常重要。以下是几种常见的二极管正负极区分方法&#xff1a; 1. 外观标识 有标记的二极管 色环或色点&#xff1a;许多二极管在表面会有一个色环或色点&#xff0c;这个标记…

【c++深入系列】:类与对象详解(中)

&#x1f525; 本文专栏&#xff1a;c &#x1f338;作者主页&#xff1a;努力努力再努力wz &#x1f4aa; 今日博客励志语录&#xff1a; 不是因为看到希望才坚持&#xff0c;而是坚持了才能看到希望 那么上一篇博客我讲解了什么是类和对象以及类和对象是怎么定义的&#xff0…

汽车 HMI 设计的发展趋势与设计要点

一、汽车HMI设计的发展历程与现状 汽车人机交互界面&#xff08;HMI&#xff09;设计经历了从简单到复杂、从单一到多元的演变过程。2012年以前&#xff0c;汽车HMI主要依赖物理按键进行操作&#xff0c;交互方式较为单一。随着特斯拉Model S的推出&#xff0c;触控屏逐渐成为…

《AI大模型应知应会100篇》第56篇:LangChain快速入门与应用示例

第56篇&#xff1a;LangChain快速入门与应用示例 前言 最近最火的肯定非Manus和OpenManus莫属&#xff0c;因为与传统AI工具仅提供信息不同&#xff0c;Manus能完成端到端的任务闭环。例如用户发送“筛选本月抖音爆款视频”&#xff0c;它会自动完成&#xff1a; 爬取平台数据…

Java 大视界 -- Java 大数据在智能农业无人机植保作业路径规划与药效评估中的应用(165)

&#x1f496;亲爱的朋友们&#xff0c;热烈欢迎来到 青云交的博客&#xff01;能与诸位在此相逢&#xff0c;我倍感荣幸。在这飞速更迭的时代&#xff0c;我们都渴望一方心灵净土&#xff0c;而 我的博客 正是这样温暖的所在。这里为你呈上趣味与实用兼具的知识&#xff0c;也…