RPC 框架

news2024/9/20 9:38:27

RPC 全称 Remote Procedure Call——远程过程调用。

  • RPC技术简单说就是为了解决远程调用服务的一种技术,使得调用者像调用本地服务一样方便透明。
  • RPC是一种通过网络从远程计算机程序上请求服务,不需要了解底层网络技术的协议。

集群和分布式

集群:集群(cluster)是指在多台不同的服务器中部署相同应用或服务模块,构成一个集群,通过负载均衡设备对外提供服务。在不同的服务器中部署相同的功能。

分布式:指在多台不同的服务器中部署不同的服务模块,通过远程调用协同工作,对外提供服务。不同服务器中部署不同的功能,通过网络连接起来,组成一个完整的系统。

分布式是以缩短单个任务的执行时间来提升效率的,而集群则是通过提高单位时间内执行的任务数来提升效率。

为什么要有RPC?

服务化:微服务化,跨平台的服务之间远程调用;
分布式系统架构:分布式服务跨机器进行远程调用;
服务可重用:开发一个公共能力服务,供多个服务远程调用。
系统间交互调用:两台服务器A、B,服务器 A 上的应用 a 需要调用服务器 B 上的应用 b 提供的方法,而应用 a 和应用 b 不在一个内存空间,不能直接调用,此时,需要通过网络传输来表达需要调用的语义及传输调用的数据。

使用场景

  • 大型网站:内部涉及多个子系统,服务、接口较多。
  • 注册发现机制:如Nacos、Dubbo等,一般都有注册中心,服务有多个实例,调用方调用的哪个实例无感知。
  • 安全性:不暴露资源。
  • 服务化治理:微服务架构、分布式架构。

常用RPC技术或框架

  • 应用级的服务框架:阿里的 Dubbo/Dubbox、Google gRPC、Spring Boot/Spring Cloud。
  • 远程通信协议:RMI、Socket、SOAP(HTTP XML)、REST(HTTP JSON)。
  • 通信框架:MINA 和 Netty

RPC 原理

在这里插入图片描述
RPC 是指计算机 A 上的进程,调用另外一台计算机 B 上的进程,其中 A 上的调用进程被挂起,而 B 上的被调用进程开始执行,当值返回给 A 时,A 进程继续执行。调用方可以通过使用参数将信息传送给被调用方,而后可以通过传回的结果得到信息。而这一过程,对于开发人员来说是透明的。

远程过程调用采用客户机/服务器(C/S)模式。请求程序就是一个客户机,而服务提供程序就是一台服务器。和常规或本地过程调用一样,远程过程调用是同步操作,在远程过程结果返回之前,需要暂时中止请求程序。使用相同地址空间的低权进程或低权线程允许同时运行多个远程过程调用。

在这里插入图片描述

RPC五大模块及交互关系

在这里插入图片描述

  • user(客户端)
  • user-stub(客户端存根)
  • RPCRuntime(RPC通信包)
  • server-stub(服务端存根)
  • server(服务端)

用户端:当用户希望进行远程调用时,实际上是调用的本地 user-stub 中相应的代码。user-stub 负责将调用的规范和参数打包成一个或多个包,通过 RPCRuntime(RPC通信包)传输到被调用机器。
服务端:服务端接收到这些数据包后,对应的 RPCRuntime(RPC通信包)将它们传递给 server-stub。然后 server-stub 将它们解包,并调用对应的本地实现。同时用户端的调用进程挂起,等待服务端返回结果包。当服务端调用完成时,返回到 server-stub,并通过服务端的RPCRuntime 将结果传回用户端对应的 RPCRuntime(RPC通信包)挂起的进程中。然后通过 user-stub 解包,最后将它们返回给用户。

如果把用户端和服务端代码放在一台机器上,直接绑定在一起,不使用 user-stub 和 server-stub,程序仍然可以工作。RPCRuntime(RPC通信包)是Cedar系统的一个标准部分,因此不用程序员编写通信相关代码,但是 user-stub 和 server-stub 是由一个叫做 Lupine 的程序自动生成的,也不需要程序员编写对应包处理层面的代码。

RPC 业务实现

Callee 对外提供远端可调用方法 LoginRegister,要在 user.proto 中进行注册(service UserServiceRpc)。在Callee中的Login方法接受 LoginRequest message,执行完逻辑后返回LoginResponse message 给 Caller。

Caller 可以调用 UserServiceRpc_Stub::Login发起远端调用,而 Callee 则继承UserServiceRpc类并重写UserServiceRpc::Login函数,实现Login函数的处理逻辑。这是 protobuf 提供的接口,需要服务方法提供者重写这个 Login 函数。

class UserService : public fixbug::UserServiceRpc{  //使用在rpc服务发布端(rpc服务提供者)
    public:
    bool Login(std::string name, std::string pwd){
        std::cout << "doing local service : LOGIN " << std::endl;
        std::cout << "name: " << name << " pwd: "<< pwd << std::endl;
        return true;
    }

    //新增的测试方法
    bool Register(uint32_t id,std::string name,std::string pwd)
    {
        std::cout << "doing local service: Register" << std::endl;
        std::cout << "id:" << id <<" name:" << name << " pwd:" << pwd << std::endl;
        return true;
    }

