仿写Dubbo-MyRpc

news2024/11/28 0:45:50

基础

在仿写Dubbo之前,需要了解一些技术,像Java反射,Java代理,Java Socket以及Dubbo相关概念。

项目结构

项目gitee地址:https://gitee.com/AGi_R/framework 

my-common

整个项目的公共资源库。存放一些公共的注解,类,接口,工具类等,供其他模块使用。

my-container

仿照tomcat的一些功能编写的项目运行容器。Sheep抽象类负责接收请求并分发,使用ServerSocket来接收请求,分发逻辑由my-framework编写。接收到一个请求之后创建一个新的线程来处理请求。 在UrlMethodMapping中存放访问路径以及对应的方法相关信息,方法信息封装成MethodObject类。

my-framework

仿照spring boot编写的开发框架。编写了@MyService,@MyController,@MyRequestMapping,@MyAutowired和@MyApplication等注解。 实现了IOC自动装配bean,MyRequestMapping请求处理,MyAutowired自动注入(只能注入类,无法注入接口)等功能。继承了Sheep抽象类,实现了请求分发功能。

my-registry

根据本项目需求编写的注册中心。实现了服务提供者注册,消费者订阅和消息推送等功能。每隔60秒对还在运行的消费者推送提供者列表。

⭐my-rpc

仿照dubbo编写的rpc框架。编写了@RpcReference和@RpcService注解,实现了其基本功能。实现了远程调用,负载均衡,服务提供者注册,服务消费者订阅等功能。

rpc-demo

my-framework的简单使用。

1.执行DemoApp的main方法启动项目

2.浏览器访问localhost:9527/进行测试

⭐rpc-api,rpc-consumer,rpc-provider

仿照dubbo项目结构搭建的微服务项目。rpc-api存放接口,rpc-consumer服务消费者,rpc-provider服务提供者。

1. 启动my-registry注册中心

2. 启动rpc-provider服务提供者

3. 启动rpc-consumer服务消费者

4. 浏览器访问localhost/demo/getUUID测试

my-rpc

annotation

存放的是框架使用的注解。@Rpc是修饰微服务项目启动类,有一个RpcType类型的必填参数type。@RpcService用来修饰服务提供者端的接口实现类。@RpcReference修饰服务消费者端需要远程调用的成员变量,value参数是被@RpcService修饰的类的全路径名。

@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Rpc {
    RpcType type();
}

@Documented
@Component    //存放在my-common里的公共注解
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcService {
}

@Documented
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcReference {
    /**
     * 被@RpcService修饰的类的全路径名
     * 例:com.agi.rpc.annotation.RpcReference
     * @return String
     *
     * @author aji
     * @date 2023/1/25 11:47
     */
    String value();
}

enums

RpcType枚举类,是@Rpc的参数类型,表明当前项目是CONSUMER或者PROVIDER。

public enum RpcType {
    PROVIDER, CONSUMER;
}

handler

是存放处理类的包。RpcReferenceHandler是给被@RpcReference修饰的成员变量赋值用的处理类。

