Netty学习(一)——基础组件

news2025/1/14 1:10:09

根据黑马程序员netty视频教程学习所做笔记。

笔记demo:https://gitee.com/jobim/netty_learn_demo.git

参考博客:https://blog.csdn.net/cl939974883/article/details/122550345

一、概述

1.1 什么是Netty

Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.Copy

Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端

  • 异步是一种独特的网络模型,在这里的指的是调用时的异步与异步IO不同(netty使用多线程来完成方法的调用和处理结果相分离),是指的方法调用和处理结果交由多个线程来进行处理的方式(调用方法的线程可以腾出手来做其他的事情),依旧是基于多路复用。
  • 事件驱动指的是底层采用的是多路复用技术,也就是selector,当发生响应请求时才会被处理

1.2 Netty的地位

Netty 在 Java 网络应用框架中的地位就好比:Spring 框架在 JavaEE 开发中的地位

以下的框架都使用了 Netty,因为它们有网络通信需求!

  • Cassandra - nosql 数据库
  • Spark - 大数据分布式计算框架
  • Hadoop - 大数据分布式存储框架
  • RocketMQ - ali 开源的消息队列
  • ElasticSearch - 搜索引擎
  • gRPC - rpc 框架
  • Dubbo - rpc 框架
  • Spring 5.x - flux api 完全抛弃了 tomcat ,使用 netty 作为服务器端
  • Zookeeper - 分布式协调框架

1.3 Netty的优势

  • Netty vs NIO
    • 传统NIO,工作量大,bug 多。需要自己构建协议
    • epoll 空轮询导致 CPU 100%。(netty通过一些方式解决了这个bug)
    • Netty,解决 TCP 传输问题,如粘包、半包
    • Netty对 API 进行增强,使之更易用,如 FastThreadLocal => ThreadLocal,ByteBuf => ByteBuffer
  • Netty vs 其它网络应用框架
    • Mina 由 apache 维护,将来 3.x 版本可能会有较大重构,破坏 API 向下兼容性,Netty 的开发迭代更迅速,API 更简洁、文档更优秀
    • 久经考验,16年,Netty 版本
      • 2.x 2004
      • 3.x 2008
      • 4.x 2013
      • 5.x 已废弃(没有明显的性能提升,维护成本高)

二、netty入门程序Hello World

2.1 代码示例

目标:客户端向服务端发送一个"helloworld",服务器进行接收打印!

加入依赖:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.39.Final</version>
</dependency>

服务器端:

public class HelloServer {
    public static void main(String[] args) {
        // 1、启动器,负责装配netty组件,启动服务器
        new ServerBootstrap()
                // 2、创建 NioEventLoopGroup,可以简单理解为 线程池 + Selector
                .group(new NioEventLoopGroup())
                // 3、选择服务器的ServerSocketChannel实现。其中 NioServerSocketChannel 表示基于 NIO 的服务器端实现,其它实现还有
                .channel(NioServerSocketChannel.class)
                // 4、child 负责处理读写,该方法决定了 child 执行哪些操作
          		// 是接下来添加的处理器都是给 SocketChannel 用的,而不是给 ServerSocketChannel
            	// ChannelInitializer 处理器(仅执行一次)
            	// 它的作用是待客户端SocketChannel建立连接后,执行initChannel以便添加更多的处理器
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        System.out.println("initChannel");
                        // 5、SocketChannel的处理器,使用StringDecoder解码,ByteBuf=>String
                        nioSocketChannel.pipeline().addLast(new StringDecoder());
                        // 6、SocketChannel的业务处理,使用上一个处理器的处理结果
                        nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
                                System.out.println(s);
                            }
                        });
                    }
                    // 7、ServerSocketChannel绑定8080端口
                }).bind(8080);
    }
}

客户端代码:

public class HelloClient {
    public static void main(String[] args) throws InterruptedException {
        new Bootstrap()
          			// 创建 NioEventLoopGroup,同 Server
                .group(new NioEventLoopGroup())
                // 选择客户 Socket 实现类,NioSocketChannel 表示基于 NIO 的客户端实现
                .channel(NioSocketChannel.class)
                // ChannelInitializer 处理器(仅执行一次)
                // 它的作用是待客户端SocketChannel建立连接后,执行initChannel以便添加更多的处理器
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel channel) throws Exception {
                        System.out.println("initChannel");
                        // 消息会经过通道 handler 处理,这里是将 String => ByteBuf 编码发出
                        channel.pipeline().addLast(new StringEncoder());
                    }
                })
                // 指定要连接的服务器和端口
                .connect(new InetSocketAddress("localhost", 8080))
                // Netty 中很多方法都是异步的,如 connect
                // 这时需要使用 sync 方法等待 connect 建立连接完毕
                .sync()
                // 获取 channel 对象,它即为通道抽象,可以进行数据读写操作
                .channel()
                // 写入消息并清空缓冲区
                .writeAndFlush("hello world");
    }
}

执行结果:

image-20240619224824799

2.2 流程梳理

step1:服务端server启动:

  • 首先会创建group组
  • 接着指定channel实现类(这里是serversocketchannel,其中会处理accept()事件),并且来添加一些handler处理器。这里的添加的是初始化handler,该handler会在客户端发起连接时执行初始化操作也就是方法内内容。
  • 监听端口。

step2:客户端client启动

  • 同样创建group组。
  • 指定连接的channel。同样也添加了一个初始化处理器,该处理器同样也在连接建立之后会被执行init方法。
  • 执行connect(),发起连接(下面经过debug测试)
    • 首先触发自己客户端的initChannel()事件执行初始化,这里添加了一个编码器(用于将发送的字符串=>ByteBuf传输出去)
    • 接着触发server的initchannel来为pipeline(流水线)添加一些必要工序操作,这里添加了一个字符串解码器(用于接收客户端数据后将ByteBuf=>String);还有一个是InBound适配器,可进行一系列事件的自定义重写,这里的话重写了read()事件,之后客户端发送数据就会执行我们自定义的内容。
  • 紧接着连接完毕之后sync()取到连接对象也就是之前定义的NioSocketChannel,取到之后向服务器发送一个字符串
    • 发送过程中会先走StringEncoder中的编码方法,将String=>ByteBuf之后发送出去
    • 接着服务端的read()事件接收好之后,同样也会走StringDecoder中的解码方法,将ByteBuf=>String,接着会执行channelRead()方法,其中的msg就是转换之后的字符串,我们这里仅仅只是打印即可!

组件介绍:

  • 把 channel 理解为数据的通道
  • 把 msg 理解为流动的数据,最开始输入是 ByteBuf,但经过 pipeline 的加工,会变成其它类型对象,最后输出又变成 ByteBuf
  • 把 handler 理解为数据的处理工序
    • 工序有多道,合在一起就是 pipeline,pipeline 负责发布事件(读、读取完成…)传播给每个 handler, handler 对自己感兴趣的事件进行处理(重写了相应事件处理方法)
    • handler 分 Inbound 和 Outbound 两类
  • 把 eventLoop 理解为处理数据的工人
    • 工人可以管理多个 channel 的 io 操作,并且一旦工人负责了某个 channel,就要负责到底(绑定)
    • 工人既可以执行 io 操作,也可以进行任务处理,每位工人有任务队列,队列里可以堆放多个 channel 的待处理任务,任务分为普通任务、定时任务
    • 工人按照 pipeline 顺序,依次按照 handler 的规划(代码)处理数据,可以为每道工序指定不同的工人

