Netty深入浅出Java网络编程学习笔记(一) Netty入门篇

news2024/9/24 13:21:09

目录

一、概述

1、什么是Netty

2、Netty的优势

二、入门案例

1、服务器端代码

2、客户端代码

3、运行流程

组件解释

三、组件

1、EventLoop

处理普通与定时任务

关闭 EventLoopGroup

处理IO任务

服务器代码

客户端代码

分工细化

划分Boss 和Work 

增加自定义EventLoopGroup

切换的实现

2、Channel

ChannelFuture

连接问题

处理关闭 

为什么Netty要将多个API调用NIO线程来异步实现?

3、Future与Promise

概念

JDK Future

Netty Future

Netty Promise

4、Handler与Pipeline

Pipeline

OutboundHandler

socketChannel.writeAndFlush()

ctx.writeAndFlush()

EmbeddedChannel

5、ByteBuf

创建

直接内存与堆内存

池化与非池化

组成

写入

扩容

读取

释放

释放规则

切片


一、概述

1、什么是Netty

Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.

Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端

注意:netty的异步还是基于多路复用的,并没有实现真正意义上的异步IO

2、Netty的优势

如果使用传统NIO,其工作量大,bug 多

  • 需要自己构建协议
  • 解决 TCP 传输问题,如粘包、半包
  • 因为bug的存在,epoll 空轮询导致 CPU 100%

Netty 对 API 进行增强,使之更易用,如

  • FastThreadLocal => ThreadLocal
  • ByteBuf => ByteBuffer

二、入门案例

1、服务器端代码

public class HelloServer {
    public static void main(String[] args) {
        // 1、启动器,负责装配netty组件,启动服务器
        new ServerBootstrap()
                // 2、创建 NioEventLoopGroup,可以简单理解为 线程池 + Selector
                .group(new NioEventLoopGroup())
                // 3、选择服务器的 ServerSocketChannel 实现
                .channel(NioServerSocketChannel.class)
                // 4、child 负责处理读写,该方法决定了 child 执行哪些操作
            	// ChannelInitializer 处理器(仅执行一次)
            	// 它的作用是待客户端SocketChannel建立连接后,执行initChannel以便添加更多的处理器
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        // 5、SocketChannel的处理器,使用StringDecoder解码,ByteBuf=>String
                        nioSocketChannel.pipeline().addLast(new StringDecoder());
                        // 6、SocketChannel的业务处理,使用上一个处理器的处理结果
                        nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
                                System.out.println(s);
                            }
                        });
                    }
                    // 7、ServerSocketChannel绑定8080端口
                }).bind(8080);
    }
}

2、客户端代码

public class HelloClient {
    public static void main(String[] args) throws InterruptedException {
        new Bootstrap()
                .group(new NioEventLoopGroup())
                // 选择客户 Socket 实现类,NioSocketChannel 表示基于 NIO 的客户端实现
                .channel(NioSocketChannel.class)
                // ChannelInitializer 处理器(仅执行一次)
                // 它的作用是待客户端SocketChannel建立连接后,执行initChannel以便添加更多的处理器
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel channel) throws Exception {
                        // 消息会经过通道 handler 处理,这里是将 String => ByteBuf 编码发出
                        channel.pipeline().addLast(new StringEncoder());
                    }
                })
                // 指定要连接的服务器和端口
                .connect(new InetSocketAddress("localhost", 8080))
                // Netty 中很多方法都是异步的,如 connect
                // 这时需要使用 sync 方法等待 connect 建立连接完毕
                .sync()
                // 获取 channel 对象,它即为通道抽象,可以进行数据读写操作
                .channel()
                // 写入消息并清空缓冲区
                .writeAndFlush("hello world");
    }
}

3、运行流程

左:客户端 右:服务器端

组件解释

  • channel 可以理解为数据的通道
  • msg 理解为流动的数据,最开始输入是 ByteBuf,但经过 pipeline 中的各个 handler 加工,会变成其它类型对象,最后输出又变成 ByteBuf
  • handler 可以理解为数据的处理工序
    • 工序有多道,合在一起就是 pipeline(传递途径),pipeline 负责发布事件(读、读取完成…)传播给每个 handler, handler 对自己感兴趣的事件进行处理(重写了相应事件处理方法)
      • pipeline 中有多个 handler,处理时会依次调用其中的 handler
    • handler 分 Inbound 和 Outbound 两类
      • Inbound 入站
      • Outbound 出站
  • eventLoop 可以理解为处理数据的工人
    • eventLoop 可以管理多个 channel 的 io 操作,并且一旦 eventLoop 负责了某个 channel,就会将其与channel进行绑定,以后该 channel 中的 io 操作都由该 eventLoop 负责
    • eventLoop 既可以执行 io 操作,也可以进行任务处理,每个 eventLoop 有自己的任务队列,队列里可以堆放多个 channel 的待处理任务,任务分为普通任务、定时任务
    • eventLoop 按照 pipeline 顺序,依次按照 handler 的规划(代码)处理数据,可以为每个 handler 指定不同的 eventLoop

