Simple RPC - 07 从零开始设计一个服务端(下)_RPC服务的实现

news2024/9/24 19:10:57

文章目录

  • Pre
  • RPC服务实现
    • 服务注册
    • 请求处理
  • 设计: 请求分发机制

在这里插入图片描述

Pre

Simple RPC - 01 框架原理及总体架构初探

Simple RPC - 02 通用高性能序列化和反序列化设计与实现

Simple RPC - 03 借助Netty实现异步网络通信

Simple RPC - 04 从零开始设计一个客户端(上)

Simple RPC - 05 从零开始设计一个客户端(下)_ 依赖倒置和SPI


RPC服务实现

服务端的RPC服务主要包含两个核心功能:

  1. 服务注册:将服务的实现类注册到RPC框架中。

  2. 请求处理:接收并处理来自客户端的RPC请求。


服务注册

服务注册通过一个Map<String, Object>结构来实现,其中Key为服务名,Value为服务实现类的实例。

把服务的实现类注册到 RPC 框架中,这个逻辑的实现很简单,我们只要使用一个合适的数据结构,记录下所有注册的实例就可以了,后面在处理客户端请求的时候,会用到这个数据结构来查找服务实例

@Singleton
public class RpcRequestHandler implements RequestHandler, ServiceProviderRegistry {
    private final Map<String, Object> serviceProviders = new ConcurrentHashMap<>();

    @Override
    public synchronized <T> void addServiceProvider(Class<? extends T> serviceClass, T serviceProvider) {
        serviceProviders.put(serviceClass.getCanonicalName(), serviceProvider);
    }
}

请求处理

首先来看服务端中,使用 Netty 接收所有请求数据的处理类 RequestInvocationchannelRead0 方法

/**
     * 处理通道读取事件
     *
     * @param channelHandlerContext 上下文处理器,用于管理通道事件
     * @param request 入站请求命令
     * @throws Exception 如果没有找到处理请求的处理器或者其他异常
     */
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Command request) throws Exception {
        // 根据请求头的类型获取对应的请求处理器
        RequestHandler handler = requestHandlerRegistry.get(request.getHeader().getType());
        if(null != handler) {
            // 处理请求并获取响应
            Command response = handler.handle(request);
            if(null != response) {
                // 将响应写入并刷新通道上下文,并添加监听器处理写入结果
                channelHandlerContext.writeAndFlush(response).addListener((ChannelFutureListener) channelFuture -> {
                    if (!channelFuture.isSuccess()) {
                        // 如果写入响应失败,记录日志并关闭通道
                        logger.warn("Write response failed!", channelFuture.cause());
                        channelHandlerContext.channel().close();
                    }
                });
            } else {
                // 如果响应为空,记录警告日志
                logger.warn("Response is null!");
            }
        } else {
            // 如果没有找到处理请求的处理器,抛出异常
            throw new Exception(String.format("No handler for request with type: %d!", request.getHeader().getType()));
        }
    }

根据请求命令的 Hdader 中的请求类型 type,去 requestHandlerRegistry 中查找对应的请求处理器 RequestHandler,然后调用请求处理器去处理请求,最后把结果发送给客户端

这种通过“请求中的类型”,把请求分发到对应的处理类或者处理方法的设计,在服务端处理请求的场景中,这是一个很常用的方法。我们这里使用的也是同样的设计,不同的是,我们使用了一个命令注册机制,让这个路由分发的过程省略了大量的 if-else 或者是 switch 代码。这样做的好处是,可以很方便地扩展命令处理器,而不用修改路由分发的方法,并且代码看起来更加优雅。



/**
 * 请求处理器注册类,用于管理和获取不同的请求处理器
 * 该类通过 Singleton 模式确保只有一个实例,
 * 并在类加载时初始化所有已知的请求处理器
 * @author artisan
 */
public class RequestHandlerRegistry {
    private static final Logger logger = LoggerFactory.getLogger(RequestHandlerRegistry.class);
    /**
     * 存储请求处理器的映射,键为处理器类型,值为处理器实例
     */
    private Map<Integer, RequestHandler> handlerMap = new HashMap<>();
    /**
     * Singleton 实例
     */
    private static RequestHandlerRegistry instance = null;

