手写RPC框架-整合注册中心模块设计与实现

news2024/10/4 15:38:31

源码地址:https://github.com/lhj502819/IRpc/tree/v2

思考

  • 如果同一个服务有10台不同的机器进行提供,那么客户端该从哪获取这10台目标机器的ip地址信息呢?
  • 随着调用方的增加,如何对服务调用者的数据进行监控呢?
  • 服务提供者下线了,该如何通知到服务调用方?

实现的功能

  • 基于zookeeper作为注册中心进行了统一的访问接口封装与实现,并且能够支持日后其他注册中心的拓展。
  • 当服务提供方发生变更的时候,借助注册中心通知到客户端做本地调用表的一个更新操作。
  • 当服务订阅的时候需要告知注册中心修改节点数据,方便日后可以针对调用者做一些数据统计与监控的功能。
  • 统一将节点的更新后的相关操作通过事件的机制来实现代码解偶。
  • 将项目中常用的一些缓存数据按照服务端和客户端两类角色进行分开管理。
  • 将对于netty连接的管理操作统一封装在了ConnectionHandler类中,以及将之前硬编码的配置信息都迁移到了properties配置文件中,并设计了PropertiesBootst rap类进行管理。

架构图
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6P78upLB-1672750337695)(img_1.png)]

接入注册中心

抽象注册中心相关操作

方便扩展多种注册中心

public interface RegistryService {
    
    void register(URL url);
    
    void unRegister(URL url);
    
    void subscribe(URL url);

    void doUnSubscribe(URL url);

}

通过curator封装Zookeeper相关api

public class CuratorZookeeperClient extends AbstractZookeeperClient {

    private static Logger logger = LoggerFactory.getLogger(CuratorZookeeperClient.class);

    private CuratorFramework client;

    public CuratorZookeeperClient(String zkAddress) {
        this(zkAddress, null, null);
    }

    public CuratorZookeeperClient(String zkAddress, Integer baseSleepTimes, Integer maxRetryTimes) {
        super(zkAddress, baseSleepTimes, maxRetryTimes);
        //重试策略
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(super.getBaseSleepTimes(), super.getMaxRetryTimes());
        if (client == null) {
            client = CuratorFrameworkFactory.newClient(zkAddress, retryPolicy);
            client.start();
        }
    }
    ...省略部分代码...
}

实现Zookeeper注册中心相关api

需要注意的是我们会在doAfterSubscribe中添加一个监听,监听是否有新的Provider注册,当监听到后会通过指定的事件通知Client进行变更操作

public class ZookeeperRegister extends AbstractRegister implements RegistryService {

    private static Logger logger = LoggerFactory.getLogger(ZookeeperRegister.class);

    private AbstractZookeeperClient zkClient;

    private String ROOT = "/irpc";

    public ZookeeperRegister(String address) {
        this.zkClient = new CuratorZookeeperClient(address);
    }

    @Override
    public void doAfterSubscribe(URL url) {
        String newServerNodePath = ROOT + "/" + url.getServiceName() + "/provider";
        //监听是否有新的服务注册
        watchChildNodeData(newServerNodePath);
    }

    private void watchChildNodeData(String newServerNodePath) {
        zkClient.watchChildNodeData(newServerNodePath, watchedEvent -> {
            logger.info("监听到事件:{}",watchedEvent);
            String path = watchedEvent.getPath();
            List<String> childrenDataList = zkClient.getChildrenData(path);
            URLChangeWrapper urlChangeWrapper = new URLChangeWrapper();
            urlChangeWrapper.setProviderUrl(childrenDataList);
            urlChangeWrapper.setServiceName(path.split("/")[2]);

            //自定义的事件监听组件
            //当某个节点的数据发生更新后,会发送一个节点更新的事件,然后在事件的监听端对不同的行为做不同的事件处理操作
            IRpcEvent iRpcEvent = new IRpcUpdateEvent(urlChangeWrapper);
            IRpcListenerLoader.sendEvent(iRpcEvent);

            //zk节点的消息通常只具有一次性的功效,所以可能会出现第一次修改节点之后发送一次通知,之后再次修改节点之后不会再发送节点变更通知
            //因此收到回调之后,需要在注册一次监听,这样能保证一直都有收到消息
            watchChildNodeData(path);
        });

    }
    ...省略部分代码...
}