三、组件

3.1 EventLoop

3.1.1、认识EventLoop和EventLoopGroup

事件循环对象 EventLoop

  • EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理一个或多个 Channel 上源源不断的 io 事件

  • image-20240620231402480

  • 它的继承关系如下

    • 继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法

    • 继承自 netty 自己的 OrderedEventExecutor

      • 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
        • 提供了 EventLoopGroup parent() 方法来看看自己属于哪个 EventLoopGroup

事件循环组 EventLoopGroup

  • EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)
  • 继承自 netty 自己的 EventExecutorGroup
    • 实现了 Iterable 接口提供遍历 EventLoop 的能力
    • 另有 next 方法获取集合中下一个 EventLoop
3.1.2、执行普通、定时任务

目的:通过NioEventLoopGroup事件循环组来去执行普通和定时任务。

public class TestEventLoop {
    public static void main(String[] args) {
        // 创建拥有两个EventLoop的NioEventLoopGroup,对应两个线程
        EventLoopGroup group = new NioEventLoopGroup(2);
        // 通过next方法可以获得下一个 EventLoop
        System.out.println(group.next());
        System.out.println(group.next());
        // 第三次调用的和第一次一样(初始化两个的情况)
        System.out.println(group.next());

        // 通过EventLoop执行普通任务
        group.next().execute(()->{
            System.out.println(Thread.currentThread().getName() + " hello");
        });

        // 通过EventLoop执行定时任务
        group.next().scheduleAtFixedRate(()->{
            System.out.println(Thread.currentThread().getName() + " hello2");
        }, 0, 1, TimeUnit.SECONDS);
        
        // 优雅地关闭
        // group.shutdownGracefully();
    }
}

结果:

image-20240620232314965

对于demo中主线程结束了还能运行的原因是,线程中开辟的用户线程依旧在运行中。

  • 分析:ThreadPoolExecutor中的runWorker方法里有一个getTask()方法,该方法不断从队列中拿任务执行,没有就阻塞,这也就是为什么主线程结束了,程序依旧在运行中的原因。

关闭 EventLoopGroup:

优雅关闭 shutdownGracefully 方法。该方法会首先切换 EventLoopGroup 到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的

3.1.3、执行IO任务(含2点细化)

服务端代码:

public class MyIOServer {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
                            }
                        });
                    }
                })
                .bind(8080);
    }
}

客户端代码:

public class MyIOClient {
    public static void main(String[] args) throws IOException, InterruptedException {
        Channel channel = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080))
                .sync()
                .channel();
        System.out.println(channel);
        channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("zhangsan3".getBytes()));
        Thread.sleep(2000);
        channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("lisi3".getBytes()));
        // 此处打断点调试,调用 channel.writeAndFlush(...);
        System.in.read();
    }
}

多个客户端执行:分别修改发送字符串为 zhangsan1(第一次),zhangsan2(第二次),zhangsan3(第三次)

image-20240620233455102

每当来临一个连接,此时就会将该channel去绑定到指定的一个EventLoop中的selector中,每个NioEventLoop都是一个线程,之后该channel的其他事件都由这个EventLoop来去处理执行,这就与我们之前手写多线程NIO多路复用的思路完全一致

image-20240620233545686

分工细化

  1. Bootstrap的group()方法可以传入两个EventLoopGroup参数,分别负责处理不同的事件。

    • Boos、worker各指定一个组,Boos只负责serversocketchannel的accept监听,worker负责建立连接后得到的channel均衡绑定到各个eventloop的selector上。
  2. 若是执行handler中间有一些较耗时的操作,那么可以添加一个新的handler并交由一个处理普通事件的eventloop来进行异步处理!

/**
 * 1:细化工作组。
 * 2:耗时较长的任务交给指定组进行异步执行
 */
public class MyServerV2 {
    public static void main(String[] args) {
        // 增加自定义的非NioEventLoopGroup
        EventLoopGroup group = new DefaultEventLoopGroup();
        
        new ServerBootstrap()
                // 两个Group,分别为Boss 负责Accept事件,Worker 负责读写事件
                //1. Boss对应一个组(不用传递参数也没事),负责NioServerSocketChannel的accept监听;
                //          worker对应一个组,之后来临连接的channel都会绑定其某个EventLoop
                .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // 增加两个handler,第一个使用NioEventLoopGroup处理,第二个使用自定义EventLoopGroup处理
                        socketChannel.pipeline().addLast("nioHandler",new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
                                // 调用下一个handler
                                ctx.fireChannelRead(msg);
                            }
                        })
                        // 2.该handler绑定自定义的Group
                        .addLast(group, "myHandler", new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
                            }
                        });
                    }
                })
                .bind(8080);
    }
}

执行结果:分别修改发送字符串为 zhangsan1(第一次),zhangsan2(第二次),zhangsan3(第三次)

image-20240620235326283

可以看出,客户端与服务器之间的事件,被nioEventLoopGroup和defaultEventLoopGroup分别处理

image-20240620235343399

3.1.4、源码分析(不同eventLoop,线程如何切换)

关键代码 io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    // 下一个 handler 的事件循环是否与当前的事件循环是同一个线程
    EventExecutor executor = next.executor();// 返回下一个handler的eventLoop
    
    // 当前handler中的线程,是否和eventLoop是同一个线程
    if (executor.inEventLoop()) {
      	// 是,直接调用
        next.invokeChannelRead(m);
    } 
    // 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)
    else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}
  • 如果两个 handler 绑定的是同一个线程,那么就直接调用
  • 否则,把要调用的代码封装为一个任务对象,由下一个 handler 的线程来调用

3.2 Channel

Channel 的常用方法:

  • close() 可以用来关闭Channel
  • closeFuture() 用来处理 Channel 的关闭
    • sync 方法作用是同步等待 Channel 关闭
    • 而 addListener 方法是异步等待 Channel 关闭
  • pipeline() 方法用于添加处理器
  • write() 方法将数据写入
    • 因为缓冲机制,数据被写入到 Channel 中以后,不会立即被发送
    • 只有当缓冲满了或者调用了flush()方法后,才会将数据通过 Channel 发送出去
  • writeAndFlush() 方法将数据写入并立即发送(刷出)

3.2.1 ChannelFuture连接建立(同步、异步)

ChannelFuture作用:专门用于记录异步方法状态的返回结果。

public class MyClient {
    public static void main(String[] args) throws IOException, InterruptedException {
        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                // 该方法为异步非阻塞方法,主线程调用后不会被阻塞,真正去执行连接操作的是NIO线程
            	// NIO线程:NioEventLoop 中的线程
                .connect(new InetSocketAddress("localhost", 8080));
        
        // 该方法用于等待连接真正建立
        channelFuture.sync();
        
        // 获取客户端-服务器之间的Channel对象
        Channel channel = channelFuture.channel();
        channel.writeAndFlush("hello world");
        System.in.read();
    }
}