    /**
     * 获取 RequestHandlerRegistry 的单例实例
     * 如果实例不存在,则创建一个新的实例
     * 
     * @return RequestHandlerRegistry 的单例实例
     */
    public static RequestHandlerRegistry getInstance() {
        if (null == instance) {
            instance = new RequestHandlerRegistry();
        }
        return instance;
    }

    /**
     * 私有构造方法,用于在实例创建时加载所有请求处理器
     * 通过 ServiceSupport 加载所有实现 RequestHandler 接口的实例,
     * 并将它们添加到 handlerMap 中
     */
    private RequestHandlerRegistry() {
        Collection<RequestHandler> requestHandlers = ServiceSupport.loadAll(RequestHandler.class);
        for (RequestHandler requestHandler : requestHandlers) {
            handlerMap.put(requestHandler.type(), requestHandler);
            logger.info("Load request handler, type: {}, class: {}.", requestHandler.type(), requestHandler.getClass().getCanonicalName());
        }
    }

    /**
     * 根据请求类型获取对应的请求处理器
     * 
     * @param type 请求处理器的类型
     * @return 对应的请求处理器实例,如果不存在则返回 null
     */
    public RequestHandler get(int type) {
        return handlerMap.get(type);
    }
}


接下来,我们看下通过RpcRequestHandler处理具体的客户端RPC请求

 /**
     * 处理请求命令的方法
     * 该方法的主要职责是解析请求命令,查找并调用相应服务,然后返回处理结果
     * 
     * @param requestCommand 请求命令,包含服务调用信息和参数
     * @return 返回命令,包含调用结果或错误信息
     */
    @Override
    public Command handle(Command requestCommand) {
        Header header = requestCommand.getHeader();
        // 从payload中反序列化RpcRequest,获取服务调用请求的具体信息
        RpcRequest rpcRequest = SerializeSupport.parse(requestCommand.getPayload());
        try {
            // 根据rpcRequest中的服务名,查找已注册的服务提供方
            Object serviceProvider = serviceProviders.get(rpcRequest.getInterfaceName());
            if(serviceProvider != null) {
                // 服务提供者存在,反序列化参数并获取具体方法进行调用
                String arg = SerializeSupport.parse(rpcRequest.getSerializedArguments());
                // 通过反射调用服务方法
                Method method = serviceProvider.getClass().getMethod(rpcRequest.getMethodName(), String.class);
                String result = (String ) method.invoke(serviceProvider, arg);
                // 将调用结果序列化并封装成响应命令返回
                return new Command(new ResponseHeader(type(), header.getVersion(), header.getRequestId()), SerializeSupport.serialize(result));
            }
            // 未找到对应的服务提供方,返回错误响应
            logger.warn("No service Provider of {}#{}(String)!", rpcRequest.getInterfaceName(), rpcRequest.getMethodName());
            return new Command(new ResponseHeader(type(), header.getVersion(), header.getRequestId(), Code.NO_PROVIDER.getCode(), "No provider!"), new byte[0]);
        } catch (Throwable t) {
            // 处理过程中发生异常,记录并返回错误响应
            logger.warn("Exception: ", t);
            return new Command(new ResponseHeader(type(), header.getVersion(), header.getRequestId(), Code.UNKNOWN_ERROR.getCode(), t.getMessage()), new byte[0]);
        }
    }

核心流程如下:

  1. requestCommandpayload 属性反序列化成为 RpcRequest
  2. 根据 rpcRequest 中的服务名,去成员变量 serviceProviders 中查找已注册服务实现类的实例;
  3. 找到服务提供者之后,利用 Java 反射机制调用服务的对应方法;
  4. 把结果封装成响应命令并返回,在 RequestInvocation 中,它会把这个响应命令发送给客户端。

再来看成员变量 serviceProviders,它的定义是:Map<String/service name/, Object/service provider/> serviceProviders

它实际上就是一个 Map,Key 就是服务名,Value 就是服务提供方,也就是服务实现类的实例。这个 Map 的数据从哪儿来的呢?

我们来看一下 RpcRequestHandler 这个类的定义