三、组件

1、EventLoop

事件循环对象 EventLoop

EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理一个或多个 Channel 上源源不断的 io 事件

它的继承关系如下

  • 继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
  • 继承自 netty 自己的 OrderedEventExecutor
    • 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
    • 提供了 EventLoopGroup parent() 方法来看看自己属于哪个 EventLoopGroup

事件循环组 EventLoopGroup

EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)

  • 继承自 netty 自己的 EventExecutorGroup
    • 实现了 Iterable 接口提供遍历 EventLoop 的能力
    • 另有 next 方法获取集合中下一个 EventLoop

处理普通与定时任务

public class TestEventLoop {
    public static void main(String[] args) {
        // 创建拥有两个EventLoop的NioEventLoopGroup,对应两个线程
        EventLoopGroup group = new NioEventLoopGroup(2);
        // 通过next方法可以获得下一个 EventLoop
        System.out.println(group.next());
        System.out.println(group.next());

        // 通过EventLoop执行普通任务
        group.next().execute(()->{
            System.out.println(Thread.currentThread().getName() + " hello");
        });

        // 通过EventLoop执行定时任务
        group.next().scheduleAtFixedRate(()->{
            System.out.println(Thread.currentThread().getName() + " hello2");
        }, 0, 1, TimeUnit.SECONDS);
        
        // 优雅地关闭整个EventLoopGroup 
        group.shutdownGracefully();
    }
}

输出结果如下

io.netty.channel.nio.NioEventLoop@7bb11784
io.netty.channel.nio.NioEventLoop@33a10788
nioEventLoopGroup-2-1 hello
nioEventLoopGroup-2-2 hello2
nioEventLoopGroup-2-2 hello2
nioEventLoopGroup-2-2 hello2
关闭 EventLoopGroup

优雅关闭 shutdownGracefully 方法。该方法会首先切换 EventLoopGroup 到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的

处理IO任务

服务器代码
public class MyServer {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));

                            }
                        });
                    }
                })
                .bind(8080);
    }
}
客户端代码
public class MyClient {
    public static void main(String[] args) throws IOException, InterruptedException {
        Channel channel = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080))
                .sync()
                .channel();
        System.out.println(channel);
        // 此处打断点调试,调用 channel.writeAndFlush(...);
        System.in.read();
    }
}

分工细化

划分Boss 和Work 

Bootstrap的group()方法可以传入两个EventLoopGroup参数,分别负责处理不同的事件

public class MyServer {
    public static void main(String[] args) {
        new ServerBootstrap()
            	// 两个Group,分别为Boss 负责Accept事件,Worker 负责读写事件
                .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
            
				...
    }
}

一个EventLoop可以负责多个Channel,且EventLoop一旦与Channel绑定,则一直负责处理该Channel中的事件


增加自定义EventLoopGroup

当有的任务需要较长的时间处理时,可以使用非NioEventLoopGroup,避免同一个NioEventLoop中的其他Channel在较长的时间内都无法得到处理

切换的实现

不同的EventLoopGroup切换的实现原理如下

由上面的图可以看出,当handler中绑定的Group不同时,需要切换Group来执行不同的任务

  • 如果两个 handler 绑定的是同一个EventLoopGroup,那么就直接调用
  • 否则,把要调用的代码封装为一个任务对象,由下一个 handler 的 EventLoopGroup 来调用

2、Channel

Channel 的常用方法

  • close() 可以用来关闭Channel
  • closeFuture() 用来处理 Channel 的关闭
    • sync 方法作用是同步等待 Channel 关闭
    • 而 addListener 方法是异步等待 Channel 关闭
  • pipeline() 方法用于添加处理器
  • write() 方法将数据写入
    • 因为缓冲机制,数据被写入到 Channel 中以后,不会立即被发送
    • 只有当缓冲满了或者调用了flush()方法后,才会将数据通过 Channel 发送出去
  • writeAndFlush() 方法将数据写入并立即发送(刷出)

ChannelFuture

连接问题

        ChannelFuture是Netty中的一个异步操作的结果对象,它的作用是利用 channel() 方法来获取 Channel 对象。它代表了一个将来可能会拥有操作结果的操作。在Netty中,几乎所有的异步操作都会返回一个ChannelFuture对象,用于表示对该操作的异步执行。

