异步编程 - 12 异步、基于事件驱动的网络编程框架 Netty

news2025/1/8 4:38:54

文章目录

  • Netty概述
  • Netty中的一些概念
  • Netty的线程模型
    • Netty Server端
    • Netty Netty 端
  • TCP半包与粘包问题
  • 基于Netty与CompletableFuture实现RPC异步调用

在这里插入图片描述


Netty概述

Netty是一个异步、基于事件驱动的网络应用程序框架,其对Java NIO进行了封装,大大简化了TCP或者UDP服务器的网络编程开发。

Netty框架将网络编程逻辑与业务逻辑处理分离开来,其内部会自动处理好网络与异步处理逻辑,让我们专心写自己的业务处理逻辑。同时,Netty的异步非阻塞能力与CompletableFuture结合可以让我们轻松实现网络请求的异步调用。

Netty的应用还是比较广泛的,Apache Dubbo、Apache RocketMq、Zuul 2.0服务网关、Spring WebFlux、Sofa-Bolt底层网络通信等都是基于Netty来实现的。

Netty中的一些概念

  • Channel:是通道的意思。这是在JDK NIO类库里面提供的一个概念,JDK里面的通道是java.nio.channels.Channel,JDK中的实现类有客户端套接字通道java.nio.channels.SocketChannel和服务端监听套接字通道java.nio.channels.ServerSocketChannel。Channel的出现是为了支持异步IO操作。io.netty.channel.Channel是Netty框架自己定义的一个通道接口。Netty实现的客户端NIO套接字通道是io.netty.channel.socket.nio.NioSocketChannel,提供的服务器端NIO套接字通道是io.netty.channel.socket.nio.NioServerSocketChannel

  • NioSocketChannel:Netty中客户端套接字通道。内部管理了一个Java NIO中的java.nio.channels.SocketChannel实例,其被用来创建java.nio.channels.SocketChannel的实例和设置该实例的属性,并调用其connect方法向服务端发起TCP链接。

  • NioServerSocketChannel:服务器端监听套接字通道。内部管理了一个Java NIO中的java.nio.channels.ServerSocketChannel实例,用来创建ServerSocketChannel实例和设置该实例属性,并调用该实例的bind方法在指定端口监听客户端的链接。

  • EventLoopGroup:Netty之所以能提供高性能网络通信,其中一个原因是它使用Reactor线程模型。在Netty中,每个EventLoopGroup本身都是一个线程池,其中包含了自定义个数的NioEventLoop,每个NioEventLoop是一个线程,并且每个NioEventLoop里面持有自己的NIO Selector选择器。在Netty中,客户端持有一个EventLoopGroup用来处理网络IO操作;在服务器端持有两个EventLoopGroup,其中boss组是专门用来接收客户端发来的TCP链接请求的,worker组是专门用来处理完成三次握手的链接套接字的网络IO请求的。

  • Channel与EventLoop的关系:在Netty中,NioEventLoop是EventLoop的一个实现,每个NioEventLoop中会管理自己的一个selector选择器和监控选择器就绪事件的线程;每个Channel在整个生命周期中固定关联到某一个NioEventLoop;但是,每个NioEventLoop中可以关联多个Channel。

  • ChannelPipeline:Netty中的ChannelPipeline类似于Tomcat容器中的Filter链,属于设计模式中的责任链模式,其中链上的每个节点就是一个ChannelHandler。在Netty中,每个Channel有属于自己的ChannelPipeline,管线中的处理器会对从Channel中读取或者要写入Channel中的数据进行依次处理。下图是Netty源码里面的一个图。
    在这里插入图片描述

【Netty框架数据流图】

如图所示,当有数据从连接套接字被读取后,数据会被依次传递到Channel Pipeline中的每个ChannelHandler进行处理;当通过Channel或者ChannelHandlerContext向连接套接字写入数据时,数据会先依次被ChannelPipeline中的每个Channel Handler处理,处理完毕后才会最终通过原生连接套接字写入TCP发送缓存。

需要注意的是,虽然每个Channel(更底层说是每个Socket)有自己的Channel Pipeline,但是每个ChannelPipeline里面可以复用同一个ChannelHandler的实例(当ChannelHandler使用@shared注解修饰时)。


Netty的线程模型

Netty的线程模型,因为其模型实现与Netty的异步处理能力紧密相关,其线程模型如下图所示

在这里插入图片描述
【Netty线程模型】

Netty Server端

图下侧所示为Netty Server端,

  • 当NettyServer启动时会创建两个NioEventLoop Group线程池组,其中boss组用来接收客户端发来的连接,worker组则负责对完成TCP三次握手的连接进行处理

  • 图中每个NioEventLoopGroup里面包含了多个Nio EventLoop,每个NioEventLoop中包含了一个NIO Selector、一个队列、一个线程;其中线程用来做轮询注册到Selector上的Channel的读写事件和对投递到队列里面的事件进行处理。

