使用动态代理+Netty+Zookeeper+Protobuff手撸一个RPC框架

news2025/1/11 21:56:03

RPC是什么

RPC(Remote Procedure Call)远程过程调用,一种计算机之间的远程调用技术,客户端能够在不知道服务器底层的通信架构的情况下调用服务器端的方法,就像调用自身的方法一样。

举个例子:

老婆自己去超市买瓶酱油,这是本地调用

老婆发微信要我去买瓶酱油回来,不管我是开车、打车、骑车、坐地铁去超市,这就是远程过程调用

RPC可以做什么

现在的软件系统规模越来越大,很多采用了微服务架构,就是将系统拆分成了一个个独立的服务,部署在不同的服务器上,如电商系统一般会有商品服务、库存服务、订单服务、支付服务、物流服务、优惠券服务、售后服务等等。

服务和服务之间需要相互通信,如订单服务就需要商品服务提供订单商品的数据,这时就可以通过RPC进行网络通信。

RPC如何实现

进行RPC通信的两台计算机,提供服务的称为服务提供者,调用服务的称为服务消费者,消费者需要知道服务提供者的地址(IP和端口)才能进行调用,这时我们需要一台服务器来保存服务提供者的地址提供给服务消费者调用,这台服务器就是服务注册中心Registry。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-t0cxgqcN-1670831166759)(RPC.assets/image-20221212101459851.png)]

这就像你要跟一家公司打电话,但是不知道对方的号码,可以去114查询,因为这些公司都在114上注册了自己的电话号码,114告诉你,就可以联系对方了。

使用Dubbo实现RPC

Dubbo简介

Dubbo是阿里的一款高性能、轻量级的开源Java RPC框架

它提供了三大核心能力:

  1. 透明化的远程方法调用,就像调用本地方法一样调用远程方法,只需简单配置,没有任何API侵入。

  2. 软负载均衡及容错机制,可在内网替代F5等硬件负载均衡器,降低成本,减少单点。

  3. 服务自动注册与发现,注册中心基于接口名查询服务提供者的IP地址,并且能够平滑添加或删除服务提供者。

Dubbo的架构

img

  • Provider: 暴露服务的服务提供方。
  • Consumer: 调用远程服务的服务消费方。
  • Registry: 服务注册与发现的注册中心。
  • Monitor: 统计服务的调用次调和调用时间的监控中心。
  • Container: 服务运行容器。

Dubbo简单案例

此案例中的注册中心是Zookeeper,首先需要安装和启动Zookeeper

创建父项目,下面三个子项目:

  • common_api 通用的接口
  • consumer_service 服务消费者
  • provider_service 服务提供者

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OFe5WCww-1670831166761)(RPC.assets/image-20221212102941354.png)]

父项目的pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.3</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>org.example</groupId>
    <artifactId>zookeeper_dubbo_demo</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>common_api</module>
    </modules>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.dubbo</groupId>
            <artifactId>dubbo-spring-boot-starter</artifactId>
            <version>2.7.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.dubbo</groupId>
            <artifactId>dubbo</artifactId>
            <version>2.7.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.8.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.14</version>
        </dependency>

        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.10</version>
        </dependency>
    </dependencies>
</project>

通用项目添加接口

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HXumczBt-1670831166761)(RPC.assets/image-20221212103328960.png)]

消费者和服务者都继承父项目并引入通用项目

<parent>
    <groupId>org.example</groupId>
    <artifactId>zookeeper_dubbo_demo</artifactId>
    <version>1.0-SNAPSHOT</version>
</parent>

<dependency>
    <groupId>com.blb</groupId>
    <artifactId>common_api</artifactId>
    <version>1.0-SNAPSHOT</version>
</dependency>

服务提供者配置文件

server.port=6606
spring.application.name=provider-service
#Dubbo
dubbo.application.name=provider-service
# dubbo的注册中心协议类型
dubbo.registry.protocol=zookeeper
# zookeeper的地址
dubbo.registry.address=zookeeper://192.168.223.223:2181
# dubbo的协议配置
dubbo.protocol.name=dubbo
dubbo.protocol.port=20880
# dubbo扫描接口的包
dubbo.scan.base-packages=com.blb.provider_service.service

服务提供者实现接口

package com.blb.provider_service.service;

import com.blb.api.HelloService;
import org.apache.dubbo.config.annotation.Service;