如果我们去掉channelFuture.sync()方法,会服务器无法收到hello world

这是因为建立连接(connect)的过程是异步非阻塞的,若不通过sync()方法阻塞主线程,等待连接真正建立,这时通过 channelFuture.channel() 拿到的 Channel 对象,并不是真正与服务器建立好连接的 Channel,也就没法将信息正确的传输给服务器端

所以需要通过channelFuture.sync()方法,阻塞主线程,同步处理结果,等待连接真正建立好以后,再去获得 Channel 传递数据。使用该方法,获取 Channel 和发送数据的线程都是主线程

下面还有一种方法,用于异步获取建立连接后的 Channel 和发送数据,使得执行这些操作的线程是 NIO 线程(去执行connect操作的线程)

addListener方法

通过这种方法可以在NIO线程中获取 Channel 并发送数据,而不是在主线程中执行这些操作


处理关闭 

关闭channel

当我们要关闭channel时,可以调用channel.close()方法进行关闭。但是该方法也是一个异步方法。真正的关闭操作并不是在调用该方法的线程中执行的,而是在NIO线程中执行真正的关闭操作

如果我们想在channel真正关闭以后,执行一些额外的善后操作,可以选择以下两种方法来实现

  • 通过channel.closeFuture()方法获得对应的ChannelFuture对象,然后调用sync()方法阻塞执行操作的线程,等待channel真正关闭后,再执行其他操作

    // 获得closeFuture对象
    ChannelFuture closeFuture = channel.closeFuture();
    
    // 同步等待NIO线程执行完close操作
    closeFuture.sync();
  • 调用closeFuture.addListener方法,添加close的后续操作

    closeFuture.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            // 等待channel关闭后才执行的操作
            System.out.println("关闭之后执行一些额外操作...");
            // 关闭EventLoopGroup
            group.shutdownGracefully();
        }
    });

为什么Netty要将多个API调用NIO线程来异步实现?

其实这个可以和JVM的指令重排优化这个知识点挂钩,主要是为了最大程度发挥每个线程对于单一操作的一个满负荷性,充分利用CPU,提高吞吐量

 


3、Future与Promise

概念

Future(未来)是一个并发编程的概念,用于表示一个异步操作的结果,尚未完成的值或异常。在编程中,Future可以作为一个占位符,代表一个将来可能会获得的值。

netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口

netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展

  • jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果
  • netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
  • netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称jdk Futurenetty FuturePromise
cancel取消任务--
isCanceled任务是否取消--
isDone任务是否完成,不能区分成功失败--
get获取任务结果,阻塞等待--
getNow-获取任务结果,非阻塞,还未产生结果时返回 null-
await-等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断-
sync-等待任务结束,如果任务失败,抛出异常-
isSuccess-判断任务是否成功-
cause-获取失败信息,非阻塞,如果没有失败,返回null-
addLinstener-添加回调,异步接收结果-
setSuccess--设置成功结果
setFailure--设置失败结果

JDK Future

public class JdkFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadFactory factory = new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "JdkFuture");
            }
        };
        // 创建线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10,10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), factory);

        // 获得Future对象
        Future<Integer> future = executor.submit(new Callable<Integer>() {

            @Override
            public Integer call() throws Exception {
                TimeUnit.SECONDS.sleep(1);
                return 50;
            }
        });

        // 通过阻塞的方式,获得运行结果
        System.out.println(future.get());
    }
}

Netty Future

Netty的Future与JDK的Future有一些优化和扩展,主要体现在异步操作支持:Netty的Future支持异步操作,允许在执行网络操作时立即返回并继续执行其他任务,而不需要阻塞等待操作完成。这种异步操作使得Netty在处理高并发和大规模网络请求时表现出色。

public class NettyFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();

        // 获得 EventLoop 对象
        EventLoop eventLoop = group.next();
        Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return 50;
            }
        });

        // 主线程中获取结果
        System.out.println(Thread.currentThread().getName() + " 获取结果");
        System.out.println("getNow " + future.getNow());
        System.out.println("get " + future.get());

        // NIO线程中异步获取结果
        future.addListener(new GenericFutureListener<Future<? super Integer>>() {
            @Override
            public void operationComplete(Future<? super Integer> future) throws Exception {
                System.out.println(Thread.currentThread().getName() + " 获取结果");
                System.out.println("getNow " + future.getNow());
            }
        });
    }
}

采用这种异步获取返回值处理的方案适用于主线程要执行的业务与该返回值无关,但是该返回值又不得不进行一个处理的情况。所以说,将这个活交给小弟线程异步来处理掉,主线程可以安心的处理自己的事

