RPC框架引入zookeeper服务注册与服务发现

news2025/1/17 1:05:03

Zookeeper概念及其作用

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是大数据生态中的重要组件。它是集群的管理者,监视着集群中各个节点的状态根据节点提交的反馈进行下一步合理操作。最终,将简单易用的接口和性能高效、功能稳定的系统提供给用户。

它是一个为分布式应用提供一致性协调服务的中间件
zookeeper入门参考链接:https://www.cnblogs.com/xinyonghu/p/11031729.html

在分布式系统中,zookeeper提供了非常丰富的应用,本文只是剖析其中一小部分,但也是非常重要的一个部分,即服务注册和服务发现。
在RPC框架中,如果没有服务注册和服务发现,那么这个RPC框架几乎变得不实用,浅显的思路是在RPCConsumer(服务调用端)维护一个服务的列表,这个列表包含了所有分布式节点服务的ip和端口,但考虑这么一种情况,如果其中某个节点由于某种原因down掉了或者将这个节点的服务删除了,但是RPCConsumer本地还维护的列表中还存在这个服务结点,并且还尝试请求这个服务,那么显然会调用出错。
在这里插入图片描述

类似这种肯定需要动态的维护每个分布式服务节点的状态,在该节点down掉或者被撤销时应及时删除这个服务,避免RPC调用端继续请求不存在的服务。这就是zookeeper服务注册和服务发现所做的事。

在这里插入图片描述
Zookeeper组织数据的格式类似于一个文件系统,每个znode结点都可以是一个分布式服务结点,一般组织的结构是XXXXService/login、 XXXXService/registe,即service_name/method_name,znode结点的数据就是该服务所在节点的ip和port
在这里插入图片描述

Zookeeper服务注册和发现的流程:

step1:Rpc服务端先通过zkClient向zkServer端注册服务,也即创建XXXXService/login、 XXXXService/registe节点,并填充相应的数据。
step2:Rpc调用端再调用某个服务之前,通过zkClient向ZkServer查询这个服务节点是否存在,如果存在则返回这个服务节点的ip和port。然后进行远程rpc调用,否则返回错误终止调用过程。
step3:这一步其实zookeeper已经帮我们做了,step1中注册服务的过程中,zkServer会与这个节点建立一个session,并且zkServer以1/3 * timeout 的时间定期为每个与之简历的节点发送心跳包,如果得不到回应那么zkServer会认为这个节点已经不存在了,会动态的把这个节点上的所有服务都进行删除。

RPC框架引入zookeeper

1、封装zkclient(用于与zkServer通信的句柄、例如创建结点和删除结点、以及一些心跳回调操作)

#pragma once


#include <semaphore.h>
#include <zookeeper/zookeeper.h>
#include <string>


class ZkClient
{
public: 
    ZkClient();
    ~ZkClient();
    // zkClient启动连接zkserver
    void Start();
    // 在zkserver上根据指定的path创建Znode节点
    void Create(const char *path, const char* data, int datalen, int state=0);
    // 根据参数指定的znode节点路径,获取znode节点的值
    std::string GetData(const char* path);

private:
    // zk客户端句柄
    zhandle_t *m_zhandle;
};

// .cc
#include "zookeeperutil.h"
#include "rpcapplication.h"
#include  <iostream>

//全局的watcher观察器     zkserver给zkclient的通知回调
void global_watcher(zhandle_t *zh, int type, int state, const char* path, void *watcherCtx)
{
    if(type == ZOO_SESSION_EVENT)    //回调的消息类型是和会话相关的消息类型
    {
        if(state == ZOO_CONNECTED_STATE)  //zkserver和zkclient连接成功
        {
            sem_t *sem = (sem_t*) zoo_get_context(zh);
            sem_post(sem);
        }
    }
}

