简易版 RPC 框架实现 2.0 -netty实现

news2024/11/24 19:08:25

这一篇理解如果有难度,可能对netty不是很理解, 可以关注我netty专栏,还有另外一篇: 用 Netty 自己实现简单的RPC, 这一篇是学习netty的时候写的,更倾向于分析netty相关的知识, 今天我是学习dubbo,从一个rpc框架进行思考写的一篇文章

RPC 是“远程过程调用(Remote Procedure Call)”的缩写形式,比较通俗的解释是:像本地方法调用一样调用远程的服务。虽然 RPC 的定义非常简单,但是相对完整的、通用的 RPC 框架涉及很多方面的内容,例如注册发现、服务治理、负载均衡、集群容错、RPC 协议等,如下图所示:
在这里插入图片描述

简易 RPC 框架的架构图

本课时我们主要实现RPC 框架的基石部分——远程调用,简易版 RPC 框架一次远程调用的核心流程是这样的:

  1. Client 首先会调用本地的代理,也就是图中的 Proxy。
  2. Client 端 Proxy 会按照协议(Protocol),将调用中传入的数据序列化成字节流。
  3. 之后 Client 会通过网络,将字节数据发送到 Server 端。
  4. Server 端接收到字节数据之后,会按照协议进行反序列化,得到相应的请求信息。
  5. Server 端 Proxy 会根据序列化后的请求信息,调用相应的业务逻辑。
  6. Server 端业务逻辑的返回值,也会按照上述逻辑返回给 Client 端。

这个远程调用的过程,就是我们简易版本 RPC 框架的核心实现,只有理解了这个流程,才能进行后续的开发。

项目结构

了解了简易版 RPC 框架的工作流程和实现目标之后,我们再来看下项目的结构,为了方便起见,这里我们将整个项目放到了一个 Module 中了,如下图所示,你可以按照自己的需求进行模块划分。

在这里插入图片描述
那这各个包的功能是怎样的呢?我们就来一一说明。

  • protocol:简易版 RPC 框架的自定义协议。
  • serialization:提供了自定义协议对应的序列化、反序列化的相关工具类。
  • codec:提供了自定义协议对应的编码器和解码器。
  • transport:基于 Netty 提供了底层网络通信的功能,其中会使用到 codec 包中定义编码器和解码器,以及 serialization 包中的序列化器和反序列化器。
  • registry:基于 ZooKeeper 和 Curator 实现了简易版本的注册中心功能。
  • proxy:使用 JDK 动态代理实现了一层代理。

自定义协议

当前已经有很多成熟的协议了,例如 HTTP、HTTPS 等,那为什么我们还要自定义 RPC 协议呢?

从功能角度考虑,HTTP 协议在 1.X 时代,只支持半双工传输模式,虽然支持长连接,但是不支持服务端主动推送数据。从效率角度来看,在一次简单的远程调用中,只需要传递方法名和加个简单的参数,此时,HTTP 请求中大部分数据都被 HTTP Header 占据,真正的有效负载非常少,效率就比较低。

当然,HTTP 协议也有自己的优势,例如,天然穿透防火墙,大量的框架和开源软件支持 HTTP 接口,而且配合 REST 规范使用也是很便捷的,所以有很多 RPC 框架直接使用 HTTP 协议,尤其是在 HTTP 2.0 之后,如 gRPC、Spring Cloud 等。

这里我们自定义一个简易版的 Demo RPC 协议,如下图所示:

在这里插入图片描述
在 Demo RPC 的消息头中,包含了整个 RPC 消息的一些控制信息,例如,版本号、魔数、消息类型、附加信息、消息 ID 以及消息体的长度,在附加信息(extraInfo)中,按位进行划分,分别定义消息的类型、序列化方式、压缩方式以及请求类型。当然,你也可以自己扩充 Demo RPC 协议,实现更加复杂的功能。

Demo RPC 消息头对应的实体类是 Header,其定义如下:

public class Header {

    private short magic; // 魔数
    private byte version; // 版本号
    private byte extraInfo; // 附加信息
    private Long messageId; // 消息ID
    private Integer size; // 消息体长度

确定了 Demo RPC 协议消息头的结构之后,我们再来看 Demo RPC 协议消息体由哪些字段构成,这里我们通过 Request 和 Response 两个实体类来表示请求消息和响应消息的消息体:

public class Request implements Serializable {

    private String serviceName; // 请求的Service类名

    private String methodName; // 请求的方法名称

    private Class[] argTypes; // 请求方法的参数类型

    private Object[] args; // 请求方法的参数
}
public class Response implements Serializable {

    private int code = 0; // 响应的错误码,正常响应为0,非0表示异常响应

    private String errMsg; // 异常信息

    private Object result; // 响应结果
}

注意,Request 和 Response 对象是要进行序列化的,需要实现 Serializable 接口。为了让这两个类的对象能够在 Client 和 Server 之间跨进程传输,需要进行序列化和反序列化操作,这里定义一个 Serialization 接口,统一完成序列化相关的操作:

public interface Serialization {
    <T> byte[] serialize(T obj) throws IOException;