Netty中的Future对象,可以通过EventLoop的sumbit()方法得到

  • 可以通过Future对象的get方法,阻塞地获取返回结果
  • 也可以通过getNow方法,获取结果,若还没有结果,则返回null,该方法是非阻塞的
  • 还可以通过future.addListener方法,在Callable方法执行的线程中,异步获取返回结果

Netty Promise

Promise相当于一个容器,可以用于存放各个线程中的结果,然后让其他线程去获取该结果

(把多线程比作分布式的环境,那么Promise就是Redis这种第三方存储结构,通过独立于架构外的存储,可以实现各线程间的数据交换,有点类似于前端组件传播的全局事务总线

public class NettyPromise {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建EventLoop
        NioEventLoopGroup group = new NioEventLoopGroup();
        EventLoop eventLoop = group.next();

        // 创建Promise对象,用于存放结果
        DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);

        new Thread(()->{
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 自定义线程向Promise中存放结果
            promise.setSuccess(50);
        }).start();

        // 主线程从Promise中获取结果
        System.out.println(Thread.currentThread().getName() + " " + promise.get());
    }
}

 Promise提供了setResult()setFailure()方法,可以主动设置操作的结果或异常。这使得在一些特殊情况下,可以人为地指定操作的结果,而不需要等待真正的异步操作完成。例如,可以在操作超时时主动设置Promise的结果。

romise的setFailure()方法可以设置操作的异常,在Future中可以通过cause()来获取异常。这使得异常处理变得更加便捷和清晰,可以更好地处理操作的失败情况。


4、Handler与Pipeline

Pipeline

public class PipeLineServer {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // 在socketChannel的pipeline中添加handler
                        // pipeline中handler是带有head与tail节点的双向链表,的实际结构为
    				 	// head <-> handler1 <-> ... <-> handler4 <->tail
                        // Inbound主要处理入站操作,一般为读操作,发生入站操作时会触发Inbound方法
                        // 入站时,handler是从head向后调用的
                        socketChannel.pipeline().addLast("handler1" ,new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                System.out.println(Thread.currentThread().getName() + " Inbound handler 1");
                                String newResult = msg + "wzx";
                                // 父类该方法内部会调用fireChannelRead
                                // 将数据传递给下一个handler
                                super.channelRead(ctx, newResult );
                            }
                        });
                        socketChannel.pipeline().addLast("handler2", new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                System.out.println(Thread.currentThread().getName() + " Inbound handler 2");
                                // 执行write操作,使得Outbound的方法能够得到调用,如果channel中没有写入的数据,Outbound的方法是不会发生调用的
          socketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes("Server...".getBytes(StandardCharsets.UTF_8)));
                            }
                        });
                        // Outbound主要处理出站操作,一般为写操作,发生出站操作时会触发Outbound方法
                        // 出站时,handler的调用是从tail向前调用的
                        socketChannel.pipeline().addLast("handler3" ,new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                System.out.println(Thread.currentThread().getName() + " Outbound handler 1");
                                super.write(ctx, msg, promise);
                            }
                        });
                        socketChannel.pipeline().addLast("handler4" ,new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                System.out.println(Thread.currentThread().getName() + " Outbound handler 2");
                                super.write(ctx, msg, promise);
                            }
                        });
                    }
                })
                .bind(8080);
    }
}

通过channel.pipeline().addLast(name, handler)添加handler时,记得给handler取名字。这样可以调用pipeline的addAfter、addBefore等方法更灵活地向pipeline中添加handler

handler需要放入通道的pipeline中,才能根据放入顺序来使用handler

  • pipeline是结构是一个带有head与tail指针的双向链表,其中的节点为handler
    • 要通过ctx.fireChannelRead(msg)等方法,将当前handler的处理结果传递给下一个handler
  • 当有入站(Inbound)操作时,会从head开始向后调用handler,直到handler不是处理Inbound操作为止
  • 当有出站(Outbound)操作时,会从tail开始向前调用handler,直到handler不是处理Outbound操作为止

具体结构如下

调用顺序如下

OutboundHandler

socketChannel.writeAndFlush()

当handler中调用该方法进行写操作时,会触发Outbound操作,此时是从tail向前寻找OutboundHandler

ctx.writeAndFlush()

        ChannelHandlerContext是Netty中的一个重要概念,它代表了一个网络通信的上下文环境。每当有新的连接建立或数据传输时,Netty会为每个连接创建一个对应的ChannelHandlerContext对象。

        ChannelHandlerContext包含了与通信相关的各种信息,包括底层的Channel、ChannelPipeline、处理器(Handler)等。它提供了操作和管理网络通信的方法,例如数据的读写、发送消息、关闭连接等。

        通过ChannelHandlerContext,我们可以访问和操作与特定连接相关的所有资源和状态。它允许我们编写自定义的处理器来实现特定的业务逻辑,对接收到的数据进行处理,并将相应的响应发送回客户端。