当NettyServer启动时会注册监听套接字通道NioServerSocketChannel到boss线程池组中的某一个NioEventLoop管理的Selector上,与其对应的线程会负责轮询该监听套接字上的连接请求;

当客户端发来一个连接请求时,boss线程池组中注册了监听套接字的NioEventLoop中的Selector会读取TCP三次握手的请求,然后创建对应的连接套接字通道NioSocketChannel,接着把其注册到worker线程池组的某一个NioEventLoop中管理的一个NIO Selector上,该连接套接字通道NioSocketChannel上的所有读写事件都由该NioEventLoop管理。

当客户端发来多个连接时,NettyServer端会创建多个NioSocketChannel,而worker线程池组中的NioEventLoop是有个数限制的,所以Netty有一定的策略把很多NioSocketChannel注册到不同的NioEventLoop上,也就是每个NioEventLoop中会管理好多客户端发来的连接,并通过循环轮询处理每个连接的读写事件。


Netty Netty 端

图上侧部分所示为Netty Client部分,当NettyClient启动时会创建一个NioEventLoopGroup,用来发起请求并对建立TCP三次连接的套接字的读写事件进行处理。当调用Bootstrap的connect方法发起连接请求后内部会创建一个NioSocketChannel用来代表该请求,并且会把该NioSocketChannel注册到NioSocketChannel管理的某个NioEventLoop的Selector上,该NioEventLoop的读写事件都由该NioEventLoop负责处理。

Netty之所以说是异步非阻塞网络框架,是因为通过NioSocketChannel的write系列方法向连接里面写入数据时是非阻塞的,是可以马上返回的(即使调用写入的线程是我们的业务线程)。这是Netty通过在ChannelPipeline中判断调用NioSocketChannel的write的调用线程是不是其对应的NioEventLoop中的线程来实现的:

private void write(Object msg, boolean flush, ChannelPromise promise) {
    ...
    //1.如果调用线程是IO线程
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {//2.如果调用线程不是IO线程
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

如上代码1所示,如果调用线程是IO线程,则会在IO线程上执行写入;

如代码2所示,如果发现调用线程不是IO线程,则会把写入请求封装为WriteTask并投递到与其对应的NioEventLoop中的队列里面,然后等其对应的NioEventLoop中的线程轮询连接套接字的读写事件时捎带从队列里面取出来并执行。

也就是说,与每个NioSocketChannel对应的读写事件都是在与其对应的NioEvent Loop管理的单线程内执行的,不存在并发,所以无须加锁处理。

另外当从NioSocketChannel中读取数据时,并不是使用业务线程来阻塞等待,而是等NioEventLoop中的IO轮询线程发现Selector上有数据就绪时,通过事件通知方式来通知我们业务数据已经就绪,可以来读取并处理了。

使用Netty框架进行网络通信时,当我们发起请求后请求会马上返回,而不会阻塞我们的业务调用线程;如果我们想要获取请求的响应结果,也不需要业务调用线程使用阻塞的方式来等待,而是当响应结果出来时使用IO线程异步通知业务,由此可知,在整个请求–响应过程中,业务线程不会由于阻塞等待而不能干其他事情。

下面我们讨论几个细节:

第一,完成TCP三次握手的套接字应该注册到worker线程池中的哪一个NioEventLoop的Selector上;

第二,如果NioEventLoop中的线程负责监听注册到Selector上的所有连接的读写事件和处理队列里面的消息,那么会不会导致由于处理队列里面任务耗时太长导致来不及处理连接的读写事件;

第三,多个套接字注册到同一个NioEventLoop的Selector上,使用单线程轮询处理每个套接字上的事件,如果某一个套接字网络请求比较频繁,轮询线程是不是会一直处理该套接字的请求,而使其他套接字请求得不到及时处理。

对于第一个问题,关于NioEventLoop的分配,Netty默认使用的是PowerOfTwoEvent ExecutorChooser,其代码如下:

private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
    @Override
    public EventExecutor next() {
        return children[childIndex.getAndIncrement() & children.length - 1];
    }
}

可知是采用轮询取模的方式来进行分配。

对于第二个问题,Netty默认是采用时间均分策略来避免某一方处于饥饿状态,可以参见NioEventLoop的run方法内的代码片段:

//1.记录开始处理时间
final long ioStartTime = System.nanoTime();
try {//1.1处理连接套接字的读写事件
    processSelectedKeys();
} finally {
    // 1.2计算连接套接字处理耗时,ioRatio默认为50
    final long ioTime = System.nanoTime() - ioStartTime;
    //1.3运行队列里面任务
    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}

代码1.1处理所有注册到当前NioEventLoop的Selector上的所有连接套接字的读写事件,代码1.2用来统计其耗时,由于默认情况下ioRatio为50,所以代码1.3尝试使用与代码1.2执行相同的时间来运行队列里面的任务,也就是处理套接字读写事件与运行队列里面任务是使用时间片轮转方式轮询执行。

针对第三个问题,我们可以看NioEventLoop的processSelectedKeysOptimized方法,该方法内会轮询注册到自己的Selector上的所有连接套接字的读写事件:

private void processSelectedKeysOptimized() {
    //3轮询处理所有套接字的读写事件
    for (int i = 0; i < selectedKeys.size; ++i) {
        final SelectionKey k = selectedKeys.keys[i];
        selectedKeys.keys[i] = null;

        final Object a = k.attachment();
        //如果是AbstractNioChannel子类实例
        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        if (needsToSelectAgain) {
            selectedKeys.reset(i + 1);

            selectAgain();
            i = -1;
        }
    }
}

在上述代码中,processSelectedKeysOptimized内会轮询处理所有套接字的读写事件,具体是调用processSelectedKey处理每个NioSocketChannel的读写事件,其代码如下:

 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
            ...
            //AbstractNioByteChannel的read方法
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

如上代码如果是读事件或者套接字接收事件则会调用AbstractNioByteChannel的read方法读取数据,这里我们只关心读事件,其代码如下:

public final void read() {
    ...
    try {
        //4循环读取套接字中的数据
        do {
            byteBuf = allocHandle.allocate(allocator);
            allocHandle.lastBytesRead(doReadBytes(byteBuf));
            if (allocHandle.lastBytesRead() <= 0) {
                byteBuf.release();
                byteBuf = null;
                close = allocHandle.lastBytesRead() < 0;
                if (close) {
                    readPending = false;
                }
                break;
            }
            //4.1增加读取的包数量
            allocHandle.incMessagesRead(1);
            readPending = false;
            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;
        //4.2判断是否继续读取
        } while (allocHandle.continueReading());

        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();

        if (close) {
            closeOnRead(pipeline);
        }
    } catch (Throwable t) {
        handleReadException(pipeline, byteBuf, t, close, allocHandle);
    } finally {
       ...
    }
}

代码4循环读取当前连接套接字中的数据,代码4.1表示每当从套接字读取一批数据就让读取的消息数量加一,代码如下:

public final void incMessagesRead(int amt) {
 totalMessages += amt;
}

代码4.2则判断是继续读取数据,还是退出读取循环,allocHandle的continueReading代码如下:

public boolean continueReading() {
    return continueReading(defaultMaybeMoreSupplier);
}

public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
    return config.isAutoRead() &&
           (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
           totalMessages < maxMessagePerRead &&//最大读取消息个数?
           totalBytesRead > 0;
}

默认情况下maxMessagePerRead为16,所以对应NioEventLoop管理的每个NioSocketChannel中的数据,在一次事件循环内最多连续读取16次数据,并不会一直读取,这就有效避免了其他NioSocketChannel的请求事件得不到及时处理的情况。


TCP半包与粘包问题

大家都知道在客户端与服务端进行网络通信时,客户端会通过socket把需要发送的内容序列化为二进制流后发送出去,当二进制流通过网络流向服务器端后,服务端会接收该请求并解析该请求包,然后反序列化后对请求进行处理。

这看似是一个很简单的过程,但是细细想来却发现没有那么简单。使用TCP进行通信时客户端与服务端之间持有一个长连接,客户端多次发送请求都是复用该连接的,下图展示了客户端与服务端的交互流程。

在这里插入图片描述
【客户端与服务器交互图】

如图所示,在客户端发送数据时,实际是把数据写入TCP发送缓存里面的,如果发送的包的大小比TCP发送缓存的容量大,那么这个数据包就会被分成多个包,通过socket多次发送到服务端。而服务端获取数据是从接收缓存里面获取的,假设服务端第一次从接收缓存里面获取的数据是整个包的一部分,这时候就产生了半包现象,半包不是说只收到了全包的一半,而是说只收到了全包的一部分。

服务器读取到半包数据后,会对读取的二进制流进行解析,一般会把二进制流反序列化为对象,这里由于服务器只读取了客户端序列化对象后的一部分,所以反序列会报错。

同理,如果发送的数据包大小比TCP发送缓存容量小,并且假设TCP缓存可以存放多个包,那么客户端和服务端的一次通信就可能传递了多个包,这时候服务端从接收缓存就可能一下读取了多个包,出现粘包现象,由于服务端从接收缓存获取的二进制流是多个对象转换来的,所以在后续的反序列化时肯定也会出错。

其实出现粘包和半包的原因是TCP层不知道上层业务的包的概念,它只是简单地传递流,所以需要上层应用层协议来识别读取的数据是不是一个完整的包。

一般有三种常用的解决半包与粘包问题的方案:

在这里插入图片描述

【Dubbo协议帧】

  • 比较常见的方案是应用层设计协议时,把协议包分为header和body(比如Dubbo协议帧格式),header里面记录body长度,当服务端从接收缓冲区读取数据后,如果发现数据大小小于包的长度则说明出现了半包,这时候就回退读取缓存的指针,等待下次读事件到来时再次测试。如果发现数据大小大于包长度则看大小是否是包长度的整数倍,如果是则循环读取多个包,否则就是出现了多个整包+半包(粘包)。

  • 第二种方案是在多个包之间添加分隔符,使用分隔符来判断一个包的结束。
    在这里插入图片描述
    【帧分隔符】

如图 所示,每个包中间使用“|”作为分隔符,此时每个包的大小可以不固定,当服务器端读取时,若遇到分隔符就知道当前包结束了,但是包的消息体内不能含有分隔符,Netty中提供了DelimiterBasedFrameDecoder用来实现该功能。

  • 还有一种方案是包定长,就是每个包大小固定长度,如下图所示。
    在这里插入图片描述

【 包大小固定长度】

使用这种方案时每个包的大小必须一致,Netty中提供了FixedLengthFrameDecoder来实现该功能。

基于Netty与CompletableFuture实现RPC异步调用

我们来基于CompletableFuture与Netty来模拟下如何异步发起远程调用,为简化设计,这里我们将应用层协议帧格式定义为文本格式,如下图所示。

在这里插入图片描述

【 协议帧格式】

如图所示,

  • 帧格式的第一部分为消息体,也就是业务需要传递的内容;
  • 第二部分为“:”号;
  • 第三部分为请求id,这里使用“:”把消息体与请求id分开,以便服务端提取这两部分内容,需要注意消息体内不能含有“:”号;
  • 第四部分“|”标识一个协议帧的结束,因为本文使用Netty的DelimiterBasedFrameDecoder来解决半包粘包问题,所以需要注意消息体内不能含有“|”号。

首先我们基于Netty开发一个简单的demo来模拟RpcServer,也就是服务提供方程序,RpcServer的代码如下:

public final class RpcServer {
    public static void main(String[] args) throws Exception {
        // 0.配置创建两级线程池
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);// boss
        EventLoopGroup workerGroup = new NioEventLoopGroup();// worker
        // 1.创建业务处理hander
        NettyServerHandler servrHandler = new NettyServerHandler();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            // 1.1设置帧分隔符解码器
                            ByteBuf delimiter = Unpooled.copiedBuffer("|".getBytes());
                            p.addLast(new DelimiterBasedFrameDecoder(1000, delimiter));
                            // 1.2设置消息内容自动转换为String的解码器到管线
                            p.addLast(new StringDecoder());
                            // 1.3设置字符串消息自动进行编码的编码器到管线
                            p.addLast(new StringEncoder());
                            // 1.4添加业务hander到管线
                            p.addLast(servrHandler);
                        }
                    });

            // 2.启动服务,并且在12800端口监听
            ChannelFuture f = b.bind(12800).sync();

            // 3. 等待服务监听套接字关闭
            f.channel().closeFuture().sync();
        } finally {
            // 4.优雅关闭两级线程池,以便释放线程
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

如上代码是一个典型的NettyServer启动程序,首先代码0创建了NettyServer的boss与worker线程池,然后代码1创建了业务NettyServerHandler,这个我们后面具体讲解。

代码1.1添加DelimiterBasedFrameDecoder解码器到链接channel的管道以便使用“|”分隔符来确定一个协议帧的边界(避免半包粘包问题);

代码1.2添加字符串解码器,它在服务端链接channel接收到客户端发来的消息后会自动把消息内容转换为字符串;代码1.3设置字符串编码器,它会在服务端链接channel向客户端写入数据时,对数据进行编码;代码1.3添加业务handler到管线。

代码2启动服务,并且在端口12800监听客户端发来的链接;代码3同步等待服务监听套接字关闭;代码4优雅关闭两级线程池,以便释放线程。

这里我们主要看下业务handler的实现,服务端在接收客户端消息,且消息内容经过代码1.1、代码1.2的hanlder处理后,流转到NettyServerHandler的就是一个完整的协议帧的字符串了。NettyServerHandler代码如下:

@Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    //5. 根据消息内容和请求id,拼接消息帧
    public String generatorFrame(String msg, String reqId) {
        return msg + ":" + reqId + "|";
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {

        //6.处理请求
        try {
                System.out.println(msg);
                // 6.1.获取消息体,并且解析出请求id
                String str = (String) msg;
                String reqId = str.split(":")[1];

                // 6.2.拼接结果,请求id,协议帧分隔符(模拟服务端执行服务产生结果)
                String resp =  generatorFrame("im jiaduo ", reqId);

                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                // 6.3.写回结果
                ctx.channel().writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
            } catch (Exception e) {
                e.printStackTrace();
            }
    }

   ...

}

由上述代码可知,@Sharable注解是让服务端所有接收的链接对应的channel复用同一个NettyServerHandler实例,这里可以使用@Sharable方式是因为NettyServer Handler内的处理是无状态的,不会存在线程安全问题。

当数据流程到NettyServerHandler时,会调用其channelRead方法进行处理,这里msg已经是一个完整的本文的协议帧了。

异步任务内代码6.1首先获取消息体的内容,然后根据协议格式,从中截取出请求id,然后调用代码6.2拼接返回给客户端的协议帧,注意这里需要把请求id带回去;然后休眠2s模拟服务端任务处理,最后代码6.3把拼接好的协议帧写回客户端。

下面我们基于Netty开发一个简单的demo来模拟RpcClient,也就是服务消费方程序,RpcClient的代码如下:

public class RpcClient {
    // 连接通道
    private volatile Channel channel;
    // 请求id生成器
    private static final AtomicLong INVOKE_ID = new AtomicLong(0);
    // 启动器
    private Bootstrap b;

    public RpcClient() {
        // 1. 配置客户端.
        EventLoopGroup group = new NioEventLoopGroup();
        NettyClientHandler clientHandler = new NettyClientHandler();
        try {
            b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            // 1.1设置帧分隔符解码器
                            ByteBuf delimiter = Unpooled.copiedBuffer("|".getBytes());
                            p.addLast(new DelimiterBasedFrameDecoder(1000, delimiter));
                            // 1.2设置消息内容自动转换为String的解码器到管线
                            p.addLast(new StringDecoder());
                            // 1.3设置字符串消息自动进行编码的编码器到管线
                            p.addLast(new StringEncoder());
                            // 1.4添加业务Handler到管线
                            p.addLast(clientHandler);

                        }
                    });
            // 2.发起链接请求,并同步等待链接完成
            ChannelFuture f = b.connect("127.0.0.1", 12800).sync();
            if (f.isDone() && f.isSuccess()) {
                this.channel = f.channel();
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void sendMsg(String msg) {
        channel.writeAndFlush(msg);
    }

    public void close() {

        if (null != b) {
            b.group().shutdownGracefully();
        }
        if (null != channel) {
            channel.close();
        }
    }

    // 根据消息内容和请求id,拼接消息帧
    private String generatorFrame(String msg, String reqId) {
        return msg + ":" + reqId + "|";
    }

    public CompletableFuture rpcAsyncCall(String msg) {
        // 1. 创建future
        CompletableFuture<String> future = new CompletableFuture<>();

        // 2.创建消息id
        String reqId = INVOKE_ID.getAndIncrement() + "";

        // 3.根据消息,请求id创建协议帧
        msg = generatorFrame(msg, reqId);

        // 4.nio异步发起网络请求,马上返回
        this.sendMsg(msg);

        // 5.保存future对象
        FutureMapUtil.put(reqId, future);

        return future;
    }

    public String rpcSyncCall(String msg) throws InterruptedException, ExecutionException {
        // 1. 创建future
        CompletableFuture<String> future = new CompletableFuture<>();
        // 2.创建消息id
        String reqId = INVOKE_ID.getAndIncrement()  + "";

        // 3.消息体后追加消息id和帧分隔符
        msg = generatorFrame(msg, reqId);

        // 4.nio异步发起网络请求,马上返回
        this.sendMsg(msg);

        // 5.保存future
        FutureMapUtil.put(reqId, future);

        // 6.同步等待结果
        String result = future.get();
        return result;
    }
}

如上代码RpcClient的构造函数创建了一个NettyClient,其使用方法与NettyServer类似,这里不再赘述。需要注意的是,这里注册了业务的NettyClientHandler处理器到链接channel的管线里面,并且在与服务端完成TCP三次握手后把对应的channel对象保存了下来。

下面先来看rpcSyncCall方法,该方法意在模拟同步远程调用,其中代码1创建了一个CompletableFuture对象;代码2使用原子变量生成一个请求id,代码3则把业务传递的msg消息体和请求id组成协议帧;代码4则调用sendMsg方法通过保存的channel对象把协议帧异步发送出去,该方法是非阻塞的,会马上返回,所以不会阻塞业务线程;代码5把代码1创建的future对象保存到FutureMapUtil中管理并发缓存,其中key为请求id,value为创建的future。FutureMapUtil代码如下,可知就是管理并发缓存的一个工具类:

public class FutureMapUtil {
    // <请求id,对应的future>
    private static final ConcurrentHashMap<String, CompletableFuture> futureMap = new ConcurrentHashMap<String, CompletableFuture>();

    public static void put(String id, CompletableFuture future) {
        futureMap.put(id, future);
    }
    public static CompletableFuture remove(String id) {
        return futureMap.remove(id);
    }
}

然后代码6调用future的get()方法,同步等待future的complete()方法设置结果完成,调用get()方法会阻塞业务线程,直到future的结果被设置了。

现在我们再来看rpcAsyncCall异步调用,其代码实现与同步的rpcSyncCall类似,只不过其没有同步等待future有结果值,而是直接将future返回给调用方,然后就直接返回了,该方法不会阻塞业务线程。

到这里我们讲解了业务调用时发起远程调用,接下来我们看服务端写回结果到客户端后。关于客户端是如何把接入写回对应的future的,这里我们需要看注册的NettyClientHandler,其代码如下:

@Sharable
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
   ...
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 1.根据请求id,获取对应future
        CompletableFuture future = FutureMapUtil.remove(((String) msg).split(":")[1]);
        // 2.如果存在,则设置future结果
        if (null != future) {
            future.complete(((String) msg).split(":")[0]);
        }
    }