    <T> T deserialize(byte[] data, Class<T> clz) throws IOException;
}

在 Demo RPC 中默认使用 Hessian 序列化方式,下面的 HessianSerialization 就是基于 Hessian 序列化方式对 Serialization 接口的实现:

public class HessianSerialization implements Serialization {
    @Override
    public byte[] serialize(Object data) throws IOException {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        Hessian2Output out = new Hessian2Output(bos);
        out.writeObject(data);
        out.flush();
        return bos.toByteArray();
    }

    public <T> T deserialize(byte[] data, Class<T> clz) throws IOException {
        Hessian2Input input = new Hessian2Input(new ByteArrayInputStream(data));
        return (T) input.readObject(clz);
    }
}

在有的场景中,请求或响应传输的数据比较大,直接传输比较消耗带宽,所以一般会采用压缩后再发送的方式。在前面介绍的 Demo RPC 消息头中的 extraInfo 字段中,就包含了标识消息体压缩方式的 bit 位。这里我们定义一个 Compressor 接口抽象所有压缩算法:

public interface Compressor {
  byte[] compress(byte[] array) throws IOException;
  byte[] unCompress(byte[] array) throws IOException;
}

同时提供了一个基于 Snappy 压缩算法的实现,作为 Demo RPC 的默认压缩算法:

public class SnappyCompressor implements Compressor {
  public byte[] compress(byte[] array) throws IOException {
      if (array == null) { return null; }
      return Snappy.compress(array);
  }
  public byte[] unCompress(byte[] array) throws IOException {
      if (array == null) { return null; }
      return Snappy.uncompress(array);
  }
}

编解码实现

了解了自定义协议的结构之后,我们再来解决协议的编解码问题。

前面课时介绍 Netty 核心概念的时候我们提到过,Netty 每个 Channel 绑定一个 ChannelPipeline,并依赖 ChannelPipeline 中添加的 ChannelHandler 处理接收到(或要发送)的数据,其中就包括字节到消息(以及消息到字节)的转换。Netty 中提供了 ByteToMessageDecoder、 MessageToByteEncoder、MessageToMessageEncoder、MessageToMessageDecoder 等抽象类来实现 Message 与 ByteBuf 之间的转换以及 Message 之间的转换,如下图所示:

在这里插入图片描述
Netty 提供的 Decoder 和 Encoder 实现

在 Netty 的源码中,我们可以看到对很多已有协议的序列化和反序列化都是基于上述抽象类实现的,例如,HttpServerCodec 中通过依赖 HttpServerRequestDecoder 和 HttpServerResponseEncoder 来实现 HTTP 请求的解码和 HTTP 响应的编码。如下图所示,HttpServerRequestDecoder 继承自 ByteToMessageDecoder,实现了 ByteBuf 到 HTTP 请求之间的转换;HttpServerResponseEncoder 继承自 MessageToMessageEncoder,实现 HTTP 响应到其他消息的转换(其中包括转换成 ByteBuf 的能力)。

在这里插入图片描述
Netty 中 HTTP 协议的 Decoder 和 Encoder 实现

在简易版 RPC 框架中,我们的自定义请求暂时没有 HTTP 协议那么复杂,只要简单继承 ByteToMessageDecoder 和 MessageToMessageEncoder 即可。

首先来看 DemoRpcDecoder,它实现了 ByteBuf 到 Demo RPC Message 的转换,具体实现如下:

public class DemoRpcDecoder extends ByteToMessageDecoder {

    protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
        if (byteBuf.readableBytes() < Constants.HEADER_SIZE) {
            return; // 不到16字节的话无法解析消息头,暂不读取
        }
        // 记录当前readIndex指针的位置,方便重置
        byteBuf.markReaderIndex();
        // 尝试读取消息头的魔数部分
        short magic = byteBuf.readShort();
        if (magic != Constants.MAGIC) { // 魔数不匹配会抛出异常
            byteBuf.resetReaderIndex(); // 重置readIndex指针
            throw new RuntimeException("magic number error:" + magic);
        }
        // 依次读取消息版本、附加信息、消息ID以及消息体长度四部分
        byte version = byteBuf.readByte();
        byte extraInfo = byteBuf.readByte();
        long messageId = byteBuf.readLong();
        int size = byteBuf.readInt();
        Object body = null;
        // 心跳消息是没有消息体的,无需读取
        if (!Constants.isHeartBeat(extraInfo)) {
            // 对于非心跳消息,没有积累到足够的数据是无法进行反序列化的
            if (byteBuf.readableBytes() < size) {
                byteBuf.resetReaderIndex();
                return;
            }
            // 读取消息体并进行反序列化
            byte[] payload = new byte[size];
            byteBuf.readBytes(payload);
            // 这里根据消息头中的extraInfo部分选择相应的序列化和压缩方式
            Serialization serialization = SerializationFactory.get(extraInfo);
            Compressor compressor = CompressorFactory.get(extraInfo);
            if (Constants.isRequest(extraInfo)) {
                // 得到消息体
                body = serialization.deserialize(compressor.unCompress(payload),
                        Request.class);
            } else {
                // 得到消息体
                body = serialization.deserialize(compressor.unCompress(payload),
                        Response.class);
            }
        }
        // 将上面读取到的消息头和消息体拼装成完整的Message并向后传递
        Header header = new Header(magic, version, extraInfo, messageId, size);
        Message message = new Message(header, body);
        out.add(message);
    }
}
public class DemoRpcEncoder extends MessageToByteEncoder<Message> {