当handler中调用该方法进行写操作时,会触发Outbound操作,此时是从当前handler向前寻找OutboundHandler

EmbeddedChannel

EmbeddedChannel可以用于测试各个handler,通过其构造函数按顺序传入需要测试handler,然后调用对应的Inbound和Outbound方法即可

public class TestEmbeddedChannel {
    public static void main(String[] args) {
        ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                System.out.println("1");
                super.channelRead(ctx, msg);
            }
        };

        ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                System.out.println("2");
                super.channelRead(ctx, msg);
            }
        };

        ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                System.out.println("3");
                super.write(ctx, msg, promise);
            }
        };

        ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                System.out.println("4");
                super.write(ctx, msg, promise);
            }
        };

        // 用于测试Handler的Channel
        EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
        
        // 执行Inbound操作 
        channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
        // 执行Outbound操作
        channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
    }
}

5、ByteBuf

创建

public class ByteBufStudy {
    public static void main(String[] args) {
        // 创建ByteBuf
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
        ByteBufUtil.log(buffer);

        // 向buffer中写入数据
        StringBuilder sb = new StringBuilder();
        for(int i = 0; i < 20; i++) {
            sb.append("a");
        }
        buffer.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8));

        // 查看写入结果
        ByteBufUtil.log(buffer);
    }
}

        ByteBuf通过ByteBufAllocator选择allocator并调用对应的buffer()方法来创建的,默认使用直接内存作为ByteBuf,容量为256个字节,可以指定初始容量的大小

        当ByteBuf的容量无法容纳所有数据时,ByteBuf会进行动态扩容操作(扩容为当前容量的两倍)

        如果在handler中创建ByteBuf,建议使用ChannelHandlerContext 的ctx.alloc().buffer()来创建

直接内存与堆内存

通过该方法创建的ByteBuf,使用的是基于直接内存的ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);

可以使用下面的代码来创建池化基于堆的 ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(16);

也可以使用下面的代码来创建池化基于直接内存的 ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(16);
  • 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用
  • 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放

池化与非池化

池化的最大意义在于可以重用 ByteBuf,优点有

  • 没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力
  • 有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
  • 高并发时,池化功能更节约内存,减少内存溢出的可能

        在高并发环境下,池化功能可以在一定程度上节约内存减少内存溢出的可能性。这是因为池化功能通过对资源的重用,减少了频繁地创建和销毁对象的开销

        具体来说,当系统面临高并发请求时,如果每次请求都需要创建一个新的对象来处理,那么系统就需要频繁地进行内存分配和释放操作。这会导致内存碎片的产生,浪费大量的内存空间,并且频繁的对象创建和销毁会消耗较多的CPU时间

        而使用池化功能,可以事先创建一定数量的对象,并将其存储在对象池中。当有请求到达时,可以从对象池中获取一个可用的对象,使用完毕后再将其放回池中进行重用。这样就避免了频繁地创建和销毁对象的过程。

        综上所述,池化功能可以在高并发环境下提供更好的资源管理和利用效率,节约内存并减少内存溢出的可能性。

池化功能是否开启,可以通过下面的系统环境变量来设置

-Dio.netty.allocator.type={unpooled|pooled}
  • 4.1 以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现
  • 4.1 之前,池化功能还不成熟,默认是非池化实现

组成

ByteBuf主要有以下几个组成部分

  • 最大容量与当前容量
    • 在构造ByteBuf时,可传入两个参数,分别代表初始容量和最大容量,若未传入第二个参数(最大容量),最大容量默认为Integer.MAX_VALUE
    • 当ByteBuf容量无法容纳所有数据时,会进行扩容操作,若超出最大容量,会抛出java.lang.IndexOutOfBoundsException异常
  • 读写操作不同于ByteBuffer只用position进行控制,ByteBuf分别由读指针和写指针两个指针控制。进行读写操作时,无需进行模式的切换
    • 读指针前的部分被称为废弃部分,是已经读过的内容
    • 读指针与写指针之间的空间称为可读部分
    • 写指针与当前容量之间的空间称为可写部分

写入

常用方法如下