ZkClient::ZkClient():m_zhandle(nullptr)
{
    
}
ZkClient::~ZkClient()
{
    if(m_zhandle != nullptr)
    {
        zookeeper_close(m_zhandle);   //关闭句柄, 释放资源
    }
}
// zkClient启动连接zkserver
void ZkClient::Start()
{
    std::string host = RpcApplication::GetInstance().GetConfig().Load("zookeeperip");
    std::string port = RpcApplication::GetInstance().GetConfig().Load("zookeeperport");
    std::string connstr = host + ":" + port;

    /*
        zookeeper_mt:多线程版本
        zookeeper的API客户端程序提供了三个线程
        APT调用线程
        网络I/O线程  pthread_create (使用的poll-IO多路复用)
        watcher回调线程 pthread_create
    */
   m_zhandle = zookeeper_init(connstr.c_str(), global_watcher, 30000, nullptr, nullptr, 0);
   if(nullptr == m_zhandle)
   {
        std::cout << "zookeeper_init error !" << std::endl;
        exit(EXIT_FAILURE);
   }

   sem_t sem;
   sem_init(&sem, 0, 0);
   zoo_set_context(m_zhandle, &sem);

   sem_wait(&sem);
   std::cout << "zookeeper_init success !" << std::endl;
}

// 在zkserver上根据指定的path创建Znode节点
void ZkClient::Create(const char *path, const char* data, int datalen, int state)
{
    char path_buffer[128];
    int bufferlen = sizeof(path_buffer);
    int flag;
    //先判断path表示的znode节点是否存在, 如果存在, 就不能重复创建了
    flag = zoo_exists(m_zhandle, path, 0, nullptr);
    if(ZNONODE == flag)   //表示path的znode节点不存在
    {
        // 创建指定path的znode节点
        flag = zoo_create(m_zhandle, path, data, datalen, &ZOO_OPEN_ACL_UNSAFE, state, path_buffer, bufferlen);
        if(flag == ZOK)
        {
            std::cout << "znode create success .... path:" << path << std::endl;
        }
        else
        {
            std::cout << "flag : " << flag <<std::endl;
            std::cout << "znode create error...path: " << path << std::endl;
            exit(EXIT_FAILURE);
        }
    }
}


// 根据参数指定的znode节点路径,获取znode节点的值
std::string ZkClient::GetData(const char* path)
{
    char buffer[64];
    int bufferlen = sizeof(buffer);
    int flag = zoo_get(m_zhandle, path, 0, buffer, &bufferlen, nullptr);
    if(flag != ZOK)
    {
        std::cout << "get znode error ...... path" << path << std::endl;
        return "";
    }
    else
    {
        return buffer;
    }
}

2、在RPCProvider端进行服务注册

//把当前rpc节点上要发布的服务全部注册到zk上面, 让rpc client可以从zk上发现服务
        // session timeout 30s          zkclient 的网络I/O线程 会定时以1/3 * timeout 时间去给zkserver发送ping心跳包
        ZkClient zkCli;
        zkCli.Start();
        //service name为永久性节点      method name 为临时性节点
        for(auto& sp : m_serviceMap)
        {
                // /service_name   ---> /UserServiceRPc
                std::string service_path = "/" + sp.first;
                zkCli.Create(service_path.c_str(), nullptr, 0);
                for(auto &mp : sp.second.m_methodMap)
                {
                        // /service_name/method_name /UserServiceRPc/Login 存储当前这个rpc服务节点主机的ip和port
                        std::string method_path = service_path + "/" + mp.first;
                        char method_path_data[128] = {0};
                        sprintf(method_path_data, "%s:%d", ip.c_str(), port);
                        //ZOO_EPHEMERAL 表示znode是一个临时性节点
                        zkCli.Create(method_path.c_str(), method_path_data, strlen(method_path_data), ZOO_EPHEMERAL);
                }

        }

3、RPCConsumer端进行服务发现

//rpc调用方想调用service_name的method_name的服务, 需要查询zk上该服务所在的host信息
     ZkClient zkCli;
     zkCli.Start();
     // /UserServiceRpc/Login
     std::string method_path = "/" + service_name + "/" + method_name;
     // 127.0.0.1:8000
     std::string host_data = zkCli.GetData(method_path.c_str());
     if(host_data == "")
     {
        controller->SetFailed(method_path + "is not exist!");
        return;
     }
     int idx = host_data.find(":");
     if(idx == -1)
     {
        controller->SetFailed(method_path + "address is invalid!");
        return;
     }
     std::string ip = host_data.substr(0, idx);
     uint16_t port = atoi(host_data.substr(idx + 1, host_data.size() - idx).c_str());