    // 重写基类UserServiceRpc的虚函数 下面这些方法都是框架直接调用的
    // 1. caller --> Login(LoginRequest) --> muduo --> callee
    // 2. callee --> Login(LoginRequest) --> 交到下面重写的这个Login方法上了
    void Login(::google::protobuf::RpcController* controller,
                       const ::fixbug::LoginRequest* request,
                       ::fixbug::LoginResponse* response,
                       ::google::protobuf::Closure* done)
    {
        //框架给业务.上报了请求参数LoginRequest,应用获取相应数据做本地业务
        std::string name = request->name();
        std::string pwd = request->pwd();
    
        // 做本地业务
        bool login_result = Login(name, pwd); 

        // 把响应写入包括错误码、 错误消息、返回值
        fixbug::ResultCode *code = response->mutable_result();
        code->set_errcode(0);
        code->set_errmsg("");
        response->set_success(login_result);

        //执行回调操作执行, 响应对象数据的序列化和网络发送 (都是由框架来完成的)
        done->Run();
    }

    void Register(::google::protobuf::RpcController* controller,
                       const ::fixbug::RegisterRequest* request,
                       ::fixbug::RegisterResponse* response,
                       ::google::protobuf::Closure* done)
    {
        uint32_t id = request->id();
        std::string name = request->name();
        std::string pwd = request->pwd();

        //开始做本地业务
        bool ret = Register(id, name, pwd);

        //填充回调结果
        response->mutable_result()->set_errcode(0);
        response->mutable_result()->set_errmsg("");
        response->set_success(ret);

        done->Run();
    }
};

RPC 服务提供

  1. RpcProvider 是一个服务器,接收来自 rpc 客户端的请求,且能在一定程度上承载高并发的需求(考虑多个 rpcClient 给当前 rpcProvider 发送 rpc 调用请求)。
  2. 一个 rpcclient 发送请求过来调用一个远程方法,那么 rpcProvider 收到这个请求之后,能根据请求所携带的数据自动调用发布的 rpc 方法,那么请求必须包含服务名、方法名、以及参数,这样 rpcProvider 才知道怎么调用。即 buffer = service_name + method_name + args。
//框架提供的专门负责发布rpc服务的网络对象类
class RpcProvider{
public:
    //这里是框架提供给外部使用的,可以发布rpc方法的函数接口
    //此处应该使用Service类,而不是指定某个方法
    void NotifyService(google::protobuf::Service *service);

    //启动rpc服务节点,开始提供rpc远程网络调用服务
    void Run();

private:
    //组合 EventLoop
    muduo::net::EventLoop m_eventLoop;

    //service服务类型信息
    struct ServiceInfo
    {
        google::protobuf::Service *m_service;//保存服务对象
        std::unordered_map<std::string,const google::protobuf::MethodDescriptor*> m_methodMap;//保存服务方法

    };

    //存储注册成功的服务对象和其服务方法的所有信息
    std::unordered_map<std::string,ServiceInfo> m_serviceMap;

    // 新的 socket 连接时的回调
    void OnConnection(const muduo::net::TcpConnectionPtr &conn);
    // 已建立连接用户的读写事件回调;当远程有调用 rpc 服务的请求时,OnMessage 方法就会响应
    void OnMessage(const muduo::net::TcpConnectionPtr &conn,
                                    muduo::net::Buffer *buffer,
                                    muduo::Timestamp);
    //Closure的回调操作,用于序列化RPC的响应和网络发送
    void SendRpcResponse(const muduo::net::TcpConnectionPtr&,google::protobuf::Message* );
}; 
int main(int argc, char *argv[])
{
    //先调用框架的初始化操作 provider -i config.conf,从init方法读取配置服务,比如IP地址和端口号
    MprpcApplication::Init(argc,argv);

    //项目提供者,让我们可以发布该服务
    RpcProvider provider;
    //把UserService对象发布到rpc节点上
    provider.NotifyService(new UserService());
    
    //启动一个rpc服务发布节点,run以后,进程进入阻塞状态,等待远程的rpc请求
    provider.Run();

    return 0;
}

NotifyService 函数可以将UserService服务对象及其提供的方法进行预备发布。发布完服务对象后再调用Run()就将预备发布的服务对象及方法注册到ZooKeeper上并开启了对远端调用的网络监听。

Muduo提供的网络模块监听到连接事件并处理完连接逻辑后会调用OnConnection函数,监听到已建立的连接发生可读事件后会调用OnMessage函数

RpcProvider::NotifyService() 实现

Service_Info结构体内定义了一个服务对象,以及这个服务对象内提供的方法们(以std::unordered_map形式存储)

将传入进来的服务对象 service 进行预备发布。其实说直白点就是将这个 service 服务对象及其提供的方法的 Descriptor 描述类,存储在RpcProvider::m_serviceMap中。

/*
service_name <=> service 描述  => service* 记录服务对象
                              => method_name => method 方法对象
json protobuf
*/

//这里是框架提供给外部使用的,可以发布rpc方法的函数接口
//此处应该使用Service类,而不是指定某个方法
void RpcProvider::NotifyService(google::protobuf::Service *service){
    //服务表
    ServiceInfo service_info;//服务表

    //获取了服务对象的描述信息
    const google::protobuf::ServiceDescriptor *pserviceDesc = service->GetDescriptor();
    //获取服务的名字
    std::string service_name = pserviceDesc->name();
    //获取服务对象service的方法数量
    int methodCnt= pserviceDesc->method_count();

    std::cout<<"service name:"<<service_name<<std::endl;    // 添加日志信息后更改

    for(int i=0; i<methodCnt; ++i){
        //获取了服务对象指定下标的服务方法的描述(抽象描述)
        const google::protobuf::MethodDescriptor* pmethodDesc = pserviceDesc->method(i);

        std::string method_name = pmethodDesc->name();
        //插入服务
        service_info.m_methodMap.insert({method_name, pmethodDesc});

        printf("method_name:%s \n",method_name.c_str());
    }
    //可以使用该表来调用方法
    service_info.m_service = service;
    m_serviceMap.insert({service_name, service_info});

}