如果我们去掉channelFuture.sync()方法,会服务器无法收到hello world

这是因为建立连接(connect)的过程是异步非阻塞的,若不通过sync()方法阻塞主线程(调用线程),等待连接真正建立,这时通过 channelFuture.channel() 拿到的 Channel 对象,还未真正与服务器建立好连接,也就没法将信息正确的传输给服务器端

解决方法:

  • 方法一:所以需要通过channelFuture.sync()方法,阻塞主线程,同步处理结果,等待连接真正建立好以后,再去获得 Channel 传递数据。使用该方法,获取 Channel 和发送数据的线程都是主线程

    //方式一:同步阻塞等待连接
    //阻塞方法,直到连接建立之后再会停止阻塞继续向下执行。
    // 若是不调用该方法,直接去获取channel来发送数据,很有可能因为没有建立好连接导致发送失败
    channelFuture.sync();//底层源码保护性暂停,主线程await(),另一个线程创建成功之后唤醒
    Channel channel = channelFuture.channel();
    log.info("channel {}",channel);
    //测试:channel.writeAndFlush("hello")
    
    
  • 方法二:用于异步获取建立连接后的 Channel 和发送数据,使得执行这些操作的线程是 NIO(eventLoop) 线程(去执行connect操作的线程)

    //方式二:添加一个监听器,来异步处理结果
    channelFuture.addListener(new ChannelFutureListener() {
        //当连接完成就会执行该回调方法:执行完成事件,其中channelFuture就是本身对象
        @Override
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            Channel channel = channelFuture.channel();
            log.info("channel {}",channel);
            channel.writeAndFlush("hello!");
        }
    });
    
    

代码示例:

public class MyClient {
    public static void main(String[] args) throws IOException, InterruptedException {
        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                // 1.链接到服务器
                // 该方法为异步非阻塞方法,主线程调用后不会被阻塞,真正去执行连接操作的是NIO线程(NioEventLoop 中的线程)
                .connect(new InetSocketAddress("localhost", 8080));

        // 2.1 使用sync方法同步处理结果
        // 该方法用于等待连接真正建立
        channelFuture.sync();
        // 获取客户端-服务器之间的Channel对象
        Channel channel = channelFuture.channel();
        channel.writeAndFlush("hello world");
        System.in.read();

        // 2.2 使用addListener(回调对象)方法一步处理结果
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            // 当connect方法执行完毕后,也就是连接真正建立后,会在NIO线程中调用operationComplete方法
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                Channel channel = channelFuture.channel();
                channel.writeAndFlush("hello world");
            }
        });
        System.in.read();
    }
}

3.2.2 CloseFuture连接关闭(同步、异步)

当我们要关闭channel时,可以调用channel.close()方法进行关闭。但是该方法也是一个异步方法,调用方法返回一个ChannelFuture。真正的关闭操作并不是在调用该方法的线程中执行的,而是在NIO线程中执行真正的关闭操作

如果我们想在channel真正关闭以后,执行一些额外的操作,可以选择以下两种方法来实现

  • 通过channel.closeFuture()方法获得对应的ChannelFuture对象,然后调用sync()方法阻塞执行操作的线程,等待channel真正关闭后,再执行其他操作

    // 获得closeFuture对象
    ChannelFuture closeFuture = channel.closeFuture();
    
    // 同步等待NIO线程执行完close操作
    closeFuture.sync();
    // 关闭EventLoopGroup
    group.shutdownGracefully();
    
  • 调用closeFuture.addListener方法,添加close的后续操作

    closeFuture.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            // 等待channel关闭后才执行的操作
            System.out.println("关闭之后执行一些额外操作...");
            // 关闭EventLoopGroup
            group.shutdownGracefully();
        }
    });
    

代码示例:

public class CloseFutureClient {
    public static void main(String[] args) throws InterruptedException {
        // 创建EventLoopGroup,使用完毕后关闭
        NioEventLoopGroup group = new NioEventLoopGroup();

        ChannelFuture channelFuture = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // 可以打印channel的一些运行流程、状态信息。需要配置logback
                        socketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                        socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080));
        channelFuture.sync();

        Channel channel = channelFuture.channel();
        Scanner scanner = new Scanner(System.in);

        // 创建一个线程用于输入并向服务器发送
        new Thread(()->{
            while (true) {
                String msg = scanner.next();
                if ("q".equals(msg)) {
                    // 关闭操作是异步的,在NIO线程中执行
                    channel.close();
                    break;
                }
                channel.writeAndFlush(msg);
            }
        }, "inputThread").start();

        // 获得closeFuture对象
        ChannelFuture closeFuture = channel.closeFuture();
        System.out.println("waiting close...");

        // 方式一:同步等待NIO线程执行完close操作
        closeFuture.sync();
        // 关闭之后执行一些操作,可以保证执行的操作一定是在channel关闭以后执行的
        System.out.println("关闭之后执行一些额外操作...");
        // 关闭EventLoopGroup
        group.shutdownGracefully();

        //方式2:异步处理关闭结果
        // closeFuture.addListener(new ChannelFutureListener() {
        //     @Override
        //     public void operationComplete(ChannelFuture future) throws Exception {
        //         System.out.println("处理关闭之后的操作");
        //         // 关闭EventLoopGroup
        //         group.shutdownGracefully();
        //     }
        // });

    }
}

3.2.3 为什么netty要用异步?异步提升了什么?

结论:对每个操作步骤进行合理的拆解并且通过多线程+异步执行,在一定时间内能够提升吞吐量,但是对于总体响应时间不减反增。(这里吞吐量实际上我们可以看成来建立连接处理的个数!)

3.3 Future & Promise

在异步处理时,经常用到这两个接口。netty的future继承了JDK的future;netty的promise继承了netty的future。

3.3.1、介绍Future与Promise

说明: netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展

  • jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果
  • netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
  • netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称jdk Futurenetty FuturePromise
cancel取消任务--
isCanceled任务是否取消--
isDone任务是否完成,不能区分成功失败--
get获取任务结果,阻塞等待--
getNow-获取任务结果,非阻塞,还未产生结果时返回 null-
await-等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断-
sync-等待任务结束,如果任务失败,抛出异常-
isSuccess-判断任务是否成功-
cause-获取失败信息,非阻塞,如果没有失败,返回null-
addLinstener-添加回调,异步接收结果-
setSuccess--设置成功结果
setFailure--设置失败结果

本质都是等待唤醒机制,这个机制一个应用就是保护性暂停,另一个就是生产者消费者,都是线程通信。

额外:

  • 对于promise,netty比es6出来早
  • jdk中的future不能够区分任务是成功还是失败!
  • future就是在线程间传递一个结果或者传递一个数据的容器。
  • 该future中的数据是由执行任务的线程来进行填充进去的,我们自己没有机会去填,之后我们可以使用promise来去自己填充进去!

3.3.2、JDK的Future示例(线程间取值)

案例目的:主线程中获取线程池中某个线程处理任务的结果!