/**
 * 接口实现 
 * @Service是Dubbo的注解
 */
@Service(version = "1.0.0",interfaceClass = HelloService.class)
public class HelloServiceImpl implements HelloService {
    @Override
    public String hello(String name) {
        return "Hello,我是服务提供者!" + name;
    }
}

服务消费者的配置文件

server.port=7707
dubbo.application.name=consumer-service
# 服务消费者注册同一个Zookeeper
dubbo.registry.protocol=zookeeper
dubbo.registry.address=zookeeper://192.168.223.223:2181

服务消费者调用服务

@RestController
public class HelloController {

    //实现远程方法调用 RPC
    @Reference(version = "1.0.0")
    private HelloService helloService;

    @RequestMapping("/hello")
    public String hello(String name){
        //调用远程方法
        return helloService.hello(name);
    }
}

手撸RPC框架

案例简介

在前面的案例中使用了Dubbo实现RPC,如果想要更加深入的掌握Dubbo实现RPC底层原理,可以自己完成一个简单的RPC框架

案例使用的技术栈:

  • Netty 基于NIO的网络通信框架,Dubbo底层也是使用此框架
  • Zookeeper 服务注册和发现
  • Protobuf 轻量级的序列化框架,实现通信协议的序列化

基本流程

在这里插入图片描述

执行流程:

  1. 服务端实现被调用的接口,启动后注册地址到Zookeeper上
  2. 客户端调用接口
  3. 接下来的步骤要对用户透明,就像调用本地接口,所以要生成动态代理
  4. 将接口类型、方法、参数等封装成请求协议,使用Protobuf序列化工具序列化
  5. 使用Netty编码器对请求进行编码
  6. 从Zookeeper中查询服务端的IP和端口
  7. 通过Netty发送网络请求
  8. 服务端接收Netty的网络请求
  9. 使用Netty解码器对请求进行解码
  10. 通过Protobuf反序列化获得请求协议
  11. 读取请求协议中的接口类型、方法和参数,通过反射调用服务端接口
  12. 客户端获得返回的结果

** RPC就是将3~11步骤封装起来,对用户透明,减少网络程序调用的复杂性 **

项目结构

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-W0ZVKxJH-1670831166763)(RPC.assets/image-20221212145722155.png)]

依赖坐标

<!-- SLF4J -->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.7</version>
</dependency>

<!-- Netty -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>5.0.0.Alpha1</version>
</dependency>

<!-- protostuff -->
<dependency>
    <groupId>io.protostuff</groupId>
    <artifactId>protostuff-core</artifactId>
    <version>1.6.0</version>
</dependency>
<dependency>
    <groupId>io.protostuff</groupId>
    <artifactId>protostuff-runtime</artifactId>
    <version>1.6.0</version>
</dependency>

<!-- ZooKeeper -->
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.6</version>
</dependency>

<!-- Apache Commons Collections -->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-collections4</artifactId>
    <version>4.0</version>
</dependency>

<!-- Objenesis -->
<dependency>
    <groupId>org.objenesis</groupId>
    <artifactId>objenesis</artifactId>
    <version>2.1</version>
</dependency>

<!-- CGLib -->
<dependency>
    <groupId>cglib</groupId>
    <artifactId>cglib</artifactId>
    <version>3.1</version>
</dependency>

通用代码

common/service/IDiff

package com.xray.rpc.common.service;

/**
 * 用于测试的减法接口
 */
public interface IDiff {
    double diff(double a, double b);
}

common/service/ISum

package com.xray.rpc.common.service;

/**
 * 用于测试的加法接口
 */
public interface ISum {
    public int sum(int a, int b);
}

common/registry/Constant

package com.xray.rpc.common.registry;

/**
 * 常量接口
 */
public interface Constant {

    //Zookeeper连接超时
    int ZK_SESSION_TIMEOUT = 10000;
    //zk地址
    String ZK_CONNECT = "127.0.0.1:2181";
    //zk保存路径
    String ZK_REGISTRY_PATH = "/registry";
    //zk保存节点
    String ZK_DATA_PATH = ZK_REGISTRY_PATH + "/data";
    //ip和端口的分隔符号
    String ZK_IP_SPLIT = ":";
}

common/registry/ServiceDiscovery

package com.xray.rpc.common.registry;