RpcProvider::Run() 实现

将待发布的服务对象及其方法发布到ZooKeeper上,同时利用Muduo库提供的网络模块开启对RpcServer的(IP, Port)的监听。

// 启动rpc服务节点,开始提供rpc远程网络调用服务
void RpcProvider::Run(){
    // 获取配置文件中的 ip 和端口号初始化结构体
    std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip");
    uint16_t port = atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str());
    muduo::net::InetAddress address(ip, port);

    // 为了方便用户使用框架,在 Run 方法中封装 muduo
    // 创建 TcpServer 对象
    muduo::net::TcpServer tcpServer_(&m_eventLoop, address, "MprpcProvider");

     // 绑定连接回调和消息读写回调方法
    tcpServer_.setConnectionCallback(std::bind(&RpcProvider::OnConnection, this, std::placeholders::_1));
    tcpServer_.setMessageCallback(std::bind(&RpcProvider::OnMessage, this, std::placeholders::_1, 
                                    std::placeholders::_2, std::placeholders::_3));
    
    // 设置 muduo 库的线程数
    tcpServer_.setThreadNum(4);

    //把当前rpc节点上要发布的服务全部注册在zk上,让rpc client可以从zk上发现服务
    //session的timeout默认为30s,zkclient的网络I/O线程1/3的timeout内不发送心跳则丢弃此节点
    ZkClient zkCli;
    zkCli.Start();//链接zkserver
    for(auto &sp:m_serviceMap){
        //service_name
        std::string service_path ="/"+sp.first;//拼接路径
        zkCli.Create(service_path.c_str(),nullptr,0);//创建临时性节点
        for(auto &mp:sp.second.m_methodMap){
            //service_name/method_name
            std::string method_path=service_path+"/"+mp.first;//拼接服务器路径和方法路径
            char method_path_data[128]={0};
            sprintf(method_path_data,"%s:%d",ip.c_str(),port);//向data中写入路径

            //创建节点,ZOO_EPHEMERAL表示临时节点
            zkCli.Create(method_path.c_str(),method_path_data,strlen(method_path_data),ZOO_EPHEMERAL);
        }
    }

    std::cout << "MprpcProvider start service at: " << ip << ':' << port << '\n';

    // 启动网络服务
    tcpServer_.start();
    m_eventLoop.loop();
    
}

RpcProvider::OnConnection() 实现

// 新的 socket 连接时的回调
void RpcProvider::OnConnection(const muduo::net::TcpConnectionPtr &conn){
    if(!conn->connected()){
        //和rpcclient的链接断开了
        conn->shutdown();
    }
}

RpcProvider::OnMessage() 实现

Caller 端发起远程调用的时候, 会对callee的rpcserver发起tcp连接,rpcserver接受连接后,开启对客户端连接描述符的可读事件监听。caller将请求的服务方法及参数发给callee的rpcserver,此时rpcserver上的muduo网络模块监听到该连接的可读事件,然后就会执行OnMessage(…)函数逻辑。

该方法表示已建立连接用户的读写事件操作,如果有一个远程 RPC 服务的调用请求,那么OnMessage方法就会响应。

  1. 首先要从网络上接收的远程rpc调用请求的字符流;
  2. 从字符流中读取前4个字节的内容,将头部的大小转换成二进制存到这四字节里,不可能会超出范围;
  3. 根据header_size读取数据头的原始字符流,反序列化数据,得到rpc请求的详细信息;
  4. 获取rpc方法参数的字符流数据,略过recv_buf的前面的头部信息(header_size和header_str),4字节加header_size即为开始的位置;
  5. 获取service对象和method对象;
  6. 生成rpc方法调用的请求request和响应response参数;
  7. 在框架上根据远端rpc请求,调用当前rpc节点上发布的方法。
/*
在框架内部需要提前协商好通信使用的protobuf数据类型:比如发送过来的数据类型为:service_name,method_name,args
需要定义proto的message类型,进行数据头的序列化和反序列化,为防止TCP的粘包,需要对各个参数进行参数的长度明确

定义header_size(4字节) + header_str + args_str

已建立连接的用户的读写事件回调,网络上如果有一个远程的rpc服务请求,则onmessge方法就会响应
*/

