RecvByteBufAllocator内存分配计算

news2025/1/11 5:44:31

  虽然了解了整个内存池管理的细节,包括它的内存分配的具体逻辑,但是每次从NioSocketChannel中读取数据时,应该分配多少内存去读呢? 例如,客户端发送的数据为1KB , 应该分配多少内存去读呢? 例如: 客户端发送的数据为1KB , 若每次都分配8KB的内存去读取数据,则会导致内存大量浪费,若分配16B的内存去读取数据,那么需要64次才能全部读完, 对性能的有很大的影响 , 那么对于 这个问题,Netty是如何解决的呢?

  NioEventLoop线程在处理OP_READ事件,进入NioByteUnsafe循环读取数据时,使用了两个类来处理内存的分配,一个是ByteBufAllocator, PooledByteBufAllocator为它的默认实现类, 另一个是RecvByteBufAllocator,AdaptiveRecvByteBufAllocator是它的默认实现类,在DefaultChannelConfig初始化时设置 , PooledByteBufAllocator主要用来处理内存分配,并最终委托PoolArena去完成,AdaptiveRecvByteBufAllocator主要用来计算每次读循环时应该分配多少内存,NioByteUnsafe之所有需要循环读取,主要是因为分配的初始ByteBuf不一定能够容纳读取到的所有数据,NioByteUnsafe循环读取的核心代码解读如下 :

public final void read() {
        // 获取pipeline通道配置,Channel管道
        final ChannelConfig config = config();
        // socketChannel已经关闭
        if (shouldBreakReadReady(config)) {
            clearReadPending();
            return;
        }
        final ChannelPipeline pipeline = pipeline();
        // 获取内存分配器,默认为PooledByteBufAllocator
        final ByteBufAllocator allocator = config.getAllocator();
        // 获取RecvByteBufAllocator内部的计算器Handle
        final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
        // 清空上一次读取的字节数,每次读取时均重新计算
        // 字节buf分配器, 并计算字节buf分配器Handler
        allocHandle.reset(config);

        ByteBuf byteBuf = null;
        boolean close = false;
        try {
            //当对端发送一个超大的数据包时,TCP会拆包。
            //        OP_READ事件只会触发一次,Netty需要循环读,默认最多读16次,因此ChannelRead()可能会触发多次,拿到的是半包数据。
            //        如果16次没把数据读完,没有关系,下次select()还会继续处理。
            //        对于Selector的可读事件,如果你没有读完数据,它会一直返回。
            do {
                // 分配内存 ,allocator根据计算器Handle计算此次需要分配多少内存并从内存池中分配
                //  分配一个ByteBuf,大小能容纳可读数据,又不过于浪费空间。
                byteBuf = allocHandle.allocate(allocator);
                // 读取通道接收缓冲区的数据 , 设置最后一次分配内存大小加上每次读取的字节数
                // doReadBytes(byteBuf):ByteBuf内部有ByteBuffer,底层还是调用了SocketChannel.read(ByteBuffer)
                // allocHandle.lastBytesRead()根据读取到的实际字节数,自适应调整下次分配的缓冲区大小。
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                if (allocHandle.lastBytesRead() <= 0) {
                    // nothing was read. release the buffer.
                    // 若没有数据可读,则释放内存
                    byteBuf.release();
                    byteBuf = null;
                    close = allocHandle.lastBytesRead() < 0;
                    if (close) {
                        // There is nothing left to read as we received an EOF.
                        // 当读到-1时, 表示Channel 通道已经关闭
                        // 没有必要再继续
                        readPending = false;
                    }
                    break;
                }
                // 更新读取消息计数器, 递增已经读取的消息数量
                allocHandle.incMessagesRead(1);
                readPending = false;
                // 通知通道处理读取数据,触发Channel管道的fireChannelRead事件
                pipeline.fireChannelRead(byteBuf);
                byteBuf = null;
            } while (allocHandle.continueReading());
            // 读取操作完毕 ,读结束后调用,记录此次实际读取到的数据大小,并预测下一次内存分配大小
            allocHandle.readComplete();
            // 触发Channel管道的fireChannelReadComplete事件
            pipeline.fireChannelReadComplete();

            if (close) {
                // 如果Socket通道关闭,则关闭读操作
                closeOnRead(pipeline);
            }
        } catch (Throwable t) {
            // 处理读取异常
            handleReadException(pipeline, byteBuf, t, close, allocHandle);
        } finally {
            // Check if there is a readPending which was not processed yet.
            // This could be for two reasons:
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
            //
            // See https://github.com/netty/netty/issues/2254
            if (!readPending && !config.isAutoRead()) {
                // 若操作完毕,且没有配置自动读
                // 则从选择Key兴趣集中移除读操作事件
                removeReadOp();
            }
        }
    }
}

  每一次创建byteBuf分配内存大小是多大呢? 这个由allocate()方法内部的guess()方法来决定 。

