手写RPC框架04-过滤器模块实现

news2025/1/11 15:48:41

源代码地址:https://github.com/lhj502819/IRpc/tree/v5

系列文章:

  • 注册中心模块实现
  • 路由模块实现
  • 序列化模块实现
  • 过滤器模块实现
  • 自定义SPI机制增加框架的扩展性的设计与实现

为什么需要过滤器?

目前整个RPC框架的功能基本已经齐全了,但是在实际的开发过程中我们可能会有如下的需求:

  • 对client的请求做鉴权
  • 对服务进行分组管理
  • 记录请求日志
  • 基于IP的请求直连

首先我们先对这几个需求进行简单解释,只有弄清楚需求的来源才能更好的去理解、设计和实现。

对client的请求做鉴权

随着业务的不断发展,服务的种类变得越来越丰富,有些重要的操作可能安全性比较高,需要进行鉴权,因此需要在RPC框架中增加对鉴权的支持。
在这里插入图片描述

对服务进行分组管理

在进行团队协作或者服务升级的时候,可能会遇到需要对Service Provider进行分组,比如分为V1、V2,方便进行流量的划分,当V2版本出现问题时,我们只需要将所有的调用调整为V1即可,同时也可以进行迭代升级,以免出现ALL IN时升级的功能出现问题导致整个系统不可用。.
在这里插入图片描述

基于IP的请求直连

在测试和联调阶段比较常见,例如在服务部署之后,发现两个provider对相同的服务,相同的参数,返回的结果却不同,此时就可以通过指定IP进行直连,方便问题定位。
在这里插入图片描述

记录请求日志

在实际的业务中我们在进行服务调用的时候需要做一些日志埋点,对调用信息进行记录,方便进行问题的排查

以上的这些处理其实就是一个链条,我们仅需要将这些功能按照顺序插入到整个链条中,并在适当的位置执行整个链条即可,而这些一个个的功能,则类似于我们常见的过滤器一样。

如何实现?

过滤器的实现一般都会基于责任链设计模式去设计,在目前比较流行的API网关SprngCloudGateway中也有类似的实现。
在这里插入图片描述

我曾经对SCG的2.2.6版本源码进行过解析,感兴趣的小伙伴可前往查看,地址:https://www.yuque.com/lihongjian/gui608
在我们的RPC框架中也采用类似的方式,只不过进行了简单的变形。我们首先定义了过滤器标记接口:IFilter

public interface IFilter {
}

由于我们的过滤器分为Client和Server两端使用的,因此分别抽象出IClinetFilterIServerFilter

public interface IServerFilter extends IFilter {

    void doFilter(RpcInvocation rpcInvocation);

}

public interface IClientFilter extends IFilter {

    void doFilter(List<ChannelFutureWrapper> src, RpcInvocation rpcInvocation);

}

服务分组过滤器

在Client发起调用时,我们将分组信息存储到了attachements中,是一个Map结构。

public class ClientGroupFilterImpl implements IClientFilter{

    @Override
    public void doFilter(List<ChannelFutureWrapper> src, RpcInvocation rpcInvocation) {
        String group = (String) rpcInvocation.getAttachments().get("group");
        if (StrUtil.isBlank(group)){
            return;
        }
        Iterator<ChannelFutureWrapper> iterator = src.iterator();
        while (iterator.hasNext()) {
            ChannelFutureWrapper channelFutureWrapper = iterator.next();
            if (!channelFutureWrapper.getGroup().equals(group)){
                iterator.remove();
            }
        }
        if (CollectionUtil.isEmpty(src)){
            throw new RuntimeException("no provider match for group " + group);
        }

    }
}

IP直连过滤器

与“服务分组过滤器”一样,Client会在发起调用时将请求的ip存储到attachments中。

public class DirectInvokeFilterImpl implements IClientFilter {
    @Override
    public void doFilter(List<ChannelFutureWrapper> src, RpcInvocation rpcInvocation) {
        String url = (String) rpcInvocation.getAttachments().get("url");
        if (StrUtil.isBlank(url)) {
            return;
        }

        Iterator<ChannelFutureWrapper> iterator = src.iterator();
        while (iterator.hasNext()) {
            ChannelFutureWrapper channelFutureWrapper = iterator.next();
            if (!(channelFutureWrapper.getHost() + ":" + channelFutureWrapper.getPort()).equals(url)) {
                iterator.remove();
            }
            if (CollectionUtil.isEmpty(src)) {
                throw new RuntimeException("no match for url:" + url);
            }
        }
    }
}