    @Override
    protected void encode(ChannelHandlerContext ctx,
                          Message message, ByteBuf byteBuf) throws Exception {
        Header header = message.getHeader();
        // 依次序列化消息头中的魔数、版本、附加信息以及消息ID
        byteBuf.writeShort(header.getMagic());
        byteBuf.writeByte(header.getVersion());
        byteBuf.writeByte(header.getExtraInfo());
        byteBuf.writeLong(header.getMessageId());
        Object content = message.getContent();
        if (Constants.isHeartBeat(header.getExtraInfo())) {
            byteBuf.writeInt(0); // 心跳消息,没有消息体,这里写入0
            return;
        }
        // 按照extraInfo部分指定的序列化方式和压缩方式进行处理
        Serialization serialization = SerializationFactory.get(header.getExtraInfo());
        Compressor compressor = CompressorFactory.get(header.getExtraInfo());
        byte[] payload = compressor.compress(serialization.serialize(content));
        byteBuf.writeInt(payload.length); // 写入消息体长度
        byteBuf.writeBytes(payload); // 写入消息体
    }
}

transport 相关实现

正如前文介绍 Netty 线程模型的时候提到,我们不能在 Netty 的 I/O 线程中执行耗时的业务逻辑。在 Demo RPC 框架的 Server 端接收到请求时,首先会通过上面介绍的 DemoRpcDecoder 反序列化得到请求消息,之后我们会通过一个自定义的 ChannelHandler(DemoRpcServerHandler)将请求提交给业务线程池进行处理。

在 Demo RPC 框架的 Client 端接收到响应消息的时候,也是先通过 DemoRpcDecoder 反序列化得到响应消息,之后通过一个自定义的 ChannelHandler(DemoRpcClientHandler)将响应返回给上层业务。

DemoRpcServerHandler 和 DemoRpcClientHandler 都继承自 SimpleChannelInboundHandler,如下图所示:
在这里插入图片描述
下面我们就来看一下这两个自定义的 ChannelHandler 实现:

public class DemoRpcServerHandler extends SimpleChannelInboundHandler<Message<Request>> {

    // 业务线程池
    private static Executor executor = Executors.newCachedThreadPool();

    @Override
    protected void channelRead0(final ChannelHandlerContext channelHandlerContext, Message<Request> message) throws Exception {
        byte extraInfo = message.getHeader().getExtraInfo();
        if (Constants.isHeartBeat(extraInfo)) { // 心跳消息,直接返回即可
            channelHandlerContext.writeAndFlush(message);
            return;
        }
        // 非心跳消息,直接封装成Runnable提交到业务线程池
        executor.execute(new InvokeRunnable(message, channelHandlerContext));
    }
}
public class DemoRpcClientHandler extends SimpleChannelInboundHandler<Message<Response>> {

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Message<Response> message) throws Exception {
        NettyResponseFuture responseFuture =
                Connection.IN_FLIGHT_REQUEST_MAP.remove(message.getHeader().getMessageId());
        Response response = message.getContent();
        // 心跳消息特殊处理
        if (response == null && Constants.isHeartBeat(message.getHeader().getExtraInfo())) {
            response = new Response();
            response.setCode(Constants.HEARTBEAT_CODE);
        }
        responseFuture.getPromise().setSuccess(response.getResult());
    }
}

注意,这里有两个点需要特别说明一下。一个点是 Server 端的 InvokeRunnable,在这个 Runnable 任务中会根据请求的 serviceName、methodName 以及参数信息,调用相应的方法:

class InvokeRunnable implements Runnable {

    private ChannelHandlerContext ctx;
    private Message<Request> message;

    public InvokeRunnable(Message<Request> message, ChannelHandlerContext ctx) {
        this.message = message;
        this.ctx = ctx;
    }

    @Override
    public void run() {
        Response response = new Response();
        Object result = null;
        try {
            Request request = message.getContent();
            String serviceName = request.getServiceName();
            // 这里提供BeanManager对所有业务Bean进行管理,其底层在内存中维护了
            // 一个业务Bean实例的集合。感兴趣的同学可以尝试接入Spring等容器管
            // 理业务Bean
            Object bean = BeanManager.getBean(serviceName);
            // 下面通过反射调用Bean中的相应方法
            Method method = bean.getClass().getMethod(request.getMethodName(), request.getArgTypes());
            result = method.invoke(bean, request.getArgs());
        } catch (Exception e) {
            // 省略异常处理
        } finally {
        }
        Header header = message.getHeader();
        header.setExtraInfo((byte) 1);
        response.setResult(result); // 设置响应结果
        // 将响应消息返回给客户端
        ctx.writeAndFlush(new Message(header, response));
    }

}