import io.netty.util.internal.ThreadLocalRandom;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * 服务发现:连接ZK,添加watch事件
 */
public class ServiceDiscovery {

    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class);

    private final CountDownLatch latch = new CountDownLatch(1);

    private volatile List<String> dataList = new ArrayList<>();

    private final String registryAddress;

    public ServiceDiscovery(String registryAddress) {
        this.registryAddress = registryAddress;
        //连接Zookeeper
        ZooKeeper zk = connectServer();
        if (zk != null) {
            //监听registry/data节点
            watchNode(zk);
        }
    }

    /**
     * 读取Zookeeper中的服务端地址
     * @return
     */
    public String discover() {
        String data = null;
        int size = dataList.size();
        if (size > 0) {
            if (size == 1) {
                data = dataList.get(0);
                LOGGER.debug("using only data: {}", data);
            } else {
                data = dataList.get(ThreadLocalRandom.current().nextInt(size));
                LOGGER.debug("using random data: {}", data);
            }
        }
        return data;
    }

    /**
     * 连接Zookeeper服务
     * @return
     */
    private ZooKeeper connectServer() {
        ZooKeeper zk = null;
        try {
            zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        latch.countDown();
                    }
                }
            });
            latch.await();
        } catch (IOException | InterruptedException e) {
            LOGGER.error("", e);
        }
        return zk;
    }

    /**
     * 监听节点修改
     * @param zk
     */
    private void watchNode(final ZooKeeper zk) {
        try {
            //给注册节点添加监听器
            List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getType() == Event.EventType.NodeChildrenChanged) {
                        watchNode(zk);
                    }
                }
            });
            //一旦节点发生改变,就读取地址到dataList中
            List<String> dataList = new ArrayList<>();
            for (String node : nodeList) {
                byte[] bytes = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false, null);
                dataList.add(new String(bytes));
            }
            LOGGER.debug("node data: {}", dataList);
            this.dataList = dataList;
        } catch (KeeperException | InterruptedException e) {
            LOGGER.error("", e);
        }
    }
}

common/registry/ServiceRegistry

package com.xray.rpc.common.registry;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * 连接ZK注册中心,创建服务注册目录
 */
public class ServiceRegistry {

    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry.class);

    private final CountDownLatch latch = new CountDownLatch(1);

    private ZooKeeper zk;


    public ServiceRegistry() {
    }

    /**
     * 注册服务端地址到Zookeeper中
     * @param data
     */
    public void register(String data) {
        if (data != null) {
            zk = connectServer();
            if (zk != null) {
                try {
                    Stat stat = zk.exists(Constant.ZK_REGISTRY_PATH, false);
                    if (stat == null) {
                        zk.create(Constant.ZK_REGISTRY_PATH, new byte[]{}, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                    }
                    stat = zk.exists(Constant.ZK_DATA_PATH, false);
                    if (stat == null) {
                        zk.create(Constant.ZK_DATA_PATH, new byte[]{}, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                    }
                    //保存服务端地址
                    zk.setData(Constant.ZK_DATA_PATH, data.getBytes(), -1);
                    LOGGER.info("create zookeeper node ({} => {})", Constant.ZK_DATA_PATH, data);
                } catch (InterruptedException | KeeperException e) {
                    LOGGER.error("Connect Zookeeper Error {}", e);
                }
            }
        }
    }

    /**
     * 连接Zookeeper
     * @return
     */
    private ZooKeeper connectServer() {
        ZooKeeper zk = null;
        try {
            zk = new ZooKeeper(Constant.ZK_CONNECT, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    // 判断是否已连接ZK,连接后计数器递减.
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        latch.countDown();
                    }
                }
            });
            // 若计数器不为0,则等待.
            latch.await();
        } catch (IOException | InterruptedException e) {
            LOGGER.error("Connect Zookeeper Error {}", e);
        }
        return zk;
    }
}

common/codec/RPCRequest

package com.xray.rpc.common.codec;

/**
 * 请求协议
 */
public class RPCRequest {
    //请求id
    private String requestId;
    //接口类型
    private String className;
    //方法名
    private String methodName;
    //方法参数
    private Class<?>[] parameterTypes;
    //参数值
    private Object[] parameters;

    public String getRequestId() {
        return requestId;
    }

    public void setRequestId(String requestId) {
        this.requestId = requestId;
    }

