基于netty实现简易版rpc服务-代码实现

news2024/10/20 0:21:42

1 公共部分

1.1 请求、响应对象

@Data
public class RpcRequest {

    private String serviceName;
    private String methodName;
    private Class<?>[] parameterTypes;
    private Object[] parameters;
}
@Data
public class RpcResponse {

    private int code;
    private String msg;
    private Object data;
    private String ex;
}

1.2 rpc协议

@Data
public class RpcProtocol {

    private int length;
    private byte[] content;

}

1.3 简易注册中心,保存服务名和地址的映射

public class ServiceRegister {

    private Map<String, List<String>> register = new HashMap<>();

    public ServiceRegister() {
        register.put(RpcService.class.getName(), 
        new ArrayList<>(List.of("localhost:1733")));
    }

    public List<String> findService(String serviceName) {
        return register.get(serviceName);
    }

}

1.4 rpc上下文,用来获取单例的ServiceRegister

public class RpcContext {

    public static ServiceRegister register() {
        return RpcRegisterHodler.REGISTER;
    }

    private static class RpcRegisterHodler {
        private static final ServiceRegister REGISTER = new ServiceRegister();
    }

}

1.7 帧解码器

// 帧解码器,要配置在ChannelPipeline的第一个,这样才能解决入站数据的粘包和半包
public class RpcFrameDecoder extends LengthFieldBasedFrameDecoder {

    public RpcFrameDecoder() {
        super(1024, 0, 4);
    }
}
// rpc协议的编解码器
public class RpcProtocolCodec extends ByteToMessageCodec<RpcProtocol> {
	// 将rpc协议对象编码成字节流
    @Override
    protected void encode(ChannelHandlerContext ctx, RpcProtocol msg, 
    ByteBuf out) throws Exception {
        out.writeInt(msg.getLength());
        out.writeBytes(msg.getContent());
    }
	// 将字节流解码成rpc协议对象
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in,
     List<Object> out) throws Exception {
        int length = in.readInt();
        byte[] content = new byte[length];
        in.readBytes(content);
        RpcProtocol protocol = new RpcProtocol();
        protocol.setLength(length);
        protocol.setContent(content);
        out.add(protocol);
    }
}
// rpc请求对象的编解码器
public class RpcRequestCodec extends MessageToMessageCodec<RpcProtocol, 
RpcRequest> {
	// 将请求对象编码成rpc协议对象
    @Override
    protected void encode(ChannelHandlerContext ctx, RpcRequest msg, 
    List<Object> out) throws Exception {
        byte[] content = JSON.toJSONBytes(msg);
        int length = content.length;
        RpcProtocol rpcProtocol = new RpcProtocol();
        rpcProtocol.setLength(length);
        rpcProtocol.setContent(content);
        out.add(rpcProtocol);
    }
	// 将rpc协议对象解码成请求对象
    @Override
    protected void decode(ChannelHandlerContext ctx, RpcProtocol msg, 
    List<Object> out) throws Exception {
        RpcRequest request = JSON.parseObject(msg.getData(),
                RpcRequest.class,
                JSONReader.Feature.SupportClassForName);
        out.add(request);
    }
}
// rpc响应对象的编解码器
public class RpcResponseCodec extends MessageToMessageCodec<RpcProtocol, 
RpcResponse> {
	// 将响应对象编码成rpc协议对象
    @Override
    protected void encode(ChannelHandlerContext ctx, RpcResponse msg, 
    List<Object> out) throws Exception {
        byte[] content = JSON.toJSONBytes(msg);
        int length = content.length;
        RpcProtocol rpcProtocol = new RpcProtocol();
        rpcProtocol.setLength(length);
        rpcProtocol.setContent(content);
        out.add(rpcProtocol);
    }
	// 将rpc协议对象解码成响应对象
    @Override
    protected void decode(ChannelHandlerContext ctx, RpcProtocol msg, 
    List<Object> out) throws Exception {
        RpcResponse response = JSON.parseObject(msg.getContent(), RpcResponse.class);
        out.add(response);
    }
}

1.6 服务接口

public interface RpcService {

    String hello(String name);

}

2 服务端

2.1 接口实现类

@Slf4j
public class RpcServiceImpl implements RpcService {
    @Override
    public String hello(String name) {

        log.info("service received: {} ", name);

        return "hello " + name;
    }
}

2.2 接口名和实现类的对象映射,通过接口名查找对应的实现类对象

public class ServiceMapping {

    private Map<String, RpcService> mappings = new HashMap<>();

    public ServiceMapping() {
        mappings.put(RpcService.class.getName(), new RpcServiceImpl());
    }


    public void registerMapping(String serviceName, RpcService service) {
        mappings.put(serviceName, service);
    }

    public RpcService findMapping(String serviceName) {
        return mappings.get(serviceName);
    }

}