方法签名含义备注
writeBoolean(boolean value)写入 boolean 值用一字节 01|00 代表 true|false
writeByte(int value)写入 byte 值
writeShort(int value)写入 short 值
writeInt(int value)写入 int 值Big Endian(大端写入),即 0x250,写入后 00 00 02 50
writeIntLE(int value)写入 int 值Little Endian(小端写入),即 0x250,写入后 50 02 00 00
writeLong(long value)写入 long 值
writeChar(int value)写入 char 值
writeFloat(float value)写入 float 值
writeDouble(double value)写入 double 值
writeBytes(ByteBuf src)写入 netty 的 ByteBuf
writeBytes(byte[] src)写入 byte[]
writeBytes(ByteBuffer src)写入 nio 的 ByteBuffer
int writeCharSequence(CharSequence sequence, Charset charset)写入字符串CharSequence为字符串类的父类,第二个参数为对应的字符集

注意

  • 这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用来写入不同的数据
  • 网络传输中,默认习惯是 Big Endian,使用 writeInt(int value)

使用方法

public class ByteBufStudy {
    public static void main(String[] args) {
        // 创建ByteBuf
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);
        ByteBufUtil.log(buffer);

        // 向buffer中写入数据
        buffer.writeBytes(new byte[]{1, 2, 3, 4});
        ByteBufUtil.log(buffer);

        buffer.writeInt(5);
        ByteBufUtil.log(buffer);

        buffer.writeIntLE(6);
        ByteBufUtil.log(buffer);

        buffer.writeLong(7);
        ByteBufUtil.log(buffer);
    }
}

扩容

当ByteBuf中的容量无法容纳写入的数据时,会进行扩容操作

  • 扩容因子:对于HeapByteBuf和DirectByteBuf,默认的扩容规则是按照2的幂次方进行扩容。即,每次扩容时,会将容量扩大到原容量的两倍,以提供更多的空间来存储数据。这种扩容规则在大多数情况下都能够提供较好的内存利用效率。

  • 固定长度:对于CompositeByteBuf,其容量是固定的,无法进行扩容。CompositeByteBuf是由多个ByteBuf组成的复合缓冲区,每个ByteBuf具有单独的容量并负责存储对应的数据。

读取

读取主要是通过一系列read方法进行读取,读取时会根据读取数据的字节数移动读指针

如果需要重复读取,需要调用buffer.markReaderIndex()对读指针进行标记,并通过buffer.resetReaderIndex()将读指针恢复到mark标记的位置

public class ByteBufStudy {
    public static void main(String[] args) {
        // 创建ByteBuf
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);

        // 向buffer中写入数据
        buffer.writeBytes(new byte[]{1, 2, 3, 4});
        buffer.writeInt(5);

        // 读取4个字节
        System.out.println(buffer.readByte());
        System.out.println(buffer.readByte());
        System.out.println(buffer.readByte());
        System.out.println(buffer.readByte());
        ByteBufUtil.log(buffer);

        // 通过mark与reset实现重复读取
        buffer.markReaderIndex();
        System.out.println(buffer.readInt());
        ByteBufUtil.log(buffer);

        // 恢复到mark标记处
        buffer.resetReaderIndex();
        ByteBufUtil.log(buffer);
    }
}

还有以 get 开头的一系列方法,这些方法不会改变读指针的位置

释放

由于 Netty 中有堆外内存(直接内存)的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。

  • UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
  • UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存(采用虚引用 + Clean.clean()方法调用本地方法unsafe.freeMemory()来释放直接内存)
  • PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存

Netty 这里采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口

  • 每个 ByteBuf 对象的初始计数为 1
  • 调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收
  • 调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收
  • 当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用
释放规则

因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在每个 ChannelHandler 中都去调用 release ,就失去了传递性(如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命,那么便无须再传递)

基本规则是,谁是最后使用者,谁负责 release

  • 起点,对于 NIO 实现来讲,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe.read 方法中首次创建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))

  • 入站 ByteBuf 处理原则

    • 对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead(msg) 向后传递,这时无须 release
    • 将原始 ByteBuf 转换为其它类型的 Java 对象,这时 ByteBuf 就没用了,必须 release
    • 如果不调用 ctx.fireChannelRead(msg) 向后传递,那么也必须 release
    • 注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release
    • 假设消息一直向后传,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)
  • 出站 ByteBuf 处理原则

    • 出站消息最终都会转为 ByteBuf 输出,一直向前传,由 HeadContext flush 后 release
  • 异常处理原则

    • 有时候不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true

while (!buffer.release()) {}

当ByteBuf被传到了pipeline的head与tail时,ByteBuf会被其中的方法彻底释放,但前提是ByteBuf被传递到了head与tail中

切片

ByteBuf切片是【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write 指针

得到分片后的buffer后,要调用其retain方法,使其内部的引用计数加一。避免原ByteBuf释放,导致切片buffer无法使用

修改原ByteBuf中的值,也会影响切片后得到的ByteBuf

public class TestSlice {
    public static void main(String[] args) {
        // 创建ByteBuf
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);

