Netty基础—6.Netty实现RPC服务三

news2025/3/17 13:17:55

大纲

1.RPC的相关概念

2.RPC服务调用端动态代理实现

3.Netty客户端之RPC远程调用过程分析

4.RPC网络通信中的编码解码器

5.Netty服务端之RPC服务提供端的处理

6.RPC服务调用端实现超时功能

5.Netty服务端之RPC服务提供端的处理

(1)RPC服务提供端NettyServer

(2)基于反射调用请求对象的目标方法

(1)RPC服务提供端NettyRpcServer

public class ServiceConfig {
    private String serviceName;//调用方的服务名称
    private Class serviceInterfaceClass;//服务接口类型
    private Class serviceClass;
    ...
}

public class NettyRpcServer {
    private static final Logger logger = LogManager.getLogger(NettyRpcServer.class);
    private static final int DEFAULT_PORT = 8998;
    private List<ServiceConfig> serviceConfigs = new CopyOnWriteArrayList<ServiceConfig>();
    private int port;
    
    public NettyRpcServer(int port) {
        this.port = port;
    }
    
    public void start() {
        logger.info("Netty RPC Server Starting...");
        EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup();
        EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap
            .group(bossEventLoopGroup, workerEventLoopGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline()
                    .addLast(new RpcDecoder(RpcRequest.class))
                    .addLast(new RpcEncoder(RpcResponse.class))
                    .addLast(new NettyRpcServerHandler(serviceConfigs));
                }
            })
            .option(ChannelOption.SO_BACKLOG, 128)
            .childOption(ChannelOption.SO_KEEPALIVE, true);

            //到这一步为止,server启动了而且监听指定的端口号了
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            logger.info("Netty RPC Server started successfully, listened[" + port + "]");
            //进入一个阻塞的状态,同步一直等待到你的server端要关闭掉
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            logger.error("Netty RPC Server failed to start, listened[" + port + "]");
        } finally {
            bossEventLoopGroup.shutdownGracefully();
            workerEventLoopGroup.shutdownGracefully();
        }
    }
    
    //可以代理多个服务
    public void addServiceConfig(ServiceConfig serviceConfig) {
        this.serviceConfigs.add(serviceConfig);
    }
    
    public static void main(String[] args) {
        ServiceConfig serviceConfig = new ServiceConfig( "TestService", TestService.class, TestServiceImpl.class);
        NettyRpcServer nettyRpcServer = new NettyRpcServer(DEFAULT_PORT);
        nettyRpcServer.addServiceConfig(serviceConfig);
        nettyRpcServer.start();
    }
}

(2)基于反射调用请求对象的目标方法

//RpcRequest类需要修改字段调整为如下所示
public class RpcRequest implements Serializable {
    private String requestId;
    private String className;
    private String methodName;
    private Class[] parameterTypes;//参数类型
    private Object[] args;//参数值
    private String invokerApplicationName;//调用方的服务名称
    private String invokerIp;//调用方的IP地址
    ...
}

public class NettyRpcServerHandler extends ChannelInboundHandlerAdapter {
    private static final Logger logger = LogManager.getLogger(NettyRpcServerHandler.class);
    private ConcurrentHashMap<String, ServiceConfig> serviceConfigMap = new ConcurrentHashMap<String, ServiceConfig>();
    
    public NettyRpcServerHandler(List<ServiceConfig> serviceConfigs) {
        for (ServiceConfig serviceConfig : serviceConfigs) {
            String serviceInterfaceClass = serviceConfig.getServiceInterfaceClass().getName();
            serviceConfigMap.put(serviceInterfaceClass, serviceConfig);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        RpcRequest rpcRequest = (RpcRequest)msg;
        logger.info("Netty RPC Server receives the request: " + rpcRequest);
        RpcResponse rpcResponse = new RpcResponse();
        rpcResponse.setRequestId(rpcRequest.getRequestId());
        try {
            //此时我们要实现什么呢?
            //我们需要根据RpcRequest指定的class,获取到这个class
            //然后通过反射构建这个class对象实例
            //接着通过反射获取到这个RpcRequest指定方法和入参类型的method
            //最后通过反射调用,传入方法,拿到返回值

            //根据接口名字拿到接口实现类的名字后再获取类
            ServiceConfig serviceConfig = serviceConfigMap.get(rpcRequest.getServiceInterfaceClass());
            Class clazz = serviceConfig.getServiceClass();
            Object instance = clazz.newInstance();
            Method method = clazz.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
            Object result = method.invoke(instance, rpcRequest.getArgs());

            //把rpc调用结果封装到响应里去
            rpcResponse.setResult(result);
            rpcResponse.setSuccess(RpcResponse.SUCCESS);
        } catch(Exception e) {
            logger.error("Netty RPC Server failed to response the request.", e);
            rpcResponse.setSuccess(RpcResponse.FAILURE);
            rpcResponse.setException(e);
        }
        ctx.write(rpcResponse);
        ctx.flush();
        logger.info("send RPC response to client: " + rpcResponse);
    }
}

6.RPC服务调用端实现超时功能

public class ReferenceConfig {
    private static final long DEFAULT_TIMEOUT = 5000;
    private static final String DEFAULT_SERVICE_HOST = "127.0.0.1";
    private static final int DEFAULT_SERVICE_PORT = 8998;