Token校验过滤器

Client发起调用时会将token放入attachments中,Server端在过滤器中会拿到本次请求的token和内存中对应服务的token进行比对。

public class ServerTokenFilterImpl implements IServerFilter {
    @Override
    public void doFilter(RpcInvocation rpcInvocation) {
        String token = (String) rpcInvocation.getAttachments().get("token");
        ServiceWrapper serviceWrapper = PROVIDER_SERVICE_WRAPPER_MAP.get(rpcInvocation.getTargetServiceName());
        if (serviceWrapper == null) {
            return;
        }
        String matchToken = serviceWrapper.getServiceToken();
        if (StrUtil.isBlank(matchToken)) {
            return;
        }
        if (StrUtil.isNotBlank(token) && matchToken.equals(token)) {
            return;
        }

        throw new RuntimeException("token is " + token + " verify result is false!");

    }
}

过滤器链FilterChain

过滤器链主要是负责将所有的过滤器按照一定顺序串起来。与过滤器类似,过滤器链也需要分为Client和Server端。

ServerFilterChain

public class ServerFilterChain {

    private static List<IServerFilter> iServerFilters = new ArrayList<>();

    public void addServerFilter(IServerFilter serverFilter){
        iServerFilters.add(serverFilter);
    }

    public void doFilter(RpcInvocation rpcInvocation){
        for (IServerFilter iServerFilter : iServerFilters) {
            iServerFilter.doFilter(rpcInvocation);
        }
    }

}

ClientFilterChain

public class ClientFilterChain {

    private static List<IClientFilter> iClientFilters = new ArrayList<>();

    public void addServerFilter(IClientFilter clientFilter) {
        iClientFilters.add(clientFilter);
    }

    public void doFilter(List<ChannelFutureWrapper> src, RpcInvocation rpcInvocation) {
        for (IClientFilter iClientFilter : iClientFilters) {
            iClientFilter.doFilter(src, rpcInvocation);
        }
    }

}

RPC框架接入

Client端接入

由于像服务分组过滤器、IP直连过滤器都需要根据指定的规则选择出对应的Provider,因此我们将执行过滤链条的逻辑插入在cn.onenine.irpc.framework.core.client.ConnectionHandler#getChannelFuture中。

/**
 * 默认走随机策略获取ChannelFuture
 */
public static ChannelFuture getChannelFuture(RpcInvocation rpcInvocation) {
    ChannelFutureWrapper[] channelFutureWrappers = SERVICE_ROUTER_MAP.get(rpcInvocation.getTargetServiceName());
    if (channelFutureWrappers == null || channelFutureWrappers.length == 0) {
        throw new RuntimeException("no provider exist for " + rpcInvocation.getTargetServiceName());
    }
    //doFilter
    CLIENT_FILTER_CHAIN.doFilter(Lists.newArrayList(channelFutureWrappers), rpcInvocation);
    Selector selector = new Selector();
    selector.setProviderServiceName(rpcInvocation.getTargetServiceName());
    selector.setChannelFutureWrappers(channelFutureWrappers);
    //通过指定的路由算法选择一个Provider ChannelFuture
    return IROUTER.select(selector).getChannelFuture();
}

Server端接入

在初始化时按照指定顺序将所有的Filter插入到FilterChain中,在接收到请求后执行整个链条即可,也就是ChannelInboundHandlerAdapter#channelRead

public class ServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //服务端接收数据的时候以RpcProtocol协议的格式接收
        RpcProtocol rpcProtocol = (RpcProtocol) msg;
        RpcInvocation rpcInvocation = SERVER_SERIALIZE_FACTORY.deserialize(rpcProtocol.getContent(),RpcInvocation.class);

        //doFilter
        SERVER_FILTER_CHAIN.doFilter(rpcInvocation);
        
    	//省略部分代码......
        rpcInvocation.setResponse(result);
        RpcProtocol respRpcProtocol = new RpcProtocol(SERVER_SERIALIZE_FACTORY.serialize(rpcInvocation));
        ctx.writeAndFlush(respRpcProtocol);
    }

}

总结