// 已建立连接用户的读写事件回调;当远程有调用 rpc 服务的请求时,OnMessage 方法就会响应
void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buffer, muduo::Timestamp){
    //获取到数据,即网络上接受的远程rpc调用请求的字符流, Login和args
    std::string recv_buf= buffer->retrieveAllAsString();

    //读取header_size,此时的整数若按照字符串格式发送,读取时会出现问题,所以需要直接按二进制发送
    //从字符流中读取前四个字节的内容
    uint32_t header_size=0;
    recv_buf.copy((char*)&header_size,4,0);

    //根据header_size读取数据头的原始字符流,反序列化数据,得到rpc的详细请求数据
    std::string rpc_header_str=recv_buf.substr(4,header_size);//substr从4开始读读取header_size个字节的数据
    mprpc::RpcHeader rpcHeader;

    std::string service_name;//用于存储反序列化成功的服务名字
    std::string method_name;//用于存储反序列化成功的服务方法
    uint32_t args_size;//用于存储反序列化成功的参数个数

    //开始反序列化,参数接受类型为引用,返回值为bool型
    if(rpcHeader.ParseFromString(rpc_header_str)){
        //数据头反序列化成功
        service_name=rpcHeader.service_name();
        method_name=rpcHeader.method_name();
        args_size=rpcHeader.args_size();
    }else {
        //数据头反序列化失败
        std::cout<<"rpc_header_str: "<<rpc_header_str<<"parse error!"<<std::endl;
        return;
    }

    //获取rpc参数方法的字符流数据
    std::string args_str=recv_buf.substr(4+header_size,args_size);

    //打印调试信息
    std::cout << "======================================" << std::endl;
    std::cout << "header_size: " << header_size << std::endl;
    std::cout << "rpc_header_str" << rpc_header_str << std::endl;
    std::cout << "service_name: " << service_name << std::endl;
    std::cout << "method_name: " << method_name << std::endl;
    std::cout << "args_str: " << args_str << std::endl;
    std::cout << "======================================" << std::endl;

    //获取service对象和method对象
    auto it = m_serviceMap.find(service_name);
    if(it == m_serviceMap.end()){
        //如果方法不存在
        std::cout << service_name << "is not exist!" << std::endl;
        return;
    }

    auto mit = it->second.m_methodMap.find(method_name);
    if(mit == it->second.m_methodMap.end()){
        //如果服务提供的方法不存在
        std::cout << service_name << ":" << method_name << "is not exists!" << std::endl;
        return;
    }

    google::protobuf::Service *service=it->second.m_service;    // 获取service对象,对应Userservice
    const google::protobuf::MethodDescriptor *method=mit->second;   // 获取method对象,对应Login方法

    //生成rpc方法调用的请求request和相应response参数
    google::protobuf::Message *request = service->GetRequestPrototype(method).New();//生成一个新对象
    if(!request->ParseFromString(args_str)){
        std::cout << "request parse error, content:" << args_str << std::endl;
        return;
    }

    google::protobuf::Message *response = service->GetResponsePrototype(method).New();//生成一个新对象

    //给下面的method方法的调用,绑定一个Closure的回调函数,因为模板的实参推演失败,所以需要指定类型
    google::protobuf::Closure *done = google::protobuf::NewCallback
    <RpcProvider,const muduo::net::TcpConnectionPtr&,google::protobuf::Message*>
    (this, &RpcProvider::SendRpcResponse, conn,response);

    //在框架上根据远端rpc请求,调用当前rpc节点上发布的方法
    //相当于UserService调用了Login方法
    service->CallMethod(method, nullptr, request, response, done);
}

NewCallback函数会返回一个google::protobuf::Closure类的对象,该Closure类其实相当于一个闭包。这个闭包捕获了一个成员对象的成员函数,以及这个成员函数需要的参数。然后闭包类提供了一个方法Run(),当执行这个闭包对象的Run()函数时,他就会执行捕获到的成员对象的成员函数,也就是相当于执行void RpcProvider::SendRpcResponse(conn, response);,这个函数可以将reponse消息体发送给Tcp连接的另一端,即caller

CallMethod 将服务名方法名进行组装,并用protobuf提供的序列化方法序列化,然后通过服务名方法名查找ZooKeeper服务器上提供该服务方法的RpcServer的地址信息,然后返回。接着再将请求的服务方法及其参数组装并序列化,向RpcServer发起tcp连接请求,连接建立后将序列化的数据发送给RpcServer,然后再等待接收来自RpcServer的返回消息体。

RpcProvider::SendRpcResponse() 实现

//Closure的回调操作,用于序列化RPC的响应和网络发送
void RpcProvider::SendRpcResponse(const muduo::net::TcpConnectionPtr& conn,google::protobuf::Message* response){
    std::string response_str;
    //response进行序列化
    if(response->SerializeToString(&response_str)){
        //序列化成功后,通过网络把rpc方法执行的结果发送回rpc的调用方
        conn->send(response_str);
    } else {
        std::cout<<"Serialize response error!"<<std::endl;
    }
    //模拟http的短链接服务,由rpcprovider主动断开连接
    conn->shutdown();
}

RPC 服务调用

调用方需要利用到的是 Stub 类。Stub 类需要提供一个带参数的构造函数,需要重写这个实参 RpcChannel。

class MprpcChannel:public google::protobuf::RpcChannel
{
public:
    //所有通过stub代理对象调用的rpc方法都从这里处理,统一做方法调用的数据序列化和网络发送
    void CallMethod(const google::protobuf::MethodDescriptor* method,
                          google::protobuf::RpcController* controller, const google::protobuf::Message* request,
                          google::protobuf::Message* response, google::protobuf::Closure* done);
};

提供方调用函数的方法:MprpcChannel::CallMethod,调用方的框架逻辑就是将访问的对象,函数,参数序列化,socket连接到zookeeper,获取对应的 response。