public class JdkFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadFactory factory = new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "JdkFuture");
            }
        };
        // 创建线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10,10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), factory);

        // 获得Future对象
        Future<Integer> future = executor.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                TimeUnit.SECONDS.sleep(1);
                return 50;
            }
        });

        // 通过阻塞的方式,获得运行结果
        System.out.println("运行结果:"+future.get());
    }
}

3.3.3、netty的Future示例(同步、异步)

public class NettyFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();

        // 获得 EventLoop 对象
        EventLoop eventLoop = group.next();
        Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println(Thread.currentThread().getName() + " 执行任务...");
                return 50;
            }
        });

        // 主线程中获取结果
        System.out.println(Thread.currentThread().getName() + " 获取结果");
        // 获取任务结果,非阻塞,还未产生结果时返回 null
        System.out.println("getNow " + future.getNow());



        // 方式一:同步取得结果(主线程阻塞获取)
        // System.out.println("get " + future.get());

        // NIO线程中异步获取结果
        future.addListener(new GenericFutureListener<Future<? super Integer>>() {
            @Override
            public void operationComplete(Future<? super Integer> future) throws Exception {
                System.out.println(Thread.currentThread().getName() + " 获取结果");
                System.out.println("getNow " + future.getNow());
            }
        });
    }
}

同步方法执行:

image-20240623162005117

异步方法执行:可以看到执行任务和获取结果都是同一个线程处理

image-20240623161943016

3.3.4、netty的promise示例

描述

  • 前面的future不能主动来装数据
  • 使用promise可以准确的知道数据是处理正常还是异常!
  • 开发网络框架,例如RPC,Promise的重要性比较大
  • setSuccess()表示结果正确,setFailure(e)表示结果不正确会抛出异常!

案例目的:通过使用promise来去表示执行某个任务的结果是成功还是失败!主线程可以来进行接收。

public class NettyPromise {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建EventLoop
        NioEventLoopGroup group = new NioEventLoopGroup();
        EventLoop eventLoop = group.next();

        // 创建Promise对象,用于存放结果
        DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
        new Thread(()->{
            try {
                TimeUnit.SECONDS.sleep(1);
                // 自定义线程向Promise中存放结果
                promise.setSuccess(50);
            } catch (Exception e) {
                e.printStackTrace();
                promise.setFailure(e);
            }
        }).start();

        // 主线程从Promise中获取结果(如果调用setFailure设置的会抛出异常)
        System.out.println(Thread.currentThread().getName() + " " + promise.get());
    }
}

执行结果:

image-20240623163159363

3.4 handler & pipeline

pipeline:类似于流水线,handler则是一道道工序,流动的内容就是要处理的数据。

handler:handler是最为重要的,之后编写一些业务我们都直接在handler中进行,并且在netty中包含了许多内置的handler给我们简化工作(例如netty提供的StringEncoder是OutBoundHandler,StringDecode是InBoundHandler,日志new LoggingHandler()若是使用了logback需要进行额外配置)。

3.4.1 入站、出站handler执行顺序

ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline

  • 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果
  • 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工

服务端代码:

public class PipeLineServer {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel socketChannel) throws Exception {
                        // 在socketChannel的pipeline中添加handler
                        // pipeline中handler是带有head与tail节点的双向链表,的实际结构为
    				 	// head <-> handler1 <-> ... <-> handler4 <->tail
                        // Inbound主要处理入站操作,一般为读操作,发生入站操作时会触发Inbound方法
                        // 入站时,handler是从head向后调用的
                        socketChannel.pipeline().addLast("inboundHandler1" ,new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                System.out.println(Thread.currentThread().getName() + " Inbound handler 1");
                                // 父类该方法内部会调用fireChannelRead
                                // 将数据传递给下一个入栈处理器handler。如果不调用,调用链会断开
                                super.channelRead(ctx, msg);// 1
                            }
                        });
                        socketChannel.pipeline().addLast("inboundHandler2", new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                System.out.println(Thread.currentThread().getName() + " Inbound handler 2");
                                super.channelRead(ctx, msg);//2
                            }
                        });
                        socketChannel.pipeline().addLast("inboundHandler3", new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                System.out.println(Thread.currentThread().getName() + " Inbound handler 3");
                                // 执行write操作,使得Outbound的方法能够得到调用
                                socketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes("Server...".getBytes(StandardCharsets.UTF_8)));//3
                                // 会触发Outbound操作,此时是从当前handler向前寻找OutboundHandler
                                // 如果是最后一个入栈处理器可以不用执行 super.channelRead(ctx, msg)
                                // super.channelRead(ctx, msg);
                            }
                        });
                        // Outbound主要处理出站操作,一般为写操作,发生出站操作时会触发Outbound方法
                        // 出站时,handler的调用是从tail向前调用的
                        socketChannel.pipeline().addLast("outboundHandler4" ,new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                System.out.println(Thread.currentThread().getName() + " Outbound handler 1");
                                super.write(ctx, msg, promise);//4
                            }
                        });
                        socketChannel.pipeline().addLast("outboundHandler5" ,new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                System.out.println(Thread.currentThread().getName() + " Outbound handler 2");
                                super.write(ctx, msg, promise);//5
                            }
                        });
                        socketChannel.pipeline().addLast("outboundHandler6" ,new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                System.out.println(Thread.currentThread().getName() + " Outbound handler 3");
                                super.write(ctx, msg, promise);//6
                            }
                        });
                    }
                })
                .bind(8080);
    }
}

执行结果:

image-20240623171458487

可以看到,ChannelInboundHandlerAdapter 是按照 addLast 的顺序执行的,而 ChannelOutboundHandlerAdapter 是按照 addLast 的逆序执行的。

  • ChannelPipeline结构是一个带有head与tail指针的双向链表,其中的节点为handler。要通过ctx.fireChannelRead(msg)等方法,将当前handler的处理结果传递给下一个handler
  • 当有入站(Inbound)操作时,会从head开始向后调用handler,直到handler不是处理Inbound操作为止
  • 当有出站(Outbound)操作时,会从tail开始向前调用handler,直到handler不是处理Outbound操作为止

image-20240623172053482

  • 入站处理器中,ctx.fireChannelRead(msg) 是 调用下一个入站处理器
    • 如果注释掉 1 处代码,则仅会打印 1
    • 如果注释掉 2 处代码,则仅会打印 1 2
  • 3 处的 ctx.channel().write(msg) 会 从尾部开始触发 后续出站处理器的执行
    • 如果注释掉 3 处代码,则仅会打印 1 2 3
  • 类似的,出站处理器中,ctx.write(msg, promise) 的调用也会 触发上一个出站处理器
    • 如果注释掉 6 处代码,则仅会打印 1 2 3 6

3.4.2 InBoundHandler案例(加工数据)

  1. 若是想要InBoundHandler依次执行,那么需要调用一个super.channelRead(ctx, data);ctx.fireChannelRead(data);来进行调用下一个handler,前者源码实际就是调用的后者!
  2. handler之间可以传递数据,那么可以来使用多个handler可以进行对数据加工处理!
  3. 最后一个InBoundHandler不需要去调用super.channelRead了,因为已经是最后一个执行结果了!

案例目的:通过三个自定义InBoundHandler,来对Bytebuf 进行如Bytebuf -> String对象进行加工处理。