...
}

如上代码所示,当NettyClientHandler的channelRead方法被调用时,其中msg已经是一个完整的本文的协议帧了(因为DelimiterBasedFrameDecoder与StringDecoder已经做过解析)。

异步任务内代码1首先根据协议帧格式,从消息msg内获取到请求id,然后从FutureMapUtil管理的缓存内获取请求id对应的future对象,并移除;如果存在,代码2则从协议帧内获取服务端写回的数据,并调用future的complete方法把结果设置到future,这时候由于调用future的get()方法而被阻塞的线程就返回结果了。

上面我们讲解了RpcClient与RpcServer的实现,下面我们从两个例子看如何使用,首先看TestModelAsyncRpc的代码:

public class TestModelAsyncRpc {

    private static final RpcClient rpcClient = new RpcClient();

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        // 1.同步调用
        System.out.println(rpcClient.rpcSyncCall("who are you"));

        // 2.发起远程调用异步,并注册回调,马上返回
        CompletableFuture<String> future = rpcClient.rpcAsyncCall("who are you");
        future.whenComplete((v, t) -> {
            if (t != null) {
                t.printStackTrace();
            } else {
                System.out.println(v);
            }

        });

        System.out.println("---async rpc call over");
    }
}

如上main函数内首先创建了一个rpcClient对象,然后代码1同步调用了其rpcSyncCall方法,由于是同步调用,所以在服务端执行返回结果前,当前调用线程会被阻塞,直到服务端把结果写回客户端,并且客户端把结果写回到对应的future对象后才会返回。