//所有通过stub代理对象调用的rpc方法都从这里处理,统一做方法调用的数据序列化和网络发送
void MprpcChannel::CallMethod(const google::protobuf::MethodDescriptor* method,
                          google::protobuf::RpcController* controller, 
                          const google::protobuf::Message* request,
                          google::protobuf::Message* response, 
                          google::protobuf::Closure* done)
{
    const google::protobuf::ServiceDescriptor* sd=method->service();
    std::string service_name=sd->name();    //service name
    std::string method_name=method->name(); //method name

    //获取参数的序列化字符串长度 args_size
    uint32_t args_size = 0;
    std::string args_str;
    if(request->SerializeToString(&args_str)){
        //序列化成功
        args_size=args_str.size();
    } else {
         controller->SetFailed("serialize request error!");//保存错误信息
        // std::cout <<"serialize request error!"<< std::endl;
        return;
    }

    //定义rpc的请求header
    mprpc::RpcHeader rpcHeader;
    rpcHeader.set_service_name(service_name);
    rpcHeader.set_method_name(method_name);
    rpcHeader.set_args_size(args_size);

    uint32_t header_size = 0;
    std::string rpc_header_str;
    if(rpcHeader.SerializeToString(&rpc_header_str)){   // response进行序列化   
        header_size = rpc_header_str.size();
    } else {
        // std::cout <<"serialize rpc header error!"<< std::endl;    // 优化
        controller->SetFailed("serialize rpc header error!");
        return;
    }

    //组织待发送的rpc请求的字符串
    std::string send_rpc_str;
    send_rpc_str.insert(0, std::string((char *)&header_size, 4));   // header_size
    send_rpc_str += rpc_header_str; // rpcheader
    send_rpc_str += args_str;   // args

    std::cout<<"======================================"<<std::endl;
    std::cout<<"header_size: "<<header_size<<std::endl;
    std::cout<<"rpc_header_str"<<rpc_header_str<<std::endl;
    std::cout<<"service_name: "<<service_name<<std::endl;
    std::cout<<"method_name: "<<method_name<<std::endl;
    std::cout<<"args_str: "<<args_str<<std::endl;
    std::cout<<"======================================"<<std::endl;

    //使用TCP编程,完成rpc方法的远程调用
    int clientfd = socket(AF_INET, SOCK_STREAM, 0);
    if(-1 == clientfd)
    {
        // std::cout << "create socket error! errno: "<< errno << std::endl;    //改用 controller 记录错误信息
        // exit(EXIT_FAILURE);
        char errtxt[512]={0};
        sprintf(errtxt,"create socket error! errno: %d",errno);
        controller->SetFailed(errtxt);
        return;
    }

    // std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip");
    // uint16_t port = atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str());

    /*
    rpc调用方向调用service_name服务,需要查询zk上该服务所在的host信息
    */
    ZkClient zkCli;
    zkCli.Start();
    std::string method_path="/"+service_name+"/"+method_name;

    //获取ip地址和端口号
    std::string host_data=zkCli.GetData(method_path.c_str());
    if(host_data=="")
    {
        controller->SetFailed(method_path+" is not exist!");
        return;
    }
    int idx=host_data.find(":");//分割符
    if(idx==-1)
    {
        controller->SetFailed(method_path+" address is invalid!");
        return;
    }
    std::string ip=host_data.substr(0,idx); //从字符串中返回一个指定的子串
    uint32_t port=atoi(host_data.substr(idx+1,host_data.size()-idx).c_str());   //把参数 str 所指向的字符串转换为一个整数


    struct sockaddr_in server_addr;
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(port);
    server_addr.sin_addr.s_addr = inet_addr(ip.c_str());

    //链接rpc服务节点
    if(-1 == connect(clientfd,(struct sockaddr*)&server_addr,sizeof(server_addr)))
    {

        // std::cout<<"connect error!errno: "<<errno<<std::endl;
        // close(clientfd);
        // exit(EXIT_FAILURE);
        close(clientfd);
        char errtxt[512]={0};
        sprintf(errtxt,"connect error! errno: %d",errno);
        controller->SetFailed(errtxt);
        return;
    }

    //发送rpc请求
    if(-1 == send(clientfd, send_rpc_str.c_str(), send_rpc_str.size(), 0))
    {
        // std::cout<<"send error!errno: "<<errno<<std::endl;
        // close(clientfd);
        // return;
        close(clientfd);
        char errtxt[512]={0};
        sprintf(errtxt,"send error! errno: %d",errno);
        controller->SetFailed(errtxt);
        return;
    }

    //接受rpc请求的响应值
    char recv_buf[1024]={0};
    int recv_size = 0;
    if(-1 == (recv_size = recv(clientfd, recv_buf, 1024, 0)))
    {
        close(clientfd);
        char errtxt[512]={0};
        sprintf(errtxt,"recv error! errno: %d",errno);
        controller->SetFailed(errtxt);
        return;
    }

    //反序列化rpc调用的响应数据
    
    std::string response_str(recv_buf, 0, recv_size);   //bug点:recv_buf遇到\0后的数据不再读取,导致反序列化失败
    //解决方案:使用string转换时会遇到\0,由于字符串特性导致不再读取,因为protobuf支持从数组转换,所以换方法直接从Array反序列化
    // if(!response->ParseFromString(response_str)){
    if(!response->ParsePartialFromArray(recv_buf,recv_size)){
        // std::cout<<"parse error! response_str:"<<response_str<<std::endl;
        // close(clientfd);
        // return;
        close(clientfd);
        char errtxt[512]={0};
        sprintf(errtxt,"arse error!! response_str: %s",response_str.c_str());
        controller->SetFailed(errtxt);
        return;
    }

    close(clientfd);
}

zookeeper