2.2 服务端rpc上下文,用来获取单例的ServiceMapping

public class RpcServerContext {

    public static ServiceMapping mapping() {
        return RpcMappingrHodler.MAPPING;
    }

    private static class RpcMappingrHodler {
        private static final ServiceMapping MAPPING = new ServiceMapping();
    }

}

2.3 业务处理器handler

@Slf4j
public class RpcServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) 
    throws Exception {
        RpcRequest request = (RpcRequest) msg;
        RpcResponse response = invoke(request);
        ctx.writeAndFlush(response);
    }

    private RpcResponse invoke(RpcRequest request) {
        RpcResponse response = new RpcResponse();
        try {
            ServiceMapping register = RpcServerContext.mapping();
            RpcService rpcService = register.findMapping(
            request.getServiceName());
            String methodName = request.getMethodName();
            Class<?>[] parameterTypes = request.getParameterTypes();
            Object[] parameters = request.getParameters();
            // invoke
            Method method = RpcService.class.getDeclaredMethod(methodName, 
            parameterTypes);
            Object result = method.invoke(rpcService, parameters);
            //
            response.setCode(200);
            response.setMsg("ok");
            response.setData(result);
        } catch (Exception e) {
            response.setCode(500);
            response.setMsg("error");
            response.setEx(e.getMessage());
        }
        return response;
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("channelInactive :{}", ctx.channel().remoteAddress());
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
    throws Exception {
        log.error("exceptionCaught :{}", ctx.channel().remoteAddress(), cause);
        ctx.close();
    }
}

2.4 启动类

public class RpcServer {

    public static void main(String[] args) {

        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ChannelFuture channelFuture = new ServerBootstrap()
                    .group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new RpcFrameDecoder());
                            ch.pipeline().addLast(new RpcProtocolCodec());
                            ch.pipeline().addLast(new RpcRequestCodec());
                            ch.pipeline().addLast(new RpcResponseCodec());
//                            ch.pipeline().addLast(new LoggingHandler());
                            ch.pipeline().addLast(new RpcServerHandler());
                        }
                    }).bind(1733);

            channelFuture.sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }


    }

}

3 客户端

3.2 客户端rpc上下文,用来处理channel的响应数据

public class RpcClientContext {

    private Map<Channel, Promise<Object>> promises = new HashMap<>();

    public Promise<Object> getPromise(Channel channel) {
        return promises.remove(channel);
    }

    public void setPromise(Channel channel, Promise<Object> promise) {
        promises.put(channel, promise);
    }

}

3.2 业务处理器handler

@Slf4j
public class RpcClientHandler extends ChannelInboundHandlerAdapter {

    private final RpcClientContext rpcClientContext;

    public RpcClientHandler(RpcClientContext rpcClientContext) {
        this.rpcClientContext = rpcClientContext;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) 
    throws Exception {
        log.info("rpc invoke response: {}", msg);
        RpcResponse response = (RpcResponse) msg;
        //
        Promise<Object> promise = rpcClientContext.getPromise(ctx.channel());
        //
        if (response.getEx() != null)
            promise.setFailure(new RuntimeException(response.getEx()));
        else
            promise.setSuccess(response.getData());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("channelInactive :{}", ctx.channel().remoteAddress());
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
    throws Exception {
        log.error("exceptionCaught :{}", ctx.channel().remoteAddress(), cause);
        ctx.close();
    }
}

3.3 启动类

@Slf4j
public class RpcClient {

    private final Map<String, NioSocketChannel> nioSocketChannels = 
    new HashMap<>();
    private final RpcClientContext rpcClientContext = new RpcClientContext();

    public RpcService rpcService() {
        String serviceName = RpcService.class.getName();
        List<String> services = RpcContext.register().findService(serviceName);
        String url = services.get(0);
        if (!nioSocketChannels.containsKey(url)) {
            NioSocketChannel nioSocketChannel = createNioSocketChannel(url);
            nioSocketChannels.put(url, nioSocketChannel);
            log.info("create a new channel: {}", nioSocketChannel);
        }
        final NioSocketChannel nioSocketChannel = nioSocketChannels.get(url);
        return (RpcService) Proxy.newProxyInstance(RpcClient.class
        .getClassLoader(), new Class[]{RpcService.class},
                (proxy, method, args) -> {
                    RpcRequest request = new RpcRequest();
                    request.setServiceName(RpcService.class.getName());
                    request.setMethodName(method.getName());
                    request.setParameterTypes(method.getParameterTypes());
                    request.setParameters(args);
                    nioSocketChannel.writeAndFlush(request);
                    // wait response
                    DefaultPromise<Object> promise = 
                    new DefaultPromise<>(nioSocketChannel.eventLoop());
                    rpcClientContext.setPromise(nioSocketChannel, promise);
                    promise.await();
                    if (!promise.isSuccess())
                        throw new RuntimeException(promise.cause());
                    return promise.getNow();
                });
    }