至此基本上完整RPC应该具备的核心东西都有了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/829679.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

海外ASO优化之如何提高应用的可见度和安装量

关键词的投放能够激励用户通过搜索查询找到我们的应用程序&#xff0c;安装它并在他们的设备上运行它。如果有足够的流量&#xff0c;应用程序在搜索中的某些关键词的排名将会提升&#xff0c;并且有助于提高我们应用程序的知名度和自然下载量。 1、选择正确的关键词来提升。 …

【韩顺平】JDBC

第一节 JDBC概述 1.1 JDBC原理图 Java不可能具体地去操作数据库&#xff0c;因为数据库有许多种&#xff0c;直接操作数据库是一种很低效且复杂的过程。 因此&#xff0c;Java引入JDBC&#xff0c;规定一套操作数据库的接口规范&#xff0c;从而要求数据库厂商去实现JDBC接口。…

金鸣识别将无表格线的图片转为excel的几个常用方案

我们知道&#xff0c;金鸣识别要将横竖线齐全的表格图片转为excel非常简单&#xff0c;但要是表格线不齐全甚至没有表格线的图片呢&#xff1f;这就没那么容易了&#xff0c;在识别这类图片时&#xff0c;我们一般会使用以下的一种或多种方法进行处理&#xff1a; 1. 基于布局…

04 Ubuntu中的中文输入法的安装

在Ubuntu22.04这种版本相对较高的系统中安装中文输入法&#xff0c;一般推荐使用fctix5&#xff0c;相比于其他的输入法&#xff0c;这款输入法的推荐词要好得多&#xff0c;而且不会像ibus一样莫名其妙地失灵。 首先感谢文章《滑动验证页面》&#xff0c;我是根据这篇文章的教…

网络安全--原型链污染

目录 1.什么是原型链污染 2.原型链三属性 1&#xff09;prototype 2)constructor 3)__proto__ 4&#xff09;原型链三属性之间关系 3.JavaScript原型链继承 1&#xff09;分析 2&#xff09;总结 3)运行结果 4.原型链污染简单实验 1&#xff09;实验一 2&#xff0…

matlab编程实践18、19

浅水方程 浅水方程可以建立起海啸和浴缸中波浪的数学模型。浅水方程建立了水或者其它不可压缩液体受扰动时传播的模型。隐含的假设是&#xff0c;液体的深度和波浪的长度、扰动等相比是很小的。 在这样的记号下&#xff0c;浅水方程为双曲守恒定律的一个例子。 使用拉克斯-冯特…

0基础学习VR全景平台篇 第77篇:全景相机-圆周率外接收音方案

一、相机外接收音准备工作 需要自行购买USB外置声卡&#xff0c;无线麦克风&#xff0c; Type-c的拓展器。 USB外置声卡 &#xff1a; 产品参数 品牌: HAGiBiS/海备思 名称: USB三合一声卡 接口:耳麦孔/耳机孔/麦克风孔 工作电流:≤38mA 工作电压: DV 5V 输入信噪比:≥90dB …

Facebook营销推广怎么做?有哪些技巧?

Facebook是使用人数比较多的一个社交软件,也是跨境电商的首要选择平台。要想做好Facebook宣传推广&#xff0c;做好以下步骤很重要。 一、基础设置 1.创建 Facebook 业务公共主页 这相当于商业版的Facebook个人资料。您可以添加自己的品牌名称&#xff0c;上传个人资料和封面…

K8s工作原理

K8s title: Kubernetes之初探 subtitle: K8s的工作原理 date: 2018-09-18 18:26:37K8s概述 我清晰地记得曾经读到过的一篇博文&#xff0c;上面是这样写的&#xff0c; “云端教父AWS云端架构策略副总裁Adrian Cockcroft曾指出&#xff0c;两者虽然都是运用容器技术&#xff0…

“窗口期”开启!多域融合大趋势下,中国智能汽车OS如何破局?

操作系统已经成为了各大车厂、互联网企业的必争之地。 过去几年&#xff0c;丰田、大众、奔驰等众多车企&#xff0c;以及阿里、百度、腾讯、华为等纷纷加大了操作系统的布局&#xff0c;智能汽车操作系统的抢位战已经火热开启。 汽车电子电气架构已经迈入了域集中式架构、多…