代码2调用了异步方法rpcAsyncCall,其不会阻塞业务调用线程,而是马上返回一个CompletableFuture对象,然后我们在其上设置了一个回调函数,意在等future对象的结果被设置后进行回调,这个实现了真正意义上的异步。

我们再看一个使用实例,演示如何基于CompletableFuture的能力,并发发起多次调用,然后对返回的多个CompletableFuture进行运算,首先看TestModelAsyncRpc2类:

public class TestModelAsyncRpc2 {

    private static final RpcClient rpcClient = new RpcClient();

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        // 1.发起远程调用异步,马上返回
        CompletableFuture<String> future1 = rpcClient.rpcAsyncCall("who are you");
        // 2.发起远程调用异步,马上返回
        CompletableFuture<String> future2 = rpcClient.rpcAsyncCall("who are you");

        // 3.等两个请求都返回结果时候,使用结果做些事情
        CompletableFuture<String> future = future1.thenCombine(future2, (u, v) -> {

            return u + v;
        });

        // 4.等待最终结果
        future.whenComplete((v, t) -> {
            if (t != null) {
                t.printStackTrace();
            } else {
                System.out.println(v);
            }

        });
        System.out.println("---async rpc call over---");
        // rpcClient.close();

    }

}

代码1首先发起一次远程调用,该调用马上返回future1;然后代码2又发起一次远程调用,该调用也马上返回future2对象;代码3则基于CompletableFuture的能力,意在让future1和future2都有结果后再基于两者的结果做一件事情(这里是拼接两者结果返回),并返回一个获取回调结果的新的future。