本版本我们基于责任链模式完成了对RPC框架中流程化功能的整合,这些零零散散的功能我们通过过滤器的方式进行了实现,比如服务的分组选择、IP直连、Token统一校验等,减少了各个模块间的耦合性,如果需要扩展新的过滤器,只需要实现Client或者Server对应的接口即可。

问题

到目前版本,我们的RPC框架实现了注册中心模块、路由选择模块、序列化模块、过滤器模块,我们基本都提供了对应的抽象接口,使用者可以进行扩展,但目前想要扩展的话,比如自定义一个过滤器接入到框架中,还需要对源码进行调整,重新编译才可以,不太方便,后续我们会通过SPI的方式来解决这个问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/143665.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ArcGIS基础实验操作100例--实验54 Shapfile与Graphic转换

本实验专栏参考自汤国安教授《地理信息系统基础实验操作100例》一书 实验平台&#xff1a;ArcGIS 10.6 实验数据&#xff1a;请访问实验1&#xff08;传送门&#xff09; 高级编辑篇--实验54 Shapfile与Graphic转换 目录 一、实验背景 二、实验数据 三、实验步骤 &#xff…

2022天翼数字科技生态大会 五大亮点看永不落幕的“5G物联生态城”

2022天翼数字科技生态大会&#xff0c;近日于线上隆重开幕。大会首次“云”上绽放&#xff0c;将给我们带来一场怎样的科技盛宴&#xff1f; 本次大会&#xff0c;中国电信天翼物联以“智启物联未来”为主题&#xff0c;运用“元宇宙”VR技术&#xff0c;打造虚拟世界中的“5G物…

Netty 创建高性能聊天室 单聊 群聊 websocket

目录 一、简单实现Netty发送消息的案例 二、websocket连接注册用户 三、实现单聊 四、群聊功能 五、案例代码 一、简单实现Netty发送消息的案例 案例一的依赖有&#xff1a;若没springboot项目有自动对应版本&#xff0c;其他版本可以使用maven仓库的最新版本。 <depe…

img的应用

我的目的是&#xff0c;因为图片足够的大&#xff0c;我想让它在一个小盒子里居中显示&#xff0c;所以我这样做了&#xff1a;<style>.text{width: 375px;height: 100px;} </style> <body><div class"text"><img src"./img/5.png&q…

企业为什么要利用数据中台进行数字化转型?_光点科技

近两年“数字化”已经悄悄的替代了“信息化”。那么什么是“企业的数字化转型”&#xff1f;数字化转型是企业战略层面的概念&#xff0c;它并不是追求眼前效益的机灵战术&#xff0c;其本质&#xff0c;是用数字化技术对业务的重构、流程的重构和组织的重构&#xff0c;目的是…

云呐|什么是固定资产?什么是流动资产

什么是固定资产&#xff1f;什么是流动资产&#xff0c;  1、固定资产  属于产品生产过程中用来改变或者影响劳动对象的劳动资料&#xff0c;是固定资本的实物形态固定资产在生产过程中可以长期发挥作用&#xff0c;长期保持原有的实物形态&#xff0c;但其价值则随着企业生…

自己centos7系统制作iso镜像,并新建虚拟机

一、自己centos7系统制作iso镜像 1. 前置工作 将系统安全配置 SELINUX 改为 disabled&#xff0c;否则制作好的镜像无法登陆&#xff01;&#xff01;&#xff01; vim /etc/selinux/config # 将其从 enforcing 改为 disabled SELINUXdisabled2.安装 mondo rescue cd /etc…

正则表达式的使用

什么是正则表达式 正则表达式&#xff0c;又称规则表达式,&#xff08;Regular Expression&#xff0c;在代码中常简写为regex、regexp或RE&#xff09;&#xff0c;是一种文本模式&#xff0c;包括普通字符&#xff08;例如&#xff0c;a 到 z 之间的字母&#xff09;和特殊字…

使用VC时一些容易犯的错误

本文迁移自本人网易博客&#xff0c;写于2011年1月13日&#xff0c;使用VC时一些容易犯的错误 - lysygyy的日志 - 网易博客 (163.com)1、在调用其他类中的函数时&#xff0c;需要在当前类中声明一个类对象&#xff0c;但是调用的时候&#xff0c;编译会出错。出现很多符号&…

2023年SQL大厂高频实战面试题(详细解析)