@Singleton
public class RpcRequestHandler implements RequestHandler, ServiceProviderRegistry {

/**
     * 同步地向服务提供者集合中添加一个新的服务提供者
     * 此方法为类的公共接口一部分,提供了一种向内部服务提供者映射添加新服务提供者的方式
     *
     * @param serviceClass 服务接口类,指定了服务提供者的类型期望
     * @param serviceProvider 实际的服务提供者实例,实现了指定的服务接口
     *
     * 选择使用同步方法以确保线程安全,因为在多线程环境中,
     * 可能会有多个线程尝试同时向服务提供者集合中添加服务
     *
     * 使用服务提供者的类的规范名称(canonical name)作为键进行存储,是为了确保
     * 通过类名唯一标识服务提供者,避免了由于类加载器差异导致的潜在问题
     *
     * 记录日志是为了在系统运行时能够追踪到服务提供者的添加操作,有助于调试和系统维护
     */
    @Override
    public synchronized <T> void addServiceProvider(Class<? extends T> serviceClass, T serviceProvider) {
        serviceProviders.put(serviceClass.getCanonicalName(), serviceProvider);
        logger.info("Add service: {}, provider: {}.",
                serviceClass.getCanonicalName(),
                serviceProvider.getClass().getCanonicalName());
    }

}

这个类不仅实现了处理客户端请求的 RequestHandler 接口,同时还实现了注册 RPC 服务 ServiceProviderRegistry 接口,也就是说,RPC 框架服务端需要实现的两个功能——注册 RPC 服务和处理客户端 RPC 请求,都是在这一个类 RpcRequestHandler 中实现的,所以说,这个类是这个 RPC 框架服务端最核心的部分。

成员变量 serviceProviders 这个 Map 中的数据,也就是在 addServiceProvider 这个方法的实现中添加进去的


@Singleton

RpcRequestHandler 上增加了一个注解 @Singleton,限定这个类它是一个单例模式,这样确保在进程中任何一个地方,无论通过 ServiceSupport 获取 RequestHandler 或者 ServiceProviderRegistry 这两个接口的实现类,拿到的都是 RpcRequestHandler 这个类的唯一的一个实例。 实现如下


/**
 * 提供服务加载功能的支持类,特别是处理单例服务
 * @author artisan
 */
public class ServiceSupport {
    /**
     * 存储单例服务的映射,确保每个服务只有一个实例
     */
    private final static Map<String, Object> singletonServices = new HashMap<>();

    /**
     * 加载单例服务实例
     *
     * @param service 服务类的Class对象
     * @param <S> 服务类的类型参数
     * @return 单例服务实例
     * @throws ServiceLoadException 如果找不到服务实例
     */
    public synchronized static <S> S load(Class<S> service) {
        return StreamSupport.
                stream(ServiceLoader.load(service).spliterator(), false)
                .map(ServiceSupport::singletonFilter)
                .findFirst().orElseThrow(ServiceLoadException::new);
    }

    /**
     * 加载所有服务实例
     *
     * @param service 服务类的Class对象
     * @param <S> 服务类的类型参数
     * @return 所有服务实例的集合
     */
    public synchronized static <S> Collection<S> loadAll(Class<S> service) {
        return StreamSupport.
                stream(ServiceLoader.load(service).spliterator(), false)
                .map(ServiceSupport::singletonFilter).collect(Collectors.toList());
    }

    /**
     * 对服务实例进行单例过滤
     *
     * @param service 服务实例
     * @param <S> 服务类的类型参数
     * @return 单例过滤后的服务实例,如果该服务是单例的并且已有实例存在,则返回已存在的实例
     */
    @SuppressWarnings("unchecked")
    private static <S> S singletonFilter(S service) {

        if(service.getClass().isAnnotationPresent(Singleton.class)) {
            String className = service.getClass().getCanonicalName();
            Object singletonInstance = singletonServices.putIfAbsent(className, service);
            return singletonInstance == null ? service : (S) singletonInstance;
        } else {
            return service;
        }
    }
}


设计: 请求分发机制

请求分发分为两个层次:

  1. 网络传输层:通过RequestInvocation类,根据请求类型分发到对应的请求处理器。

    根据请求命令中的请求类型 (command.getHeader().getType()),分发到对应的请求处理器 RequestHandler 中

  2. 业务逻辑层:通过RpcRequestHandler类,根据服务名分发到具体的服务实现类。

    根据 RPC 请求中的服务名,把 RPC 请求分发到对应的服务实现类的实例中去