server:

public class InboundHandlerTest {
    public static void main(String[] args) throws InterruptedException {
        new ServerBootstrap()
                .group(new NioEventLoopGroup(),new NioEventLoopGroup(2))
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        //添加入站事件
                        //第一个handler:将ByteBuf => String
                        ch.pipeline().addLast("h1", new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                System.out.println("Inbound handler 1");
                                System.out.println("解析得到的数据:" + msg);
                                ByteBuf buf = (ByteBuf)msg;
                                final String data = buf.toString(Charsets.UTF_8);
                                super.channelRead(ctx, data);//方式一:执行下一个handler
                            }
                        });
                        //第二个handler:将String封装到Result对象中
                        ch.pipeline().addLast("h2", new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                System.out.println("Inbound handler 2");
                                System.out.println("解析得到的数据:" + msg);
                                ctx.fireChannelRead(msg);
                            }
                        });
                    }
                })
                .bind(8080)
                .sync();
        System.out.println("服务器启动成功!");
    }
}

client:

public class MyClient {
    public static void main(String[] args) throws IOException, InterruptedException {
        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                // 该方法为异步非阻塞方法,主线程调用后不会被阻塞,真正去执行连接操作的是NIO线程(NioEventLoop 中的线程)
                .connect(new InetSocketAddress("localhost", 8080));

        // 该方法用于等待连接真正建立
        channelFuture.sync();
        // 获取客户端-服务器之间的Channel对象
        Channel channel = channelFuture.channel();
        channel.writeAndFlush("hello world");
    }
}

image-20240624105951904

3.4.3 OutBoundHandler案例(不同对象发出数据效果不一致)

ctx.channel().write(msg) VS ctx.write(msg)

  1. 执行OutBoundHandler的顺序是从后往前依次执行的,对于使用channel来写或者ChannelHandlerContext来写handler的处理也有区别。
  2. 通过ChannelHandlerContext来发送数据效果,实际会从当前的handler向前开始依次执行handler来进行数据的额外处理,若是原本在该handler之后的boundhandler就不会被执行到!
  3. 通过channel来写数据,一定会从tail(最后一个handler)开始向前依次执行OutBoundHandler。
  4. 发送数据一定要发出去bytebuf,若是直接writeAndFlush(“字符串”),服务端不会接收到,除非再添加一个handler处理器也就是StringEncoder(),会将String转为ByteBuf。

Server:对比两种write()的执行顺序

public class OutBoundHandlerTest {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast("inboundHandler1" ,new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                System.out.println(Thread.currentThread().getName() + " Inbound handler 1");
                                super.channelRead(ctx, msg);
                                // 向客户端写数据
                                // 方式一:调用NioSocketChannel来进行发送数据。(从tail末尾向前依次执行outhandler)
                                // socketChannel.writeAndFlush("hello,client!");
                                // 方式二:调用ctx来进行发送数据。(从当前handler向前依次执行outhandler)
                                ctx.writeAndFlush("hello,client");
                            }
                        });
                        socketChannel.pipeline().addLast("outboundHandler4" ,new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                System.out.println(Thread.currentThread().getName() + " Outbound handler 1");
                                super.write(ctx, msg, promise);
                            }
                        });
                        socketChannel.pipeline().addLast("outboundHandler5" ,new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                System.out.println(Thread.currentThread().getName() + " Outbound handler 2");
                                super.write(ctx, msg, promise);
                            }
                        });
                        socketChannel.pipeline().addLast("outboundHandler6" ,new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                System.out.println(Thread.currentThread().getName() + " Outbound handler 3");
                                super.write(ctx, msg, promise);
                            }
                        });
                    }
                })
                .bind(8080);
    }
}

通过channel来发送数据效果

image-20240624114715117

通过ctx,也就是ChannelHandlerContext发送数据效果:

image-20240624114852036

image-20240624115419260

3.4.4 EmbeddedChannel

EmbeddedChannel可以用于测试各个handler,通过其构造函数按顺序传入需要测试handler,然后调用对应的Inbound和Outbound方法即可

public class TestEmbeddedChannel {
    public static void main(String[] args) {
        ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                System.out.println("1");
                super.channelRead(ctx, msg);
            }
        };

        ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                System.out.println("2");
                super.channelRead(ctx, msg);
            }
        };

        ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                System.out.println("3");
                super.write(ctx, msg, promise);
            }
        };

        ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                System.out.println("4");
                super.write(ctx, msg, promise);
            }
        };

        // 用于测试Handler的Channel
        EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
        
        // 执行Inbound操作 
        channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
        System.out.println("======");
        // 执行Outbound操作
        channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
    }
}

执行结果:

image-20240413181139427

3.5 ByteBuf

netty中的ByteBuf的容量可以动态扩容,相比较于在NIO中的ByteBuffer一旦指定初始容量之后就无法更改了!若是写入超过容量的数据则会出现覆盖的情况!

3.5.1 创建

public class ByteBufLearn {

    public static void main(String[] args) {
        // 创建byteBuf(直接内存,而不是堆内存),默认大小是256,通过该方式创建的ByteBuf对象会自动扩容,与nio的ByteBuffer不一样,nio达到融期限之后,会报错
        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(32);
        // 使用ByteBufAllocator创建直接内容,直接内容不需要考虑GC回收,默认获取的 byteBuf 便是直接内存,需要主动释放关闭
        // ByteBuf directBuffer = ByteBufAllocator.DEFAULT.directBuffer();
        // 使用ByteBufAllocator创建堆内存
        // ByteBuf heapBuffer = ByteBufAllocator.DEFAULT.heapBuffer();

        ByteBuffer byteBuffer = ByteBuffer.allocate(32);

        // 往byteBuf里面写入内容
        StringBuilder stringBuilder = new StringBuilder();
        for (int i = 0; i < 32; i++) {
            stringBuilder.append(i).append("-");
        }

        // 往ByteBuf对象中写入数据,当达到容量界限之后,会自动扩容
        byteBuf.writeBytes(stringBuilder.toString().getBytes(StandardCharsets.UTF_8));
        ByteBufUtil.log(byteBuf);

        // 往nio的byteBuffer对象中写入数据,达到界限之后,会抛出 BufferOverflowException 的异常
        byteBuffer.put(stringBuilder.toString().getBytes(StandardCharsets.UTF_8));

    }
}

运行结果:

image-20240624162720705

  • ByteBuf通过ByteBufAllocator选择allocator并调用对应的buffer()方法来创建的,默认使用直接内存作为ByteBuf,容量为256个字节,可以指定初始容量的大小

  • 当ByteBuf的容量无法容纳所有数据时,ByteBuf会进行扩容操作。而nio的byteBuffer对象中写入数据,达到界限之后,会抛出 BufferOverflowException 的异常

  • 如果在handler中创建ByteBuf,建议使用ChannelHandlerContext ctx.alloc().buffer()来创建

3.5.2 直接内存与堆内存

