bprc二次封装

news2024/9/19 23:44:02

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

  • 一、封装的思想
  • 二、封装单个服务的信道管理类
    • 1.成员变量
    • 2.成员函数
  • 三、封装总体的服务信道管理类
    • 1.成员变量
    • 2.成员函数
  • 四.etcd和brpc联合测试
    • 1.服务注册客户端
    • 2.服务发现客户端


一、封装的思想

brpc的二次封装:
brpc本质上来说–进行rpc调用的,但是向谁调用什么服务得管理起来 – 搭配etcd实现的注册中心管理
原因:通过注册中心,能够获知谁能提供什么服务,进而能够连接它发起这个服务调用。
封装思想:
主要是管理起来网络通信的信道----将不同服务节点主机的通信信道管理起来
封装的是服务节点信道的管理,而不是rpc调用的管理。
封装:
1.指定服务的信道管理类:
一个服务可能会有多个节点提供服务,每个节点都有自己的channe
建立服务与信道的映射关系,并且关系是一对多,采用RR轮转策略进行获取
2.总体的服务信道管理类
将多个服务的信道管理对象管理起来

二、封装单个服务的信道管理类

1.成员变量

1.需要一个string来表视当前服务的信道管理类的服务名称因为我们是把一个服务的信道管理起来,所以需要知道该服务的名称。
2.一个服务可能会有多个主机,也就是一个服务可能会有多个信道。所以我么需要一个数组来存储当前服务节点的信道。
3.当一个节点主机下线后,我们需要删除释放这个channel,所以我们需要一个哈希表来进行主机地址与channel的映射关系,方便我们进行删除。
4.需要一个rr轮转下标,当需要对该服务进行rpc调用时,可以使用rr轮转负载均衡式的返回channel.
5.需要一个互斥锁,保证容器的线程安全。

 std::mutex _mutex;
 int32_t _index; //rr轮转下标
 std::string _service_name;  //服务名称
 std::vector<ChannelPtr> _channels;  //当前服务对应的通信集合
 std::unordered_map<std::string,ChannelPtr> _hosts;  //主机地址与信道映射关系

2.成员函数

构造函数中,只需要初始化服务名称,然后轮转下标初始化为0.

ServiceChannel(const std::string& service_name)
        :_index(0),_service_name(service_name)
    {

    }

当服务上线一台主机时,需要创建一个channel进行管理,那么该函数中需要有一个参数,就是主机的地址。

void append(const std::string& host)
    {
        //创建一个channel,并进行连接
        ChannelPtr channel = std::make_shared<brpc::Channel>();
        brpc::ChannelOptions options;
        options.connect_timeout_ms = -1;
        options.timeout_ms = -1;
        options.max_retry = 3;
        options.protocol = "baidu_std";
        int ret = channel->Init(host.c_str(), &options);
        if (ret == -1) {
            LOG_ERROR("初始化{}-{}信道失败!", _service_name, host);
            return;
        }

        std::unique_lock<std::mutex> lock(_mutex);
        _hosts.insert(std::make_pair(host, channel));
        _channels.push_back(channel);

    }

对应的,当主机下线时,需要将该主机的channel删除并释放

 void remove(const std::string& host)
    {
        std::unique_lock<std::mutex> lock(_mutex);
        auto it = _hosts.find(host);
        if (it == _hosts.end()) {
            LOG_WARN("{}-{}节点删除信道时,没有找到信道信息!", _service_name, host);
            return;
        }

        for (auto vit = _channels.begin(); vit != _channels.end(); ++vit) {
            if (*vit == it->second) {
                _channels.erase(vit);
                break;
            }
        }
        _hosts.erase(it);
    }

当需要对该服务发起rpc调用时,我们可以返回该服务的channel。通过rr轮转方式实现一个负载均衡。

ChannelPtr choose()
    {
        std::unique_lock<std::mutex> lock(_mutex);
        if (_channels.size() == 0) {
            LOG_ERROR("当前没有能够提供 {} 服务的节点!", _service_name);
            return ChannelPtr();
        }
        int32_t idx = _index++ % _channels.size();
        return _channels[idx];
    }

三、封装总体的服务信道管理类

我们的服务有多个,而每一个服务也会有多个主机节点。因此我们需要把这些服务总的管理起来。