公共模块

对一次调用的封装

public class RpcInvocation {

    /**
     * 请求的目标方法,例如finderUser
     */
    private String targetMethod;

    /**
     * 请求的目标服务名称,例如:cn.onenine.user.UserService
     */
    private String targetServiceName;

    /**
     * 请求参数信息
     */
    private Object[] args;

    /**
     * 用于匹配请求和响应的一个关键值,当请求从客户端发出的时候,会有一个uuid用于记录发出的请求
     *  待数据返回的时候通过uuid来匹配对应的请求线程,并且返回给调用线程
     */
    private String uuid;

    /**
     * 接口响应的数据塞入这个字段中(如果是异步调用或者是void类型,这里就为空)
     */
    private Object response;
    ...省略部分代码...
}

将硬编码的配置信息抽离到配置文件中

public class PropertiesBootstrap {

    private volatile boolean configIsReady;

    public static final String SERVER_PORT = "irpc.serverPort";

    public static final String REGISTER_ADDRESS = "irpc.registerAddr";

    public static final String APPLICATION_NAME = "irpc.applicationName";

    public static final String PROXY_TYPE = "irpc.proxyType";

    public static final String CALL_TIMEOUT = "irpc.call.timeout";

    public static ServerConfig loadServerConfigFromLocal(){
        try {
            PropertiesLoader.loadConfiguration();;
        }catch (IOException e){
            throw new RuntimeException("loadServerConfigFromLocal fail,e is  {}",e);
        }

        ServerConfig serverConfig = new ServerConfig();
        serverConfig.setPort(PropertiesLoader.getPropertiesInteger(SERVER_PORT));
        serverConfig.setApplicationName(PropertiesLoader.getPropertiesStr(APPLICATION_NAME));
        serverConfig.setRegisterAddr(PropertiesLoader.getPropertiesStr(REGISTER_ADDRESS));
        return serverConfig;
    }

    public static ClientConfig loadClientConfigFromLocal(){
        try {
            PropertiesLoader.loadConfiguration();;
        }catch (IOException e){
            throw new RuntimeException("loadClientConfigFromLocal fail,e is {}" , e);
        }

        ClientConfig clientConfig = new ClientConfig();
        clientConfig.setApplicationName(PropertiesLoader.getPropertiesStr(APPLICATION_NAME));
        clientConfig.setRegisterAddr(PropertiesLoader.getPropertiesStr(REGISTER_ADDRESS));
        clientConfig.setProxyType(PropertiesLoader.getPropertiesStr(PROXY_TYPE));
        clientConfig.setCallTimeout(Long.parseLong(Objects.requireNonNull(PropertiesLoader.getPropertiesStr(CALL_TIMEOUT))));
        return clientConfig;
    }
}

Provider服务提供方

调用注册中心相关API注册服务

程序启动时可调用此API进行服务暴露注册

public void exportService(Object serviceBean){
    if (serviceBean.getClass().getInterfaces().length == 0){
        throw new RuntimeException("service must had interfaces!");
    }
    Class<?>[] classes = serviceBean.getClass().getInterfaces();
    if (classes.length >1){
        throw new RuntimeException("service must only had one interfaces!");
    }
    if (registryService == null){
        registryService = new ZookeeperRegister(serverConfig.getRegisterAddr());
    }
    //默认选择该对象的第一个实现
    Class<?> interfaceClass = classes[0];
    PROVIDER_CLASS_MAP.put(interfaceClass.getName(),serviceBean);
    URL url = new URL();
    url.setServiceName(interfaceClass.getName());
    url.setApplicationName(serverConfig.getApplicationName());
    url.addParameter("host", CommonUtils.getIpAddress());
    url.addParameter("port",String.valueOf(serverConfig.getPort()));
    PROVIDER_URL_SET.add(url);
}

/**
 * 异步注册
 */
private void batchExportUrl() {

    Thread task = new Thread(() -> {
        try {
            Thread.sleep(2500);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        for (URL url : PROVIDER_URL_SET) {
            registryService.register(url);
        }
    });

    task.start();

}

Consumer服务调用方

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TlCUSVel-1672750337696)(img_2.png)]