public ByteBuf allocate(ByteBufAllocator alloc) {
    return alloc.ioBuffer(guess());
}

  如果是第一次 调用guess()方法,默认分配1024B的内存空间 ,后面分配内存大小动态调节 。

// 实现doReadBytes()方法,从SocketChannel中读取数据。
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
    // 获取计算内存分配器Handle
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    // 设置尝试读取字节数组的buf的可写字节数
    allocHandle.attemptedBytesRead(byteBuf.writableBytes());
    // 从Channel中读取字节并写入到buf中,返回读取的字节数
    return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}

  在这里,我们需要明白byteBuf.writableBytes()这个方法,writableBytes()方法的返回值为byteBuf中可写的字节数,内部计算方法用byteBuf的容量- byteBuf的写索引得出,而byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());这一行代码,实际上就是将Channel中的数据写入到byteBuf中,返回值为实际写入到ByteBuf中的字节数。
  RecvByteBufAllocator的默认实现类AdaptiveRecvByteBufAllocator是实际的缓冲管理区,这个类可以根据读取到的数据预测所需要的字节的多少,从而自动增加或减少,如果上一次读循环将缓冲区的写满了,那么预测的字节数会变大,如果连续两次循环都不能填满已经分配的缓冲区,则预测字节数会变小。

public void lastBytesRead(int bytes) {
    // If we read as much as we asked for we should check if we need to ramp up the size of our next guess.
    // This helps adjust more quickly when large amounts of data is pending and can avoid going back to
    // the selector to check for more data. Going back to the selector can add significant latency for large
    // data transfers.
    //  如果上 一次读循环将缓冲区填充满了,那么预测的字节数会变大
    if (bytes == attemptedBytesRead()) {
    	// 如果此次读取将缓冲区填充满了,增加一次记录的机会
        record(bytes);
    }
    super.lastBytesRead(bytes);
}

// 该方法的参数是一次读取操作中实际读取到的数据大小,将其与nextReceiveBufferSize 进行比较,如果实际字节数actualReadBytes大于等于该值,则立即更新nextReceiveBufferSize ,
// 其更新后的值与INDEX_INCREMENT有关。INDEX_INCREMENT为默认常量,值为4。也就是说在扩容时会一次性增大多一些,以保证下次有足够空间可以接收数据。而相对扩容的策略,
// 缩容策略则实际保守些,常量为INDEX_INCREMENT,值为1,同样也是进行对比, 但不同的是,若实际字节小于所用nextReceiveBufferSize,并不会立马进行大小调整,
// 而是先把 decreaseNow 设置为true,如果下次仍然小于,则才会减少nextReceiveBufferSize的大小
private void record(int actualReadBytes) {
	// 如果小了两个数量级,则需要缩容
    if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT - 1)]) {
        if (decreaseNow) {                      // 若减少标识decreaseNow连续两次为true, 则说明下次读取字节数需要减少SIZE_TABLE下标减1
            index = max(index - INDEX_DECREMENT, minIndex);
            nextReceiveBufferSize = SIZE_TABLE[index];
            decreaseNow = false;
        } else {
            decreaseNow = true;                     // 第一次减少,只做记录
        }
    } else if (actualReadBytes >= nextReceiveBufferSize) {                // 实际读取的字节大小要大于或等于预测值
        index = min(index + INDEX_INCREMENT, maxIndex);             // SIZE_TABLE 下标 + 4
        nextReceiveBufferSize = SIZE_TABLE[index];      // 若当前缓存为512,则变成 512 * 2 ^ 4
        decreaseNow = false;
    }
}

public void lastBytesRead(int bytes) {
	// 设置最后读取的字节数
    lastBytesRead = bytes;
    if (bytes > 0) {
    	// 总读取的字节数
        totalBytesRead += bytes;
    }
}

  上述过程中,SIZE_TABLE是什么呢? 请看AdaptiveRecvByteBufAllocator源码实现。