1.成员变量

1.需要一个哈希表来实现服务名称和该服务的信道管理类的映射关系。
2.项目有多个服务,而我们并不是对所有的服务都要关心的。我们需要使用一个unordered_set来记录我们关心的服务。
3.一个互斥锁,保证容器的线程安全。

2.成员函数

添加一个关心的服务,只需要把服务名称传递进来,进行一个插入即可

 void declared(const std::string& service_name)
    {
        std::unique_lock<std::mutex> lock(_mutex);
        _follow_services.insert(service_name);
    }

当一个对应的服务上线一个主机节点时,为该服务节点主机创建一个channel,并管理起来。这里直接调用上面的服务的信道管理类就行。

void onServiceOnline(const std::string &service_instance,const std::string& host)
    {
        std::string service_name = getServiceName(service_instance);
        ServiceChannel::ptr service;
            {
                std::unique_lock<std::mutex> lock(_mutex);
                auto fit = _follow_services.find(service_name);
                if (fit == _follow_services.end()) {
                    LOG_DEBUG("{}-{} 服务上线了,但是当前并不关心!", service_name, host);
                    return;
                }
                //先获取管理对象,没有则创建,有则添加节点
                auto sit = _services.find(service_name);
                if (sit == _services.end()) {
                    service = std::make_shared<ServiceChannel>(service_name);
                    _services.insert(std::make_pair(service_name, service));
                }else {
                    service = sit->second;
                }
            }

            if (!service) {
                LOG_ERROR("新增 {} 服务管理节点失败!", service_name);
                return ;
            }
        service->append(host);
        LOG_DEBUG("{}-{} 服务上线新节点,进行添加管理!", service_name, host);
    }

服务主机节点下线时,需要找到对应服务的信道管理类,来进行一个对应主机节点的channel的删除和释放。

void onServiceOffline(const std::string &service_instance,const std::string& host)
    {
        std::string service_name = getServiceName(service_instance);
        ServiceChannel::ptr service;
            {
                std::unique_lock<std::mutex> lock(_mutex);
                auto fit = _follow_services.find(service_name);
                if (fit == _follow_services.end()) {
                    LOG_DEBUG("{}-{} 服务下线了,但是当前并不关心!", service_name, host);
                    return;
                }
                //先获取管理对象,没有则创建,有则添加节点
                auto sit = _services.find(service_name);
                if (sit == _services.end()) {
                    LOG_WARN("删除{}服务节点时,没有找到管理对象", service_name);
                    return;
                }
                service = sit->second;
            }
        service->remove(host);
        LOG_DEBUG("{}-{} 服务下线节点,进行删除管理!", service_name, host);
    }

上面的两个接口,是当服务的主机节点上线和下线时调用的,形参中的两个参数就是服务名称和主机节点。
那么什么时候会调用这两个函数呢?我们怎么知道什么时候主机上线下线呢?在前面我们封装了一个etcd,在服务发现客户端中,我们是同watcher来监控一个服务。当服务的数据发送改变时,就会调用我们传入的两个回调函数,那么这里的两个函数就可以作为俩个回调函数传入。也就是当服务新增节点和删除节点时调用。

获取指定服务的节点信道,传入指定服务,返回一个服务的信道。

//获取指定服务的节点信道
    ServiceChannel::ChannelPtr choose(const std::string& service_name)
    {
        std::unique_lock<std::mutex> lock(_mutex);
        auto sit = _services.find(service_name);
        if (sit == _services.end()) {
            LOG_ERROR("当前没有能够提供 {} 服务的节点!", service_name);
            return ServiceChannel::ChannelPtr();
        }
        
        return sit->second->choose();
    }

四.etcd和brpc联合测试

前面我们封装了etcd,有一个服务注册客户端和一个服务发现客户端。那么我们可以用刚刚封装的brpc和etcd进行一个连接测试。

我们是可以用brpc进行rpc调用的,但是向谁进行rpc调用呢?我们可以使用服务发现客户端对服务进行一个监控,当服务注册客户端注册一个服务后,服务发现客户端就会触发一个回调函数,在这个回调函数就会对触发的事件进行一个判断,如果是新增事件,则调用用户设置的put_cb,如果是删除事件,则调用用户的del_cb;那么我们是不是可以把我们刚刚封装的bprc中的onServiceOnline和onServiceOffline作为对应的回调函数设置进去呢?