国产GOWIN实现低成本实现CSI MIPI转换DVP

CSI MIPI转换DVP&#xff0c;要么就是通用IC操作&#xff0c;如龙讯芯片和索尼芯片&#xff0c;但是复杂的寄存器控制器实在开发太累。对于FPGA操作&#xff0c;大部分都是用xilinx的方案&#xff0c;xilinx方案成本太高&#xff0c;IP复杂。 而用国产GOWIN已经实现了直接mipi …

工作日报怎么写?聪明灵犀工具能帮你

工作日报怎么写&#xff1f;在工作中每天写日报是必不可少的&#xff0c;日报不仅可以记录每天的工作内容&#xff0c;也可以帮助自己更好的规划下一步的工作任务。但是&#xff0c;如何写出一份好的日报呢&#xff1f;今天我们就来介绍一些工具&#xff0c;让你的写日报更加高…

Vue3徽标数(Badge)

APIs 参数说明类型默认值必传color自定义小圆点的颜色string‘’falsecount展示的数字&#xff0c;大于 overflowCount 时显示为 overflowCount&#xff0c;为 0 时隐藏number | slot0falseoverflowCount展示封顶的数字值number99falseshowZero当数值为 0 时&#xff0c;是否展…

干货 | 清华大学叶晓俊:GB/T 35274-2023《信息安全技术 大数据服务安全能力要求》解读...

全国信息技术安全标准化委员会&#xff08;简称信安标委或TC260&#xff09;在2021年通过了编制组申请的GB/T 35274-2017《信息安全技术 大数据服务安全能力要求》修订项目&#xff0c; 新版标准报批稿在2022年年底提交给国标委进行最后的形式化审查&#xff0c;从国标委标准进…

TCP的三次握手和四次挥手······详解

1、三次握手 三次握手是建立连接的过程 如图大致为三次握手的流程图&#xff1a; 当客户端对服务端发起连接时&#xff0c;会先发一个包连接请求数据&#xff0c;去询问能否建立连接&#xff0c;该数据包称为 “SYN”包 然后&#xff0c;如果对方同意连接&#xff0c;那么…

思科单臂路由、lacp链路聚合、NAT实验

实验拓扑图&#xff1a; 实验目的&#xff1a; 如图所示配置相应IP地址和VLAN&#xff0c;并通过在AR1上配置单臂路由&#xff0c;实现VLAN10和VLAN20的主机能够在VLAN间通信&#xff1b;在SW1和SW2的三条链路实施链路聚合&#xff0c;使用静态LACP模式&#xff0c;使一条链…

【linux--->网络层协议】

文章目录 [TOC](文章目录) 一、概念1.网络层概念2.IP地址概念 二、IP协议报文结构1.首部长度2.总长度(total length)3.协议4.版本号(version)5.服务类型(Type Of Service)6.生存时间间(Time To Live, TTL) 三、网段划分1.5类IP划分法.2.CIDR(Classless Interdomain Routing)划分…

STM32刷Micropython固件参考指南

STM32刷Micropython固件指南 其实刷固件和普通的程序下载烧录无多大的差异&#xff0c;主要是其他因数的影响导致刷固件或刷完固件无法运行的情况和相关问题。 &#x1f4d1;刷固件教程 固件下载。目前所支持的stm32型号有这些&#xff1a; stm32f0, stm32f4, stm32f7, stm32g…

《零基础入门学习Python》第076讲:GUI的终极选择:Tkinter13

这节课我们来学习 Tkinter 的布局管理器&#xff0c;那什么是布局管理器呢&#xff1f;说白了&#xff0c;就是用于管理你的组件如何排列。Tkinter 提供了 3 大布局管理器&#xff1a;pack、grid 和 place。 pack 是按添加顺序排列组件grid 是按行/列形式排列组件place 则允许…

qt富文本编辑基本知识(QTextBlockFormat、QTextListFormat)

可以参考该文章&#xff1a;QTextBlockFormat、QTextListFormat - 程序员大本营 核心知识如下&#xff1a; 如果想开发一个富文本编辑器&#xff08;html&#xff0c;markdown等常见格式&#xff09;&#xff0c;Qt已经为用户完成了几乎所有与编辑有关的具体工作&#xff0c;…