    private Class serviceInterfaceClass;
    private String serviceHost;
    private int servicePort;
    private long timeout;
    ...
}

public class NettyRpcClient {
    private static final Logger logger = LogManager.getLogger(NettyRpcClient.class);

    private ReferenceConfig referenceConfig;
    private ChannelFuture channelFuture;
    private NettyRpcClientHandler nettyRpcClientHandler;
    
    public NettyRpcClient(ReferenceConfig referenceConfig) {
        this.referenceConfig = referenceConfig;
        this.nettyRpcClientHandler = new NettyRpcClientHandler(referenceConfig.getTimeout());
    }

    public void connect() {
        logger.info("connecting to Netty RPC server: " + referenceConfig.getServiceHost() + ":" + referenceConfig.getServicePort());
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap
        .group(eventLoopGroup)
        .channel(NioSocketChannel.class)
        .option(ChannelOption.SO_KEEPALIVE, true)//长时间没有通信就发送一个检测包
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline()
                .addLast(new RpcEncoder(RpcRequest.class))
                .addLast(new RpcDecoder(RpcResponse.class))
                .addLast(new NettyRpcReadTimeoutHandler(referenceConfig.getTimeout()))
                .addLast(nettyRpcClientHandler);
            }
        });       

        try {
            if (referenceConfig.getServiceHost() != null && !referenceConfig.getServiceHost().equals("")) {
                channelFuture = bootstrap.connect(referenceConfig.getServiceHost(), referenceConfig.getServicePort()).sync();
                logger.info("successfully connected.");
            }
        } catch(Exception e) {
            throw new RuntimeException(e);
        }
    }

    public RpcResponse remoteCall(RpcRequest rpcRequest) throws Throwable {
        //标记一下请求发起的时间
        NettyRpcRequestTimeHolder.put(rpcRequest.getRequestId(), new Date().getTime());
        channelFuture.channel().writeAndFlush(rpcRequest).sync();
        RpcResponse rpcResponse = nettyRpcClientHandler.getRpcResponse(rpcRequest.getRequestId());
        logger.info("receives response from netty rpc server.");
        if (rpcResponse.isSuccess()) {
            return rpcResponse;
        }
        throw rpcResponse.getException();
    }
}

public class NettyRpcReadTimeoutHandler extends ChannelInboundHandlerAdapter {
    private static final Logger logger = LogManager.getLogger(NettyRpcReadTimeoutHandler.class);
    private long timeout;
    public NettyRpcReadTimeoutHandler(long timeout) {
        this.timeout = timeout;
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        RpcResponse rpcResponse = (RpcResponse)msg;
        long requestTime = NettyRpcRequestTimeHolder.get(rpcResponse.getRequestId());
        long now = new Date().getTime();
        if (now - requestTime >= timeout) {
            rpcResponse.setTimeout(true);
            logger.error("Netty RPC response is marked as timeout status: " + rpcResponse);
        }
      
        //移除发起请求时间的标记
        NettyRpcRequestTimeHolder.remove(rpcResponse.getRequestId());
        ctx.fireChannelRead(rpcResponse);
    }
}

public class NettyRpcClientHandler extends ChannelInboundHandlerAdapter {
    private static final Logger logger = LogManager.getLogger(NettyRpcClientHandler.class);
    private static final long GET_RPC_RESPONSE_SLEEP_INTERVAL = 5;
    private ConcurrentHashMap<String, RpcResponse> rpcResponses = new ConcurrentHashMap<String, RpcResponse>();
    private long timeout;

    public NettyRpcClientHandler(long timeout) {
        this.timeout = timeout;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        RpcResponse rpcResponse = (RpcResponse) msg;
        if (rpcResponse.getTimeout()) {
            logger.error("Netty RPC client receives the response timeout: " + rpcResponse);
        } else {
            rpcResponses.put(rpcResponse.getRequestId(), rpcResponse);
            logger.info("Netty RPC client receives the response: " + rpcResponse);
        }
    }