大体思路:

  • 服务注册客户端这边,首先创建一个brpc服务器,然后新增EchoService服务,并启动服务器。最后向etcd中注册一个/service/echo/instance服务,并指明自己的主机地址,表明自己的主机可以提供echo服务。
  • 服务发现客户端这边,首先创建一个总的信道管理类对象,然后构造一个服务发现对象,对/service/echo服务进行监控。接着通过总的信道管理类对象获取到一个channel,通过这个channel,就可以创建stub进行服务调用。

1.服务注册客户端

首先创建一个服务器对象,因为我们要提供rpc服务。

    brpc::Server server;

接着新增服务。

//3.向服务器对象中,新增EchoService服务
    EchoServiceImpl echo_service;
    int ret = server.AddService(&echo_service,brpc::ServiceOwnership::SERVER_DOESNT_OWN_SERVICE);
    if (ret == -1) {
        std::cout << "添加Rpc服务失败!\n";
        return -1;
    }

启动服务器,监听8085端口。

    brpc::ServerOptions options;
    options.idle_timeout_sec = -1;
    options.num_threads = 1;
    ret = server.Start(8085,&options);
    if (ret == -1) {
        std::cout << "启动服务器失败!\n";
        return -1;
    }

向etcd注册中心注册一个/service/echo/instance服务,设置val为自己的主机地址。代表当前主机可以提供echo服务。
此时etcd注册中心就会有一个{key:/service/echo/instance ; val:127.0.0.1:8085};

//5.注册服务
    Registry::ptr rClient = std::make_shared<Registry>("127.0.0.1:2379");
    rClient->registry("/service/echo/instance","127.0.0.1:8085");

2.服务发现客户端

先构建以恶个总的信道管理对象,需要通过这个对象来"知道"哪个主机可以提供服务。但具体是怎么发现的呢?可以看到,我们调用了declared这个函数代表我们关心/service/echo这个服务。此时这个字符串就会被插入到一个unordered_set中。其次我们将这个对象中的两个成员函数获取到了。这两个函数后续我们要设置进服务发现客户端中。

 //1.先构建Rpc信道管理对象
    auto sm = std::make_shared<ServiceManager>();
    sm->declared("/service/echo");
    auto put_cb = std::bind(&ServiceManager::onServiceOnline,sm.get(),std::placeholders::_1,std::placeholders::_2);
    auto del_cb = std::bind(&ServiceManager::onServiceOffline,sm.get(),std::placeholders::_1,std::placeholders::_2);

构造服务发现对象,这里的构造函数我们传入了四个参数,第一个参数是etcd注册中心的地址,因为我们要和etcd通信.
第二个参数是我们要监控的服务。我们的服务发现对象中有一个成员watcher,他会对指定的服务进行监控,如服务数据有变化,就会进行处理。而这里的第二个参数就是watcher进行监控的服务。另外这个监控它是会进行递归的,/service/下的所有目录发现变化,他都可以感知到。第三个第四个函数就是我们设置进的回调函数,当watcher感知到服务发送变化,会获取到发送变化的服务的key和val,也就是服务名称和主机地址。这是就可以调用我们设置的两个回调函数,我们的回调函数刚好可以接受两个参数。

而这两个参数一个是/service/echo/instance,一个是127.0.0.1:8085。我们的服务信道管理类是管理 服务的信道的。我们服务发现客户端关心的是/service/echo服务。而这里的/service/echo/instance本质上是一个/service/echo服务。所以在onServiceOnline和onServiceOffline中我们对这个参数进行了一个劫取子串,只需要/service/echo这一部分。然后为这一部分创建一个服务信道管理类对象。

//2. 构造服务发现对象
    Discovery::ptr dclient = std::make_shared<Discovery>("127.0.0.1:2379","/service", put_cb, del_cb);

接着就是获取服务的channel,我们关心哪个服务就获取什么服务的channel.

 //3. 通过Rpc信道管理对象,获取提供Echo服务的信道
        ServiceChannel::ChannelPtr channel = sm->choose("/service/echo");
        if(channel == nullptr){
            return -1;
        }

通过channel来发起rpc调用