        // 向buffer中写入数据
        buffer.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10});

        // 将buffer分成两部分  参数1:下标   参数2:长度
        ByteBuf slice1 = buffer.slice(0, 5);
        ByteBuf slice2 = buffer.slice(5, 5);

        // 需要让分片的buffer引用计数加一
        // 避免原Buffer释放导致分片buffer无法使用
        slice1.retain();
        slice2.retain();
        
        ByteBufUtil.log(slice1);
        ByteBufUtil.log(slice2);

        // 更改原始buffer中的值
        System.out.println("===========修改原buffer中的值===========");
        // 修改0下标元素为5
        buffer.setByte(0,5);

        System.out.println("===========打印slice1===========");
        ByteBufUtil.log(slice1);
    }
}

 运行结果

注意:

  • 可以在切片中修改元素,但无法直接新增元素或删除元素,这需要通过原始ByteBuf进行操作。
  • 切片和原有的ByteBuf共用一份内存,收到同样的引用计数法来控制回收内存,就是说,释放了依次原来的ByteBuf,那么切片中的计数也会减一,为了避免这种影响,需要让分片的buffer引用计数加一

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1079475.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

练[HarekazeCTF2019]encode_and_encode

[HarekazeCTF2019]encode_and_encode 文章目录 [HarekazeCTF2019]encode_and_encode掌握知识解题思路代码分析 关键paylaod 掌握知识 ​ JSON对Unicode字符的解析转义&#xff0c;json格式的构建&#xff0c;代码审计&#xff0c;php伪协议的利用&#xff0c;file_get_content…

Python 中的 set 集合类型是可迭代的吗?

当我们运行以下代码时会报错。 a {1, 2, 4, 3, 4} for i in range(len(a)):print(a[i]) 所以我之前一直以为 set 类型是不可迭代的&#xff0c;后来发现这里的报错问题是&#xff1a;set object is not subscriptable&#xff0c;也就是说 set 是不可以通过下标来访问的。因为…

APP如何设计应用的屏幕截图以提高下载量

APP高质量的应用程序商店屏幕截图&#xff0c;对于建立初始信任以及向潜在用户推销应用程序的优势至关重要。创建应用程序商店屏幕截图&#xff0c;以最好的方式展示我们的应用程序&#xff0c;从而优化应用形象。 1、使用大标题。 确保重点突出品牌的独特性&#xff0c;在屏幕…

改进智能优化算法常用指标一键导出为EXCEL,最优值,平均值,标准差,最差值,中位数,秩和检验,箱线图...

声明&#xff1a;对于作者的原创代码&#xff0c;禁止转售倒卖&#xff0c;违者必究&#xff01; 为了突出改进智能优化算法的效果&#xff0c;常常会将改进的智能算法与其他算法进行对比。 在一些期刊论文中&#xff0c;经常会看到一个超级大的表格&#xff0c;统计着每个算法…

基于地理位置的IP地址定位技术

IP地址定位是指通过互联网上的IP地址&#xff0c;准确地定位出该IP地址对应的物理位置。IP地址是互联网上设备之间通信时使用的一个地址标识符&#xff0c;每个设备都有一个唯一的IP地址。 IP地址定位的原理是通过收集和分析网络设备的IP地址和相应的网络数据&#xff0c;以确定…

3D目标检测实战 | 详解2D/3D检测框交并比IoU计算(附Python实现)

目录 1 交并比基本概念2 2D检测框IoU计算3 旋转2D检测框IoU计算4 3D检测框IoU计算 1 交并比基本概念 交并比(Intersection Over Union, IoU)是度量两个目标检测框交叠程度的方式&#xff0c;公式如下 I o U a r e a ( B p ∩ B g t ) a r e a ( B p ∪ B g t ) \mathrm{IoU}\…

【VTK】一文讲解vtkImageActor

很高兴在雪易的CSDN见到你,给你糖糖 系列文章目录 VTK付费专栏_雪易的博客-CSDN博客 感谢订阅的小哥哥小姐姐,小易会继续努力分享,一起进步! 若订阅后有其它需求,欢迎随时联系,CSDN一直在线(^U^)ノ~YO 前言 本文主要讲解vtk的

SD卡格式化如何恢复数据?

SD卡作为现代移动设备如手机、数码相机以及行车记录仪中的重要存储组件&#xff0c;其应用场景越来越广泛。与硬盘或U盘一样&#xff0c;SD卡也不是完全免疫于数据丢失的问题&#xff0c;特别是在误格式化或误删除的情况下。所以&#xff0c;许多人会有这样的疑问&#xff1a;S…

【21】c++设计模式——>装饰模式