    private NioSocketChannel createNioSocketChannel(String url) {
        //
        String host = url.substring(0, url.indexOf(":"));
        int port = Integer.parseInt(url.substring(url.indexOf(":") + 1));
        //
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            ChannelFuture channelFuture = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new RpcFrameDecoder());
                            ch.pipeline().addLast(new RpcProtocolCodec());
                            ch.pipeline().addLast(new RpcResponseCodec());
                            ch.pipeline().addLast(new RpcRequestCodec());
//                            ch.pipeline().addLast(new LoggingHandler());
                            ch.pipeline().addLast(new 
                            RpcClientHandler(rpcClientContext));
                        }
                    }).connect(host, port);

            channelFuture.sync();
            channelFuture.channel().closeFuture().addListener(future -> {
                nioSocketChannels.remove(RpcService.class.getName());
                group.shutdownGracefully();
            });
            //
            return (NioSocketChannel) channelFuture.channel();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private void close() {
        nioSocketChannels.values().forEach(NioSocketChannel::close);
    }


    public static void main(String[] args) {

        RpcClient rpcClient = new RpcClient();
        RpcService rpcService = rpcClient.rpcService();
        String netty = rpcService.hello("netty");
        System.out.println(netty);
        String world = rpcService.hello("world");
        System.out.println(world);
        String java = rpcService.hello("java");
        System.out.println(java);

        rpcClient.close();

    }

}

4 总结

这样就实现了简单的rpc服务,通过公共部分的接口、注册中心、编解码器、服务端的服务映射,客户端就能进行远程过程调用了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2218855.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

31. 问题 - GPIO调试

1. 概述 gpio调试过程中,个别gpio出现,波形干扰 2. gpio配置表 GPIO 功能 备注

AP上线的那些事儿(1)capwap建立过程、设备初始化以及二层上线

1、了解FITAP与AC的建立过程 之前我们已经知道了FATAP与FIT是一对双胞胎一样的兄弟&#xff0c;FAT哥哥能够直接独立使用当AP桥接、路由器等&#xff0c;而弟弟FIT则比较薄弱&#xff0c;独自发挥不出功效&#xff0c;需要一位师傅&#xff08;AC&#xff09;来带领&#xff0c…

Java21虚拟线程:我的锁去哪儿了?

0 前言 最近的文章中&#xff0c;我们详细介绍了当我们迁移到 Java 21 并将代际 ZGC 作为默认垃圾收集器时&#xff0c;我们的工作负载是如何受益的。虚拟线程是我们在这次迁移中兴奋采用的另一个特性。 对虚拟线程新手&#xff0c;它们被描述为“轻量级线程&#xff0c;大大…

word建立目录以及修改页码

1、为word建立新的目录 &#xff08;1&#xff09;选中word中的标题设置为第几级标题&#xff0c;将所有的标题均设置完成。最后可以鼠标右击标题&#xff0c;对不同的标题字体大小等进行设置。右击-->修改-->格式-->段落 &#xff08;2&#xff09;在word中插入新的…

springboot039基于Web足球青训俱乐部管理后台系统开发(论文+源码)_kaic

毕业设计(论文) 基于Web的足球青训俱乐部管理后台系统的设计与开发 学生姓名 XXX 学 号 XXXXXXXX 分院名称 XXXXXXXX 专业班级 XXXXX 指导教师 XXXX …

12.个人博客系统(Java项目基于spring和vue)

目录 1.系统的受众说明 2.相关技术介绍 2.1 B/S 简介 2.2 JAVA 简介 2.3 vue简介 2.4 SSM和Springboot简介 3.可行性分析 3.1 技术可行性分析 3.2 经济可行性分析 3.3 操作可行性 4.系统设计 4.1 系统总流程 4.2 博主用例 4.3 游客用例 4.4 系统类 4.…

llm 论文淘金,大模型精选论文解读,让你从大模型零基础到进阶

技术报告 没啥说的&#xff0c;当下最最最有含金量的论文&#xff0c;值得反复阅读。重点留意&#xff1a;数据清洗方法、pretrain 数据配比、pretrain 超参数、退火阶段、sft 的 task 种类、sft 的数据量级、dpo / ppo 训练技巧&#xff0c;合成数据方法等。 我个人觉着&…

Java项目实战II基于Spring Boot的毕业就业信息管理系统设计与实现(源码+数据库+文档)

目录 一、前言 二、技术介绍 三、系统实现 四、文档参考 五、核心代码 六、源码获取 全栈码农以及毕业设计实战开发&#xff0c;CSDN平台Java领域新星创作者&#xff0c;专注于大学生项目实战开发、讲解和毕业答疑辅导。获取源码联系方式请查看文末 一、前言 随着高校扩…