//4.发起rpc调用
        example::EchoService_Stub stub(channel.get());
        example::EchoRequest req;
        req.set_message("你好,Lkm");
        brpc::Controller *cntl = new brpc::Controller();
        example::EchoResponse *rsp = new example::EchoResponse();
        stub.Echo(cntl, &req, rsp, nullptr);
        if(cntl->Failed() == true){
            std::cout << "Rpc调用失败:" << cntl->ErrorText() << std::endl;
			return;
        }

        std::cout << "收到响应: " << rsp->message() << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(1));

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2147303.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

从工厂打螺丝到数据库专家(上)

可能是年纪大了&#xff0c;近期总是失眠&#xff01;不知为何&#xff0c;这段时间心情烦躁时&#xff0c;特别喜欢听老歌&#xff0c;难道这是中年人的通病&#xff1a;都喜欢怀旧&#xff1f; 在数据库恢复订阅伙伴群&#xff0c;大家经常讨论&#xff0c;总是在回味过去&a…

文心一言 VS 讯飞星火 VS chatgpt (350)-- 算法导论24.1 1题

一、在图 24-4上运行Bellman-Ford算法&#xff0c;使用结点 z z z作为源结点。在每一遍松弛过程中&#xff0c;以图中相同的次序对每条边进行松弛&#xff0c;给出每遍松弛操作后的 d d d值和 π π π值。然后&#xff0c;把边 ( z , x ) (z,x) (z,x)的权重改为 4 4 4&#xf…

面试官:什么是CAS?存在什么问题?

大家好&#xff0c;我是大明哥&#xff0c;一个专注「死磕 Java」系列创作的硬核程序员。 回答 CAS&#xff0c;Compare And Swap&#xff0c;即比较并交换&#xff0c;它一种无锁编程技术的核心机制。其工作方式分为两步&#xff1a; 比较&#xff1a;它首先会比较内存中的某…

汉字转拼音工具类

一&#xff0c;汉字转成拼音大写首字母 public static String chineseToPinyin(String chinese) {//创建一个 StringBuilder 对象用于存储转换后的拼音。StringBuilder pinyin new StringBuilder();//创建一个汉语拼音输出格式对象。HanyuPinyinOutputFormat format new Han…

Redis-01 入门和十大数据类型

Redis支持两种持久化方式&#xff1a;RDB持久化和AOF持久化。 1.RDB持久化是将Redis的数据以快照的形式保存在磁盘上&#xff0c;可以手动触发或通过配置文件设置定时触发。RDB保存的是Redis在某个时间点上的数据快照&#xff0c;可以通过恢复RDB文件来恢复数据。 2.AOF持久化…

MySQL 中的 EXPLAIN 命令:洞察查询性能的利器

《MySQL 中的 EXPLAIN 命令&#xff1a;洞察查询性能的利器》 在 MySQL 数据库的使用中&#xff0c;优化查询性能是至关重要的一项任务。而 EXPLAIN 命令就是我们用来深入了解查询执行计划的强大工具。今天&#xff0c;我们就来一起探讨如何在 MySQL 中使用 EXPLAIN 命令&…

数据结构-3.2.栈的顺序存储实现

一.顺序栈的定义&#xff1a;top指针指向栈顶元素 1.图解&#xff1a; 2.代码&#xff1a; #include<stdio.h> #define MaxSize 10 //定义栈最多存入的元素个数 ​ typedef struct {int data[MaxSize]; //静态数组存放栈中元素int top; //栈顶指针 } SqStack; ​ int…

python mysql pymysql 数据库操作,常用脚本,个人小工具

起因&#xff0c; 目的: 整理 mysql 工具 启动数据库 检查服务器是否启动了: Get-Service -Name ‘mysql*’ 如果没启动的话&#xff0c;那么就启动: net start MySQL80 (最好是开启管理员权限) 1, 日常最常用的&#xff0c;创建连接 --> 查看所有数据库 —> 查看所有…

预处理、makefile、静动态库编写、nfs挂载、快捷命令

c查看预处理后的文件 查看执行后的汇编代码 预处理过程 静态库和动态库 静态库编写 实践 a 动态库生成 查找文件命令 动态库升级 链接的库找不到 命名要为linfun.so 执行时-lfun才能找到 系统会将lfun补充成libfun查找&#xff08;系统默认路径/user/lib/...&#xff09; 链…