另一个点是 Client 端的 Connection,它是用来暂存已发送出去但未得到响应的请求,这样,在响应返回时,就可以查找到相应的请求以及 Future,从而将响应结果返回给上层业务逻辑,具体实现如下:

public class Connection implements Closeable {
  private static AtomicLong ID_GENERATOR = new AtomicLong(0);
  public static Map<Long, NettyResponseFuture<Response>> 
      IN_FLIGHT_REQUEST_MAP = new ConcurrentHashMap<>();
  private ChannelFuture future;
  private AtomicBoolean isConnected = new AtomicBoolean();
  public Connection(ChannelFuture future, boolean isConnected) {
      this.future = future;
      this.isConnected.set(isConnected);
  }
  public NettyResponseFuture<Response> request(Message<Request> message, long timeOut) {
      // 生成并设置消息ID
      long messageId = ID_GENERATOR.incrementAndGet();
      message.getHeader().setMessageId(messageId);
      // 创建消息关联的Future
      NettyResponseFuture responseFuture = new NettyResponseFuture(System.currentTimeMillis(),
              timeOut, message, future.channel(), new DefaultPromise(new DefaultEventLoop()));
      // 将消息ID和关联的Future记录到IN_FLIGHT_REQUEST_MAP集合中
      IN_FLIGHT_REQUEST_MAP.put(messageId, responseFuture);
      try {
          future.channel().writeAndFlush(message); // 发送请求
      } catch (Exception e) {
          // 发送请求异常时,删除对应的Future
          IN_FLIGHT_REQUEST_MAP.remove(messageId);
          throw e;
      }
      return responseFuture;
  }
  // 省略getter/setter以及close()方法
}

我们可以看到,Connection 中没有定时清理 IN_FLIGHT_REQUEST_MAP 集合的操作,在无法正常获取响应的时候,就会导致 IN_FLIGHT_REQUEST_MAP 不断膨胀,最终 OOM。你也可以添加一个时间轮定时器,定时清理过期的请求消息,这里我们就不再展开讲述了。

完成自定义 ChannelHandler 的编写之后,我们需要再定义两个类—— DemoRpcClient 和 DemoRpcServer,分别作为 Client 和 Server 的启动入口。DemoRpcClient 的实现如下:

public class DemoRpcClient implements Closeable {

    protected Bootstrap clientBootstrap;
    protected EventLoopGroup group;
    private String host;
    private int port;

    public DemoRpcClient(String host, int port) {
        this.host = host;
        this.port = port;
        // 创建并配置客户端Bootstrap
        clientBootstrap = new Bootstrap();
        group = NettyEventLoopFactory.eventLoopGroup(Constants.DEFAULT_IO_THREADS, "NettyClientWorker");
        clientBootstrap.group(group)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .channel(NioSocketChannel.class) // 创建的Channel类型
                // 指定ChannelHandler的顺序
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast("demo-rpc-encoder", new DemoRpcEncoder());
                        ch.pipeline().addLast("demo-rpc-decoder", new DemoRpcDecoder());
                        ch.pipeline().addLast("client-handler", new DemoRpcClientHandler());
                    }
                });
    }


    public ChannelFuture connect() {
        // 连接指定的地址和端口
        ChannelFuture connect = clientBootstrap.connect(host, port);
        connect.awaitUninterruptibly();
        return connect;
    }

    @Override
    public void close() {
        group.shutdownGracefully();
    }
}

通过 DemoRpcClient 的代码我们可以看到其 ChannelHandler 的执行顺序如下:
在这里插入图片描述
客户端 ChannelHandler 结构图

另外,在创建EventLoopGroup时并没有直接使用NioEventLoopGroup,而是在 NettyEventLoopFactory 中根据当前操作系统进行选择,对于 Linux 系统,会使用 EpollEventLoopGroup,其他系统则使用 NioEventLoopGroup。

接下来我们再看DemoRpcServer 的具体实现:

public class DemoRpcServer {

    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    private ServerBootstrap serverBootstrap;
    private Channel channel;
    protected int port;

    public DemoRpcServer(int port) throws InterruptedException {
        this.port = port;
        // 创建boss和worker两个EventLoopGroup,注意一些小细节,
        // workerGroup 是按照中的线程数是按照 CPU 核数计算得到的
        bossGroup = NettyEventLoopFactory.eventLoopGroup(1,
                "NettyServerBoss");
        workerGroup = NettyEventLoopFactory.eventLoopGroup(
                Math.min(Runtime.getRuntime().availableProcessors() + 1, 32),
                "NettyServerWorker");
        serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                // 指定每个Channel上注册的ChannelHandler以及顺序
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(
                        new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast("demp-rpc-decoder", new DemoRpcDecoder());
                                ch.pipeline().addLast("demo-rpc-encoder", new DemoRpcEncoder());
                                ch.pipeline().addLast("server-handler", new DemoRpcServerHandler());
                            }
                        });
    }

    public ChannelFuture start() throws InterruptedException {
        // 监听指定的端口
        ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (channelFuture.isSuccess()) {
                    System.out.println("监听端口 6668 成功");
                } else {
                    System.out.println("监听端口 6668 失败");
                }
            }
        });

        channel = channelFuture.channel();
        channel.closeFuture().sync();
        return channelFuture;
    }


    public void startAndWait() throws InterruptedException {
        try {
            channel.closeFuture().await();
        } catch (InterruptedException e) {
            Thread.interrupted();
        }
    }


    public void shutdown() throws InterruptedException {
        channel.close().sync();
        if (bossGroup != null)
            bossGroup.shutdownGracefully().awaitUninterruptibly(15000);
        if (workerGroup != null)
            workerGroup.shutdownGracefully().awaitUninterruptibly(15000);
    }

}