代码4基于新的future,等其结果产生后,执行新的回调函数,进行结果打印或者异常打印。

最后我们看如何把异步调用改造为Reactive编程风格,这里基于RxJava让异步调用返回结果为Flowable,其实我们只需要把返回的CompletableFuture转换为Flowable即可,可以在RpcClient里面新增一个方法:

// 异步转反应式
public Flowable<String> rpcAsyncCallFlowable(String msg) {
    // 1.1 使用defer操作,当订阅时候在执行rpc调用
    return Flowable.defer(() -> {
        // 1.2创建含有一个元素的流
        final ReplayProcessor<String> flowable = ReplayProcessor.createWithSize(1);
        // 1.3具体执行RPC调用
        CompletableFuture<String> future = rpcAsyncCall(msg);
        // 1.4等rpc结果返回后设置结果到流对象
        future.whenComplete((v, t) -> {
            if (t != null) {// 1.4.1结果异常则发射错误信息
                flowable.onError(t);
            } else {// 1.4.2结果OK,则发射出rpc返回结果
                flowable.onNext(v);
                // 1.4.3结束流
                flowable.onComplete();
            }
        });
        return flowable;
    });
}

如上代码由于CompletableFuture是可以设置回调函数的,所以把其转换为Reactive风格编程很容易。