封装对通过netty对provider的一系列连接API

可对provider发起连接、断开连接以及获取每个连接的ChannelFuture

public class ConnectionHandler {

    /**
     * 核心的连接处理器
     * 专门用于负责和服务端构建连接通信
     */
    private static Bootstrap bootstrap;

    public static void setBootstrap(Bootstrap bootstrap) {
        ConnectionHandler.bootstrap = bootstrap;
    }

    /**
     * 构建单个连接通道,元操作
     * @param providerServiceName
     * @param providerIp
     * @throws InterruptedException
     */
    public static void connect(String providerServiceName,String providerIp) throws InterruptedException{
        if (bootstrap == null){
            throw new RuntimeException("bootstrap can not be null");
        }

        //格式错误类型的信息
        if (!providerIp.contains(":")){
            return;
        }
        String[] providerAddress = providerIp.split(":");
        String ip = providerAddress[0];
        int port = Integer.parseInt(providerAddress[1]);
        ChannelFuture channelFuture = bootstrap.connect(ip, port).sync();
        ChannelFutureWrapper channelFutureWrapper = new ChannelFutureWrapper();
        channelFutureWrapper.setChannelFuture(channelFuture);
        channelFutureWrapper.setHost(ip);
        channelFutureWrapper.setPort(port);
        SERVER_ADDRESS.add(providerIp);
        List<ChannelFutureWrapper> channelFutureWrappers = CONNECT_MAP.get(providerServiceName);
        if (CollectionUtil.isEmpty(channelFutureWrappers)){
            channelFutureWrappers = new ArrayList<>();
        }

        channelFutureWrappers.add(channelFutureWrapper);
        CONNECT_MAP.put(providerServiceName,channelFutureWrappers);
    }


    public static ChannelFuture createChannelFuture(String host, Integer port) throws InterruptedException {
        return bootstrap.connect(host,port).sync();
    }

    /**
     * 断开连接
     * @param providerServiceName 服务名
     * @param providerIp 服务提供者IP
     */
    public static void disConnect(String providerServiceName,String providerIp){
        SERVER_ADDRESS.remove(providerIp);
        List<ChannelFutureWrapper> channelFutureWrappers = CONNECT_MAP.get(providerServiceName);
        if (CollectionUtil.isNotEmpty(channelFutureWrappers)){
            channelFutureWrappers.removeIf(channelFutureWrapper ->
                    providerIp.equals(channelFutureWrapper.getHost() + ":" + channelFutureWrapper.getPort()));
        }
    }

    /**
     * 默认走随机策略获取ChannelFuture
     */
    public static ChannelFuture getChannelFuture(String providerServiceName){
        List<ChannelFutureWrapper> channelFutureWrappers = CONNECT_MAP.get(providerServiceName);
        if (CollectionUtil.isEmpty(channelFutureWrappers)){
            throw new RuntimeException("no provider exist for " + providerServiceName);
        }

        return channelFutureWrappers.get(new Random().nextInt(channelFutureWrappers.size())).getChannelFuture();
    }
}

异步发送任务

class AsyncSendJob implements Runnable {

        public AsyncSendJob() {
        }