C++:string 类详解

目录 简介 使用 初始化(构造函数、拷贝构造函数) 析构函数 赋值运算符重载(operator) 成员常量(npos) 运算符重载[ ](operator[ ]) size() 和 length() 迭代器( begin() 和 end() ) 范围 for 迭代器和范围 for 的比较 反向迭代器( rbegin() 和 rend() ) const 迭…

每日刷题(算法)

我们N个真是太厉害了 思路&#xff1a; 我们先给数组排序&#xff0c;如果最小的元素不为1&#xff0c;那么肯定是吹牛的&#xff0c;我们拿一个变量记录前缀和&#xff0c;如果当前元素大于它前面所有元素的和1&#xff0c;那么sum1是不能到达的值。 代码&#xff1a; #def…

elasticsearch实战应用

Elasticsearch(ES)是一种基于分布式存储的搜索和分析引擎&#xff0c;目前在许多场景得到了广泛使用&#xff0c;比如维基百科和github的检索&#xff0c;使用的就是ES。本文总结了一些使用心得体会&#xff0c;希望对大家有所帮助。 一、技术选型 说到全文搜索大家肯定会想到…

软件测试 BUG 篇

目录 一、软件测试的生命周期 二、BUG 1. bug的概念 2. 描述bug的要素 3. bug的级别 4. bug的生命周期 5. 与开发产生争执怎么办&#xff1f;&#xff08;面试高频考题&#xff09; 5.1 先检查自身&#xff0c;是否bug描述不清楚 5.2 站在用户角度考虑并抛出问题 5.3 …

[vue2+axios]下载文件+文件下载为乱码

export function downloadKnowledage(parameter) {return axios({url: /knowledage/download,method: GET,params: parameter,responseType: blob}) }添加 responseType: blob’解决以下乱码现象 使用触发a标签下载文件 downloadKnowledage(data).then((res) > {let link …

PHP及Java等其他语言转Go时选择GoFly快速快速开发框架指南

概要 经过一年多的发展GoFly快速开发框架已被一千多家科技企业或开发者用于项目开发&#xff0c;他的简单易学得到其他语言转Go首选框架。且企业版的发展为GoFly社区提供资金&#xff0c;这使得GoFly快速框架得到良好的发展&#xff0c;GoFly技术团队加大投入反哺科技企业和开…

模版进阶(template)

1.非类型模版参数 模版参数分类类型形参与非类型形参。 ① 类型形参&#xff1a;出现在在模板参数列表中&#xff0c;跟在class或者typename之类的参数类型名称。 ② 非类型形参&#xff0c;就是用一个常量作为类(函数)模板的一个参数&#xff0c;在类(函数)模板中可将该参数当…

Java键盘输入语句

编程输入语句 1.介绍:在编程中&#xff0c;需要接受用户输入的数据&#xff0c;就可以使用键盘输入语句来获取。 2.步骤&#xff1a; 1&#xff09;导入该类的所在包&#xff0c;java.util.* 2)创建该类对象&#xff08;声明变量&#xff09; 3&#xff09;调用里面的功能 3…

[2025]医院健康陪诊系统(源码+定制+服务)

博主介绍&#xff1a; ✌我是阿龙&#xff0c;一名专注于Java技术领域的程序员&#xff0c;全网拥有10W粉丝。作为CSDN特邀作者、博客专家、新星计划导师&#xff0c;我在计算机毕业设计开发方面积累了丰富的经验。同时&#xff0c;我也是掘金、华为云、阿里云、InfoQ等平台…

计算机毕业设计 奖学金评定管理系统的设计与实现 Java实战项目 附源码+文档+视频讲解

博主介绍&#xff1a;✌从事软件开发10年之余&#xff0c;专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精…

MySQL ------- 索引(B树B+树)

阿华代码&#xff0c;不是逆风&#xff0c;就是我疯&#xff0c;希望本文内容能帮到你&#xff01;你们的点赞收藏是我前进最大的动力&#xff01;&#xff01; 目录 一&#xff1a;索引的特点 二&#xff1a;索引适用的场景 三&#xff1a;MySQL中索引操作 1&#xff1a;…