基础
在仿写Dubbo之前,需要了解一些技术,像Java反射,Java代理,Java Socket以及Dubbo相关概念。
项目结构
项目gitee地址:https://gitee.com/AGi_R/framework
my-common
整个项目的公共资源库。存放一些公共的注解,类,接口,工具类等,供其他模块使用。
my-container
仿照tomcat的一些功能编写的项目运行容器。Sheep抽象类负责接收请求并分发,使用ServerSocket来接收请求,分发逻辑由my-framework编写。接收到一个请求之后创建一个新的线程来处理请求。 在UrlMethodMapping中存放访问路径以及对应的方法相关信息,方法信息封装成MethodObject类。
my-framework
仿照spring boot编写的开发框架。编写了@MyService,@MyController,@MyRequestMapping,@MyAutowired和@MyApplication等注解。 实现了IOC自动装配bean,MyRequestMapping请求处理,MyAutowired自动注入(只能注入类,无法注入接口)等功能。继承了Sheep抽象类,实现了请求分发功能。
my-registry
根据本项目需求编写的注册中心。实现了服务提供者注册,消费者订阅和消息推送等功能。每隔60秒对还在运行的消费者推送提供者列表。
⭐my-rpc
仿照dubbo编写的rpc框架。编写了@RpcReference和@RpcService注解,实现了其基本功能。实现了远程调用,负载均衡,服务提供者注册,服务消费者订阅等功能。
rpc-demo
my-framework的简单使用。
1.执行DemoApp的main方法启动项目
2.浏览器访问localhost:9527/进行测试
⭐rpc-api,rpc-consumer,rpc-provider
仿照dubbo项目结构搭建的微服务项目。rpc-api存放接口,rpc-consumer服务消费者,rpc-provider服务提供者。
1. 启动my-registry注册中心
2. 启动rpc-provider服务提供者
3. 启动rpc-consumer服务消费者
4. 浏览器访问localhost/demo/getUUID测试
my-rpc
annotation
存放的是框架使用的注解。@Rpc是修饰微服务项目启动类,有一个RpcType类型的必填参数type。@RpcService用来修饰服务提供者端的接口实现类。@RpcReference修饰服务消费者端需要远程调用的成员变量,value参数是被@RpcService修饰的类的全路径名。
@Documented @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) public @interface Rpc { RpcType type(); } @Documented @Component //存放在my-common里的公共注解 @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) public @interface RpcService { } @Documented @Target(ElementType.FIELD) @Retention(RetentionPolicy.RUNTIME) public @interface RpcReference { /** * 被@RpcService修饰的类的全路径名 * 例:com.agi.rpc.annotation.RpcReference * @return String * * @author aji * @date 2023/1/25 11:47 */ String value(); }
enums
RpcType枚举类,是@Rpc的参数类型,表明当前项目是CONSUMER或者PROVIDER。
public enum RpcType { PROVIDER, CONSUMER; }
handler
是存放处理类的包。RpcReferenceHandler是给被@RpcReference修饰的成员变量赋值用的处理类。
public class RpcReferenceHandler { public static void assignReferenceValue() { for (Map.Entry<String, Bean> entry : IOC.getBeanMap().entrySet()) { Bean bean = entry.getValue(); for (Field declaredField : bean.getClazz().getDeclaredFields()) { if (declaredField.isAnnotationPresent(RpcReference.class)) { RpcReference reference = declaredField.getDeclaredAnnotation(RpcReference.class); declaredField.setAccessible(true); Object value = RpcProxy.getInstance(declaredField.getType(), reference.value()); try { declaredField.set(bean.getInstance(), value); } catch (Exception e) { e.printStackTrace(); } } } } } }
RpcRegistryHandler是向注册中心注册服务或者订阅服务用的处理类。
public class RpcRegistryHandler { //提供者列表 private static List<ServiceProvider> providers = new ArrayList<>(); private static void handler(RegistryRequestType type, InetAddress registryHost, int registryPort, InetAddress thisHost, int thisPort, String thisServiceName) { try { Socket socket = new Socket(registryHost, registryPort); OutputStream outputStream = socket.getOutputStream(); ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream); RpcRegistryRequest request = new RpcRegistryRequest(); request.setType(type); request.setHost(thisHost); request.setPort(thisPort); request.setServiceName(thisServiceName); objectOutputStream.writeObject(request); objectOutputStream.flush(); InputStream inputStream = socket.getInputStream(); ObjectInputStream objectInputStream = new ObjectInputStream(inputStream); Object object = objectInputStream.readObject(); if (object instanceof RpcRegistryResponse) { RpcRegistryResponse response = (RpcRegistryResponse) object; providers = response.getProviderList(); } objectInputStream.close(); inputStream.close(); objectOutputStream.close(); outputStream.close(); socket.close(); } catch (Exception e) { e.printStackTrace(); } } public static void register(InetAddress registryHost, int registryPort, InetAddress thisHost, int thisPort, String thisServiceName) { handler(RegistryRequestType.PUT, registryHost, registryPort, thisHost, thisPort, thisServiceName); } public static void subscribe(InetAddress registryHost, int registryPort, InetAddress thisHost, int thisPort, String thisServiceName) { handler(RegistryRequestType.GET, registryHost, registryPort, thisHost, thisPort, thisServiceName); Invoker.setProviderList(providers); } }
⭐proxy
存放代理,远程调用相关的类。RpcInvoker用来远程调用服务以及实现负载均衡的类。实现了随机访问和轮询两种策略。
public class RpcInvoker extends Invoker { //当前位置 private static int index = 0; //随机访问 private static int RandomBalance() { int size = providerList.size(); Random random = new Random(); return random.nextInt(size); } //轮询(队列询问) private static int QueueBalance() { int size = providerList.size(); int temp = index % size; index++; return temp; } private static Socket connect() { Socket socket = null; try { if (providerList.size() == 0) { throw new RuntimeException("请启动提供者"); } ServiceProvider provider = providerList.get(QueueBalance()); socket = new Socket(); socket.connect(new InetSocketAddress(provider.getHost(), provider.getPort()), 3000); if (!socket.isConnected()) { providerList.remove(index); connect(); } } catch (Exception e) { e.printStackTrace(); } return socket; } //远程调用 public static Object invoke(RpcProtocol protocol) { Object result = null; try { Socket socket = connect(); //告诉提供者容器,发送的是rpc请求 OutputStream outputStream = socket.getOutputStream(); outputStream.write("rpc\n".getBytes(StandardCharsets.UTF_8)); outputStream.flush(); InputStream inputStream = socket.getInputStream(); Scanner scanner = new Scanner(inputStream); String line = scanner.hasNext() ? scanner.nextLine() : ""; if (line.equals("rpc")) { //发送正式数据 ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); objectOutputStream.writeObject(protocol); objectOutputStream.flush(); //接收响应数据 ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream()); System.out.println(protocol); result = objectInputStream.readObject(); //关闭请求 objectInputStream.close(); outputStream.close(); } socket.close(); } catch (Exception e) { e.printStackTrace(); } return result; } }
RpcProxy是代理类,代理的是被@Reference修饰的成员变量。
public class RpcProxy implements InvocationHandler { private String referenceValue; private RpcProxy(String referenceValue) { this.referenceValue = referenceValue; } /** * 远程调用逻辑 * @param proxy * @param method * @param args * * @return Object * * @author aji * @date 2023/1/25 10:56 */ @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //封装请求 RpcProtocol protocol = new RpcProtocol(); protocol.setClassName(referenceValue); protocol.setMethodName(method.getName()); protocol.setParamTypes(method.getParameterTypes()); protocol.setParams(args); //远程调用 Object invoke = RpcInvoker.invoke(protocol); return invoke; } public static Object getInstance(Class clazz, String referenceValue) { RpcProxy handler = new RpcProxy(referenceValue); Object instance = Proxy.newProxyInstance( clazz.getClassLoader(), new Class[]{clazz}, handler ); return instance; } }
RpcApplication类
是微服务项目启动开关,根据项目启动类上的@Rpc注解的type参数判断当前项目是消费者或者提供者并调用相关逻辑。
public class RpcApplication { public static void run(Class mainClass, InetAddress registryHost, int registryPort, InetAddress thisHost, int thisPort, String thisServiceName) { if (mainClass.isAnnotationPresent(Rpc.class)) { Rpc rpc = (Rpc) mainClass.getDeclaredAnnotation(Rpc.class); if (rpc.type() == RpcType.CONSUMER) { //订阅 RpcRegistryHandler.subscribe(registryHost, registryPort, thisHost, thisPort, thisServiceName); RpcReferenceHandler.assignReferenceValue(); } else { //注册 RpcRegistryHandler.register(registryHost, registryPort, thisHost, thisPort, thisServiceName); } } else { throw new RuntimeException("请添加@Rpc注解"); } } }
rpc-api,rpc-consumer,rpc-provider
是仿照Dubbo项目结构编写的微服务项目。实现了接口,消费者,提供者模块。