ZooKeeper 在这里作为服务方法的管理配置中心,负责管理服务方法提供者对外提供的服务方法。
Callee提前将本端对外提供的服务方法名及自己的通信地址信息(IP:Port)注册到ZooKeeper。
当Caller发起远端调用时,会先拿着自己想要调用的服务方法名询问 ZooKeeper,ZooKeeper 告知Caller想要调用的服务方法在哪台服务器上(ZooKeeper返回目标服务器的IP:Port给Caller),Caller便向目标服务器Callee请求服务方法调用。Callle在本地执行相应服务方法后将结果返回给Caller。

安装java环境

在这里插入图片描述

1.sudo apt-get install openjdk-8-jdk
2. 配置环境变量,编辑如下文件:vim ~/.bashrc
在最后一行加:

export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

3.测试jdk是否安装成功:java -version
在这里插入图片描述

Ubuntu安装JDK

Zookeeper分布式协调服务

下载 zookeeper.tar.gz,解压后
1.cd conf,将 zoo_sample.cfg 改名为 zoo.cfgmv zoo_sample.cfg zoo.cfg
2.进入bin目录,启动zkServer, ./zkServer.sh start
3.可以通过netstat查看zkServer的端口,在bin目录启动zkClient.sh链接zkServer,熟悉zookeeper怎么组织节点

在这里插入图片描述
在这里插入图片描述

zk的原生开发API(c/c++接口)

1.sudo ./configure
2.sudo make
在这里插入图片描述
zookeeper 源码编译生成C函数接口,在 ./configure 后生成的 Makefile 文件中,默认是将警告当成错误的,因此导致上图中的警告,总是以错误形式展现,编译失败

进入到生成的 Makefile 中,修改第548行,将AM_CFLAGS -Wall -Werror 改为 AM_CFLAGS -Wall上述问题

Linux安装zookeeper原生C API接口出现的make编译错误

3.make install

zookeeper 项目应用

ZooKeeper相当于是一个特殊的文件系统,不过和普通文件系统不同的是,这些节点都可以设置关联的数据,而文件系统中只有文件节点可以存放数据,目录节点不行。ZooKeeper内部为了保持高吞吐和低延迟,再内存中维护了一个树状的目录结构,这种特性使ZooKeeper不能存放大量数据,每个节点存放数据的上线为1M。

服务对象名在ZooKeeper中以永久性节点的形式存在,当RpcServer与ZooKeeper断开连接后,整个节点还是会存在。方法对象名则以临时性节点存在,RpcServer与ZooKeeper断开后临时节点被删除。临时节点上带着节点数据,在本项目中,节点数据就是提供该服务方法的RpcServer的通信地址(IP+Port)

//封装的zk客户端类
class ZkClient
{
public:
    ZkClient(); 
    ~ZkClient();
    //zkclinet启动链接zkserver
    void Start();
    //在zkserver上根据指定的path创建znode节点
    void Create(const char *path,const char *data,int datalen,int state=0);
    //传入参数指定的znode节点路径,或者znode节点的值
    std::string GetData(const char *path);

private:
    //zk的客户端句柄
    zhandle_t *m_zhandle;
};
#include"zookeeperutil.h"
#include"mprpcapplication.h"
#include<semaphore.h>
#include<iostream>

// 全局的 watcher 观察器  zkserver 给 zkclient 的通知
// 参数 type 和 state 分别是 ZooKeeper 服务端返回的事件类型和连接状态
void global_watcher(zhandle_t *zh,int type,int state,const char *path,void *watcherCtx)
{
    if(type==ZOO_SESSION_EVENT) //回调的消息类型是和会话相关的消息类型
    {
        if(state==ZOO_CONNECTED_STATE)  //zkclient和zkserver链接成功
        {
            sem_t *sem=(sem_t*)zoo_get_context(zh);
            sem_post(sem);  //信号量资源加一
        }
    }
}

ZkClient::ZkClient():m_zhandle(nullptr)
{}

ZkClient::~ZkClient()
{
    if(m_zhandle!=nullptr)
    {
        zookeeper_close(m_zhandle);//关闭句柄释放资源
    }
}