public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {

    static final int DEFAULT_MINIMUM = 64;                      // 接收缓冲区的最小长度下限
    static final int DEFAULT_INITIAL = 1024;                    // 接收缓冲区的最大长度上限
    static final int DEFAULT_MAXIMUM = 65536;                   // 接收缓冲区最大长度上限


    // 在调整缓冲区大小时,若是增加缓冲区容量,那么增加的索引值。
    // 比如,当前缓冲区的大小为SIZE_TABLE[20],若预测下次需要创建的缓冲区需要增加容量大小,
    // 则新缓冲区的大小为SIZE_TABLE[20 + INDEX_INCREMENT],即SIZE_TABLE[24]
    private static final int INDEX_INCREMENT = 4;               // 扩容增长量
    // 在调整缓冲区大小时,若是减少缓冲区容量,那么减少的索引值。
    // 比如,当前缓冲区的大小为SIZE_TABLE[20],若预测下次需要创建的缓冲区需要减小容量大小,
    // 则新缓冲区的大小为SIZE_TABLE[20 - INDEX_DECREMENT],即SIZE_TABLE[19]
    private static final int INDEX_DECREMENT = 1;               // 扩容减少量

    private static final int[] SIZE_TABLE;

    // 分配了一个int类型的数组,并进行了数组的初始化处理, 从实现来看,该数组的长度是53,前32位是16的倍数,value值是从16开始的,到512,从33位开始,值是前一位的
    // 两倍,即从1024,2048 , 到最大值 1073741824 。
    static {

        List<Integer> sizeTable = new ArrayList<Integer>();
        for (int i = 16; i < 512; i += 16) {
            sizeTable.add(i);
        }

        for (int i = 512; i > 0; i <<= 1) {
            sizeTable.add(i);
        }

        SIZE_TABLE = new int[sizeTable.size()];
        for (int i = 0; i < SIZE_TABLE.length; i++) {
            SIZE_TABLE[i] = sizeTable.get(i);
        }
        System.out.println("================");
    }

    /**
     * @deprecated There is state for {@link #maxMessagesPerRead()} which is typically based upon channel type.
     */
    public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator();

    // 入参是一个大小,然后利用二分查找法对该数组进行size定位 ,目标是为了找出该size值在数组中的下标位置 , 主要是为了初始化maxIndex, maxIndex这两个参数
    private static int getSizeTableIndex(final int size) {
        for (int low = 0, high = SIZE_TABLE.length - 1; ; ) {
            if (high < low) {
                return low;
            }
            if (high == low) {
                return high;
            }

            int mid = low + high >>> 1;
            int a = SIZE_TABLE[mid];
            int b = SIZE_TABLE[mid + 1];
            if (size > b) {
                low = mid + 1;
            } else if (size < a) {
                high = mid - 1;
            } else if (size == a) {
                return mid;
            } else {
                return mid + 1;
            }
        }
    }

    private final int minIndex;
    private final int maxIndex;
    private final int initial;

    /**
     * Creates a new predictor with the default parameters.  With the default
     * parameters, the expected buffer size starts from {@code 1024}, does not
     * go down below {@code 64}, and does not go up above {@code 65536}.
     */
    public AdaptiveRecvByteBufAllocator() {
        this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
    }

    /**
     * Creates a new predictor with the specified parameters.
     * @param minimum the inclusive lower bound of the expected buffer size
     * @param initial the initial buffer size when no feed back was received
     * @param maximum the inclusive upper bound of the expected buffer size
     */
    public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
        checkPositive(minimum, "minimum");
        if (initial < minimum) {
            throw new IllegalArgumentException("initial: " + initial);
        }
        if (maximum < initial) {
            throw new IllegalArgumentException("maximum: " + maximum);
        }

        int minIndex = getSizeTableIndex(minimum);
        if (SIZE_TABLE[minIndex] < minimum) {
            this.minIndex = minIndex + 1;
        } else {
            this.minIndex = minIndex;
        }

        int maxIndex = getSizeTableIndex(maximum);
        if (SIZE_TABLE[maxIndex] > maximum) {
            this.maxIndex = maxIndex - 1;
        } else {
            this.maxIndex = maxIndex;
        }

        this.initial = initial;
    }

    @Override
    public Handle newHandle() {
        return new HandleImpl(minIndex, maxIndex, initial);
    }

    @Override
    public AdaptiveRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) {
        super.respectMaybeMoreData(respectMaybeMoreData);
        return this;
    }
    
}

  SIZE_TABLE由上述加粗代码进行初始化 。 AdaptiveRecvByteBufAllocator内部维护了一个SIZE_TABLE数组,记录了不同的内存的内存块大小,按照分配需要寻找最合适的内存块,SIZE_TABLE数组中的值为2^n,这样便于软硬件进行处理,SIZE_TABLE数组的初始化与PoolArena中的normalizeCapacity的初始化类似,当需要的内存很小时 , 增长的幅度不大, 当需要的内存较大时, 增长的幅度比较大,因此在[16,512]区间每次增加16,直到512,而从512起,每次翻一倍, 直到int的最大值 。 那size的具体大小值是什么呢?