这种分层设计的目的在于保持系统的松耦合高内聚,确保网络传输与业务逻辑的清晰分离。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2070842.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【数据结构】堆主要的应用场景

1. 堆排序 所谓堆排序&#xff0c;就是在堆的基础上进行排序。 在讲解堆排序之前&#xff0c;让我们先来回顾一下堆的概念&#xff0c; 1.1 大根堆和小根堆 堆是一种完全二叉树&#xff0c;它有两种形式&#xff0c;一种是大根堆&#xff0c;另外一种是小根堆。 大根堆&…

2023年看过的电影和电视剧

2023年看过的电影 2023年12月21日&#xff1a;三大队 评价&#xff1a;感觉结尾不太突出&#xff0c;但是值得一看。 2023年10月02日&#xff1a;志愿军&#xff1a;雄兵出击 评价&#xff1a;感觉还行&#xff0c;场面还不错。但是记不得太多情节。 2023年08月31日&#xf…

如何将 Parallels Desktop 许可证密钥移至新的 Mac?

根据 Parallels 最终用户许可协议&#xff08;EULA&#xff09;的规定&#xff0c;您最多可以在一台设备上下载、安装和使用 Parallels Desktop 的一个原始副本。但是面对更换新机的用户&#xff0c;可以通过迁移的方式把 Parallels Desktop 许可证密钥移至新的 Mac&#xff0c…

跟《经济学人》学英文:2024年08月24日这期 What to make of America’s topsy-turvy economy

What to make of America’s topsy-turvy economy Don’t panic just yet topsy-turvy&#xff1a;颠倒的&#xff1b;混乱的&#xff1b;乱七八糟的&#xff1b; make of&#xff1a;理解&#xff1b;认为&#xff1b;看待 Make of: 这里的 “make of” 意思是如何理解或解释…

自来水厂供水监控的串口服务器应用

随着城市化进程的加快和人口的不断增长&#xff0c;自来水厂作为城市供水系统的核心组成部分&#xff0c;其稳定性和安全性对于城市的正常运行和居民的生活质量至关重要。传统的供水监控系统存在数据传输效率低、维护成本高、实时性差等问题&#xff0c;难以满足现代城市对供水…

TMDOG的微服务之路_08——使用Docker部署NestJS微服务

TMDOG的微服务之路_08——使用Docker部署NestJS微服务 博客地址&#xff1a;TMDOG的博客 在上一篇博客中&#xff0c;我们探讨了如何使用 NestJS 创建一个简单的微服务架构。为了将这些微服务部署到生产环境&#xff0c;我们可以使用 Docker 来打包和管理这些服务。本篇博客将…

Docker 数据卷的使用

Docker 数据卷的使用 文章目录 Docker 数据卷的使用导引1. 创建数据卷2. 查看创建的数据卷3. 查看数据卷的详细信息 导引 在Docker中&#xff0c;我们在创建并运行容器后&#xff0c;可以通过exec命令进入容器内部进行操作&#xff0c;但会发现一些命令是无法使用的&#xff0…

NSSCTF练习记录:[SWPUCTF 2021 新生赛]pigpig

题目&#xff1a; 根据题目名字和附件名提示&#xff0c;猜测为猪圈密码&#xff0c;对应手动解码 whenthepigwanttoeat

持久层接口开发

通常一个接口定义后&#xff0c;从持久层开始开发。 对于一个新模块需要使用工具生成模型类、mapper接口、mapper映射文件等&#xff0c;下边介绍一个工具用于自动生成代码。 1. 代码生成工具 1.1 安装插件 1.2 重启IDEA&#xff0c;连接mysql 1.3 配置代码生成规则 点击“c…

C++容器之字符串的详解

每日诗词&#xff1a; 我见青山我妩媚&#xff0c;料青山见我应如是。 ——《贺新郎甚矣吾衰矣》【宋】辛弃疾 目录 补漏&#xff1a; vector在分配新内存块后如何进行元素复制 正文&#xff1a; 字符串变量和常量 字符串变量&#xff1a; 解析&#xff1a; 字符串常量…

