提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
文章目录
- 一、封装的思想
- 二、封装单个服务的信道管理类
- 1.成员变量
- 2.成员函数
- 三、封装总体的服务信道管理类
- 1.成员变量
- 2.成员函数
- 四.etcd和brpc联合测试
- 1.服务注册客户端
- 2.服务发现客户端
一、封装的思想
brpc的二次封装:
brpc本质上来说–进行rpc调用的,但是向谁调用什么服务得管理起来 – 搭配etcd实现的注册中心管理
原因:通过注册中心,能够获知谁能提供什么服务,进而能够连接它发起这个服务调用。
封装思想:
主要是管理起来网络通信的信道----将不同服务节点主机的通信信道管理起来
封装的是服务节点信道的管理,而不是rpc调用的管理。
封装:
1.指定服务的信道管理类:
一个服务可能会有多个节点提供服务,每个节点都有自己的channe
建立服务与信道的映射关系,并且关系是一对多,采用RR轮转策略进行获取
2.总体的服务信道管理类
将多个服务的信道管理对象管理起来
二、封装单个服务的信道管理类
1.成员变量
1.需要一个string来表视当前服务的信道管理类的服务名称因为我们是把一个服务的信道管理起来,所以需要知道该服务的名称。
2.一个服务可能会有多个主机,也就是一个服务可能会有多个信道。所以我么需要一个数组来存储当前服务节点的信道。
3.当一个节点主机下线后,我们需要删除释放这个channel,所以我们需要一个哈希表来进行主机地址与channel的映射关系,方便我们进行删除。
4.需要一个rr轮转下标,当需要对该服务进行rpc调用时,可以使用rr轮转负载均衡式的返回channel.
5.需要一个互斥锁,保证容器的线程安全。
std::mutex _mutex;
int32_t _index; //rr轮转下标
std::string _service_name; //服务名称
std::vector<ChannelPtr> _channels; //当前服务对应的通信集合
std::unordered_map<std::string,ChannelPtr> _hosts; //主机地址与信道映射关系
2.成员函数
构造函数中,只需要初始化服务名称,然后轮转下标初始化为0.
ServiceChannel(const std::string& service_name)
:_index(0),_service_name(service_name)
{
}
当服务上线一台主机时,需要创建一个channel进行管理,那么该函数中需要有一个参数,就是主机的地址。
void append(const std::string& host)
{
//创建一个channel,并进行连接
ChannelPtr channel = std::make_shared<brpc::Channel>();
brpc::ChannelOptions options;
options.connect_timeout_ms = -1;
options.timeout_ms = -1;
options.max_retry = 3;
options.protocol = "baidu_std";
int ret = channel->Init(host.c_str(), &options);
if (ret == -1) {
LOG_ERROR("初始化{}-{}信道失败!", _service_name, host);
return;
}
std::unique_lock<std::mutex> lock(_mutex);
_hosts.insert(std::make_pair(host, channel));
_channels.push_back(channel);
}
对应的,当主机下线时,需要将该主机的channel删除并释放
void remove(const std::string& host)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _hosts.find(host);
if (it == _hosts.end()) {
LOG_WARN("{}-{}节点删除信道时,没有找到信道信息!", _service_name, host);
return;
}
for (auto vit = _channels.begin(); vit != _channels.end(); ++vit) {
if (*vit == it->second) {
_channels.erase(vit);
break;
}
}
_hosts.erase(it);
}
当需要对该服务发起rpc调用时,我们可以返回该服务的channel。通过rr轮转方式实现一个负载均衡。
ChannelPtr choose()
{
std::unique_lock<std::mutex> lock(_mutex);
if (_channels.size() == 0) {
LOG_ERROR("当前没有能够提供 {} 服务的节点!", _service_name);
return ChannelPtr();
}
int32_t idx = _index++ % _channels.size();
return _channels[idx];
}
三、封装总体的服务信道管理类
我们的服务有多个,而每一个服务也会有多个主机节点。因此我们需要把这些服务总的管理起来。
1.成员变量
1.需要一个哈希表来实现服务名称和该服务的信道管理类的映射关系。
2.项目有多个服务,而我们并不是对所有的服务都要关心的。我们需要使用一个unordered_set来记录我们关心的服务。
3.一个互斥锁,保证容器的线程安全。
2.成员函数
添加一个关心的服务,只需要把服务名称传递进来,进行一个插入即可
void declared(const std::string& service_name)
{
std::unique_lock<std::mutex> lock(_mutex);
_follow_services.insert(service_name);
}
当一个对应的服务上线一个主机节点时,为该服务节点主机创建一个channel,并管理起来。这里直接调用上面的服务的信道管理类就行。
void onServiceOnline(const std::string &service_instance,const std::string& host)
{
std::string service_name = getServiceName(service_instance);
ServiceChannel::ptr service;
{
std::unique_lock<std::mutex> lock(_mutex);
auto fit = _follow_services.find(service_name);
if (fit == _follow_services.end()) {
LOG_DEBUG("{}-{} 服务上线了,但是当前并不关心!", service_name, host);
return;
}
//先获取管理对象,没有则创建,有则添加节点
auto sit = _services.find(service_name);
if (sit == _services.end()) {
service = std::make_shared<ServiceChannel>(service_name);
_services.insert(std::make_pair(service_name, service));
}else {
service = sit->second;
}
}
if (!service) {
LOG_ERROR("新增 {} 服务管理节点失败!", service_name);
return ;
}
service->append(host);
LOG_DEBUG("{}-{} 服务上线新节点,进行添加管理!", service_name, host);
}
服务主机节点下线时,需要找到对应服务的信道管理类,来进行一个对应主机节点的channel的删除和释放。
void onServiceOffline(const std::string &service_instance,const std::string& host)
{
std::string service_name = getServiceName(service_instance);
ServiceChannel::ptr service;
{
std::unique_lock<std::mutex> lock(_mutex);
auto fit = _follow_services.find(service_name);
if (fit == _follow_services.end()) {
LOG_DEBUG("{}-{} 服务下线了,但是当前并不关心!", service_name, host);
return;
}
//先获取管理对象,没有则创建,有则添加节点
auto sit = _services.find(service_name);
if (sit == _services.end()) {
LOG_WARN("删除{}服务节点时,没有找到管理对象", service_name);
return;
}
service = sit->second;
}
service->remove(host);
LOG_DEBUG("{}-{} 服务下线节点,进行删除管理!", service_name, host);
}
上面的两个接口,是当服务的主机节点上线和下线时调用的,形参中的两个参数就是服务名称和主机节点。
那么什么时候会调用这两个函数呢?我们怎么知道什么时候主机上线下线呢?在前面我们封装了一个etcd,在服务发现客户端中,我们是同watcher来监控一个服务。当服务的数据发送改变时,就会调用我们传入的两个回调函数,那么这里的两个函数就可以作为俩个回调函数传入。也就是当服务新增节点和删除节点时调用。
获取指定服务的节点信道,传入指定服务,返回一个服务的信道。
//获取指定服务的节点信道
ServiceChannel::ChannelPtr choose(const std::string& service_name)
{
std::unique_lock<std::mutex> lock(_mutex);
auto sit = _services.find(service_name);
if (sit == _services.end()) {
LOG_ERROR("当前没有能够提供 {} 服务的节点!", service_name);
return ServiceChannel::ChannelPtr();
}
return sit->second->choose();
}
四.etcd和brpc联合测试
前面我们封装了etcd,有一个服务注册客户端和一个服务发现客户端。那么我们可以用刚刚封装的brpc和etcd进行一个连接测试。
我们是可以用brpc进行rpc调用的,但是向谁进行rpc调用呢?我们可以使用服务发现客户端对服务进行一个监控,当服务注册客户端注册一个服务后,服务发现客户端就会触发一个回调函数,在这个回调函数就会对触发的事件进行一个判断,如果是新增事件,则调用用户设置的put_cb,如果是删除事件,则调用用户的del_cb;那么我们是不是可以把我们刚刚封装的bprc中的onServiceOnline和onServiceOffline作为对应的回调函数设置进去呢?
大体思路:
- 服务注册客户端这边,首先创建一个brpc服务器,然后新增EchoService服务,并启动服务器。最后向etcd中注册一个/service/echo/instance服务,并指明自己的主机地址,表明自己的主机可以提供echo服务。
- 服务发现客户端这边,首先创建一个总的信道管理类对象,然后构造一个服务发现对象,对/service/echo服务进行监控。接着通过总的信道管理类对象获取到一个channel,通过这个channel,就可以创建stub进行服务调用。
1.服务注册客户端
首先创建一个服务器对象,因为我们要提供rpc服务。
brpc::Server server;
接着新增服务。
//3.向服务器对象中,新增EchoService服务
EchoServiceImpl echo_service;
int ret = server.AddService(&echo_service,brpc::ServiceOwnership::SERVER_DOESNT_OWN_SERVICE);
if (ret == -1) {
std::cout << "添加Rpc服务失败!\n";
return -1;
}
启动服务器,监听8085端口。
brpc::ServerOptions options;
options.idle_timeout_sec = -1;
options.num_threads = 1;
ret = server.Start(8085,&options);
if (ret == -1) {
std::cout << "启动服务器失败!\n";
return -1;
}
向etcd注册中心注册一个/service/echo/instance服务,设置val为自己的主机地址。代表当前主机可以提供echo服务。
此时etcd注册中心就会有一个{key:/service/echo/instance ; val:127.0.0.1:8085};
//5.注册服务
Registry::ptr rClient = std::make_shared<Registry>("127.0.0.1:2379");
rClient->registry("/service/echo/instance","127.0.0.1:8085");
2.服务发现客户端
先构建以恶个总的信道管理对象,需要通过这个对象来"知道"哪个主机可以提供服务。但具体是怎么发现的呢?可以看到,我们调用了declared这个函数代表我们关心/service/echo这个服务。此时这个字符串就会被插入到一个unordered_set中。其次我们将这个对象中的两个成员函数获取到了。这两个函数后续我们要设置进服务发现客户端中。
//1.先构建Rpc信道管理对象
auto sm = std::make_shared<ServiceManager>();
sm->declared("/service/echo");
auto put_cb = std::bind(&ServiceManager::onServiceOnline,sm.get(),std::placeholders::_1,std::placeholders::_2);
auto del_cb = std::bind(&ServiceManager::onServiceOffline,sm.get(),std::placeholders::_1,std::placeholders::_2);
构造服务发现对象,这里的构造函数我们传入了四个参数,第一个参数是etcd注册中心的地址,因为我们要和etcd通信.
第二个参数是我们要监控的服务。我们的服务发现对象中有一个成员watcher,他会对指定的服务进行监控,如服务数据有变化,就会进行处理。而这里的第二个参数就是watcher进行监控的服务。另外这个监控它是会进行递归的,/service/下的所有目录发现变化,他都可以感知到。第三个第四个函数就是我们设置进的回调函数,当watcher感知到服务发送变化,会获取到发送变化的服务的key和val,也就是服务名称和主机地址。这是就可以调用我们设置的两个回调函数,我们的回调函数刚好可以接受两个参数。
而这两个参数一个是/service/echo/instance,一个是127.0.0.1:8085。我们的服务信道管理类是管理 服务的信道的。我们服务发现客户端关心的是/service/echo服务。而这里的/service/echo/instance本质上是一个/service/echo服务。所以在onServiceOnline和onServiceOffline中我们对这个参数进行了一个劫取子串,只需要/service/echo这一部分。然后为这一部分创建一个服务信道管理类对象。
//2. 构造服务发现对象
Discovery::ptr dclient = std::make_shared<Discovery>("127.0.0.1:2379","/service", put_cb, del_cb);
接着就是获取服务的channel,我们关心哪个服务就获取什么服务的channel.
//3. 通过Rpc信道管理对象,获取提供Echo服务的信道
ServiceChannel::ChannelPtr channel = sm->choose("/service/echo");
if(channel == nullptr){
return -1;
}
通过channel来发起rpc调用
//4.发起rpc调用
example::EchoService_Stub stub(channel.get());
example::EchoRequest req;
req.set_message("你好,Lkm");
brpc::Controller *cntl = new brpc::Controller();
example::EchoResponse *rsp = new example::EchoResponse();
stub.Echo(cntl, &req, rsp, nullptr);
if(cntl->Failed() == true){
std::cout << "Rpc调用失败:" << cntl->ErrorText() << std::endl;
return;
}
std::cout << "收到响应: " << rsp->message() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));