大家好&#xff0c;我是宁一。 已经连续四个周没有休息了&#xff0c;最近主业、副业都是忙碌的巅峰期&#xff0c;晚上11点下班回家&#xff0c;再写课写到凌晨两点。 连续一个多月连轴转&#xff0c;每天最大的愿望&#xff0c;就是睡足觉。 这一阶段终于忙完了~继续来更新SQ…

LongAdder源码【原创+图解+视频讲解】

目录 AtomicLong用法 源码分析 问题 解决 LongAdder用法 高并发下效率测试 原理 源码 add(long x) Striped64的longAccumulate 伪共享 总结 视频讲解&#xff1a; AtomicLong用法 public static void main(String[] args) {AtomicLong i new AtomicLong(0); ​S…

SQL UPDATE 语句

UPDATE 语句用于更新表中的记录。 SQL UPDATE 语句 UPDATE 语句用于更新表中已存在的记录。 SQL UPDATE 语法 UPDATE table_name SET column1 value1, column2 value2, ... WHERE condition; 参数说明&#xff1a; table_name&#xff1a;要修改的表名称。column1, colu…

C++:std::thread:线程用法

1&#xff1a;std::thread的基本用法 最简单的 std::thread用法如下&#xff0c;调用 thread将立即同时开始执行这个新建立的线程&#xff0c;新线程的任务执行完毕之后&#xff0c; main()的主线程也会继续执行。 #include<iostream> #include<thread> #include&l…

一致性hash算法和hash算法的区别和使用场景

1、hash算法使用场景 一般情况下hash算法主要用于&#xff1a;负载均衡&#xff08;nginx 请求转发&#xff0c;scg路由等&#xff09;&#xff0c;分布式缓存分区&#xff0c;数据库分库分表&#xff08;mycat&#xff0c;shardingSphere&#xff09;等。 2、hash算法大致实…

网络编程套接字——udp网络编程

目录 一、预备知识 1.端口 2.TCP协议和UDP协议 3.socket编程接口 ①socket 常见API ②sockaddr结构 二、网络编程 1.UDP网络程序 1.1服务器 ①打印 ②socket​编辑 ③bind ④recvfrom ​编辑 1.2客户端 ①sendto 1.3提升通信的花样性 ①将字符串返还 …

Individual Tree Segmentation Method Based on Mobile Backpack LiDAR Point Clouds

Abstract 单棵树 (IT) 分割对于森林管理、支持森林清查、生物量监测或树木竞争分析至关重要。光探测和测距 (LiDAR) 是这方面的一项突出技术&#xff0c;优于竞争技术。航空激光扫描 (ALS) 经常用于森林记录&#xff0c;在树顶表面显示良好的点密度。尽管使用多回波 ALS 可以收…

【虹科云展厅专题】虹科赋能汽车智能化云展厅——车载以太网/TSN专题

虹科2023年开年福利来了&#xff01; 聚焦前沿技术&#xff0c;【虹科赋能汽车智能化云展厅】正式上线&#xff0c;本次云展厅围绕“汽车以太网/TSN、汽车总线、智能网联、电子测试与验证、自动驾驶”等核心话题&#xff0c;为您带来如临展会现场般的讲演与介绍&#xff0c;更…

Unity入门基础语法

物体的坐标 transform.position 世界坐标 transform.localPosition 相对坐标 设置物体的坐标&#xff1a; this.transform.localPosition new Vector3(1.5f, 0, 2.0f); 帧更新 Update()&#xff0c;称为帧更新 此方法会被游戏引擎定时调用&#xff0c;已更新游戏的状态 …

基于Java+SpringBoot+vue+element实现物流管理系统

基于JavaSpringBootvueelement实现物流管理系统 博主介绍&#xff1a;5年java开发经验&#xff0c;专注Java开发、定制、远程、文档编写指导等,csdn特邀作者、专注于Java技术领域 作者主页 超级帅帅吴 Java毕设项目精品实战案例《500套》 欢迎点赞 收藏 ⭐留言 文末获取源码联系…

SQL Studio:一款纯Web化SQL开发工具,关键是免安装还免费!

经常使用SQL工具的开发者对Navicat一定都不陌生。这款软件作为一款全球化的多数据库管理工具&#xff0c;这些年逐步得到全国各地SQLer&#xff08;SQL开发者&#xff09;的关注。 与其他很多外来的软件产品一样&#xff0c;由于价格原因&#xff0c;很多SQLer感觉不太适合适应…