揭露 Sapiens:未来以人为中心的视觉任务

Sapiens | Meta Meta Reality Labs 隆重推出 Sapiens&#xff0c;这是一个尖端的模型系列&#xff0c;专为四种以人为中心的基本视觉任务而设计&#xff1a;二维姿态估计、身体部位分割、深度估计和表面法线预测。 我们的 Sapiens 模型可无缝处理 1K 高分辨率推理&#xff0c…

加州大学圣地亚哥分校 沉浸式遥操作机器人系统

想象一下&#xff0c;在VR中控制游戏角色时&#xff0c;你的手部动作能够无缝转化为角色的行动。如果将这种体验应用于现实世界中的双手机器人控制&#xff0c;将带来革命性的人机交互体验。随着Apple Vision Pro的问世&#xff0c;这一设想逐渐变为现实。然而&#xff0c;将这…

Velocity模板引擎——若依代码生成器

文章目录 快速入门准备模板数据填充运行代码 基础语法简单类型的变量获取对象类型的变量获取基础语法-循环基础语法—if判断 官网 比较擅长用于邮件&#xff0c;发票&#xff0c;web内容生成、代码生成、网页静态化 模板化的东西适合使用 当然模板引擎不止这一种&#xff0c;还…

记录|C# winform——Chart控件

目录 前言一、重点关注1.1 Chart控件效果1.2 属性1.2.0 位置讲解1.2.1 Titles——标题集合TextToolTip 1.2 .2 Series——图表序列ChartTypeLegends——图例集合 二、数据传入Chart控件2.1 如何传入数据&#xff1f;2.2 如果想限定每次展现的数据量怎么办&#xff1f; 三、标注…

主机安全-网络攻击监测

目录 概述暴力破解&#xff08;SSH爆破为例&#xff09;原理规则攻击模拟告警 端口扫描原理规则攻击模拟告警 流量劫持原理规则攻击模拟告警 参考 概述 本文介绍主机网络层面上的攻击场景&#xff0c;每种攻击场景举一个例子。监测方面以字节跳动的开源HIDS elkeid举例。 针对…

【2】搭建雅特力AT32F437ZMT OpenHarmony轻量系统开发环境

本文用于阐述如何搭建AT32F437ZMT OpenHarmony轻量系统开发环境开源组织地址&#xff1a;https://gitee.com/AT32437_OpenHarmony 1.AT-START-F437雅特力官方开发板相关资料 移植基于at32f437雅特力官方开发板AT-START-F437 AT-START-F437雅特力官方开发板相关资料 2.AT32F43…

采用不高于3次的勒让德多项式拟合原函数

利用勒让德多项式进行拟合的区域是[-1,1]&#xff0c;如果不是这个区域&#xff0c;比如是[a,b]&#xff0c;利用转化到[-1,1]。 参考以下例题计算系数 C语言代码如下 //用三阶的勒让德多项式进行拟合 #include<math.h> #include<stdio.h> #include "main.c…

智能控制,高效节能。ZLG致远电子能源智慧管理解决方案

面对楼宇及建筑群能源管理与设备控制的复杂需求&#xff0c;ZLG致远电子推出了一套能源智慧管理解决方案。该方案集设备管理、任务调度和数据可视化于一体&#xff0c;不仅实现数据的实时监控与分析&#xff0c;还助力系统节能降耗。 ZLG致远电子能源智慧管理解决方案 在ZLG致…

shallowReactive 与 shallowRef

除了之前的 ref与reactive 之外&#xff0c;Vue3 还准备了另外两个API&#xff0c;也是用来对响应式数据做处理&#xff0c;那就是 shallowReactive 与 shallowRef shallowReactive 文档解释&#xff1a;reactive() 的浅层作用形式&#xff0c;只能定义对象类型的数据。和 r…

pytorh基础知识和函数的学习:图像文件的Tensor

在深度学习和计算机视觉中&#xff0c;将图像文件转换为张量&#xff08;Tensor&#xff09;是数据预处理的重要步骤。Tensor 是一种多维数组&#xff0c;在 PyTorch 中&#xff0c;用于表示和处理数据。 首先&#xff0c;创建一个3*3的图像文件&#xff0c;放大之后&#xff…