        @Override
        public void run() {
            while (true) {
                try {
                    //阻塞模式
                    RpcInvocation data = CommonClientCache.SEND_QUEUE.take();
                    //将RpcInvocation封装到RpcProtocol对象中,然后发送给服务端,这里正好对应了ServerHandler
                    String json = JSONObject.toJSONString(data);
                    RpcProtocol rpcProtocol = new RpcProtocol(json.getBytes());
                    ChannelFuture channelFuture = ConnectionHandler.getChannelFuture(data.getTargetServiceName());
                    channelFuture.channel().writeAndFlush(rpcProtocol);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

        }
    }

剩余的问题

  • 服务注册环节出现失败时,Provier应该进行定期重试机制,保证服务能够有多次注册的机会,实现自愈的效果;
  • 注册中心挂了,这种情况对于已经建立连接的Provider和Consumer之间是不受影响的,但是如果此时任意一方出现了配置调整(权重变化,服务下线),另一方都是无感的。针对这种情况应该及时恢复注册中心,并且在恢复注册中心之后需要让原先的Provider和Consumer都收到通知,并且把数据重新同步。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/136668.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

十五、类加载器、反射、xml

类加载器 1类加载器【理解】 作用 负责将.class文件&#xff08;存储的物理文件&#xff09;加载在到内存中 2类加载的过程【理解】 类加载时机 创建类的实例&#xff08;对象&#xff09;调用类的类方法访问类或者接口的类变量&#xff0c;或者为该类变量赋值使用反射方式来…

【C++编程调试秘籍】| 总结归纳要点

文章目录一、编译器是捕捉缺陷的最好场合1 如何使用编译器捕捉缺陷二、在运行时遇见错误该如何处理1 该输出哪些错误信息2 执行安全检查则会减低程序效率&#xff0c;该如何处理呢3 当运行时遇到错误时&#xff0c;该如何处理四、索引越界1 动态数组2 静态数组3 多维数组5 指针…

uboot驱动和Linux内核驱动有什么区别?

一、前言 uboot启动后&#xff0c;一些外设如DDR、EMMC、网口、串口、音频、显示等等已经被初始化&#xff0c;为什么Linux内核中还需要写Linux驱动呢&#xff1f; 二、uboot驱动和Linux驱动的区别 1、直观理解 驱动&#xff0c;不仅仅是为了初始化&#xff0c;还实现了一组…

《Linux》1.权限

1.用户 首先介绍一下Linux中的用户概念。Linux下有两种用户&#xff1a;超级用户&#xff08;root&#xff09;&#xff0c;普通用户。 超级用户&#xff1a;可以再linux系统下做任何事情&#xff0c;不受限制 普通用户&#xff1a;在linux下做有限的事情。 超级用户的命令提示…

FTP回复码

FTP回复码由3个数字和对应文本组成 恢复定义为3个数字编码&#xff0c;紧跟一个空格 sp&#xff0c;然后是一行文本&#xff0c;以telnet的换行符表是结束 但是&#xff0c;某些情况下&#xff0c;文本内容太长超过一行&#xff0c;这就需要在第一行和最后一行特殊处理。处理格…

C++模拟实现栈(stack)和队列 (queue)

目录 一、栈&#xff08;stack) 的介绍 二、队列&#xff08;queue) 的介绍 三、容器适配器 二、deque的简单介绍 三、模拟实现stack 3.1 stack.h 3.2 test.cpp 四、模拟实现queue 4.1 queue.h 4.2 test.cpp 一、栈&#xff08;stack) 的介绍 1. stack是一种容器适配…

字符串、内存函数的介绍(13)

目录 1、字符串函数 1、strlen 模拟实现&#xff1a; 2、strcpy 模拟实现&#xff1a; 3、strcat 模拟实现&#xff1a; 4、strcmp 模拟实现&#xff1a; 5、strncpy 6、strncat 7、strncmp 8、strstr 模拟实现&#xff1a; 9、strtok 10、strerror 11、其他字…

回顾2022年5月IB全球统考成绩,这些学校IB成绩非常亮眼

IB大考成绩放榜&#xff0c;全球17&#xff0c;3878名学生在2022年5月的考试中获得文凭课程(DP)和职业课程(CP)的成绩。今年全球640位考生获得满分45分&#xff0c;全球平均分31.98分。以下是部分公布公布成绩的学校&#xff1a; 成都树德中学国际部&#xff1a;在2022年的全球…

电商维权控价方法论

电商经济繁荣发展&#xff0c;品牌销售渠道多样化&#xff0c;带来流量的同时&#xff0c;各种渠道问题也暴露出来&#xff0c;如&#xff0c;低价、侵权……渠道秩序面临着严峻挑战&#xff0c;品牌生命周期也受到了威胁。所以&#xff0c;越来越多的品牌选择维权控价&#xf…

2022年终总结与2023新年展望

前言 时间过得太快了&#xff0c;虽然写博客已经很多年了&#xff0c;但是年终总结一直由于种种原因没有写过&#xff0c;2022年确实是魔幻的一年&#xff0c;不知不觉自己也已经研二了&#xff0c;因为疫情的原因突然放开&#xff0c;提前放假回家&#xff0c;借此机会写一下…

Git(七) - IDEA 集成 GIT

一、配置 Git 忽略文件 &#xff08;1&#xff09;问题1:为什么要忽略他们&#xff1f; 答&#xff1a;与项目的实际功能无关&#xff0c;不参与服务器上部署运行。把它们忽略掉能够屏蔽IDE工具之 间的差异。 &#xff08;2&#xff09;问题2&#xff1a;怎么忽略&#xff1f; …

TP可能用到的函数

说明 该文章来源于同事lu2ker转载至此处&#xff0c;更多文章可参考&#xff1a;https://github.com/lu2ker/ 文章目录说明in_array()filter_var()class_exists()strpos()escapeshellarg()escapeshellcmd()preg_replace()parse_str()无字母数字下划线的webshellstr_replace()e…

GNN algorithm(4): HAN, Heterogeneous Graph Attention Network

目录 background (1) heterogeneity of graph (2) semantic-level attention (3) Node-level attention (4) HAN contributions 2. Related Work 2.1 GNN 2.2 Network Embedding 3. Preliminary background 4. Proposed Model 4.1 Node-level attention ideas: …

Unity脚本(四)

视频教程&#xff1a;https://www.bilibili.com/video/BV12s411g7gU?p149 目录 键盘输入 InputManager 键盘输入 当通过名称指定的按键被用户按住时返回true&#xff1a; bool resultInput.GetKey(KeyCode.A); 当用户按下指定名称按键时的那一帧返回true&#xff1a;…

Python学习笔记——变量和简单数据类型

编码默认情况下&#xff0c;Python 3 源码文件以 UTF-8 编码&#xff0c;所有字符串都是 unicode 字符串。 当然你也可以为源码文件指定不同的编码。标识符第一个字符必须是字母表中字母或下划线 _ 。标识符的其他的部分由字母、数字和下划线组成。标识符对大小写敏感。在 Pyth…

【深度学习】机器学习\深度学习常见相关公开数据集汇总(图像处理相关数据集、自然语言处理相关数据集、语音处理相关数据集)

一、前言 1. 介绍 常来说&#xff0c;深度学习的关键在于实践。从图像处理到语音识别&#xff0c;每一个细分领域都有着独特的细微差别和解决方法。 然而&#xff0c;你可以从哪里获得这些数据呢&#xff1f;现在大家所看到的大部分研究论文都用的是专有数据集&#xff0c;这…

超声波测距传感器认知

目录 一、超声波测距传感器认知 二、从零编程实现超声波测距 三、项目——感应开关盖垃圾桶 1、开发步骤 2、感应开关盖垃圾桶代码测试 一、超声波测距传感器认知 超声波测距模块是用来测量距离的一种产品&#xff0c;通过发送和接收超声波&#xff0c;利用时间差和声音…

【网络】udp_socket编程

目录 1.认识端口号 1.1 理解端口号和进程ID 1.2 理解源端口号和目的端口号 2.认识TCP协议 3.认识UDP协议 4.网络字节序 5.socket编程接口 5.1socket常见API 5.2sockaddr结构 sockaddr结构 sockaddr_in 结构 in_addr结构 6.简单的UDP网络程序 6.1创建套接字 6.2 …

【Docker】三 镜像容器常用命令

这里写目录标题1 配置镜像加速器2 Docker镜像常用命令2.1 搜索镜像2.2 下载镜像[重要]2.3 列出镜像[重要]2.3 删除本地镜像[重要]2.4 保存镜像2.5 加载镜像2.6 构建镜像[重要]3 容器常用命令3.1 新建并启动容器[重要]3.2 列出容器[重要]3.3 停止容器[重要]3.4 强制停止容器[重要…

你可以不用Git,但不能不会Git(三)基础(下)

目录 一.将文件添加至忽略列 二.日志记录操作 三.比较文件差异 四.还原文件 一.将文件添加至忽略列 一般我们总会有些文件无需纳入Git的管理&#xff0c;也不希望它们总出现在未跟踪文件列表。通常都是些自动生成的文件&#xff0c;比如日志文件&#xff0c;或者编译过程中…