堆内存与直接内存区别:

  • 堆内存的分配效率比较高,但是读写内存的效率比较低

  • 直接内存分配效率比较低,但是读写效率高(少一次内存复制),适合配合池化功能一起用。直接内存使用的是系统内存

    • 直接内存使用的是系统内存,若是从磁盘中读取文件时会将数据直接读入到系统内存,那么系统内存呢就会用直接内存的方式映射到java内存中,java里面访问的和操作系统访问的是同一块内存,那么就可以减少一次内存的复制,所以读取效率会高于堆内存。
    • 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放

创建基于直接内存的ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(16);

创建基于堆的ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(16);

3.5.3 池化与非池化

池化的最大意义在于可以重用 ByteBuf,优点:

  • 没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力
  • 有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
  • 高并发时,池化功能更节约内存,减少内存溢出的可能

4.1 以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现

4.1 之前,池化功能还不成熟,默认是非池化实现

池化功能是否开启,可以通过下面的系统环境变量来设置

// unpooled:非池化 pooled:池化
-Dio.netty.allocator.type={unpooled|pooled}

3.5.4 组成

ByteBuf由四部分组成

image-20240624161139435

ByteBuf 是一个字节容器,容器里面的的数据分为四个部分

  • 第一个部分是已经丢弃的字节,这部分数据是无效的;(已经读过的内容)

  • 第二部分是可读字节,这部分数据是 ByteBuf 的主体数据, 从 ByteBuf 里面读取的数据都来自这一部分;(已经写入但还未读取的内容)

  • 第三部分数据是可写字节,所有写到 ByteBuf 的数据都会写到这一段;(剩余可写入数据的空间大小)

  • 最后一部分表示的是该 ByteBuf 最多还能扩容多少容量

四个重要属性:

  • readerIndex(读指针):指示读取的起始位置
  • writerIndex(写指针):指示写入的起始位置
  • capacity(当前容量):当前容量。当向 ByteBuf 写数据的时候,如果容量不足,那么这个时候可以进行扩容,直到 capacity 扩容到 maxCapacity,超过 maxCapacity 就会报错。
  • maxCapacity(最大容量):表示ByteBuf可以扩容的最大容量。

3.5.5 写入

方法列表,省略一些不重要的方法

方法签名含义备注
writeBoolean(boolean value)写入 boolean 值用一字节 01|00 代表 true|false
writeByte(int value)写入 byte 值
writeShort(int value)写入 short 值
writeInt(int value)写入 int 值Big Endian,即 0x250,写入后 00 00 02 50
writeIntLE(int value)写入 int 值Little Endian,即 0x250,写入后 50 02 00 00
writeLong(long value)写入 long 值
writeChar(int value)写入 char 值
writeFloat(float value)写入 float 值
writeDouble(double value)写入 double 值
writeBytes(ByteBuf src)写入 netty 的 ByteBuf
writeBytes(byte[] src)写入 byte[]
writeBytes(ByteBuffer src)写入 nio 的 ByteBuffer
int writeCharSequence(CharSequence sequence, Charset charset)写入字符串

注意

  • 这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用
  • 网络传输,默认习惯是 Big Endian

代码示例:

public class ByteBufStudy {
    public static void main(String[] args) {
        // 创建ByteBuf
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);
        ByteBufUtil.log(buffer);

        // 向buffer中写入数据
        buffer.writeBytes(new byte[]{1, 2, 3, 4});
        ByteBufUtil.log(buffer);

        buffer.writeInt(5);
        ByteBufUtil.log(buffer);

        buffer.writeIntLE(6);
        ByteBufUtil.log(buffer);

        buffer.writeLong(7);
        ByteBufUtil.log(buffer);
    }
}

运行结果:

image-20240624162239910

还有一类方法是 set 开头的一系列方法,也可以写入数据,但不会改变写指针位置

3.5.6 扩容

从上方写入数据示例根据结果可以看到,在如下的代码中产生了动态扩容操作

buffer.writeLong(7);
ByteBufUtil.log(buffer);

image-20240624154645292

扩容规则:

image-20240624164304928

  • 如果需要的容量等于门限阈值,则直接使用阈值作为新的缓存区容量。
  • 如果需要的容量大于阈值,则采用每次步进4MB的方式进行内存扩张,即将需要扩容值除以4MB后乘以4MB,然后将结果与最大容量进行比较,取其中的较小值作为目标容量。
  • 如果需要的容量小于阈值,则采用倍增的方式,以64字节作为基本数值,每次翻倍增长(如64,128,256…),直到倍增后的结果大于或等于所需的容量值。
  • 扩容不能超过 maxCapacity,否则会抛出java.lang.IndexOutOfBoundsException异常

3.5.7 读取

参数含义
buffer.readByte()每次读取一个字节
buffer.readInt()每次读取一个整数,也就是四个字节
buffer.markReaderIndex()为读指针做一个标记,配合下面的方法可以实现重复读取某个数
buffer.resetReaderIndex()将读指针跳到上一个标记过的地方实现重复读取某个数

除了上面一些了read开头的方法以外,还有一系列get开头的方法也可以读取数据,只不过get开头的方法不会改变读指针位置。相当于是按索引去获取。

示例:如果需要重复读取,需要调用buffer.markReaderIndex()对读指针进行标记,并通过buffer.resetReaderIndex()将读指针恢复到mark标记的位置

public class ByteBufReadTest {
    public static void main(String[] args) {
        // 创建ByteBuf
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);

        // 向buffer中写入数据
        buffer.writeBytes(new byte[]{1, 2, 3, 4});
        buffer.writeInt(5);
        ByteBufUtil.log(buffer);
        // 读取4个字节
        System.out.println(buffer.readByte());
        System.out.println(buffer.readByte());
        System.out.println(buffer.readByte());
        System.out.println(buffer.readByte());
        ByteBufUtil.log(buffer);

        // 通过mark与reset实现重复读取
        buffer.markReaderIndex();
        System.out.println(buffer.readInt());
        ByteBufUtil.log(buffer);

        // 恢复到mark标记处
        buffer.resetReaderIndex();
        ByteBufUtil.log(buffer);
    }
}

执行结果:

image-20240624165031776

3.5.8 内存回收(retain & release)

由于 Netty 中有堆外内存的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。

  • UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
  • UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
  • PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存

Netty 这里采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口

  • 每个 ByteBuf 对象的初始计数为 1
  • 调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收
  • 调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收
  • 当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用

内存回收规则:

  • 因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在每个 ChannelHandler 中都去调用 release ,就失去了传递性(如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命,那么便无须再传递)

基本规则是,谁是最后使用者,谁负责 release,详细分析如下

  • 起点,对于 NIO 实现来讲,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read 方法中首次创建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))
  • 入站 ByteBuf 处理原则
    • 对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead(msg) 向后传递,这时无须 release
    • 将原始 ByteBuf 转换为其它类型的 Java 对象,这时 ByteBuf 就没用了,必须 release
    • 如果不调用 ctx.fireChannelRead(msg) 向后传递,那么也必须 release
    • 注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release
    • 假设消息一直向后传,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)
  • 出站 ByteBuf 处理原则
    • 出站消息最终都会转为 ByteBuf 输出,一直向前传,由 HeadContext flush 后 release
  • 异常处理原则
    • 有时候不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true

TailContext 释放未处理消息逻辑