    public String getClassName() {
        return className;
    }

    public void setClassName(String className) {
        this.className = className;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Class<?>[] getParameterTypes() {
        return parameterTypes;
    }

    public void setParameterTypes(Class<?>[] parameterTypes) {
        this.parameterTypes = parameterTypes;
    }

    public Object[] getParameters() {
        return parameters;
    }

    public void setParameters(Object[] parameters) {
        this.parameters = parameters;
    }
}

common/codec/RPCResponse

package com.xray.rpc.common.codec;

/**
 * 请求响应
 */
public class RPCResponse {
    //请求id
    private String requestId;
    //错误对象
    private Throwable error;
    //返回值
    private Object result;

    public String getRequestId() {
        return requestId;
    }

    public void setRequestId(String requestId) {
        this.requestId = requestId;
    }

    public Throwable getError() {
        return error;
    }

    public void setError(Throwable error) {
        this.error = error;
    }

    public Object getResult() {
        return result;
    }

    public void setResult(Object result) {
        this.result = result;
    }
}

common/codec/SerializationUtil

package com.xray.rpc.common.codec;

import io.protostuff.LinkedBuffer;
import io.protostuff.ProtostuffIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;
import org.objenesis.Objenesis;
import org.objenesis.ObjenesisStd;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 基于Protobuff的序列化工具类
 */
public class SerializationUtil {

    private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<>();

    private static Objenesis objenesis = new ObjenesisStd(true);

    private SerializationUtil() {
    }

    /**
     * 返回协议类型
     */
    @SuppressWarnings("unchecked")
    private static <T> Schema<T> getSchema(Class<T> cls) {
        Schema<T> schema = (Schema<T>) cachedSchema.get(cls);
        if (schema == null) {
            schema = RuntimeSchema.createFrom(cls);
            if (schema != null) {
                cachedSchema.put(cls, schema);
            }
        }
        return schema;
    }

    /**
     * 序列化方法
     */
    @SuppressWarnings("unchecked")
    public static <T> byte[] serialize(T obj) {
        Class<T> cls = (Class<T>) obj.getClass();
        LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
        try {
            Schema<T> schema = getSchema(cls);
            return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        } finally {
            buffer.clear();
        }
    }

    /**
     * 反序列化方法
     */
    public static <T> T deserialize(byte[] data, Class<T> cls) {
        try {
            T message = (T) objenesis.newInstance(cls);
            Schema<T> schema = getSchema(cls);
            ProtostuffIOUtil.mergeFrom(data, message, schema);
            return message;
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }
}

common/codec/RPCDecoder

package com.xray.rpc.common.codec;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/**
 * 请求解码器
 */
public class RPCDecoder extends ByteToMessageDecoder {

    //泛型类型
    private Class<?> genericClass;

    public RPCDecoder(Class<?> genericClass) {
        this.genericClass = genericClass;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        final int length = in.readableBytes();
        final byte[] bytes = new byte[length];
        //读取字节
        in.readBytes(bytes, 0, length);
        //反序列化读取请求协议
        Object obj = SerializationUtil.deserialize(bytes, genericClass);
        out.add(obj);
    }
}

common/codec/RPCEncoder

package com.xray.rpc.common.codec;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**
 * 请求编码器
 */
public class RPCEncoder extends MessageToByteEncoder<Object> {

    //泛型类型
    private Class<?> genericClass;

    public RPCEncoder(Class<?> genericClass) {
        this.genericClass = genericClass;
    }

    @Override
    public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
        if (genericClass.isInstance(in)) {
            //序列化请求协议
            byte[] data = SerializationUtil.serialize(in);
            //发送字节
            out.writeBytes(data);
        }
    }
}

服务端代码

server/impl/DiffImpl

package com.xray.rpc.server.impl;

import com.xray.rpc.common.service.IDiff;

/**
 * 接口实现 减法
 */
public class DiffImpl implements IDiff {
    @Override
    public double diff(double a, double b) {
        return a - b;
    }
}

server/impl/SumImpl

package com.xray.rpc.server.impl;

import com.xray.rpc.common.service.ISum;

/**
 * 加法实现
 */
public class SumImpl implements ISum {
    @Override
    public int sum(int a, int b) {
        return a + b;
    }
}

rpc/server/RPCServer

package com.xray.rpc.server;

/**
 * RPC服务端
 */
public class RPCServer {