通过对 DemoRpcServer 实现的分析,我们可以知道每个 Channel 上的 ChannelHandler 顺序如下:

在这里插入图片描述
服务端 ChannelHandler 结构图

registry 相关实现

介绍完客户端和服务端的通信之后,我们再来看简易 RPC 框架的另一个基础能力——服务注册与服务发现能力,对应 demo-rpc 项目源码中的 registry 包。

registry 包主要是依赖 Apache Curator 实现了一个简易版本的 ZooKeeper 客户端,并基于 ZooKeeper 实现了注册中心最基本的两个功能:Provider 注册以及 Consumer 订阅。

这里我们先定义一个 Registry 接口,其中提供了注册以及查询服务实例的方法,如下图所示:

public interface Registry<T> {

    void registerService(ServiceInstance<T> service) throws Exception;

    void unregisterService(ServiceInstance<T> service) throws Exception;

    List<ServiceInstance<T>> queryForInstances(String name) throws Exception;
}

ZooKeeperRegistry 是基于 curator-x-discovery 对 Registry 接口的实现类型,其中封装了之前课时介绍的 ServiceDiscovery,并在其上添加了 ServiceCache 缓存提高查询效率。ZooKeeperRegistry 的具体实现如下:

public class ZookeeperRegistry<T> implements Registry<T> {

    private Map<String, List<ServiceInstanceListener<T>>> listeners = Maps.newConcurrentMap();

    private InstanceSerializer serializer = new JsonInstanceSerializer<>(ServerInfo.class);

    private ServiceDiscovery<T> serviceDiscovery;

    private ServiceCache<T> serviceCache;

    private String address = "localhost:2181";

    public void start() throws Exception {
        String root = "/demo/rpc";
        // 初始化CuratorFramework
        CuratorFramework client = CuratorFrameworkFactory.newClient(address, new ExponentialBackoffRetry(1000, 3));
        client.start();  // 启动Curator客户端
        // client.createContainers(root);

        // 初始化ServiceDiscovery
        serviceDiscovery = ServiceDiscoveryBuilder.builder(ServerInfo.class)
                .client(client).basePath(root)
                .serializer(serializer)
                .build();
        serviceDiscovery.start(); // 启动ServiceDiscovery

        // 创建ServiceCache,监Zookeeper相应节点的变化,也方便后续的读取
        serviceCache = serviceDiscovery.serviceCacheBuilder()
                .name("/demoService")
                .build();
//        client.start(); // 启动Curator客户端
        client.blockUntilConnected();  // 阻塞当前线程,等待连接成功
        serviceDiscovery.start(); // 启动ServiceDiscovery
        serviceCache.start(); // 启动ServiceCache
    }

    @Override
    public void registerService(ServiceInstance<T> service) throws Exception {
        serviceDiscovery.registerService(service);
    }

    @Override
    public void unregisterService(ServiceInstance service) throws Exception {
        serviceDiscovery.unregisterService(service);
    }

    @Override
    public List<ServiceInstance<T>> queryForInstances(String name) throws Exception {
        // 直接根据name进行过滤ServiceCache中的缓存数据
        return serviceCache.getInstances().stream()
                .filter(s -> s.getName().equals(name))
                .collect(Collectors.toList());
    }
}

通过对 ZooKeeperRegistry的分析可以得知,它是基于 Curator 中的 ServiceDiscovery 组件与 ZooKeeper 进行交互的,并且对 Registry 接口的实现也是通过直接调用 ServiceDiscovery 的相关方法实现的。在查询时,直接读取 ServiceCache 中的缓存数据,ServiceCache 底层在本地维护了一个 ConcurrentHashMap 缓存,通过 PathChildrenCache 监听 ZooKeeper 中各个子节点的变化,同步更新本地缓存。这里我们简单看一下 ServiceCache 的核心实现:

public class ServiceCacheImpl<T> implements ServiceCache<T>, 
PathChildrenCacheListener{//实现PathChildrenCacheListener接口
  // 关联的ServiceDiscovery实例
  private final ServiceDiscoveryImpl<T>  discovery;
  // 底层的PathChildrenCache,用于监听子节点的变化
  private final PathChildrenCache cache; 
  // 本地缓存
  private final ConcurrentMap<String, ServiceInstance<T>> instances 
    = Maps.newConcurrentMap();
  public List<ServiceInstance<T>> getInstances(){ // 返回本地缓存内容
      return Lists.newArrayList(instances.values());
  }
  public void childEvent(CuratorFramework client, 
        PathChildrenCacheEvent event) throws Exception{
      switch(event.getType()){
          case CHILD_ADDED:
          case CHILD_UPDATED:{
              addInstance(event.getData(), false); // 更新本地缓存
              notifyListeners = true;
              break;
          }
          case CHILD_REMOVED:{ // 更新本地缓存
              instances.remove(instanceIdFromData(event.getData()));
              notifyListeners = true;
              break;
          }
      }
      ... // 通知ServiceCache上注册的监听器
  }
}

proxy 相关实现

在简易版 Demo RPC 框架中,Proxy 主要是为 Client 端创建一个代理,帮助客户端程序屏蔽底层的网络操作以及与注册中心之间的交互。

简易版 Demo RPC 使用 JDK 动态代理的方式生成代理,这里需要编写一个 InvocationHandler 接口的实现,即下面的 DemoRpcProxy。其中有两个核心方法:一个是 newInstance() 方法,用于生成代理对象;另一个是 invoke() 方法,当调用目标对象的时候,会执行 invoke() 方法中的代理逻辑。

下面是 DemoRpcProxy 的具体实现:

public class DemoRpcProxy implements InvocationHandler {

    private String serviceName; // 需要代理的服务(接口)名称

    public Map<Method, Header> headerCache = new ConcurrentHashMap<>();

    // 用于与Zookeeper交互,其中自带缓存
    private Registry<ServerInfo> registry;

    public DemoRpcProxy(String serviceName,
                        Registry<ServerInfo> registry) throws Exception {
        this.serviceName = serviceName;
        this.registry = registry;
    }

    public static <T> T newInstance(Class<T> clazz, Registry<ServerInfo> registry) throws Exception {
        // 创建代理对象
        return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class[]{clazz},
                new DemoRpcProxy("demoService", registry));
    }


    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        // 从Zookeeper缓存中获取可用的Server地址,并随机从中选择一个
        List<ServiceInstance<ServerInfo>> serviceInstances =
                registry.queryForInstances(serviceName);
        ServiceInstance<ServerInfo> serviceInstance =
                serviceInstances.get(ThreadLocalRandom.current().nextInt(serviceInstances.size()));
        // 创建请求消息,然后调用remoteCall()方法请求上面选定的Server端
        String methodName = method.getName();
        Header header = headerCache.computeIfAbsent(method, h -> new Header(MAGIC, VERSION_1));
        Message<Request> message = new Message(header, new Request(serviceName, methodName, args));
        return remoteCall(serviceInstance.getPayload(), message);
    }

    protected Object remoteCall(ServerInfo serverInfo, Message message) throws Exception {
        if (serverInfo == null) {
            throw new RuntimeException("get available server error");
        }
        Object result;
        try {
            // 创建DemoRpcClient连接指定的Server端
            DemoRpcClient demoRpcClient = new DemoRpcClient(serverInfo.getHost(), serverInfo.getPort());
            ChannelFuture channelFuture = demoRpcClient.connect().awaitUninterruptibly();
            // 创建对应的Connection对象,并发送请求
            Connection connection = new Connection(channelFuture, true);
            NettyResponseFuture responseFuture = connection.request(message, Constants.DEFAULT_TIMEOUT);
            // 等待请求对应的响应
            result = responseFuture.getPromise().get(Constants.DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            throw e;
        }
        return result;
    }
}

从 DemoRpcProxy 的实现中我们可以看到,它依赖了 ServiceInstanceCache 获取ZooKeeper 中注册的 Server 端地址,同时依赖了 DemoRpcClient 与Server 端进行通信,上层调用方拿到这个代理对象后,就可以像调用本地方法一样进行调用,而不再关心底层网络通信和服务发现的细节。当然,这个简易版 DemoRpcProxy 的实现还有很多可以优化的地方,例如:

  • 缓存 DemoRpcClient 客户端对象以及相应的 Connection 对象,不必每次进行创建。
  • 可以添加失败重试机制,在请求出现超时的时候,进行重试。
  • 可以添加更加复杂和灵活的负载均衡机制,例如,根据 Hash 值散列进行负载均衡、根据节点 load 情况进行负载均衡等。

你若感兴趣的话可以尝试进行扩展,以实现一个更加完善的代理层。

使用方接入

介绍完 Demo RPC 的核心实现之后,下面我们讲解下Demo RPC 框架的使用方式。这里涉及Consumer、DemoServiceImp、Provider三个类以及 DemoService 业务接口。

相应的业务接口和实现比较简单,我们再来看Provider的实现,它的角色类似于 Dubbo 中的 Provider,其会创建 DemoServiceImpl 这个业务 Bean 并将自身的地址信息暴露出去,如下所示:

public class Provider {
    public static void main(String[] args) throws Exception {
        // 创建DemoServiceImpl,并注册到BeanManager中
        BeanManager.registerBean("demoService", new DemoServiceImpl());
        // 创建ZookeeperRegistry,并将Provider的地址信息封装成ServerInfo
        // 对象注册到Zookeeper
        ZookeeperRegistry<ServerInfo> discovery = new ZookeeperRegistry<>();
        discovery.start();
        ServerInfo serverInfo = new ServerInfo("127.0.0.1", 6666);
        discovery.registerService(
                ServiceInstance.<ServerInfo>builder().name("demoService").payload(serverInfo).build());
        // 启动DemoRpcServer,等待Client的请求
        DemoRpcServer rpcServer = new DemoRpcServer(6666);
        rpcServer.start();
        Thread.sleep(100000000L);
    }
}

最后是Consumer,它类似于 Dubbo 中的 Consumer,其会订阅 Provider 地址信息,然后根据这些信息选择一个 Provider 建立连接,发送请求并得到响应,这些过程在 Proxy 中都予以了封装,那Consumer 的实现就很简单了,可参考如下示例代码:

public class Consumer {
    public static void main(String[] args) throws Exception {
        // 创建ZookeeperRegistr对象
        ZookeeperRegistry<ServerInfo> discovery = new ZookeeperRegistry<>();
        discovery.start();

        // 创建代理对象,通过代理调用远端Server
        DemoService demoService = DemoRpcProxy.newInstance(DemoService.class, discovery);
        // 调用sayHello()方法,并输出结果
        String result = demoService.sayHello("hello");
        System.out.println(result);
        // Thread.sleep(10000000L);
    }
}

总结

本课时我们首先介绍了简易 RPC 框架中的transport 包,它在上一课时介绍的编解码器基础之上,实现了服务端和客户端的通信能力。之后讲解了registry 包如何实现与 ZooKeeper 的交互,完善了简易 RPC 框架的服务注册与服务发现的能力。接下来又分析了proxy 包的实现,其中通过 JDK 动态代理的方式,帮接入方屏蔽了底层网络通信的复杂性。最后,我们编写了一个简单的 DemoService 业务接口,以及相应的 Provider 和 Consumer 接入简易 RPC 框架。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1522431.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

win10 + cpu + pycharm + mindspore

MindSpore是华为公司自研的最佳匹配昇腾AI处理器算力的全场景深度学习框架。 1、打开官网&#xff1a; MindSpore官网 2、选择以下选项&#xff1a; 3、创建conda 环境&#xff0c;这里python 选择3.9.0&#xff0c;也可以选择其他版本&#xff1a; conda create -c conda-…

[VulnHub靶机渗透] BNE0x03 Simple

&#x1f36c; 博主介绍&#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 hacker-routing &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【应急响应】 【Java、PHP】 【VulnHub靶场复现】【面试分析】 &#x1f389;点赞➕评论➕收…

智慧交通:构建智慧城市的重要一环

随着信息技术的飞速发展&#xff0c;智慧城市已成为现代城市发展的重要方向。作为智慧城市的重要组成部分&#xff0c;智慧交通以其高效、便捷、环保的特性&#xff0c;成为推动城市现代化进程的关键力量。本文将从智慧交通的概念、发展现状、面临挑战以及未来趋势等方面&#…

以题为例浅谈文件包含

什么叫做文件包含 文件包含函数加载的参数没有经过过滤或严格定义&#xff0c;可以被用户控制&#xff0c; 包含其他恶意文件&#xff0c;导致了执行非预期代码。 文件包含漏洞&#xff08;File Inclusion Vulnerability&#xff09;是一种常见的网络安全漏洞&#xff0c;它允…

HTML5CSS3提高导读

HTML5CSS3提高导读 2024/2/20 HTML5 的新增特性主要是针对于以前的不足&#xff0c;增加了一些新的标签、新的表单和新的表单属性等。 这些新特性都有兼容性问题&#xff0c;基本是 IE9 以上版本的浏览器才支持&#xff0c;如果不考虑兼容性问题&#xff0c;可以大量使用这 …

瑞熙贝通实验室安全培训考试系统

一、系统概述 瑞熙贝通实验室安全培训考试系统是一种基于互联网和人工智能技术的在线考试平台&#xff0c;旨在旨在提供实验室安全教育和考核的全面解决方案。该系统可以帮助实现实验室安全培训考试的在线化、智能化和规范化&#xff0c;提高实验室安全意识和能力&#xff0c;…

计算机网络——物理层(编码与调制)

计算机网络——编码与调制 基带信号和宽带信号编码与调制数字数据编码为数字信号非归零编码归零编码反向不归零编码曼彻斯特编码差分曼彻斯特编码4B/5B编码 数字数据调制为模拟信号模拟数据编码为数字信号模拟数据调制为模拟信号 我们之前讲了物理层的一些基础知识和两个准则&a…

vite打包时发布时,放在服务器的二级目录中

方式一 hash模式 如果我们的站点根目录为 public , 我们访问的时候使用的是 http://www.abc.com/ 访问到了站点的根目当&#xff0c;现在我们要访问 http://www.abc.com/mysite/#/ 配置如下 修改 vite.config.js base:“/mysite/” 修改 router中的配置 上面的步骤完成&…