    public RpcResponse getRpcResponse(String requestId) throws NettyRpcReadTimeoutException {
        long waitStartTime = new Date().getTime();
        while (rpcResponses.get(requestId) == null) {
            try {
                long now = new Date().getTime();
                if (now - waitStartTime >= timeout) {
                    break;
                }
                Thread.sleep(GET_RPC_RESPONSE_SLEEP_INTERVAL);
            } catch (InterruptedException e) {
                logger.error("wait for response interrupted", e);
            }
        }
        RpcResponse rpcResponse = rpcResponses.get(requestId);
        if (rpcResponse == null) {
            logger.error("Get RPC response timeout.");
            throw new NettyRpcReadTimeoutException("Get RPC response timeout.");
        } else {
            rpcResponses.remove(requestId);
        }
        return rpcResponse;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2316636.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

为什么需要使用十堰高防服务器?

十堰高防服务器的核心价值与应用必要性 一、‌应对复杂攻击的防御能力‌ ‌T级DDoS攻击防护‌ 十堰高防服务器搭载 ‌T级清洗中心‌&#xff0c;支持智能流量调度与分层处理&#xff0c;可抵御 ‌800Gbps-1.2Tbps‌ 的大规模混合攻击&#xff08;如SYN Flood、UDP反射&#xff…

人工智能中的线性代数基础详解

‌ 线性代数是人工智能领域的重要数学基础之一,是人工智能技术的底层数学支柱,它为数据表示、模型构建和算法优化提供了核心工具。其核心概念与算法应用贯穿数据表示、模型训练及优化全过程。更多内容可看我文章:人工智能数学基础详解与拓展-CSDN博客 一、基本介绍 …

【毕业论文格式】word分页符后的标题段前间距消失

文章目录 【问题描述】 分页符之后的段落开头&#xff0c;明明设置了标题有段前段后间距&#xff0c;但是没有显示间距&#xff1a; 【解决办法】 选中标题&#xff0c;选择边框 3. 选择段前间距&#xff0c;1~31磅的一个数 结果

【蓝桥杯每日一题】3.16

&#x1f3dd;️专栏&#xff1a; 【蓝桥杯备篇】 &#x1f305;主页&#xff1a; f狐o狸x 目录 3.9 高精度算法 一、高精度加法 题目链接&#xff1a; 题目描述&#xff1a; 解题思路&#xff1a; 解题代码&#xff1a; 二、高精度减法 题目链接&#xff1a; 题目描述&…

2.7 滑动窗口专题:串联所有单词的子串

LeetCode 30. 串联所有单词的子串算法对比分析 1. 题目链接 LeetCode 30. 串联所有单词的子串 2. 题目描述 给定一个字符串 s 和一个字符串数组 words&#xff0c;words 中所有单词长度相同。要求找到 s 中所有起始索引&#xff0c;使得从该位置开始的连续子串包含 words 中所…

电脑实用小工具--VMware常用功能简介

一、创建、编辑虚拟机 1.1 创建新的虚拟机 详见文章新创建虚拟机流程 1.2 编辑虚拟机 创建完成后&#xff0c;点击编辑虚拟机设置&#xff0c;可对虚拟机内存、处理器、硬盘等各再次进行编辑设置。 二、虚拟机开关机 2.1 打开虚拟机 虚拟机创建成功后&#xff0c;点击…

为训练大模型而努力-分享2W多张卡通头像的图片

最近我一直在研究AI大模型相关的内容&#xff0c;想着从现在开始慢慢收集各种各样的图片&#xff0c;万一以后需要训练大模型的时候可以用到&#xff0c;或者自己以后也许会需要。于是决定慢慢收集这些图片&#xff0c;为未来的学习和训练大模型做一些铺垫&#xff0c;哈哈。 …

JVM 垃圾回收器的选择

一&#xff1a;jvm性能指标吞吐量以及用户停顿时间解释。 二&#xff1a;垃圾回收器的选择。 三&#xff1a;垃圾回收器在jvm中的配置。 四&#xff1a;jvm中常用的gc算法。 一&#xff1a;jvm性能指标吞吐量以及用户停顿时间解释。 在 JVM 调优和垃圾回收器选择中&#xff0…

使用GPTQ量化Llama-3-8B大模型

使用GPTQ量化8B生成式语言模型 服务器配置&#xff1a;4*3090 描述&#xff1a;使用四张3090&#xff0c;分别进行单卡量化&#xff0c;多卡量化。并使用SGLang部署量化后的模型&#xff0c;使用GPTQ量化 原来的模型精度为FP16&#xff0c;量化为4bit 首先下载gptqmodel量化…

2025-03-16 学习记录--C/C++-PTA 习题4-2 求幂级数展开的部分和

合抱之木&#xff0c;生于毫末&#xff1b;九层之台&#xff0c;起于累土&#xff1b;千里之行&#xff0c;始于足下。&#x1f4aa;&#x1f3fb; 一、题目描述 ⭐️ 习题4-2 求幂级数展开的部分和 已知函数e^x可以展开为幂级数1xx^2/2!x^3/3!⋯x^k/k!⋯。现给定一个实数x&a…

【C#】Http请求设置接收不安全的证书

在进行HTTP请求时&#xff0c;出现以下报错&#xff0c;可设置接收不安全证书跳过证书验证&#xff0c;建议仅测试环境设置&#xff0c;生产环境可能会造成系统漏洞 /// <summary> /// HttpGet请求方法 /// </summary> /// <param name"requestUrl"&…

AP AR

混淆矩阵 真实值正例真实值负例预测值正例TPFP预测值负例FNTN &#xff08;根据阈值预测&#xff09; P精确度计算&#xff1a;TP/(TPFP) R召回率计算&#xff1a;TP/(TPFN) AP 综合考虑P R 根据不同的阈值计算出不同的PR组合&#xff0c; 画出PR曲线&#xff0c;计算曲线…

Leetcode-1278.Palindrome Partitioning III [C++][Java]

目录 一、题目描述 二、解题思路 【C】 【Java】 Leetcode-1278.Palindrome Partitioning IIIhttps://leetcode.com/problems/palindrome-partitioning-iii/description/1278. 分割回文串 III - 力扣&#xff08;LeetCode&#xff09;1278. 分割回文串 III - 给你一个由小写…

C++特性——智能指针

为什么需要智能指针 对于定义的局部变量&#xff0c;当作用域结束之后&#xff0c;就会自动回收&#xff0c;这没有什么问题。 当时用new delete的时候&#xff0c;就是动态分配对象的时候&#xff0c;如果new了一个变量&#xff0c;但却没有delete&#xff0c;这会造成内存泄…

ctf web入门知识合集

文章目录 01做题思路02信息泄露及利用robots.txt.git文件泄露dirsearch ctfshow做题记录信息搜集web1web2web3web4web5web6web7web8SVN泄露与 Git泄露的区别web9web10 php的基础概念php的基础语法1. PHP 基本语法结构2. PHP 变量3.输出数据4.数组5.超全局变量6.文件操作 php的命…

MySQL-存储过程和自定义函数

存储过程 存储过程&#xff0c;一组预编译的 SQL 语句和流程控制语句&#xff0c;被命名并存储在数据库中。存储过程可以用来封装复杂的数据库操作逻辑&#xff0c;并在需要时进行调用。 使用存储过程 创建存储过程 create procedure 存储过程名() begin存储过程的逻辑代码&…

图——表示与遍历

图的两种主要表示方法 图有两种常用的表示方法&#xff0c;一种是邻接表法&#xff08;adjacency-list&#xff09;&#xff0c;另一种是邻接矩阵法&#xff08;adjacency-matrix&#xff09;。 邻接表法储存数据更紧凑&#xff0c;适合稀疏的图&#xff08;sparse graphs&am…

新手村:数据预处理-异常值检测方法

机器学习中异常值检测方法 一、前置条件 知识领域要求编程基础Python基础&#xff08;变量、循环、函数&#xff09;、Jupyter Notebook或PyCharm使用。统计学基础理解均值、中位数、标准差、四分位数、正态分布、Z-score等概念。机器学习基础熟悉监督/无监督学习、分类、聚类…

ChatGPT-4

第一章&#xff1a;ChatGPT-4的技术背景与核心架构 1.1 生成式AI的发展脉络 生成式人工智能&#xff08;Generative AI&#xff09;的演进历程可追溯至20世纪50年代的早期自然语言处理研究。从基于规则的ELIZA系统到统计语言模型&#xff0c;再到深度学习的革命性突破&#x…

C语言_数据结构总结9:树的基础知识介绍

1. 树的基本术语 - 祖先&#xff1a;考虑结点K&#xff0c;从根A到结点K的唯一路径上的所有其它结点&#xff0c;称为结点K的祖先。 - 子孙&#xff1a;结点B是结点K的祖先&#xff0c;结点K是B的子孙。结点B的子孙包括&#xff1a;E,F,K,L。 - 双亲&#xff1a;路径上…