    /**
     * 将可调用的接口名和实现类包装到Map集合中
     * @return
     */
    private Map<String, Object> getServices() {
        Map<String, Object> services = new HashMap<String, Object>();
        services.put(ISum.class.getName(), new SumImpl());
        services.put(IDiff.class.getName(), new DiffImpl());
        return services;
    }

    /**
     * 绑定监听某个端口
     * @param port
     */
    private void bind(int port) {
        //创建线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //创建启动对象
            ServerBootstrap b = new ServerBootstrap();
            //配置参数
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    .addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2))
                                    //添加RPC解码器
                                    .addLast(new RPCDecoder(RPCRequest.class))
                                    .addLast(new LengthFieldPrepender(2))
                                    //添加RPC编码器
                                    .addLast(new RPCEncoder(RPCResponse.class))
                                    //添加RPC处理器
                                    .addLast(new RPCServerHandler(getServices()));
                        }
                    });
            //绑定端口
            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    /**
     * 获取本机ip
     * @return
     */
    public static String getAddress(){
        InetAddress host = null;
        try {
            host = InetAddress.getLocalHost();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
        String address = host.getHostAddress();
        return address;
    }

    /**
     * 初始化服务器
     * @param port
     */
    public void initService(int port)  {
        ServiceRegistry serviceRegistry = new ServiceRegistry();
        String ip = getAddress();
        //向zookeeper注册服务地址
        serviceRegistry.register(ip + Constant.ZK_IP_SPLIT+port);
        bind(port);
    }
}

rpc/server/RPCServerHandler

package com.xray.rpc.server;

/**
 * RPC处理器
 */
public class RPCServerHandler extends ChannelHandlerAdapter {

    private static final Logger LOGGER = LoggerFactory.getLogger(RPCServerHandler.class);
    //本地接口Map
    private final Map<String, Object> handlerMap;

    public RPCServerHandler(Map<String, Object> handlerMap) {
        this.handlerMap = handlerMap;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        RPCResponse response = new RPCResponse();
        //解码得到RPC协议
        RPCRequest request = (RPCRequest) msg;
        response.setRequestId(request.getRequestId());
        try {
            //调用本地方法
            Object result = handle(request);
            response.setResult(result);
        } catch (Throwable t) {
            response.setError(t);
        }
        //返回响应结果
        ctx.writeAndFlush(response);
    }

    /**
     * 调用本地方法
     */
    private Object handle(RPCRequest request) throws Throwable {
        //获得接口类型名
        String className = request.getClassName();
        //获得接口类型对应的实现对象
        Object serviceBean = handlerMap.get(className);
        //获得接口类型、方法名、参数、参数值
        Class<?> serviceClass = serviceBean.getClass();
        String methodName = request.getMethodName();
        Class<?>[] parameterTypes = request.getParameterTypes();
        Object[] parameters = request.getParameters();
        FastClass serviceFastClass = FastClass.create(serviceClass);
        //获得方法
        FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
        //调用本地方法
        return serviceFastMethod.invoke(serviceBean, parameters);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        LOGGER.error("server caught exception", cause);
        ctx.close();
    }
}

rpc/server/RPCServiceTest

package com.xray.rpc.server;

public class RPCServiceTest {
    
    public static void main(String[] args) {
        //启动服务器测试
        int port = 9090;
        new RPCServer().initService(port);
    }
}

客户端代码

rpc/client/RPCProxy

package com.xray.rpc.client;

import com.xray.rpc.common.codec.RPCRequest;
import com.xray.rpc.common.codec.RPCResponse;
import com.xray.rpc.common.registry.Constant;
import com.xray.rpc.common.registry.ServiceDiscovery;
import net.sf.cglib.proxy.InvocationHandler;
import net.sf.cglib.proxy.Proxy;

import java.lang.reflect.Method;
import java.util.UUID;

/**
 * RPC动态代理类
 */
public class RPCProxy {
    private String serverAddress;
    private ServiceDiscovery serviceDiscovery;

    public RPCProxy(ServiceDiscovery serviceDiscovery) {
        this.serviceDiscovery = serviceDiscovery;
    }