Luckysheet + Exceljs:H5实现Excel在线编辑、导入、导出及上传服务器的示例代码(完整版demo)

创建xeditor.html <!DOCTYPE html> <html><head><meta charset"UTF-8" /><title>Hello World!</title><!-- <link relstylesheet href./luckysheet/plugins/css/pluginsCss.css /><link relstylesheet href./luck…

[游戏开发][UE5.3]GAS学习心得

GAS(GameplayAbilitySystem) UE提供的一套技能框架&#xff0c;这个框架也不是万能的&#xff0c;甚至各个部件你要进行封装开发&#xff0c;但这也比你从头写一套技能框架要容易很多。 GAS功能极其强大&#xff0c;所以它是一个庞大的系统&#xff0c;如果想运用得当&#x…

【刷题训练】牛客:JZ31 栈的压入、弹出序列

文章目录 一、题目要求二、解题思路三、C代码四、注意点五、运行成功 一、题目要求 二、解题思路 题意解读。本道题给定了两个序列pushV和popV&#xff0c;其中序列pushV是入栈顺序&#xff0c;popV是出栈顺序。问题就是让我们去判断这个popV的顺序是否可能是pushV的弹出顺序。…

瑞_Redis_短信登录(二)

文章目录 项目介绍1.1 项目准备1.2 基于Session实现登录流程1.2.1 发送短信验证码1.2.2 短信验证码登录、注册1.2.3 校验登录状态 1.3 实现发送短信验证码功能1.3.1 页面流程1.3.2 代码实现 1.41.51.6 &#x1f64a; 前言&#xff1a;本文章为瑞_系列专栏之《Redis》的实战篇的…

【LeetCode热题100】148. 排序链表(链表)

一.题目要求 给你链表的头结点 head &#xff0c;请将其按 升序 排列并返回 排序后的链表 。 二.题目难度 中等 三.输入样例 示例 1&#xff1a; 输入&#xff1a;head [4,2,1,3] 输出&#xff1a;[1,2,3,4] 示例 2&#xff1a; 输入&#xff1a;head [-1,5,3,4,0] 输…

知名Web3投资基金a16z合伙人Jane Lippencott确认出席Hack.Summit() 2024区块链开发者大会

在区块链技术的风起云涌和Web3生态的蓬勃发展中&#xff0c;知名a16z Crypto的合伙人Jane Lippencott已确认出席即将于2024年4月9日至10日在香港数码港举行的Hack.Summit() 2024区块链开发者大会。作为亚洲首次举办的Hack.Summit()&#xff0c;此次大会将为全球区块链开发者及业…

DFS的一些题目

题目1&#xff1a;奶牛选美 这道题其实就是把两个连通块合成一个&#xff0c;可以用dfs、bfs和并查集。因为最近在dfs专题训练&#xff0c;这里我只写了dfs。 首先我们用dfs的方式遍历两个连通块&#xff0c;将两个连通块中点的坐标分别存入两个数组中&#xff0c;将这两个数组…

openssl3.2 - note - Writing OpenSSL Provider Skeleton

文章目录 openssl3.2 - note - Writing OpenSSL Provider Skeleton概述笔记测试工程的建立复现的provider工程总结Provider包含的头文件openssl/core.h中的数据结构实现 OSSL_provider_init()看一下openssl自带的提供者provider的openssl命令行测试provider的本质是hook了opens…

pytorch 入门基础知识一(Pytorch 01)

一 深度学习基础相关 深度学习三个主要的方向&#xff1a;计算机视觉&#xff0c;自然语言&#xff0c;语音识别。 机器学习核心组件&#xff1a;1 数据集(data)&#xff0c;2 前向传播的model(net)&#xff0c;3 目标函数(loss)&#xff0c; 4 调整模型参数和优化函数的算法…

【研发管理】产品经理-基础认知

导读&#xff1a;产品经理&#xff08;Product Manager&#xff09;是一个负责产品的全周期管理的职位&#xff0c;他们不仅参与产品的设计、开发、推广和销售&#xff0c;还涉及到产品的市场调研、用户需求分析、竞争分析、产品规划、产品测试以及后续的产品迭代等多个环节。产…

安装snap再安装flutter再安装localsend@Ubuntu(FreeBSD下未成功)

Localsend介绍 localsend是一个跨平台的文件传送软件&#xff0c;可以在Windows、MacOS、Linux、Android和IOS下互相传送文件&#xff0c;只要在同一个局域网即可。 localsend官网&#xff1a;LocalSend 尝试安装localsend&#xff0c;发现需要使用flutter&#xff0c; 安装f…

【AI】Ubuntu系统深度学习框架的神经网络图绘制

一、Graphviz 在Ubuntu上安装Graphviz&#xff0c;可以使用命令行工具apt进行安装。 安装Graphviz的步骤相对简单。打开终端&#xff0c;输入以下命令更新软件包列表&#xff1a;sudo apt update。之后&#xff0c;使用命令sudo apt install graphviz来安装Graphviz软件包。为…