// io.netty.channel.DefaultChannelPipeline#onUnhandledInboundMessage(java.lang.Object)
protected void onUnhandledInboundMessage(Object msg) {
    try {
        logger.debug(
            "Discarded inbound message {} that reached at the tail of the pipeline. " +
            "Please check your pipeline configuration.", msg);
    } finally {
        ReferenceCountUtil.release(msg);
    }
}

当ByteBuf被传到了pipeline的head与tail时,ByteBuf会被其中的方法彻底释放,但前提是ByteBuf被传递到了head与tail中

具体代码

// io.netty.util.ReferenceCountUtil#release(java.lang.Object)
public static boolean release(Object msg) {
    if (msg instanceof ReferenceCounted) {
        return ((ReferenceCounted) msg).release();
    }
    return false;
}

3.5.9 零拷贝

切片
  • 【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write 指针,修改子分片,会修改原ByteBuf。
  • 得到分片后的buffer后,要调用其retain方法,使其内部的引用计数加一。避免原ByteBuf释放,导致切片buffer无法使用

image-20240624231851082

public class TestSlice {
    public static void main(String[] args) {
        // 创建ByteBuf
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);

        // 向buffer中写入数据
        buffer.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10});

        // 将buffer分成两部分。在切片过程中,没有发生数据复制
        // 注意此时切片最大容量做了限制,不需要新增数据
        ByteBuf slice1 = buffer.slice(0, 5);
        ByteBuf slice2 = buffer.slice(5, 5);

        // 需要让分片的buffer引用计数加一
        // 避免原Buffer释放导致分片buffer无法使用
        slice1.retain();
        slice2.retain();
        
        ByteBufUtil.log(slice1);
        ByteBufUtil.log(slice2);

        // 释放原有byteBuf 内存
        //若是在release()之后也想正常使用,可以在此之前使用retain()进行引用+1,release()相对于会引用-1,此时就不会真正释放内存,自然也就能欧使用
        buffer.release();

        ByteBufUtil.log(slice1);

        // 更改原始buffer中的值
        System.out.println("===========修改原buffer中的值===========");
        buffer.setByte(0,5);

        System.out.println("===========打印slice1===========");
        ByteBufUtil.log(slice1);
    }
}

运行结果:

image-20240624232326066

注意:slice后的分片,不能再次写入新的数据,这会影响原ByteBuf。

duplicate:整块

【零拷贝】的体现之一,就好比截取了原始 ByteBuf 所有内容,并且没有 max capacity 的限制,也是与原始 ByteBuf 使用同一块底层内存,只是读写指针是独立的。

代码示例:

public class ByteBufDuplicateDemo {
    public static void main(String[] args) {
        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(10);
        byteBuf.writeBytes(new byte[]{1,2,3,4,5,6,7,8,9,0});

        // 拷贝一块buf
        ByteBuf duplicate = byteBuf.duplicate();
        ByteBufUtil.log(duplicate);

        // 将最后一位0修改成10
        duplicate.setByte(9,10);
        // 打印byteBuf
        ByteBufUtil.log(byteBuf);

        // 写入新数据11
        duplicate.writeByte(11);
        // 打印byteBuf
        ByteBufUtil.log(byteBuf);
    }

}

运行结果:

image-20240624233218884

CompositeByteBuf:组装ByteBuf

【零拷贝】的体现之一,可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf,避免拷贝。

CompositeByteBuf是一个组合的 ByteBuf,它内部维护了一个 Component 数组,每个 Component 管理一个 ByteBuf,记录了这个 ByteBuf 相对于整体偏移量等信息,代表着整体中某一段的数据。

  • 优点,对外是一个虚拟视图,组合这些 ByteBuf 不会产生内存复制
  • 缺点,复杂了很多,多次操作会带来性能的损耗

代码示例:将多个 ByteBuf 合并为一个逻辑上的 ByteBuf,避免拷贝。注意要设置true来让其调整读,写指针。

public class TestCompositeByteBuf {

    public static void main(String[] args) {
        ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
        buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
        ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
        buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});

        ByteBufUtil.log(buf1);
        ByteBufUtil.log(buf2);

        // 效率较低方案:直接通过writeBytes()写入字节方式写入
        // ByteBufUtil.log(ByteBufAllocator.DEFAULT.buffer(20).writeBytes(buf1).writeBytes(buf2));

        // 零拷贝:合并两个Buffer到一个Buffer中,使用的共享内存
        CompositeByteBuf buf3 = ByteBufAllocator.DEFAULT.compositeBuffer();
        // true 表示增加新的 ByteBuf 自动递增 write index, 否则 write index 会始终为 0
        buf3.addComponents(true, buf1, buf2);
        System.out.println("结果:");
        ByteBufUtil.log(buf3);
    }
}

执行结果:

image-20240624233933557

Unpooled
  • Unpooled 是一个工具类,类如其名,提供了非池化的 ByteBuf 创建、组合、复制等操作
  • 这里仅介绍其跟【零拷贝】相关的 wrappedBuffer 方法,可以用来包装 ByteBuf。

代码示例:

public class UnpooledTest {

    public static void main(String[] args) {
        ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
        buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
        ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
        buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});

        // 当包装 ByteBuf 个数超过一个时, 底层使用了 CompositeByteBuf
        ByteBuf buf3 = Unpooled.wrappedBuffer(buf1, buf2);
        buf3.setByte(0,6);
        ByteBufUtil.log(buf3);
    }
}

执行结果:

image-20240624234350352

3.5.10 copy:深度拷贝

ByteBuf提供了copy方法,这一类方法是真正的拷贝原ByteBuf到新的内存,返回一个新的ByteBuf,与原ByteBuf没有关系。

提供两个拷贝:

  • 一个是全量。public abstract ByteBuf copy();
  • 一个指定位置和长度。public abstract ByteBuf copy(int index, int length);

代码示例:

public class ByteBufCopyDemo {
    public static void main(String[] args) {
        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(10);
        byteBuf.writeBytes(new byte[]{1,2,3,4,5,6,7,8,9,0});

        ByteBuf copy1 = byteBuf.copy();
        ByteBufUtil.log(copy1);

        ByteBuf copy2 = byteBuf.copy(5, 5);
        ByteBufUtil.log(copy2);
    }
}

结果:

image-20240624235030168

3.5.11 ByteBuf 优势

  • 池化 - 可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能
  • 读写指针分离,不需要像 ByteBuffer 一样切换读写模式
  • 可以自动扩容
  • 支持链式调用,使用更流畅
  • 很多地方体现零拷贝,例如 slice、duplicate、CompositeByteBuf

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1862340.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于改进YOLOv5的安全帽检测算法 | 引入Ghost卷积 + 添加CA注意力机制 + 更换Neck网络之BiFPN + 更换损失函数之WIoU

前言&#xff1a;Hello大家好&#xff0c;我是小哥谈。为了解决建筑工地、隧道、煤矿等施工场景中现有安全帽检测算法对于小目标、密集目标以及复杂环境下的检测精度低的问题&#xff0c;设计实现了一种基于YOLOv5的改进目标检测算法&#xff0c;记为YOLOv5-GBCW。首先使用Ghos…

【昇思初学入门】第七天打卡-模型训练