    /**
     * 创建RPC代理对象
     */
    @SuppressWarnings("unchecked")
    public <T> T create(Class<?> interfaceClass) {
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] { interfaceClass },
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        // 创建并初始化 RPC 请求
                        RPCRequest request = new RPCRequest();
                        request.setRequestId(UUID.randomUUID().toString());
                        request.setClassName(method.getDeclaringClass().getName());
                        request.setMethodName(method.getName());
                        request.setParameterTypes(method.getParameterTypes());
                        request.setParameters(args);
                        // 通过Zookeeper查询服务端地址
                        if (serviceDiscovery != null) {
                            serverAddress = serviceDiscovery.discover();
                        }
                        // 分割出IP和端口  "123.23.213.23:9090"
                        String[] array = serverAddress.split(Constant.ZK_IP_SPLIT);
                        String host = array[0];
                        int port = Integer.parseInt(array[1]);
                        // 初始化 RPC客户端
                        RPCClient client = new RPCClient(host, port);
                        // 发送网络请求
                        RPCResponse response = client.send(request);
                        // 返回结果或错误信息
                        if (response.getError() != null) {
                            throw response.getError();
                        } else {
                            return response.getResult();
                        }
                    }
                });
    }
}

rpc/client/RPCClient

package com.xray.rpc.client;

/**
 * RPC客户端
 */
public class RPCClient {
    private final String host;
    private final int port;
    private final CountDownLatch latch;
    public RPCClient(String host,int port) {
        this.host = host;
        this.port = port;
        this.latch = new CountDownLatch(1);
    }

    /**
     * 发出RPC请求
     * @param request
     * @return
     */
    public RPCResponse send(RPCRequest request){
        //自定义RPC客户端处理器
        RPCClientHandler handler = new RPCClientHandler(request,latch);
        //创建netty启动对象
        EventLoopGroup group = new NioEventLoopGroup();
        RPCResponse response = null;
        try {
            Bootstrap b = new Bootstrap().group(group)
                    //配置参数
                    .channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                    //设置处理器
                    .handler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    .addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2))
                                    //设置RPC解码器
                                    .addLast(new RPCDecoder(RPCResponse.class))
                                    .addLast(new LengthFieldPrepender(2))
                                    //设置RPC编码器
                                    .addLast(new RPCEncoder(RPCRequest.class))
                                    //设置RPC请求处理器
                                    .addLast(handler);
                        }
                    });
            //连接服务端
            ChannelFuture f = b.connect(host, port).sync();
            //阻塞线程,直到处理器读取到服务端数据为止
            latch.await();
            //获得服务端响应
            response = handler.getResponse();
            if(response != null) {
                f.channel().close();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally{
            group.shutdownGracefully();
        }
        return response;
    }
}

rpc/client/RPCClientHandler

package com.xray.rpc.client;

import com.xray.rpc.common.codec.RPCRequest;
import com.xray.rpc.common.codec.RPCResponse;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.util.concurrent.CountDownLatch;

/**
 * 客户端处理器
 */
public class RPCClientHandler extends ChannelHandlerAdapter {

    //请求对象
    private RPCRequest request;
    //响应对象
    private RPCResponse response;
    //RPCClient传来的latch
    private final CountDownLatch latch;

    public RPCClientHandler(RPCRequest request, CountDownLatch latch) {
        this.request = request;
        this.latch = latch;
    }

    public RPCResponse getResponse() {
        return response;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(request);
    }

    //读取服务端消息
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //获得消息,并解码为响应结果
        response = (RPCResponse) msg;
        //让client阻塞的线程继续执行
        latch.countDown();
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.getCause().printStackTrace();
        ctx.close();
    }
}

rpc/client/RPCTest

package com.xray.rpc.client;

import com.xray.rpc.common.service.IDiff;
import com.xray.rpc.common.service.ISum;
import com.xray.rpc.common.registry.Constant;
import com.xray.rpc.common.registry.ServiceDiscovery;

public class RPCTest {

    public static void main(String[] args) {
        //创建动态代理对象
        RPCProxy rpcProxy = new RPCProxy(new ServiceDiscovery(Constant.ZK_CONNECT));
        //创建接口的动态代理
        IDiff diff = rpcProxy.create(IDiff.class);
        //通过动态代理调用远程方法
        double result = diff.diff(1321, 32.2);
        //同上
        ISum sum = rpcProxy.create(ISum.class);
        int result2 = sum.sum(1000, 1000);
        System.out.println(result+":"+result2);
    }
}