014_django基于大数据运城市二手房价数据可视化系统的设计与实现2024_3ahrxq75

目录 系统展示 开发背景 代码实现 项目案例 获取源码 博主介绍&#xff1a;CodeMentor毕业设计领航者、全网关注者30W群落&#xff0c;InfoQ特邀专栏作家、技术博客领航者、InfoQ新星培育计划导师、Web开发领域杰出贡献者&#xff0c;博客领航之星、开发者头条/腾讯云/AW…

【C++】编码最全详解

✨ Blog’s 主页: 白乐天_ξ( ✿&#xff1e;◡❛) &#x1f308; 个人Motto&#xff1a;他强任他强&#xff0c;清风拂山冈&#xff01; &#x1f525; 所属专栏&#xff1a;C深入学习笔记 &#x1f4ab; 欢迎来到我的学习笔记&#xff01; 一、什么编码&#xff1f; 编码&am…

DDR Study - LPDDR Initial

参考来源&#xff1a;JESD209-4B 在之前的DDR Study - Basic Understanding中介绍了DDR的基础概念&#xff0c;从这篇文章开始&#xff0c;会基于LPDDR4依次按照如下顺序对LPDDR内容进行简单分析&#xff1a; LPDDR Initial → LPDDR Write Leveling and DQ Training → LPDDR …

【Jenkins】windows安装步骤

【Jenkins】windows安装步骤 官网使用WAR包方式运行浏览器访问Jenkinswindows-installer安装安装过程问题解决This account either does not hava the privilege to logon as a service or the account was unable to be verified 安装成功修改jenkins.xml启动jenkins访问jenki…

如何测试IP速度?

了解代理的连接速度是否快速是确保网络使用效率和体验的关键因素之一。本文来为大家如何有效地评估和测试代理IP的连接速度&#xff0c;以及一些实用的方法和工具&#xff0c;帮助用户做出明智的选择和决策。 一、如何评估代理IP的连接速度 1. 使用在线速度测试工具 为了快速…

阿里云云盘在卸载时关联到PHP进程,如何在不影响PHP进程情况下卸载磁盘

1.问题&#xff1a; 在使用umount /dev/vdc1 卸载磁盘时&#xff0c;提示如下&#xff0c;导致无法在Linux系统下卸载磁盘 umount /dev/vdc1 umount: /var/www/html/*/eshop/IFile3: target is busy.(In some cases useful info about processes that usethe device is found…

鸿蒙Next设备上的ProxyMan、Charles网络抓包配置教程

一、Proxyman配置 1. 导出证书 ProxyMan菜单栏依次点击 证书—>导出—>根证书为PEM 然后保存.pem文件传送(如hdc命令<下文会有介绍>)至鸿蒙Next设备存储任意位置 2. 安装证书 系统设置搜索“证书”&#xff0c;结果列表中点击“证书与凭据” 点击“从存储设备…

AI周报(10.13-10.19)

AI应用-清华校友用AI破解162个高数定理 加州理工、斯坦福和威大的研究人员提出了LeanAgent——一个终身学习&#xff0c;并能证明定理的AI智能体。LeanAgent会根据数学难度优化的学习轨迹课程&#xff0c;来提高学习策略。并且&#xff0c;它还有一个动态数据库&#xff0c;有效…

数据结构练习题4(链表)

1两两交换链表中的节点 给你一个链表&#xff0c;两两交换其中相邻的节点&#xff0c;并返回交换后链表的头节点。你必须在不修改节点内部的值的情况下完成本题&#xff08;即&#xff0c;只能进行节点交换&#xff09;。 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,4]…

Sqli-labs less-27

Sqli-labs less-27 过滤union\select绕过方式 ### 1. 逻辑绕过 例&#xff1a; 过滤代码 union select user,password from users 绕过方式 1 && (select user from users where userid1)‘admin’### 2.十六进制字符绕过 select ——> selec\x74 union——>un…

AutoFixture:.NET 的假数据生成工具

上次推荐过《Bogus&#xff1a;.NET的假数据生成利器》方便我们制造假数据测试。今天继续推荐另外一个也是非常流行的工具。 01 项目简介 AutoFixture 是一个用于 .NET 的测试工具&#xff0c;它允许开发者在单元测试中自动生成随机的测试数据。它支持广泛的数据类型&#xf…

充电桩高压快充发展趋势

一、为什么要升级充电电压 1、新能源发展的困境 随着电动汽车加快发展&#xff0c;用户对电动汽车接受度不断提高&#xff0c;充电问题是影响电动车普及的重要因素&#xff0c;用户快速补能的需求强烈&#xff0c;例如节假日经常会遇到&#xff0c;高速充电1小时&#xff0c;…