SIZE_TABLE 数组的toString()打印如下 :
[16B, 32B, 48B, 64B, 80B, 96B, 112B, 128B, 144B, 160B, 176B, 192B, 208B, 224B, 240B, 256B, 272B, 288B, 304B, 320B, 336B, 352B, 368B, 384B, 400B, 416B, 432B, 448B, 464B, 480B, 496B, 512B, 1k, 2k, 4k, 8k, 16k, 32k, 64k, 128k, 256k, 512k, 1M, 2M, 4M, 8M, 16M, 32M, 64M, 128M, 256M, 512M, 1G]

  当对内部计算器Handle的具体实现类HandleImpl进行初始化时,可根据AdaptiveRecvByteBufAllocator的getSizeTableIndex()二分查找方法获取SIZE_TABLE的下标index并保存,通过SIZE_TABLE[index]获取下次需要分配的缓冲区大小nextReceiveBufferSize并记录,缓冲区的最小容量属性对SIZE_TABLE中的下标为minIndex的值 , 最大容量属性对应的SIZE_TABLE中的下标为maxIndex的值及bool类型标识属性decreaseNow ,这三个属性用于判断下一次创建缓冲区是否需要减少 。
  NioByteUnsafe每次循环完成后会根据实际读取到的字节数和当前缓冲区的大小重新设置下次需要分配的缓冲区的大小。 具体代码如下 。

// 循环读取完后被调用
public void readComplete() {
    record(totalBytesRead());
}

//返回已经读取的字节个数,若‘totalBytesRead < 0’则说明已经读取的字节数已经操作了’Integer.MAX_VALUE’,则返回Integer.MAX_VALUE;否则返回真实的已经读取的字节数。
protected final int totalBytesRead() {
    return totalBytesRead < 0 ? Integer.MAX_VALUE : totalBytesRead;
}

  可以模拟NioByteUnsafe的read()方法,在每次循环开始时, 一定要先重置totalMessages与totalByteRead(清零),读取完成后, readComplete会计算并调整下次预计需要分配的缓冲区的大小, 具体代码如下

public static void main(String[] args) throws Exception {
    AdaptiveRecvByteBufAllocator allocator = new AdaptiveRecvByteBufAllocator();
    RecvByteBufAllocator.Handle handle = allocator.newHandle();
    System.out.println("==============开始 I/O 读事件模拟==============");
    // 读取循环开始前先重置,将读取的次数和字节数设置为0, 将totalMessages与totalBytesRead设置为0
    handle.reset(null);
    System.out.println(String.format("第一次模拟读,需要分配大小 :%d", handle.guess()));
    handle.lastBytesRead(256);
    // 调整下次预测值
    handle.readComplete();
    // 在每次读取数据时都需要重置totalMessage 与totalBytesRead
    handle.reset(null);
    System.out.println(String.format("第2次花枝招展读,需要分配大小:%d ", handle.guess()));
    handle.lastBytesRead(256);
    handle.readComplete();

    System.out.println("===============连续2次读取的字节数小于默认分配的字节数= =========================");
    handle.reset(null);
    System.out.println(String.format("第3次模拟读,需要分配大小 : %d", handle.guess()));
    handle.lastBytesRead(512);
    // 调整下次预测值,预测值应该增加到512 * 2 ^ 4
    handle.readComplete();

    System.out.println("==================读取的字节数变大 ===============");
    handle.reset(null);
    // 读循环中缓冲区的大小
    System.out.println(String.format("第4次模拟读,需要分配的大小为:%d ", handle.guess()));
}

  结果输出
在这里插入图片描述

  当然啦,如果觉得自己已经很明白了,可以看看下面这个例子。

public class Test2 {


    public static void main(String[] args) {
        AdaptiveRecvByteBufAllocator allocator = new AdaptiveRecvByteBufAllocator();
        RecvByteBufAllocator.Handle handle = allocator.newHandle();
        System.out.println("==============开始 I/O 读事件模拟==============");
        // 读取循环开始前先重置,将读取的次数和字节数设置为0, 将totalMessages与totalBytesRead设置为0
        handle.reset(null);
        System.out.println(String.format("第一次模拟读,需要分配大小 :%d", handle.guess()));
        handle.lastBytesRead(512);
        // 调整下次预测值
        handle.readComplete();
        // 在每次读取数据时都需要重置totalMessage 与totalBytesRead
        handle.reset(null);
        System.out.println(String.format("第2次花枝招展读,需要分配大小:%d ", handle.guess()));
        handle.lastBytesRead(512);
        handle.readComplete();

        System.out.println("===============连续2次读取的字节数小于默认分配的字节数= =========================");
        handle.reset(null);
        System.out.println(String.format("第3次模拟读,需要分配大小 : %d", handle.guess()));

    }
}