总结

通过文章我们了解到RPC是一种计算机的远程调用技术,能够像调用本地方法那样,简单的调用远程服务器上的方法,大大减少了服务器之间数据通信的复杂性。

但是RPC的内部实现是比较复杂的,需要掌握Netty、Zookeeper、Protobuff、动态代理等技术,总而言之是“把简单留给用户,把复杂留给自己”。通过本文最后的案例,我们也大致了解了RPC框架底层的总体逻辑,不过该案例只实现了基本的RPC功能,如果需要像Dubbo那样通过配置注解实现,还需要在后面的版本中进行完善。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/83719.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Uni-app 实现md5加密

写下这篇文章&#xff0c;记录自己走过的坑 第一次尝试&#xff1a;参照博客uniapp使用md5_清雨小竹的博客-CSDN博客_uniapp md5 引入md5.js后&#xff0c;在main.js中import后&#xff0c;无法使用md5.hex_md5("需要加密的字符串")&#xff0c;vue页面无法打开&…

【捕风捉影】Vue项目报错,点击浏览器报错信息定位不到报错代码,该如何优雅地调试代码?

【捕风捉影】Vue项目如何优雅地调试代码一、背景二、调试时开启productionSourceMap三、devtool几种模式一、背景 通过vue-cli服务运行项目&#xff0c;项目运行一切正常。但打包后&#xff0c;通过nginx部署运行&#xff0c;大屏展示模块报echarts typeError 错误。但是点击浏…

如何使用Docker创建自定义网络

目录 网络模式 1.bridge模式(默认模式--桥接模式) 初识网络模式 查看桥接模式的特点 2.host模式&#xff08;仅主机模式&#xff09; 使用守护进程的方式创建并启动且进入容器 查看仅主机模式下的网络配置 端口映射 &#xff1a;​ 3.如何创建自定义网络 网络模式 Docker…

启发式算法 之 模拟退火原理及实践

一、初窥其貌 1.1 启发式算法和元启发式算法 启发式算法是求解优化问题的一类方法&#xff0c;因为经典优化方法存在局限性&#xff0c;有时无法得到最优解&#xff0c;只能得到一个可以接受的近似最优解&#xff0c;启发式算法就适合求解这类问题。启发式算法就是专家的推测…

Redis框架(七):大众点评项目 缓存穿透、缓存击穿、缓存雪崩

大众点评项目 缓存穿透、缓存击穿、缓存雪崩需求&#xff1a;缓存穿透、缓存击穿、缓存雪崩处理策略缓存穿透处理缓存雪崩缓存击穿总结SpringCloud章节复习已经过去&#xff0c;新的章节Redis开始了&#xff0c;这个章节中将会回顾Redis实战项目 大众点评 主要依照以下几个原则…

吉时利Keithley静电计程控上位机软件-摩擦纳米发电机测试软件NS-EM

1、产品简介 NS-EM 静电计程控系统可实现对吉时利静电计的程控&#xff0c;通过此系统软件您可以单独程控静电计进行数据的采集的同时还可以利用告诉信号采集卡对测试获取的电压、电流等信号进行高频率采样并实时显示采集信号的波形图。 2、产品特点 ◆可远程进行仪器控制&am…

QF state machine 介绍

转型Qt小半年了&#xff0c;看到项目组用的Qt state machine signal和匿名函数满天飞&#xff0c;就想之前用的C#里的QF state machine 能不能做转到Qt平台。这样可以省去使用Qt状态机的信号&#xff0c;在这过程中学习借鉴了QF state machine 的鼻祖 QP框架&#xff0c;不知道…

编译原理笔记

第一课&#xff1a; 《编译原理求语法树的短语和直接短语等等》 二义性是什么&#xff1f; 如果最左推导和最右推导的结果不一致&#xff0c;那么说明文法有二义性 短语是什么&#xff1f; 找短语就是找能长叶子的结点&#xff0c;有五个如图圆圈标号1 2 3 4 5 直接短语&#x…

c语言:联合体—union

联合体一.基本认识1.一个联合体的基本样式2.内部成员的访问3.具体的内存分配二.大小端对联合体的影响三.一个问题一.基本认识 1.一个联合体的基本样式 看得出来其实跟我们定义结构体是一样的&#xff08;如果还不大了解结构体的可以看看这篇博客什么是结构体&#xff09;&…