//zkclinet启动链接zkserver
void ZkClient::Start()
{
    //加载zk的IP和端口号,默认为2181
    std::string host=MprpcApplication::GetInstance().GetConfig().Load("zookeeperip");
    std::string port=MprpcApplication::GetInstance().GetConfig().Load("zookeeperport");
    std::string connstr=host+":"+port;

    //调用原生API,端口与IP,回调函数,会话超时时间
    /*
    zookeeper_mt:多线程版本
    zookeeper的API客户端程序提供了三个线程
    API调用线程
    网络I/O线程:专门在一个线程里处理网络I/O
    watcher回调线程
    */
    m_zhandle=zookeeper_init(connstr.c_str(), global_watcher, 30000, nullptr, nullptr, 0);
    // 仅仅通过判断接口返回的句柄是否为NULL,并不能表示句柄是可用的。
    // 因为,会话的建立过程是异步的,必须等到会话状态变成ZOO_CONNECTED_STATE才表示句柄可用。
    if(nullptr==m_zhandle)
    {
        std::cout<<"zookeeper_init error!"<<std::endl;
        exit(EXIT_FAILURE);
    }

    sem_t sem;
    sem_init(&sem,0,0); //初始化资源为0,用于多线程间的同步
    // 将刚才定义的同步信号量sem通过这个 zoo_set_context 函数可以传递给 m_zhandle 进行保存。
    // 在global_watcher中可以将这个sem从m_zhandle取出来使用。
    zoo_set_context(m_zhandle,&sem);    //设置上下文,添加额外信息

    sem_wait(&sem); // 阻塞结束后才连接成功!!!
    std::cout<<"zookeeper_init success!"<<std::endl;

}
//在zkserver上根据指定的path创建znode节点
void ZkClient::Create(const char *path,const char *data,int datalen,int state)
{
    char path_buffer[128];
    int bufferlen=sizeof(path_buffer);
    int flag;

    //检查该节点是否存在
    flag=zoo_exists(m_zhandle,path,0,nullptr);
    if(ZNONODE==flag)//该节点并不存在
    {
        //创建指定path的znode节点
        flag=zoo_create(m_zhandle,path,data,datalen,&ZOO_OPEN_ACL_UNSAFE,state,path_buffer,bufferlen);
        if(flag==ZOK)
        {
            std::cout<<"znode create success... path:"<<path<<std::endl;
        }
        else
        {
            std::cout<<"flag:"<<flag<<std::endl;
            std::cout<<"znode create error... path:"<<path<<std::endl;
            exit(EXIT_FAILURE);
        }
    }
}
//传入参数指定的znode节点路径,获取znode节点的值
std::string ZkClient::GetData(const char *path)
{
    char buffer[64];
    int bufferlen=sizeof(buffer);
    int flag=zoo_get(m_zhandle,path,0,buffer,&bufferlen,nullptr);//获取信息与返回值
    if(flag!=ZOK)//如果获取失败
    {
        std::cout<<"get znode error... path:"<<path<<std::endl;
        return "";
    }
    else
    {
        //获取成功
        return buffer;
    }
    
}

watcher 机制就是ZooKeeper客户端对某个 znode 建立一个watcher事件,当该znode发生变化时,这些ZK客户端会收到ZK服务端的通知,然后ZK客户端根据znode的变化来做出业务上的改变。

ZooKeeper服务端收到来自客户端 callee 的连接请求后,服务端为节点创建会话(此时这个节点状态发生改变),服务端会返回给客户端callee一个事件通知,然后触发watcher回调(执行global_watcher函数).

总结

深入浅出RPC服务(一)RPC来源-论文解读
深入浅出RPC服务(二)不同层的网络协议
RPC 详解
RPC——RPC协议介绍及原理详解
C++实现轻量级RPC分布式网络通信框架
https://blog.csdn.net/weixin_52344401/article/details/131343863

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1794932.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【matlab】绘图插入并放大/缩小子图

参考链接 代码分为两个&#xff1a;绘图代码与magnify.m 绘图代码就是普通的绘图代码&#xff0c;以下为例 %https://zhuanlan.zhihu.com/p/655767542 clc clear close all x 0:pi/100:2*pi; y1 sin(x); plot(x,y1,r-o); hold on y2sin(x)-0.05; y3sin(x)0.05; xlim([0 2*…

企业在现代市场中的战略:通过数据可视化提升财务决策

新时代&#xff0c;财务规划团队不仅仅是企业内部的一个部门&#xff0c;更是帮助企业做出明智决策和设定战略目标的中坚力量。在当今瞬息万变的商业环境中&#xff0c;财务专业人士需要具备应对挑战并引导企业走向成功的角色职能。企业领导者时常面临着数据压力&#xff0c;需…

混剪素材哪里找?分享几个热门混剪素材下载网站

在短视频和新媒体的世界里&#xff0c;高质量的混剪素材是吸引观众的关键。今天&#xff0c;我将为大家详细介绍几个优秀的素材网站&#xff0c;它们不仅资源丰富&#xff0c;而且完全满足新媒体创作者的需求。这篇文章将帮助你理解如何有效利用这些平台提升你的视频创作。 蛙…

小型企业网络组网与配置仿真实验

实验要求如下: 我这里以学号46为例 一、IP 地址规划表 &#xff08;一&#xff09;主类网络 &#xff08;二&#xff09;子网划分 需要自己计算有效ip范围 在C类主网络192.168.46.0/24中&#xff0c;我们需要先了解这个网络的子网掩码为255.255.255.0&#xff0c;其二进制…

DDMA信号处理以及数据处理的流程---DDMA原理介绍

Hello&#xff0c;大家好&#xff0c;我是Xiaojie&#xff0c;好久不见&#xff0c;欢迎大家能够和Xiaojie一起学习毫米波雷达知识&#xff0c;Xiaojie准备连载一个系列的文章—DDMA信号处理以及数据处理的流程&#xff0c;本系列文章将从目标生成、信号仿真、测距、测速、cfar…

Vitis HLS 学习笔记--接口聚合与解聚-AXI主接口

目录 1. 简介 2. 用法及语法 3. 详细解读 4. 总结 1. 简介 在使用 Vitis HLS 工具进行硬件设计时&#xff0c;如果你在接口上使用了结构体&#xff0c;工具会自动把结构体里的所有元素组合成一个整体。就像把一堆零件组装成一个玩具一样。这样做的好处是&#xff0c;数据可…

【System Verilog and UVM基础入门4】程序和接口