在这里插入图片描述

  最后一次结果输出为1024,并没有缩容,源码读到这里,我相信对输出结果已经没有什么意外了。

接下来看一个例子。

Netty服务端代码
public class NettyServer {

    public static void main(String[] args) {
        // 创建两个线程组bossGroup 和workerGroup , 含有的子线程NioEventLoop 的个数默认为CPU 核数的两倍
        // BossGroup只是处理连接请求,真正的和客户端业务处理,会交给workerGroup完成
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            // 创建服务端的启动对象
            ServerBootstrap bootstrap = new ServerBootstrap();
            // 使用链式编程来配置参数
            bootstrap.group(bossGroup, workerGroup)//设置两个线程组
                    .channel(NioServerSocketChannel.class)              // 使用NioServerSocketChannel 作为服务器的通道实现
                    // 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间,只能处理一个客户端连接,多个客户端同时来的时候
                    // 服务端将不能处理的客户端连接请求放在队列中等待处理
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(2056));
                            // 对workerGroup 的SocketChannel设置处理器
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    });
            System.out.println("netty server start ....");
            // 绑定一个商品并且同步,生成一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况
            // 启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕
            ChannelFuture cf = bootstrap.bind(9000).sync();

            // 给注册监听器,监听我们关心的事件
            cf.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (cf.isSuccess()) {
                        System.out.println("监听端口9000成功");
                    } else {
                        System.out.println("监听端口9000失败");
                    }
                }
            });
            // 对通道关闭进行监听,closeFuture是异步操作,监听通道关闭
            // 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成
            cf.channel().closeFuture().sync();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