[附源码]计算机毕业设计港口集团仓库管理系统Springboot程序

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; Springboot mybatis MavenVue等等组成&#xff0c;B/S模式…

MoveIT1 Assistant 总结

文章目录环境步骤备注故障问题解决参考接上一篇&#xff0c;生成URDF后&#xff0c;在MoveIT Assistant生成配置用于运动规划。https://blog.csdn.net/woshigaowei5146/article/details/128237105?spm1001.2014.3001.5501 环境 Ubuntu20.04&#xff1b;ROS1 Noetic;VMware 步…

生成模型(一):GAN

生成对抗网络 (GAN)在许多生成任务中显示出很好的结果&#xff0c;以复制真实世界的丰富内容&#xff0c;例如图像、文字和语音。它受到博弈论的启发&#xff1a;一个生成器和一个判别器&#xff0c;在互相竞争的同时让彼此变得更强大。然而&#xff0c;训练 GAN 模型相当具有挑…

一篇解析Linux paging_init

说明&#xff1a; Kernel版本&#xff1a;4.14ARM64处理器&#xff0c;Contex-A53&#xff0c;双核使用工具&#xff1a;Source Insight 3.5&#xff0c; Visio 1. 介绍 从详细讲解Linux物理内存初始化中&#xff0c;可知在paging_init调用之前&#xff0c;存放Kernel Image和…

java计算机毕业设计ssm幼儿英语学习平台的设计与实现yofnu(附源码、数据库)

java计算机毕业设计ssm幼儿英语学习平台的设计与实现yofnu&#xff08;附源码、数据库&#xff09; 项目运行 环境配置&#xff1a; Jdk1.8 Tomcat8.5 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支…

注册微信小程序

文章目录1. 项目结构2. 页面组成3. json配置文件4. 认识页面5. WXML6. WXSS7. js文件8. 宿主环境9. 组件10. API11. 协同工作与发布跟公众号平台不共用一个账号&#xff0c;需要用其它邮箱另行注册&#xff0c;填写身份证信息&#xff08;姓名、身份证号码&#xff09;&#xf…

[附源码]Node.js计算机毕业设计电商后台管理系统Express

项目运行 环境配置&#xff1a; Node.js最新版 Vscode Mysql5.7 HBuilderXNavicat11Vue。 项目技术&#xff1a; Express框架 Node.js Vue 等等组成&#xff0c;B/S模式 Vscode管理前后端分离等等。 环境需要 1.运行环境&#xff1a;最好是Nodejs最新版&#xff0c;我…

我妈眼中我的房间

ONE This is Me in My room as a teenager according to My Mother. 在我老妈眼里&#xff0c;这就是十几岁待在房间里的我。 ✨ 评论区 1️⃣ It’s all true. 这都是真的。 2️⃣ As a father - yep that’s you. 作为一个父亲&#xff0c;没错就是你。 3️⃣ Looking…

基于C语言开发(控制台)通讯录管理程序【100010030】

通讯录程序设计 一、课程设计题目与要求 题目 &#xff1a;通讯录管理程序 1. 问题描述 ​ 编写一个简单的通讯录管理程序。通讯录记录有姓名&#xff0c;地址(省、市(县)、街道)&#xff0c;电话号码&#xff0c;邮政编码等四项。 2. 基本要求 程序应提供的基本基本管理…

Biotin-PEG-Pyrene,Pyrene-PEG-Biotin,芘丁酸-PEG-生物素peg化芘衍生物

聚乙二醇化芘衍生物之Pyrene-PEG-Biotin&#xff08;Biotin-PEG-Pyrene&#xff09;&#xff0c;其化学试剂的中文名为芘丁酸-聚乙二醇-生物素&#xff0c;此试剂可用于碳纳米管和石墨烯表面功能化。它所属分类为Biotin PEG Pyrene PEG。 peg试剂的分子量均可定制&#xff0c;…

机器学习实战教程(二):决策树基础篇

一、决策树 决策树是什么&#xff1f;决策树(decision tree)是一种基本的分类与回归方法。举个通俗易懂的例子&#xff0c;如下图所示的流程图就是一个决策树&#xff0c;长方形代表判断模块(decision block)&#xff0c;椭圆形成代表终止模块(terminating block)&#xff0c;表…