然后我们可以使用下面代码进行测试:

public class TestModelAsyncRpcReactive {
    // 1.创建rpc客户端
    private static final RpcClient rpcClient = new RpcClient();

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 2.发起远程调用异步,并注册回调,马上返回
        Flowable<String> result = rpcClient.rpcAsyncCallFlowable("who are you");
        //3.订阅流对象
        result.subscribe(/* onNext */r -> {
            System.out.println(Thread.currentThread().getName() + ":" + r);
        }, /* onError */error -> {
            System.out.println(Thread.currentThread().getName() + "error:" + error.getLocalizedMessage());
        });

        System.out.println("---async rpc call over");
    }
}

如上代码,发起rpc调用后马上返回了一个Flowable流对象,但这时真正的rpc调用还没有发出去,等代码3订阅了流对象时才真正发起rpc调用。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/989213.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Milvus Cloud扩展变更:为向量数据库注入前沿增强功能

在向量数据库的不断变化中,Milvus Cloud已成为一个改变游戏规则的先锋,革新了我们存储、搜索和分析复杂向量数据的方式。通过最新版本的Milvus Cloud2.3.0,引入了一系列重要的增强和修改,为更强大、更高效的向量数据库解决方案铺平了道路。在本文中,我们将深入探讨Milvus …

自然语言处理-词向量模型-Word2Vec

通常数据的维度越高&#xff0c;能提供的信息也就越多&#xff0c;从而计算结果的可靠性就更值得信赖 如何来描述语言的特征呢&#xff0c;通常都在词的层面上构建特征&#xff0c;Word2Vec就是要把词转换成向量 假设现在已经拿到一份训练好的词向量&#xff0c;其中每一个词都…

Layui自定义列表多选

需求&#xff1a;已回访不需要分配&#xff0c;未回访的可多选分配 效果图&#xff1a; 实现方式 1&#xff0c;自定义复选 2&#xff0c;已回访的框去掉 3&#xff0c;自己写全选方法 注意&#xff1a;要想进方法一定要写lay-filter&#xff0c;才能触发方法&#xff0…

盘点使用代理IP时常会遇到的HTTP代理错误代码

如今&#xff0c;随着全球化的深入发展&#xff0c;越来越多的企业开始向海外拓展业务。跨境电商、海外营销等业务的兴起&#xff0c;使人们对HTTP代理的需求量越来越大。然而&#xff0c;在使用HTTP代理的过程中&#xff0c;常常会遇到各种错误代码&#xff0c;这些错误代码产…

Linux:进程(概念)

学习目标 1.认识冯诺依曼系统 2.认识操作系统概念与定位 (系统调用接口) 3.理解进程的概念&#xff08;PCB&#xff09; 4.理解进程的状态&#xff08;fork创建进程&#xff0c;僵尸进程及孤儿进程&#xff09; 5.了解进程的调度&#xff08;优先级&#xff0c;竞争性&#xff…

2023-2024 人工智能专业毕设如何选题

文章目录 0 简介1 如何选题2 最新毕设选题3 最后 0 简介 学长搜集分享最新的人工智能专业毕设选题&#xff0c;难度适中&#xff0c;适合作为毕业设计&#xff0c;大家参考。 学长整理的题目标准&#xff1a; 相对容易工作量达标题目新颖 1 如何选题 最近非常多的学弟学妹问…

基于单片机压力传感器MPX4115检测-报警系统-proteus仿真-源程序

一、系统方案 本设计采用52单片机作为主控器&#xff0c;液晶1602显示&#xff0c;MPX4115检测压力&#xff0c;按键设置报警&#xff0c;LED报警。 二、硬件设计 原理图如下&#xff1a; 三、单片机软件设计 1、首先是系统初始化 /***************************************…

类,这一篇文章你就懂了!

提示&#xff1a;本文主要介绍C中类相关知识及基础概念总结 渺渺何所似&#xff0c;天地一沙鸥 文章目录 一、面向对象与面向过程二、类的框架知识2.1 类的定义2.2 类的封装性2.2.1 访问限定符2.2.2 封装的概念以及实现 2.3 类的作用域及实例化2.4 类中this指针 三、六大默认成…