public class RpcReferenceHandler {
    public static void assignReferenceValue() {
        for (Map.Entry<String, Bean> entry : IOC.getBeanMap().entrySet()) {
            Bean bean = entry.getValue();
            for (Field declaredField : bean.getClazz().getDeclaredFields()) {
                if (declaredField.isAnnotationPresent(RpcReference.class)) {
                    RpcReference reference = declaredField.getDeclaredAnnotation(RpcReference.class);
                    declaredField.setAccessible(true);
                    Object value = RpcProxy.getInstance(declaredField.getType(), reference.value());
                    try {
                        declaredField.set(bean.getInstance(), value);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

RpcRegistryHandler是向注册中心注册服务或者订阅服务用的处理类。

public class RpcRegistryHandler {
    //提供者列表
    private static List<ServiceProvider> providers = new ArrayList<>();

    private static void handler(RegistryRequestType type, InetAddress registryHost, int registryPort, InetAddress thisHost, int thisPort, String thisServiceName) {
        try {
            Socket socket = new Socket(registryHost, registryPort);
            OutputStream outputStream = socket.getOutputStream();
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
            RpcRegistryRequest request = new RpcRegistryRequest();
            request.setType(type);
            request.setHost(thisHost);
            request.setPort(thisPort);
            request.setServiceName(thisServiceName);
            objectOutputStream.writeObject(request);
            objectOutputStream.flush();

            InputStream inputStream = socket.getInputStream();
            ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
            Object object = objectInputStream.readObject();
            if (object instanceof RpcRegistryResponse) {
                RpcRegistryResponse response = (RpcRegistryResponse) object;
                providers = response.getProviderList();
            }
            objectInputStream.close();
            inputStream.close();
            objectOutputStream.close();
            outputStream.close();
            socket.close();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void register(InetAddress registryHost, int registryPort, InetAddress thisHost, int thisPort, String thisServiceName) {
        handler(RegistryRequestType.PUT, registryHost, registryPort, thisHost, thisPort, thisServiceName);
    }

    public static void subscribe(InetAddress registryHost, int registryPort, InetAddress thisHost, int thisPort, String thisServiceName) {
        handler(RegistryRequestType.GET, registryHost, registryPort, thisHost, thisPort, thisServiceName);
        Invoker.setProviderList(providers);
    }
}

⭐proxy

存放代理,远程调用相关的类。RpcInvoker用来远程调用服务以及实现负载均衡的类。实现了随机访问和轮询两种策略。

public class RpcInvoker extends Invoker {
    //当前位置
    private static int index = 0;

    //随机访问
    private static int RandomBalance() {
        int size = providerList.size();
        Random random = new Random();
        return random.nextInt(size);
    }

    //轮询(队列询问)
    private static int QueueBalance() {
        int size = providerList.size();
        int temp = index % size;
        index++;
        return temp;
    }

    private static Socket connect() {
        Socket socket = null;
        try {
            if (providerList.size() == 0) {
                throw new RuntimeException("请启动提供者");
            }
            ServiceProvider provider = providerList.get(QueueBalance());
            socket = new Socket();
            socket.connect(new InetSocketAddress(provider.getHost(), provider.getPort()), 3000);
            if (!socket.isConnected()) {
                providerList.remove(index);
                connect();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return socket;
    }

    //远程调用
    public static Object invoke(RpcProtocol protocol) {
        Object result = null;
        try {
            Socket socket = connect();
            //告诉提供者容器,发送的是rpc请求
            OutputStream outputStream = socket.getOutputStream();
            outputStream.write("rpc\n".getBytes(StandardCharsets.UTF_8));
            outputStream.flush();
            InputStream inputStream = socket.getInputStream();
            Scanner scanner = new Scanner(inputStream);
            String line = scanner.hasNext() ? scanner.nextLine() : "";
            if (line.equals("rpc")) {
                //发送正式数据
                ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
                objectOutputStream.writeObject(protocol);
                objectOutputStream.flush();
                //接收响应数据
                ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
                System.out.println(protocol);
                result = objectInputStream.readObject();
                //关闭请求
                objectInputStream.close();
                outputStream.close();
            }
            socket.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return result;
    }
}

RpcProxy是代理类,代理的是被@Reference修饰的成员变量。

public class RpcProxy implements InvocationHandler {
    private String referenceValue;

    private RpcProxy(String referenceValue) {
        this.referenceValue = referenceValue;
    }

    /**
     * 远程调用逻辑
     * @param proxy
     * @param method
     * @param args
     *
     * @return Object
     *
     * @author aji
     * @date 2023/1/25 10:56
     */
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        //封装请求
        RpcProtocol protocol = new RpcProtocol();
        protocol.setClassName(referenceValue);
        protocol.setMethodName(method.getName());
        protocol.setParamTypes(method.getParameterTypes());
        protocol.setParams(args);
        //远程调用
        Object invoke = RpcInvoker.invoke(protocol);
        return invoke;
    }

    public static Object getInstance(Class clazz, String referenceValue) {
        RpcProxy handler = new RpcProxy(referenceValue);
        Object instance = Proxy.newProxyInstance(
                clazz.getClassLoader(),
                new Class[]{clazz},
                handler
        );
        return instance;
    }
}

RpcApplication类

是微服务项目启动开关,根据项目启动类上的@Rpc注解的type参数判断当前项目是消费者或者提供者并调用相关逻辑。

public class RpcApplication {
    public static void run(Class mainClass, InetAddress registryHost, int registryPort, InetAddress thisHost, int thisPort, String thisServiceName) {
        if (mainClass.isAnnotationPresent(Rpc.class)) {
            Rpc rpc = (Rpc) mainClass.getDeclaredAnnotation(Rpc.class);
            if (rpc.type() == RpcType.CONSUMER) {
                //订阅
                RpcRegistryHandler.subscribe(registryHost, registryPort, thisHost, thisPort, thisServiceName);
                RpcReferenceHandler.assignReferenceValue();
            } else {
                //注册
                RpcRegistryHandler.register(registryHost, registryPort, thisHost, thisPort, thisServiceName);
            }
        } else {
            throw new RuntimeException("请添加@Rpc注解");
        }
    }
}

rpc-api,rpc-consumer,rpc-provider

 是仿照Dubbo项目结构编写的微服务项目。实现了接口,消费者,提供者模块。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/180408.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

拦截器、过滤器、监听器

目录一、拦截器1. 拦截器是什么?2. 设置拦截器a. 定义拦截器b. 配置加载拦截器c. 新建页面二、过滤器1. 使用原因2. Filter概念图3. Filter编程三、监听器一、拦截器 拦截器&#xff1a;必须保证页面有访问controller的操作&#xff0c;否则拦截不了 1. 拦截器是什么? 概念…

OpenWrt软路由空间扩容

文章目录预备知识OpenWrt系统固件分类EXT4固件扩容方式新建分区扩容操作步骤直接扩容操作步骤SQUASHFS固件扩容方式新建分区扩容直接扩容EFI引导固件的额外操作参考预备知识 OpenWrt系统固件分类 EXT4固件 固件包名称中包含有ext4关键字&#xff0c;可以参考固件分类关键字示意…

设计模式 - 创建型模式_建造者模式

文章目录创建型模式概述Case模拟工程Bad ImplBetter Impl &#xff08;建造者模式重构代码&#xff09;小结创建型模式 创建型模式提供创建对象的机制&#xff0c; 能够提升已有代码的灵活性和可复⽤性。 类型实现要点工厂方法定义⼀个创建对象的接⼝&#xff0c;让其⼦类⾃⼰…

编写用户注册用表单

<!-- 需求&#xff1a; 用户注册&#xff1a;用户名、密码、确认密码、性别、兴趣爱好、学历、简介 --> <!DOCTYPE html> <html> <head> <meta charset"utf-8"> <title>编写用户注册用表单<…

【Linux】Linux和Window下\r与\n的区别、git命令行的使用

作者&#xff1a;小卢 专栏&#xff1a;《Linux》、《Git》 喜欢的话&#xff1a;世间因为少年的挺身而出&#xff0c;而更加瑰丽。 ——《人民日报》 目录 1. 回车换行符在Window下和在Linux下的区别&#xff1a; 1.1回车换行符&#xff1a;…

用友U8和旺店通企业版淘宝奇门单据接口集成

用友U8和旺店通企业奇门单据接口集成对接系统&#xff1a;旺店通企业奇门慧策最先以旺店通ERP切入商家核心管理痛点——订单管理&#xff0c;之后围绕电商经营管理中的核心管理诉求&#xff0c;先后布局流量获取、会员管理、仓库管理等其他重要经营模块。慧策的产品线从旺店通E…

实现宏offsetof()

本期介绍&#x1f356; 主要介绍&#xff1a;什么是offsetof()&#xff0c;offsetof()的用法&#xff0c;如何自己实现这个宏&#x1f440;。 offsetof其实是一个宏&#xff0c;作用是&#xff1a;能够求出指定成员相对于结构体起始地址的偏移量&#xff08;单位&#xff1a;字…

(考研湖科大教书匠计算机网络)第三章数据链路层-第一节:数据链路层概述

文章目录一&#xff1a;数据链路层概述&#xff08;1&#xff09;为什么要有数据链路层&#xff08;2&#xff09;数据链路层定义&#xff08;3&#xff09;点对点信道和广播信道二&#xff1a;数据链路层需要解决的一些问题&#xff08;1&#xff09;三个最基本问题①&#xf…

深入理解Promise之一步步教你手写Promise构造函数

目录前言一&#xff0c;手写教学1.1 基本结构1.2 resolve与reject结构搭建1.3 resolve与reject代码实现1.4 throw抛出异常改变状态1.5 promise对象状态只能转换一次1.6 then方法进行回调1.7 异步任务的回调执行1.8 执行多个回调的实现1.9 同步修改状态then方法结果返回1.10 异步…

【手写 Promise 源码】第四篇 - 翻译并理解 Promise A+ 规范

一&#xff0c;前言 上一篇&#xff0c;根据对 Promise 的分析和了解&#xff0c;实现了一个简版 Promise&#xff0c;主要涉及以下内容&#xff1a; Promise 的实现思路&#xff1b;Promise A 规范&#xff08;简版&#xff09;&#xff1b;Promise 简版实现和功能测试&…

KVM虚拟化之小型虚拟机kvmtool的使用记录

根据 kvmtool github仓库文档的描述&#xff0c;类似于QEMU&#xff0c;kvmtool是一个承载KVM Guest OS的 host os用户态虚拟机&#xff0c;作为一个纯的完全虚拟化的工具&#xff0c;它不需要修改guest os即可运行, 不过&#xff0c;由于KVM基于CPU的硬件虚拟化支持&#xff0…

读《哲学的故事》

文章目录读《哲学的故事》&#x1f6a9; 遇见&#x1f33b; 简述&#x1f33e; 部分摘抄读《哲学的故事》 一本书读过后&#xff0c;我有种脑子里又被塞进了很多新东西的感觉&#xff0c;也有种想要自我抒发、宣泄的欲望。可真到要说的时候&#xff0c;又好像无话可说。总归勉…

Java转换流(InputStreamReader/OutputStreamWriter)

文章目录概述为什么会有转换流&#xff1f;InputStreamReaderOutputStreamWriter概述 转换流是字节流到字符流的桥梁&#xff0c;在转换的过程中&#xff0c;可以指定编码。转换流也是一种处理流&#xff0c;它提供了字节流和字符流之间的转换。 转换流的两个类 InputStreamR…

1.设计模式的前奏

哪些维度评判代码质量的好坏&#xff1f; 常用的评价标准 可维护性&#xff08;maintainability&#xff09;:维护代码的成本可读性&#xff08;readability&#xff09;可扩展性&#xff08;extensibility&#xff09;&#xff1a;码应对未来需求变化的能力灵活性&#xff0…

【keepass】密码管理软件-推荐插件和相关工具合集-keepass工作流分析(自动填充、美化界面、快速添加记录、安全增强、软件和数据库维护类)

Keepass有很多已经开源的插件&#xff0c;生态良好&#xff0c;在官网有专门的插件推荐区。安装插件的方法很简单&#xff0c;直接把下载好的插件文件放在plugins文件夹内&#xff0c;重启软件即可。下面我以几大功能推荐一些keepass插件或搭配使用的浏览器扩展&#xff0c;以求…

Coolify系列-手把手教学解决局域网局域网中的其他主机访问虚拟机以及docker服务

背景 我在windows电脑安装了一个VM&#xff0c;使用VM开启了Linux服务器&#xff0c;运行docker&#xff0c;下载服务镜像&#xff0c;然后运行服务,然后遇到了主机无法访问服务的问题。 问题排查 STEP1:首先要开启防火墙端口&#xff0c;这个我的Coolify系列其他文章有详细…

【c++】设置控制台窗口字体颜色和背景色(system和SetConsoleTextAttribute函数 )(内含超好玩的c++游戏链接)

目录 游戏推荐 研究初步 SetConsoleTextAttribute函数 原型 参数 举个栗子 最后 题外话 一篇游戏笔记。。。 游戏推荐 最近&#xff0c;在玩&#xff08;完&#xff09;一个c的控制台游戏。 啊&#xff0c;真的非常好玩。虽然是一个文字游戏&#xff0c;但有很多隐…

分享137个ASP源码,总有一款适合您

ASP源码 分享137个ASP源码&#xff0c;总有一款适合您 下面是文件的名字&#xff0c;我放了一些图片&#xff0c;文章里不是所有的图主要是放不下...&#xff0c; 137个ASP源码下载链接&#xff1a;https://pan.baidu.com/s/13nF0yADJhSBonIFUIoymPQ?pwdmsl8 提取码&#x…

【C++】位图、布隆过滤器概念与模拟实现

目录 一、位图 1.1 位图的概念 1.2 位图的使用 1.3 位图的实现 1.4 位图的应用 二、布隆过滤器 2.1 布隆过滤器 2.2 布隆过滤器的实现 2.3 布隆过滤器练习题 一、位图 1.1 位图的概念 所谓位图&#xff0c;就是用每一位来存放某种状态&#xff0c;适用于海量数据&am…

监控Python 内存使用情况和代码执行时间

我的代码的哪些部分运行时间最长、内存最多&#xff1f;我怎样才能找到需要改进的地方&#xff1f;” 在开发过程中&#xff0c;我很确定我们大多数人都会想知道这一点&#xff0c;而且通常情况下存在开发空间。在本文中总结了一些方法来监控 Python 代码的时间和内存使用情况…