装饰模式的定义 装饰模式也可以称为封装模式&#xff0c;所谓的封装就是在原有行为之上进行扩展&#xff0c;并不会改变该行为&#xff1b; 例如网络通信&#xff1a; 在进行网络通信的时候&#xff0c;数据是基于IOS七层或四层网络模型&#xff08;某些层合并之后就是四层模型…

Rn使用FlatList导航栏自动回到中间

import { useState, useRef } from react import { FlatList, View, Text, StyleSheet, TouchableOpacity } from react-nativeconst Center () > {const tabs ["语文", "数学", "英语", "政治", "历史", "地理&q…

新的“HTTP/2 快速重置”零日攻击打破了 DDoS 记录

自 8 月份以来&#xff0c;一种名为“HTTP/2 快速重置”的新 DDoS&#xff08;分布式拒绝服务&#xff09;技术已被作为零日漏洞积极利用&#xff0c;其规模打破了之前的所有记录。 Amazon Web Services、Cloudflare 和 Google 今天联合发布了有关零日技术的消息&#xff0c;他…

PHP 自习室空位查询系统mysql数据库web结构apache计算机软件工程网页wamp计算机毕业设计

一、源码特点 PHP 自习室空位查询系统是一套完善的web设计系统&#xff0c;对理解php编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。 php 自习室空位查询系统1 代码 https://download.csdn.net/download/qq_41221322/…

聊聊身边的嵌入式:点菜机用着好好的,突然挂了,这口锅应该甩给谁?

周末被老婆challenge了。之所以用这个英文词汇&#xff0c;是因为实在难以找出一个恰当的中文&#xff0c;来表达这个意思。挑战&#xff1f;盘问&#xff0c;质疑&#xff1f;臭骂&#xff1f;好像都不对劲儿。对了&#xff0c;想来想去&#xff0c;只有diao这个词有点儿接近&…

小视频APP源码实战:探寻成功案例与经验分享

在这个数字化时代&#xff0c;小视频APP源码已经成为创业者们追逐成功的热门选择。本文将揭示三个成功案例&#xff0c;为您带来宝贵的经验分享。 案例一&#xff1a;「绝绝子」&#xff0c;小视频APP带来的逆袭奇迹 小视频APP「绝绝子」的成功故事令人瞩目。通过精心策划的内…

[网鼎杯 2018]Comment git泄露 / 恢复 二次注入 .DS_Store bash_history文件查看

首先我们看到账号密码有提示了 我们bp爆破一下 我首先对数字爆破 因为全字符的话太多了 爆出来了哦 所以账号密码也出来了 zhangwei zhangwei666 没有什么用啊 扫一下吧 有git git泄露 那泄露看看 真有 <?php include "mysql.php"; session_start(); if(…

TWDS车辆轮对故障、尺寸动态检测系统

随着我国铁路的建设发展&#xff0c;客运专线网络形成&#xff0c;既有铁路的货运能力得到释放&#xff0c;货物运输向重载方向发展&#xff0c;运输组织呈现长交路、运转周期短、编组固定的特点。 跟踪调查表明重载车辆车轮磨耗较普通车辆更为严重。大秦线c80型车辆在不到1个…

vue3+ts项目02-安装eslint、prettier和sass

创建项目 项目创建 安装eslint yarn add eslint -D生成配置文件 npx eslint --init安装其他插件 yarn add -D eslint-plugin-import eslint-plugin-vue eslint-plugin-node eslint-plugin-prettier eslint-config-prettier eslint-plugin-node babel/eslint-parser vue-e…

前端好文+插件分享(持续更新中...)

CSS 「滚动绽放」实现页面滚动时逐渐展示/隐藏元素https://github.com/vnyoon/web-magic &#xff08;相关CSS动画特效实现&#xff09; 钉钉官网首页的炫酷动效” 被我用css新特性轻松破解啦&#xff5e; 软件开发 1 模型驱动是什么意思&#xff1f;底层原理是什么&#xf…

Linux发布Java项目,使用screen窗口

代码写完正常的打包 登录Linux&#xff0c;使用screen -ls命令查看现有的窗口 将之前的jar备份一个cp 原jar包名 备份后jar包名&#xff0c;再将jar包复制到对应的路径下 使用screen -r -d 窗口名 命令进入到之前启动jar包的窗口&#xff0c;停掉之前的窗口&#xff0c;直接…

无法打开文件“opengl32.lib”

无法打开文件“opengl32.lib” [TOC](无法打开文件“opengl32.lib”) 前言一、找到库链接配置勾选继承input里也要勾选继承 前言 随便找个教程配置结果报错无法打开文件opengl32.lib 分析原因&#xff1a; opengl库和windows自带库一样出问题应该是VS配置有问题 提示&#xff…