目录 方法task和函数function 接口 [System Verilog特性] 方法task和函数function 首先要明白一个事情!Task任务,是消耗时间的,函数function是不消耗时间的! 这样写看着是不是很高大上呢?此外,如果我们想修改时钟周期怎么办呢?这时我们可以在task clk_gen(int period…

从报名到领证:软考高级【系统分析师】报名考试全攻略

本文共计13156字&#xff0c;预计阅读39分钟。包括七个篇章&#xff1a;报名、准考证打印、备考、考试、成绩查询、证书领取及常见问题。 不想看全文的可以点击目录&#xff0c;找到自己想看的篇章进行阅读。 一、报名篇 报名条件要求&#xff1a; 1.凡遵守中华人民共和国宪…

盛夏之约,即将启程,2024中国北京消防展将于6月26举行

盛夏之约&#xff0c;即将启程&#xff0c;2024中国北京消防展将于6月26举行 盛夏之约&#xff0c;即将启程&#xff01;备受瞩目的2024中国&#xff08;北京&#xff09;消防技术与设备展览会将于6月26-28 日在北京.首钢会展中心盛大召开。作为消防安全和应急救援的年度盛会&…

Camtasia Studio2024永久免费版及最新版本功能讲解

在当前数字化时代&#xff0c;视频内容的制作与编辑变得愈发重要。无论是企业宣传、在线教育还是个人Vlog制作&#xff0c;一款功能强大且易于上手的视频编辑软件成为了刚需。Camtasia Studio作为市场上备受欢迎的视频编辑与屏幕录像工具&#xff0c;凭借其强大的功能与用户友好…

Golang——gRPC与ProtoBuf介绍

一. 安装 1.1 gRPC简介 gRPC由google开发&#xff0c;是一款语言中立&#xff0c;平台中立&#xff0c;开源的远程过程调用系统。gRPC客户端和服务器可以在多种环境中运行和交互&#xff0c;例如用java写一个服务器端&#xff0c;可以用go语言写客户端调用。 1.2 gRPC与Protob…

android睡眠分期图

一、效果图 做医疗类项目&#xff0c;经常会遇到做各种图表&#xff0c;本文做的睡眠分期图。 二、代码 引入用到的库 api joda-time:joda-time:2.10.1 调用代码 /*** 睡眠* 分期*/private SleepChartAdapter mAdapter;private SleepChartAttrs mAttrs;private List<SleepI…

day26-单元测试

1. 单元测试Junit 1.1 什么是单元测试&#xff1f;&#xff08;掌握&#xff09; 1.2 Junit的特点&#xff1f;&#xff08;掌握&#xff09; 1.3 基本用法&#xff1a;&#xff08;掌握&#xff09; 实际开发中单元测试的使用方式&#xff08;掌握&#xff09; public class …

安徽京准NTP时钟系统:GPS北斗卫星授时下的生活重塑

安徽京准NTP时钟系统&#xff1a;GPS北斗卫星授时下的生活重塑 安徽京准NTP时钟系统&#xff1a;GPS北斗卫星授时下的生活重塑 时间的流逝自古以来时钟都是人类生活与活动的基础。然而&#xff0c;随着科技的进步&#xff0c;我们对时间管理和测量的方法已经发生了翻天覆地的变…

【UML用户指南】-09-对基本结构建模-类图

目录 1、概述 2、引入 3、过程 4、常用建模技术 4.1、对简单协作建模 4.2、对逻辑数据库模式建模 4.3、正向工程 1、概述 类图是面向对象系统建模中最常见的图。 类图显示一组类、接口、协作以及它们之间的关系 类图用于对系统静态设计视图建模。其大多数涉及到对系统的…

完整指南:远程管理 Linux 服务器的 Xshell6 和 Xftp6 使用方法(Xshell无法启动:要继续使用此程序........,的解决方法)

&#x1f600;前言 在当今软件开发领域&#xff0c;远程管理 Linux 服务器已成为日常工作的重要组成部分。随着团队成员分布在不同的地理位置&#xff0c;远程登录工具的使用变得至关重要&#xff0c;它们为开发人员提供了访问和管理服务器的便捷方式。本文将介绍两款功能强大的…

深度学习框架-----Tensorflow2基础

一、基础概念 1、深度学习框架基础概念 深度学习框架的出现降低了入的槛。我们不在需要丛从复杂的神经网络和反向传播算法开始编代码&#xff0c;可以依据需要&#xff0c;使用已有的模型配置参数&#xff0c;而模型的参数自动训练得到。我们也可以在已有模型的基础上增加自定…

[word] word怎样转换成pdf #职场发展#经验分享#职场发展

word怎样转换成pdf word怎样转换成pdf&#xff1f;word格式是办公中常会用到的格式&#xff0c;word格式编辑好了要想转换成pdf格式再来传输的话需要怎么操作呢&#xff1f;小编这就给大家分享下操作方法&#xff0c;一起来学习下吧&#xff01; 1、安装得力PDF转换器&#x…

vmware将物理机|虚拟机转化为vmware虚机

有时&#xff0c;我们需要从不同的云平台迁移虚拟机、上下云、或者需要将不再受支持的老旧的物理服务器转化为虚拟机&#xff0c;这时&#xff0c;我们可以用一款虚拟机转化工具&#xff1a;vmware vcenter converter standalone&#xff0c;我用的是6.6的版本&#xff0c;当然…

【Linux取经路】信号的发送与保存

文章目录 一、重新理解发送信号二、信号的保存、阻塞信号的概念三、信号集操作函数3.1 sigprocmask3.2 sigpending 四、阻塞信号代码验证五、结语 一、重新理解发送信号 进程通过位图来实现对普通信号&#xff08;1-31号信号&#xff09;的保存&#xff0c;该位图保存在进程的…