训练模型 学习心得 构建数据集。这通常包括训练集、验证集&#xff08;可选&#xff09;和测试集。训练集用于训练模型&#xff0c;验证集用于调整超参数和监控过拟合&#xff0c;测试集用于评估模型的泛化能力。 &#xff08;mindspore提供数据集https://www.mindspore.cn/d…

Fusion WAN:企业出海与全球组网的数字网络底座

众多中国企业与品牌正将目光投向海外市场&#xff0c;积极寻求发展新机遇&#xff0c;并且在这一过程中取得了显著的成果。"出海"战略已经成为一些企业转型升级的关键选择。 随着国内市场的竞争日益激烈&#xff0c;越来越多的企业开始寻求海外市场的拓展&#xff0c…

压电风扇的显著特点及其在电子系统中的应用

压电已经存在了一个多世纪&#xff0c;人们发现某些晶体结构在受到机械应力时产生表面电荷。 这种形式的压电传感器是压电传感器的工作方式。与压电传感器&#xff08;或发电机&#xff09;类似&#xff0c;压电致动器&#xff08;或电机&#xff09;使用补丁[1,3]形式的压电陶…

探索PHP中的魔术常量

PHP中的魔术常量&#xff08;Magic Constants&#xff09;是一些特殊的预定义常量&#xff0c;它们在不同的上下文中具有不同的值。这些常量可以帮助开发者获取文件路径、行号、函数名等信息&#xff0c;从而方便调试和日志记录。本文将详细介绍PHP中的魔术常量&#xff0c;帮助…

2024地理信息相关专业大学排名

在开始之前&#xff0c;不得不提一下今年福耀科技大学不能招生的遗憾&#xff0c;不知道明年是否能一切准备就绪开始招生呢&#xff1f; 如果这所大学能招生了&#xff0c;不知道它有没有地理信息相关专业呢&#xff1f; 言归正转&#xff0c;我们现在就基于公开资料&#xf…

力扣随机一题 哈希表 排序 数组

博客主页&#xff1a;誓则盟约系列专栏&#xff1a;IT竞赛 专栏关注博主&#xff0c;后期持续更新系列文章如果有错误感谢请大家批评指出&#xff0c;及时修改感谢大家点赞&#x1f44d;收藏⭐评论✍ 2491.划分技能点相等的团队【中等】 题目&#xff1a; 给你一个正整数数组…

Qt添加Dialog对话框

Qt版本&#xff1a;5.12.12 1.添加【模块】 Base class&#xff1a;可以选择QDialog、QWidget、QMainWindow 会自动生成MyDialog.h和MyDialog.cpp文件以及MyDialog.ui文件&#xff0c; 2.添加代码&#xff1a; &#xff08;1&#xff09;TestDialog.h #pragma once#include…

三分之一的违规行为未被发现

Gigamon 调查显示&#xff0c;随着漏洞的针对性越来越强、越来越复杂&#xff0c;企业在检测漏洞方面也面临越来越大的困难&#xff0c;超过三分之一的企业表示&#xff0c;现有的安全工具无法在漏洞发生时检测到它们。 随着混合云环境变得越来越复杂&#xff0c;以及恶意行为…

Docker 查看源地址/仓库地址,更改

一、源地址文件配置路径。若有docker文件夹&#xff0c;没有json&#xff0c;可以新增&#xff0c;复制进去内容 /etc/docker/daemon.json {"registry-mirrors": ["https://dockerhub.azk8s.cn","https://hub-mirror.c.163.com"&#xff0c;&q…

conda如何修改虚拟环境的python版本

有时候安装虚拟环境的时候&#xff0c;忘记指定python的版本&#xff0c;本文介绍一下如何在虚拟环境创建之后&#xff0c;修改python的版本。 1 如果安装了Anaconda Navigator。 2 终端 参考&#xff1a;conda修改当前环境中的python版本_conda更换python版本-CSDN博客

电机故障检测系统的通用性限制分析

电机故障检测系统因应用环境、功能需求、经济性等多方面差异而难以实现通用。工厂与实验室在环境条件、使用频率、功能需求、成本、维护及数据处理方面有显著不同&#xff0c;此外&#xff0c;LabVIEW软件在两者中的应用和数据处理也存在差异&#xff0c;这进一步限制了系统的通…

初探海龟绘图

自学python如何成为大佬(目录):https://blog.csdn.net/weixin_67859959/article/details/139049996?spm1001.2014.3001.5501 海龟绘图是Python内置的模块&#xff0c;在使用前需要导入该模块&#xff0c;可以使用以下几种方法导入&#xff1a; l 直接使用import语句导入海龟…

深度学习21-30

1.池化层作用&#xff08;筛选、过滤、压缩&#xff09; h和w变为原来的1/2&#xff0c;64是特征图个数保持不变。 每个位置把最大的数字取出来 用滑动窗口把最大的数值拿出来&#xff0c;把44变成22 2.卷积神经网络 &#xff08;1&#xff09;conv&#xff1a;卷积进行特征…

JS(JavaScript)的复合类型详解

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

vue3前后端开发:响应式对象不能直接成为前后端数据传输的对象

如图所示&#xff1a;前端控制台打印显示数据是没问题的&#xff0c;后端却显示没有接收到相应数据&#xff0c;但是后端的确接收到了一组空数据 直接说原因&#xff1a;这种情况唯一的原因是没有按正确格式传递参数。每个人写错的格式各有不同&#xff0c;我只是说明一下我在…

大模型应用研发基础环境配置(Miniconda、Python、Jupyter Lab、Ollama等)

老牛同学之前使用的MacBook Pro电脑配置有点旧&#xff08;2015 年生产&#xff09;&#xff0c;跑大模型感觉有点吃力&#xff0c;操作起来有点卡顿&#xff0c;因此不得已捡起了尘封了快两年的MateBook Pro电脑&#xff08;老牛同学其实不太喜欢用 Windows 电脑做研发工作&am…

第三方软件连接虚拟机

第三方软件连接虚拟机 1 查看本机VM&#xff08;VMware&#xff09;虚拟机网段2 开启虚拟机系统&#xff0c;修改网卡配置3 重新打开网络并测试连通性4 打开VM虚拟机网络开关5 通过第三方软件建立连接6 可能遇到的问题 1 查看本机VM&#xff08;VMware&#xff09;虚拟机网段 子…

vite+vue3+ts项目搭建流程 (pnpm, eslint, prettier, stylint, husky,commitlint )

vitevue3ts项目搭建 项目搭建项目目录结构 项目配置自动打开项目eslint①vue3环境代码校验插件②修改.eslintrc.cjs配置文件③.eslintignore忽略文件④运行脚本 prettier①安装依赖包②.prettierrc添加规则③.prettierignore忽略文件④运行脚本 stylint①.stylelintrc.cjs配置文…

EfficientNet-V2论文阅读笔记

目录 EfficientNetV2: Smaller Models and Faster Training摘要Introduction—简介Related work—相关工作EfficientNetV2 Architecture Design—高效EfficientNetV2架构设计Understanding Training Efficiency—了解训练效率Training-Aware NAS and Scaling—训练感知NAS和缩放…