// 自定义Handler需要继承netty 规定好的某个HandlerAdapter(规范)
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 读取客户端发送的数据
     *
     * @param ctx 上下文对象,含有通道channel ,管道 pipeline
     * @param msg 就是客户端发送的数据
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("服务器读取的线程 :" + Thread.currentThread().getName());
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客户端发送的消息是: " + buf.toString(CharsetUtil.UTF_8));
    }

    /**
     * 数据读取完毕处理方法
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("=================channelReadComplete======================");
        ByteBuf buf = Unpooled.copiedBuffer("Hello Client", CharsetUtil.UTF_8);
        ctx.writeAndFlush(buf);
    }

    // 处理异常,一般需要关闭通道
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
Netty客户端代码
public class NettyClient {

    public static void main(String[] args) {
        // 客户端需要一个事件循环组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // 创建客户端启动对象
            // 注意,客户端使用的不是ServerBootstrap , 而是Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            // 设置相关的参数
            bootstrap.group(group)                                  //设置线程组
                    .channel(NioSocketChannel.class)                  // 使用NioSocketChannel作为客户端的通道实现
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyClientHandler());
                        }
                    });
            System.out.println("netty client start ");
            // 启动客户端去连接服务器端
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",9000).sync();
            // 对关闭通道进行监听
            channelFuture.channel().closeFuture().sync();

        }catch (Exception e ){
            e.printStackTrace();
        }finally {
            group.shutdownGracefully();
        }
    }
}


public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    // 当客户端连接服务器完成就会触发这个方法
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        StringBuffer sb = new StringBuffer();
        for(int i = 0 ;i < 1023;i ++){
            sb.append("a");
        }
        sb.append("中");
        sb.append("bbbb");
        ByteBuf buf = Unpooled.copiedBuffer(sb.toString(), CharsetUtil.UTF_8);

        ctx.writeAndFlush(buf);
    }
    
    // 当通道在读取事件时会触发,即服务端发送数据给客户端
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println(" 收到服务端的消息: " + buf.toString(CharsetUtil.UTF_8));
        System.out.println("服务端的地址:" + ctx.channel().remoteAddress());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

  在NettyClientHandler的channelActive()方法中,先for循环写了1023个字节, 然后写了一个"中" 字,utf-8编码一个中文占3个字节,再写了4个"bbbb",因此最终写入到ByteBuf中是1030个字节 。
在这里插入图片描述
  我们之前也分析过,第一次读取时,ByteBuf默认容量为1024,因此在NioSocketChannel的read()方法中,while()循环中会循环两遍。 如下图所示 。
在这里插入图片描述
  而刚好在ByteBuf的[1024,1025,1026]这三个字节中被"中"的中字占用,因此ByteBuf取0~1023个字节时,“中”字被截断了 。
  最终在服务端代码中打印了两次ByteBuf字符串信息,发现打印的信息中文乱码。
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
  这个问题怎样解决呢?

解决方案一

  如果说 Netty 默认提供了一个可变的缓冲区大小分配方案,那么我们可不可以改变这个策略呢?从AdaptiveRecvByteBufAllocator开始向上找到根类型,可以最终找到 RecvByteBufAllocator 接口上,查看这个接口的子类,应该会有其他缓冲区大小分配方案。

在这里插入图片描述
  这里有一个固定的接收数组空间分配器,现在只要想办法把默认的 AdaptiveRecvByteBufAllocator换成 FixedRecvByteBufAllocator 就可以解决问题了。

在这里插入图片描述

在这里插入图片描述
  首先调用 config方法,然后调用getRecvByteBufAllocator来创建这个allocHandle。既然有getRecvByteBufAllocator()方法,那肯定有setRecvByteBufAllocator()方法。
在这里插入图片描述
  因此只需要调用config()的setRecvByteBufAllocator()方法即可。
在这里插入图片描述   ByteBuf一次就打印完了,并没有出现中文乱码。
在这里插入图片描述
在这里插入图片描述
  对于这个问题, 还有另外一种解决方案。

方案二

  客户端代码修改

在这里插入图片描述
  在内容前面添加内容的长度 。

   在initChannel()方法中添加ch.pipeline().addLast(new NettyServerHandler2()) 一行代码。
在这里插入图片描述
  的具体代码如下

public class NettyServerHandler2 extends ByteToMessageDecoder {

    private int alreadyReadLength ;
    private int sumByteLength ;
    private  ByteBuf buf ;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("NettyServerHandler2 invoke");
        if (sumByteLength == 0) {
            sumByteLength = in.readInt();
            buf = Unpooled.copiedBuffer("", CharsetUtil.UTF_8);
        }
        int readableBytes = in.readableBytes();
        alreadyReadLength += readableBytes;

        byte[] data = new byte[readableBytes];
        in.readBytes(data);

        buf.writeBytes(data);
        if (alreadyReadLength == sumByteLength) {
            sumByteLength = 0;
            byte[] outData = new byte[buf.readableBytes()];
            buf.readBytes(outData);

            out.add(new String(outData,"utf-8"));
            buf.release();
            buf = null;
        }
    }
}

  写一个Handler继承ByteToMessageDecoder,而在这个类的内部定义了三个属性,alreadyReadLength记录已经读取的字节数, sumByteLength本次客户端发送过来的总字节数, buf 临时存储客户端传递过来的字节,当alreadyReadLength和sumByteLength相等时,则表示字节已经读取完全 。 此时可以将数据写回到out中。因此NettyServerHandler2的主要作用就是合并客户端传递过来的字节,从而避免客户端数据还没有读取完就时行业务处理。

在这里插入图片描述

在这里插入图片描述
  没有出现乱码问题了,个人觉得第二种方案比第一种方案更加好,因为在第一种方案中调用了ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(2056)) 方法,指定了RecvByteBufAllocator 为FixedRecvByteBufAllocator,并且初始化ByteBuf的容量为2056,如果此次用户发送的byte长度是1030,这是已知的,如果用户第一次请求的字节长度是3000,是不是又要修改FixedRecvByteBufAllocator中的bufferSize的值为3000,又要重启Netty服务器,显然不适用于生产环境 。 而第二种方案基本上适用于所有的情况 ,当然啦,第二种情况在NettyServerHandler2定义了三个局部变量alreadyReadLength,sumByteLength,buf 那会不会存在并发问题呢? 这个不得而知,因为我自己对Netty也是在不断的学习中 ,具体的情况,我会在下一篇博客去求证,但这里也给我们提供了一种解决问题的思路,希望给读者有借鉴意义 。

第三种解决方案

  当然对于之前提到问题,还有第三种解决方案,我们利用LengthFieldBasedFrameDecoder来解决 。

  1. 在NettyServer中, 在ChannelInitializer的initChannel方法中添加自定义handler NettyServerHandler2 ,这个类继承LengthFieldBasedFrameDecoder实现了decode()方法 。
    在这里插入图片描述

  在NettyServerHandler2的构造方法中传递了3个参数,这3个参数的含义为

  1. maxFrameLength : 发送的数据包最大长度, 发送数据包的最大长度,例如1024,表示一个数据包最多可发送1024个字节
  2. lengthFieldOffset: 长度字段的偏移量, 指的是长度字段位于数据包内部字节数组中的下标值
  3. engthFieldLength: 长度字段自己占用的字节数,如果长度字段是一个int整数,则为4,如果长度字段是一个short整数,则为2
  4. lengthAdjustment: 长度字段的偏移量矫正, 这个参数最为难懂,在传输协议比较复杂的情况下,例如包含了长度字段,协议版本号, 魔数等

  那么解码时,就需要进行长度字段的矫正,长度矫正值的计算公式为:内容字段偏移量 - 长度字段偏移量 - 长度字段的字节数

  1. 写一个NettyServerHandler2继承LengthFieldBasedFrameDecoder类
public class NettyServerHandler2 extends LengthFieldBasedFrameDecoder {
    public NettyServerHandler2(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
    }
    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        in = (ByteBuf) super.decode(ctx,in);

        if(in == null){
            return null;
        }
        if(in.readableBytes()<4){
            throw new Exception("字节数不足");
        }

        //读取length字段
        int length = in.readInt();

        if(in.readableBytes()!=length){
            throw new Exception("标记的长度不符合实际长度");
        }
        //content内容
        byte []bytes = new byte[length];
        in.readBytes(bytes);
        return new String(bytes,"UTF-8");
    }
}

  

  到这里这篇博客就告一段落,下一篇博客见。

本文对应github地址为
https://gitee.com/quyixiao/netty-netty-4.1.38.Final.git

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/410878.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

梳理ERP与CRM、MRP、PLM、APS、MES、WMS、SRM的关系

数字化转型中少不了ERP系统的存在&#xff0c;CRM、MRP、PLM、APS、MES、WMS、SRM这些系统都需要一起上吗&#xff1f; 如下图所示&#xff0c;是某企业IT系统集成架构流图。 先了解一下ERP是做什么的&#xff0c;ERP就是企业资源管理系统&#xff0c;从企业的价值链分析&…

在CSDN创作了6个月,我收获了什么?文末送书~

作者主页&#xff1a;阿玥的小东东主页&#xff01; 正在学习&#xff1a;python和C/C 期待大家的关注哦 目录 一次很好的机会&#xff0c;让我开始了CSDN之旅 首先来看看我的几位领路人 创作动力 1W粉丝 在CSDN我收获了什么&#xff1f; 很高的展现量 认证创作者身份 社…

构建自动过程:FinalBuilder 8.0 Crack

使用 FinalBuilder 自动化您的构建过程很简单。使用 FinalBuilder&#xff0c;您无需编辑 xml 或编写脚本。可视化定义和调试您的构建脚本&#xff0c;然后使用 Windows 调度程序安排它们&#xff0c;或将它们与 Continua CI、Jenkins 或任何其他 CI 服务器集成。 成千上万的软…

手把手调参 YOLOv8 模型之 训练|验证|推理配置-详解

YOLO系列模型在目标检测领域有着十分重要的地位&#xff0c;随着版本不停的迭代&#xff0c;模型的性能在不断地提升&#xff0c;源码提供的功能也越来越多&#xff0c;那么如何使用源码就显得十分的重要&#xff0c;接下来通过文章带大家手把手去了解Yolov8&#xff08;最新版…

Android开发—Jetpack四件套

2017年&#xff0c;Google发布了Android Architecture Components&#xff0c;包括Room、LiveData、ViewModel和Paging等组件&#xff0c;旨在帮助开发者更轻松地实现MVVM架构。 2018年&#xff0c;Google在I/O大会上推出的一套Android开发组件库&#xff0c;旨在帮助开发者更…

Python 小型项目大全 56~60

五十六、质数 原文&#xff1a;http://inventwithpython.com/bigbookpython/project56.html 质数是只能被 1 和它自己整除的数。质数有各种各样的实际应用&#xff0c;但是没有算法可以预测它们&#xff1b;我们必须一次计算一个。然而&#xff0c;我们知道有无限多的质数有待发…

技术招聘漫谈 | Java工程师招聘难?你可能需要这份独家指南

两周前&#xff0c;我们发布了一篇关于怎样招聘前端工程师的文章&#xff08;点击此处顾&#xff09;。在文章中&#xff0c;我们分析了前端岗位有哪些必不可少的考察要点&#xff0c;以及如何在面试中考核对方是否能写出高质量的代码&#xff0c;这篇文章得到了大量技术面试官…

高完整性系统工程(四): An Overview of Alloy

目录 1. 概述 2. 指定软件设计 3. 验证设计规范 4. 验证预期属性 1. 概述 在第一章中&#xff0c;我们将解释如何使用 Alloy 来探索一个非常简单的软件组件的设计&#xff0c;即大多数操作系统中存在的众所周知的垃圾箱 或回收站。目的是对如何使用 Alloy 指定和分析软件设…

MyBatis注解开发---实现自定义映射关系和关联查询

目录 一、使用注解实现自定义映射关系 1. 编写注解方法 2. 编写测试方法 3. 查看运行结果 二、使用注解实现一对一关联查询 1. 编写注解方法 2. 编写测试方法 3. 查看运行结果 三、使用注解实现一对多关联查询 1. 编写注解方法 2. 编写测试方法 3. 查看运行结果 四…

List接口中的ArrayList与LinkedList

ArrayList ArrayList的继承实现关系图 ArrayList 底层就是⼀个 Object[] 数组&#xff0c;当实例化ArrayList时没有指定数组容量大小&#xff0c;、第⼀次添加元素&#xff08;调⽤ add() ⽅法&#xff09;时会初始化为⼀个⻓度为 10 的数组&#xff08;即默认初始化容量为 1…

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

文章目录1.ByteBufAllocator 内存管理器2.UnpooledByteBufAllocator2.1 heap内存的分配2.2 direct内存的分配3.PooledByteBufAllocator3.1 heap内存和direct内存的分配3.2 directArena分配direct内存的流程3.3 内存规格的介绍4.缓存的相关问题4.1 缓存的数据结果4.2 命中缓存的…

一维差分思想【算法推导、深刻思考】

797. 差分 - AcWing题库 差分本质上就是前缀和的逆运算 算法推导 其实在最开始自己去完成这个题目的时候&#xff0c;感觉好像是可以往前缀和方向靠的&#xff0c;但是一下子没有想到实现方法就无疾而终了。所以最后选择的算法就只是单纯的暴力&#xff08;虽然知道过不了&…

【操作系统复习】第5章 存储器管理

存储器的层次结构 存储层次 ➢ CPU寄存器 ➢ 主存&#xff1a;高速缓存、主存储器、磁盘缓存 ➢ 辅存&#xff1a;固定磁盘、可移动介质 层次越高&#xff0c;访问速度越快&#xff0c;价格也越高&#xff0c;存储容量也最小 寄存器和主存掉电后存储的信息不再存在&a…

2024软件工程考研之《软件工程导论》专业课复习

一、考察《软件工程导论》的学校 截止目前&#xff0c;考察《软件工程导论》的学校主要有&#xff1a; 大连理工大学887 北京航天航空大学991 北京交通大学901 河海大学846 海南大学835 新疆大学841 成都信息工程大学809 长安大学846 天津工业大学840 华东交通大学837 大连交通…

采购招投标系统-高效管控招采流程-降低采购成本

项目说明 随着公司的快速发展&#xff0c;企业人员和经营规模不断壮大&#xff0c;公司对内部招采管理的提升提出了更高的要求。在企业里建立一个公平、公开、公正的采购环境&#xff0c;最大限度控制采购成本至关重要。符合国家电子招投标法律法规及相关规范&#xff0c;以及…

软考证书找工作有用吗?软考找工作用处大吗

软考证书是衡量IT技术人才能力的一种重要评价标准。 一、软考高级证书对找工作的帮助 1. 竞争力增强 软考高级证书具有一定难度和较高的专业技能要求&#xff0c;拥有该证书的人的技术水平和专业能力会得到认可和尊重&#xff0c;从而增强求职者的竞争力。 2. 拓宽职业发展…

防火墙NAT实验,双机热备实验

目录 NAT防火墙基础实验 源地址转换 服务器映射 域内双向NAT 域间双向NAT 双机热备基础实验 主备备份 负载分担 NAT防火墙基础实验 实验拓扑&#xff1a; 1.进入防火墙图形化页面进行配置 接口列表的配置 源地址转换 企业内部网络访问外部网络&#xff0c;进行源地…

如何用nodejs构造一个网站爬虫

爬虫是个什么东西 英文spider&#xff0c;网络爬虫&#xff08;又称为网页蜘蛛&#xff0c;网络机器人&#xff0c;在FOAF社区中间&#xff0c;更经常的称为网页追逐者&#xff09;&#xff0c;是一种按照一定的规则&#xff0c;自动地抓取万维网信息的程序或者脚本。另外一些…

干货分享 | 采购没“云”和有云的区别有哪些?

多年前&#xff0c;提起“云”这个词&#xff0c;很多人还是“不知所云”。 但如今&#xff0c;大众对“云”的了解和认可程度也越来越高&#xff0c;尽情享受着“云”带来的便利。 通过“云”&#xff0c;可以随时随地畅听海量音乐、进行网购、访问云盘的照片和视频、在云端创…

【数据分析】——分析方法

上司要你帮忙看看公司最近的网站运营情况怎么样&#xff1f; 公司最近的网站运营情况&#xff1f;这个问题太宽泛了&#xff0c;你得要知道上司的明确需求 问题1:boss,你是想看看公司网站具体哪方面的问题&#xff1f; 回答&#xff1a;公司最近销售不太好&#xff0c;订单转…