使用feign时提供者从request中获取不到参数值解决方法

使用feign时提供者从request中获取不到参数值解决方法 问题&#xff1a;分析&#xff1a;解决办法 问题&#xff1a; 在开发中&#xff0c;A项目需要在后端调用B项目的feign接口。B项目原来的参数接收都是在httpServletRequest里面&#xff0c;如下图: 我们知道feign接口是不…

6.Xaml CheckBox控件

1.运行图片 2.运行源码 a.xaml 源码 <Grid Name="Grid1"><!--IsThreeState="True" 这个为true的时候,有三个状态 第三个状态为什么也没有--><CheckBox Name=

Redis原理:动态字符串SDS

&#xff08;课程总结自b站黑马程序员课程&#xff09; 一、引言 Redis中保存的Key是字符串&#xff0c;value往往是字符串或者字符串的集合。可见字符串是Redis中最常用的一种数据结构。 不过Redis没有直接使用C语言中的字符串&#xff0c;因为C语言字符串存在很多问题&…

FD2257H 带有嵌入式霍尔传感器的智能电机驱动器芯片

FD2257H 带有嵌入式霍尔传感器的智能电机驱动器芯片 特征 电机驱动器与高灵敏度霍尔效应传感器 h桥MOS驱动器 锁关闭保护和自动重启功能 “软开关“相切换技术减少振动和声噪音 热停机保护&#xff08;TSD&#xff09; 可在SIP-4L包 12V或24V DC电机/风扇系统 一般说明 FD2257H…

Day8:浅谈useMemo

「目标」: 持续输出&#xff01;每日分享关于web前端常见知识、面试题、性能优化、新技术等方面的内容。 Day8-今日话题 useMemo 是 React 中一个有力的性能优化Hook。可从「用法」、「工作原理」、「作用」、「优缺点」、「使用场景」、「使用注意点」进行学习、复习。 拿vue作…

【LangChain系列 5】Prompt模版——特征库

原文地址&#xff1a;【LangChain系列 5】Prompt模版——特征库 本文速读&#xff1a; Feast Featureform 特征库(feature stores)是传统机器学习中的一个概念&#xff0c;本质上是数据&#xff0c;不同维度/属性的数据&#xff0c;而且可以保持同步更新&#xff0c;所以可以…

React中函数式组件与类组件有何不同?

Function Component 与 Class Component 有何不同 目录 Function Component 与 Class Component 有何不同 文章核心观点&#xff1a; 解释一下&#xff1a; 总结&#xff1a; 文章核心观点&#xff1a; Function components capture the rendered values.函数式组件捕获…

【PyQT5教程】-02-UI组件

1.按钮 QtWidgets模块提供了多种按钮类&#xff0c;让你可以轻松地创建各种类型的按钮 1.1 QPushButton&#xff08;普通按钮&#xff09; QPushButton是PyQt5中最常见的按钮类型之一&#xff0c;用于触发动作或执行操作。通过信号与槽机制&#xff0c;你可以将按钮的点击事…

性能测试 —— 生成html测试报告、参数化、jvm监控

1.生成HTML的测试报告 1.1配置 (1)找到jmeter 的安装目录&#xff0c;下的bin中的jmeter.properties&#xff08;jmeter配置文件&#xff09; (2) ctrl f &#xff0c;搜索jmeter.save.saveservice.output_format&#xff0c;取消井号 并且 把等号后的xml改为csv&#xff0c;…

基于Intel优化的淡水养殖水质溯源方案

基于AI的淡水养殖水质溯源、优化系统方案 前言一、核心痛点及关键需求1.政策引导及产业升级2.紧跟时事及供给变化3.品牌打造&#xff0c;重拾消费者信赖4.特色生态新模式 二、方案设计1.水质溯源档案2.数字孪生系统3.基于intel AI 水质预测算法 三、实践案例1、方案简述2、数据…

聊聊低代码的全栈开发能力

一、前言 低代码的热度持续提升&#xff0c;最明显的举动就是资本真金白银的投资。 阿里推出“云钉一体”战略&#xff0c;为企业提供全生命周期的IT解决文案&#xff1b;腾讯将各个事业部的低代码平台进行整合&#xff0c;推出了OTeam平台。网易有数帆轻舟低代码平台&#xff…

CentOS7无法连接网络 右上角网络图标消失

在使用 linux 的过程中&#xff0c;有时会出现网络图标消失的问题&#xff0c;这时系统会没有网络。 有些 linux 的网络连接由 NetworkManager 管理&#xff0c; 问题应由它解决。 先执行一下 systemctl restart NetworkManager 看有没有效果。 原因一 &#xff1a;NetworkMan…