[Netty源码] ByteBuf相关问题 (十)

news2025/1/21 6:26:36

文章目录

      • 1.ByteBuf介绍
      • 2.ByteBuf分类
        • 2.1 AbstractByteBuf
        • 2.2 AbstractReferenceCountedByteBuf
        • 2.3 UnpooledHeapByteBuf
        • 2.4 UnpooledDirectByteBuf
        • 2.5 PooledDirectByteBuf

1.ByteBuf介绍

字节缓冲区, jdk NIO的ByteBuffer比较复杂, netty重新设计了ByteBuf用以代替ByteBuffer

ByteBuf主要是通过readerIndex 和 writerIndex两个指针进行数据的读和写, 整个ByteBuf被这两个指针最多分成三个部分, 分别是可丢弃部分, 可读部分和可写部分

在这里插入图片描述

  • readerIndex和writerIndex初始值都是0, 随着写入writerIndex的增加, readerIndex的增加。

  • 0 ~ readerIndex之间就被视为discard的, 调用discardReadByte 方法, 可以释放这个空间, readerIndex = 0, writerIndex = writerIndex - readerIndex

  • 扩容设计: Nio 的 ByteBuffer没有扩容设计, 底层是数组, 会报BufferOverflowExeption错误, ByteBuf会自动进行动态扩展

2.ByteBuf分类

  • 从内存回收的角度来看: Pooled和Unpooled: 基于对象池的ByteBuf和普通ByteBuf
  • 从内存分配角度来看: heap和direct: 堆内存和直接内存
  • 从如何读取数据角度来看: unsafe和非unsafe: unsafe可以直接拿到byteBuf的内存地址, 不会依赖jdk底层的unsafe, unsafe通过内存地址+偏移量, 非unsafe通过数组+下标或者说jdk底层ButBufferApi获取对应的数据

在这里插入图片描述

2.1 AbstractByteBuf

AbstractByteBuf 是ByteBuf骨架的一个实现

在这里插入图片描述

成员变量

    // 用于检测对象是否泄漏
    static final ResourceLeakDetector<ByteBuf> leakDetector = new ResourceLeakDetector<ByteBuf>(ByteBuf.class);
    
    // 读操做 和 写操作 的位置指针
    int readerIndex;
    private int writerIndex;
    
    // 读操作 和 写操作 的标记,可以通过reset()回到标记的地方
    private int markedReaderIndex;
    private int markedWriterIndex;
    
    // 最大容量
    private int maxCapacity;

一些简单的方法

    @Override
    public ByteBuf readerIndex(int readerIndex) {
        if (checkBounds) {
            checkIndexBounds(readerIndex, writerIndex, capacity());
        }
        this.readerIndex = readerIndex;
        return this;
    }

    @Override
    public ByteBuf writerIndex(int writerIndex) {
        if (checkBounds) {
            checkIndexBounds(readerIndex, writerIndex, capacity());
        }
        this.writerIndex = writerIndex;
        return this;
    }

    @Override
    public ByteBuf clear() {
        readerIndex = writerIndex = 0;
        return this;
    }

在这里插入图片描述

通过一个_getByte()抽象类让实现类实现这个方法, 做不同的功能

2.2 AbstractReferenceCountedByteBuf

作用主要是对ButeBuf引用进行计数,用于跟踪对象的分配及销毁。

在这里插入图片描述

  • 每调用一次 retain() 方法,引用计数器就加1
  • 由于 refCnt 初始值为1,每次申请加1,释放减1,当申请数等于释放数时,对象被回收,故 refCnt 不可能为0。如果为0,说明对象被错误、意外的引用了,抛出异常*
    public ByteBuf retain() {
        for (;;) {
            int refCnt = this.refCnt;
            if (refCnt == 0) {
                throw new IllegalReferenceCountException(0, 1);
            }
            if (refCnt == Integer.MAX_VALUE) {
                throw new IllegalReferenceCountException(Integer.MAX_VALUE, 1);
            }
            if (refCntUpdater.compareAndSet(this, refCnt, refCnt + 1)) {
                break;
            }
        }
        return this;
    }
    
    public final boolean release() {
        for (;;) {
            int refCnt = this.refCnt;
            if (refCnt == 0) {
                throw new IllegalReferenceCountException(0, -1);
            }

            if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
                if (refCnt == 1) {
                    // 垃圾回收
                    deallocate();
                    return true;
                }
                return false;
            }
        }
    }

2.3 UnpooledHeapByteBuf

UnpooledHeapByteBuf 就是普通的堆内存ByteBuf, 没有内存池, 没有堆外内存。

成员变量

    // 用于UnpooledHeapByteBuf内存分配
    private final ByteBufAllocator alloc;
    // 缓冲区
    private byte[] array;
    // Java NIO的ByteBuffer,用于Netty的ByteBuf到NIO的ByteBuffer转换
    private ByteBuffer tmpNioBuf;

动态扩展缓冲区

  1. newCapacity > oldCapacity,直接创建一个新数组,拷贝过去就行了
  2. newCapacity == oldCapacity,不做处理
  3. newCapacity < oldCapacity,先判断readerIndex,如果readerIndex大于等于newCapacity,说明没有数据需要复制到缓冲区,直接设置readerIndex和writerIndex的值为newCapacity即可;当readerIndex小于newCapacity时,readerIndex到writerIndex之间的数据需要复制到新的byte数组,这个时候,如果writerIndex - readerIndex > newCapacity,就会发生数组下标越界,为了防止越界,当writerIndex > newCapacity时,令writerIndex = newCapacity,然后做 byte 数组赋值操作。最后,替换掉ByteBuf中持有的 byte数组引用,并令NIO 的 ByteBuffer为 null。
    public ByteBuf capacity(int newCapacity) {
        ensureAccessible();
        // 1. 对入参做合法性校验
        if (newCapacity < 0 || newCapacity > maxCapacity()) {
            throw new IllegalArgumentException("newCapacity: " + newCapacity);
        }

        int oldCapacity = array.length;
        if (newCapacity > oldCapacity) {
            // 2. byte数组copy,然后替换掉原来的byte数组
            byte[] newArray = new byte[newCapacity];
            System.arraycopy(array, 0, newArray, 0, array.length);
            setArray(newArray);
        } else if (newCapacity < oldCapacity) {
            // 如果新容量小于老容量,则不需要动态扩展,但是需要截取当前缓冲区创建一个新的子缓冲区
            byte[] newArray = new byte[newCapacity];
            int readerIndex = readerIndex();
            if (readerIndex < newCapacity) {
                int writerIndex = writerIndex();
                if (writerIndex > newCapacity) {
                    // 如果writerIndex大于newCapacity,则有可能发生越界,这里直接截断
                    writerIndex(writerIndex = newCapacity);
                }
                System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
            } else {
                // 如果readerIndex大于等于新的capacity,说明没有数据需要复制到新缓冲区,直接将readerIndex和writerIndex设置为newCapacity即可
                setIndex(newCapacity, newCapacity);
            }
            setArray(newArray);
        }
        return this;
    }
    
    private void setArray(byte[] initialArray) {
        array = initialArray;
        tmpNioBuf = null;
    }

字节数组复制

  • 在AbstractByteBuf中的读写操作中, 具体的读写操作为子类实现的。

  • UnpooledHeapByteBuf 写操作中, 首先检查入参, 然后将数据copy到ByteBuf的byte数组中。

  • UnpooledHeapByteBuf 读操作中, 首先检查入参, 然后将ByteBuf的byte数组copy到指定的byte数组中。

   public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
        // 根据AbstractByteBuf的写操作可知,index为writerIndex
        checkSrcIndex(index, length, srcIndex, src.length);
        System.arraycopy(src, srcIndex, array, index, length);
        return this;
    }
    
    protected final void checkSrcIndex(int index, int length, int srcIndex, int srcCapacity) {
        checkIndex(index, length);
        if (srcIndex < 0 || srcIndex > srcCapacity - length) {
            throw new IndexOutOfBoundsException(String.format(
                    "srcIndex: %d, length: %d (expected: range(0, %d))", srcIndex, length, srcCapacity));
        }
    }
    
    protected final void checkIndex(int index, int fieldLength) {
        ensureAccessible();
        if (fieldLength < 0) {
            throw new IllegalArgumentException("length: " + fieldLength + " (expected: >= 0)");
        }
        // writerIndex + length > capacity,数组下表越界
        if (index < 0 || index > capacity() - fieldLength) {
            throw new IndexOutOfBoundsException(String.format(
                    "index: %d, length: %d (expected: range(0, %d))", index, fieldLength, capacity()));
        }
    }
    // 读操作时,将字节数组copy出去
  public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
        checkDstIndex(index, length, dstIndex, dst.length);
        System.arraycopy(array, index, dst, dstIndex, length);
        return this;
    }

Netty 的 ByteBuf 转换为 NIO 的 ByteNuffer

利用byte数组创建一个新的ByteBuffer, 并调用slice方法, 清除 discard 区域。

    public ByteBuffer nioBuffer(int index, int length) {
        ensureAccessible();
        // slice():copy一个原来的position到limit之间的有效数据,创建一个新的ByteBuffer
        return ByteBuffer.wrap(array, index, length).slice();
    }
    
    public static ByteBuffer wrap(byte[] array, int offset, int length) {
        try {
            return new HeapByteBuffer(array, offset, length);
        } catch (IllegalArgumentException x) {
            throw new IndexOutOfBoundsException();
        }
    }

2.4 UnpooledDirectByteBuf

UnpooledDIrectByteBuf是基于堆外内存创建的。

这里使用 ByteBuffer 存储数据, 而UnpooledHeapByteBuf 使用字节数组, 这里的ByteBuffer使用的是NIO的DirectByteBuffer, 需要手动释放内存。

成员变量

    // ByteBuf内存分配
    private final ByteBufAllocator alloc;
    
    // 这里跟UnpooledHeapByteBuf不同,这里使用的是NIO的ByteBuffer存储字节数组
    private ByteBuffer buffer;
    private ByteBuffer tmpNioBuf;
    private int capacity;
  //用于标记ByteBuffer是否释放了(这里使用堆外内存创建ByteBuffer,需要自己做垃圾回收)
    private boolean doNotFree;

动态扩展缓冲区

不同的是这里使用的是ByteBuffer而不是byte数组。

    public ByteBuf capacity(int newCapacity) {
        ensureAccessible();
        // 1. 校验粗人惨
        if (newCapacity < 0 || newCapacity > maxCapacity()) {
            throw new IllegalArgumentException("newCapacity: " + newCapacity);
        }

        int readerIndex = readerIndex();
        int writerIndex = writerIndex();

        int oldCapacity = capacity;
        if (newCapacity > oldCapacity) {
            // 这里直接创建一个新的ByteBuffer,将老的ByteBuffer数据copy过去
            ByteBuffer oldBuffer = buffer;
        // 创建一个DirectByteBuffer
            ByteBuffer newBuffer = allocateDirect(newCapacity);
        // 设置position和limit的值
            oldBuffer.position(0).limit(oldBuffer.capacity());
            newBuffer.position(0).limit(oldBuffer.capacity());
            newBuffer.put(oldBuffer);
            newBuffer.clear();
            // 替换老的ByteBuffer并释放掉老的ByteBuffer
            setByteBuffer(newBuffer);
        } else if (newCapacity < oldCapacity) {
            // 这里跟UnpooledHeapByteBuf处理是一样的,详细看UnpooledHeapByteBuf
            ByteBuffer oldBuffer = buffer;
            ByteBuffer newBuffer = allocateDirect(newCapacity);
            if (readerIndex < newCapacity) {
                if (writerIndex > newCapacity) {
                    writerIndex(writerIndex = newCapacity);
                }
                oldBuffer.position(readerIndex).limit(writerIndex);
                newBuffer.position(readerIndex).limit(writerIndex);
                newBuffer.put(oldBuffer);
                newBuffer.clear();
            } else {
                setIndex(newCapacity, newCapacity);
            }
            setByteBuffer(newBuffer);
        }
        return this;
    }
    
    // 创建DirectByteBuffer
    protected ByteBuffer allocateDirect(int initialCapacity) {
        return ByteBuffer.allocateDirect(initialCapacity);
    }
    
    private void setByteBuffer(ByteBuffer buffer) {
        ByteBuffer oldBuffer = this.buffer;
        if (oldBuffer != null) {
            if (doNotFree) {
                doNotFree = false;
            } else {
                // 释放oldByteBuffer
                freeDirect(oldBuffer);
            }
        }

        this.buffer = buffer;
        tmpNioBuf = null;
        capacity = buffer.remaining();
    }

字节数组复制

  • 在AbstractByteBuf中的读写操作中, 具体的读写操作为子类实现的。
  • UnpooledDirectByteBuf 写操作, 先验证入参, 再创建一个临时的ByteBuffer, 这个ByteBuffer与buffer的content共用, 向tmpBuf中写数据相当于向buffer写数据。
  • UnpooledDirectByteBuf 读操作, 先验证入参, 然后创建出一个临时的 ByteBuffer, 这个临时的 ByteBuffer 做读操作。
    public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
        // 参数校验
        checkSrcIndex(index, length, srcIndex, src.length);
        // 创建一个临时的tmpBuf
        ByteBuffer tmpBuf = internalNioBuffer();
        tmpBuf.clear().position(index).limit(index + length);
        tmpBuf.put(src, srcIndex, length);
        return this;
    }
    
    private ByteBuffer internalNioBuffer() {
        ByteBuffer tmpNioBuf = this.tmpNioBuf;
        if (tmpNioBuf == null) {
            // 令tempNioBuf和buffer共用同一个ByteBuffer内容,修改了tmpNioByteBuf,也等同于修改了buffer
            // 但是它们的position、limit都是独立的
            this.tmpNioBuf = tmpNioBuf = buffer.duplicate();
        }
        return tmpNioBuf;
    }

    public ByteBuf readBytes(byte[] dst, int dstIndex, int length) {
        checkReadableBytes(length);
        getBytes(readerIndex, dst, dstIndex, length, true);
        readerIndex += length;
        return this;
    }
    
    private void getBytes(int index, byte[] dst, int dstIndex, int length, boolean internal) {
        checkDstIndex(index, length, dstIndex, dst.length);

        if (dstIndex < 0 || dstIndex > dst.length - length) {
            throw new IndexOutOfBoundsException(String.format(
                    "dstIndex: %d, length: %d (expected: range(0, %d))", dstIndex, length, dst.length));
        }

        ByteBuffer tmpBuf;
        if (internal) {
            tmpBuf = internalNioBuffer();
        } else {
            tmpBuf = buffer.duplicate();
        }
        tmpBuf.clear().position(index).limit(index + length);
        tmpBuf.get(dst, dstIndex, length);
    }

Netty 的 ByteBuf 转换为 NIO 的 ByteNuffer

这里直接拿buufer的content创建一个新的ByteBuffer。

    public ByteBuffer nioBuffer(int index, int length) {
        return ((ByteBuffer) buffer.duplicate().position(index).limit(index + length)).slice();
    }

    public ByteBuffer duplicate() {
        return new DirectByteBuffer(this, this.markValue(), this.position(), this.limit(), this.capacity(), 0);
    }

2.5 PooledDirectByteBuf

PooledDirectByteBuf基于内存池实现, 与UnpooledDirectByteBuf唯一的不同就是缓冲区的分配和销毁策略

创建字节缓冲区

由于采用内存池实现, 不可new创建实例, 而是从内存池中获取, 然后设置引用计数器的值。

    static PooledDirectByteBuf newInstance(int maxCapacity) {
        PooledDirectByteBuf buf = RECYCLER.get();
        buf.setRefCnt(1);
        buf.maxCapacity(maxCapacity);
        return buf;
    }

复制新的字节缓冲区

复制新的字节缓冲区时候, 也需要通过内存池创建一个字节缓冲区, 然后复制。

    public ByteBuf copy(int index, int length) {
        // 参数校验
        checkIndex(index, length);
        // 从内存池中创建一个ByteBuf
        ByteBuf copy = alloc().directBuffer(length, maxCapacity());
        // 复制操作
        copy.writeBytes(this, index, length);
        return copy;
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/434493.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

直方图实例详解(颜色直方图、灰度直方图)

直方图实例详解&#xff08;颜色直方图、灰度直方图&#xff09; 本篇目录&#xff1a; &#x1f984; 一、前言 &#x1f984; 二、直方图的概念 &#x1f984; 三、颜色直方图 &#xff08;1&#xff09;、颜色直方图定义 &#xff08;2&#xff09;、颜色直方图使用方法…

线程池中的拒绝策略

线程池中的拒绝策略 什么情况下出发拒绝策略? 当提交任务数大于corePoolSize的时候,会将多余任务缓存在workQueue阻塞队列中当阻塞队列满了,会扩充线程数当扩充线程数大于maximumPoolSize的时候,就会触发拒绝策略 也就是说,当任务数大于workQueue.size() 和maximumPoolSize…

同态随机基加密的量子多方密码-数学公式

众所周知&#xff0c;信息和信息处理的完全量子理论提供了诸多好处&#xff0c;其中包括一种基于基础物理的安全密码学&#xff0c;以及一种实现量子计算机的合理希望&#xff0c;这种计算机可以加速某些数学问题的解决。这些好处来自于独特的量子特性&#xff0c;如叠加、纠缠…

《PyTorch 深度学习实践》第9讲 多分类问题(Kaggle作业:otto分类)

文章目录 1 一些细碎代码1.1 Cross Entropy1.2 Mini-batch: batch_size3 2 示例3 作业任务描述查看数据进行建模提交Kaggle总结 该专栏内容为对该视频的学习记录&#xff1a;【《PyTorch深度学习实践》完结合集】 专栏的全部代码、数据集和课件全放在个人GitHub了&#xff0c;…

分享:作业帮在多云环境下的高可用双活架构优化实践

欢迎访问 OceanBase 官网获取更多信息&#xff1a;https://www.oceanbase.com/ 本文来自OceanBase社区分享&#xff0c;仅限交流探讨。作者介绍&#xff1a;刘强&#xff0c;就职于作业帮基础架构 DBA 团队&#xff0c;负责分布式数据库的探索和使用&#xff0c;协同研发团队在…

node 链接MySql数据库并 进行增删改查

在Navicat中创建数据库创建表 那么就开始吧&#xff01; 一、链接数据库 mysql - npmA node.js driver for mysql. It is written in JavaScript, does not require compiling, and is 100% MIT licensed.. Latest version: 2.18.1, last published: 3 years ago. Start usin…

15.使用组件

目录 1 独立组件 2 私有子组件 3 全局组件 1 独立组件 我当前App.vue的内容是这样的 LEFT.vue的内容是这样的 RIGHT.vue的内容是这样的 那么这个时候我们认为 left.vue&#xff0c;right.vue与App.vue 是彼此独立的三个组件 2 私有子组件 我现在想把LEFT.vue与RIGHT.…

操作系统原理 —— 操作系统运行机制与体系结构(三)

什么是操作系统的指令&#xff1f; 指令就是处理器(CPU)能识别、执行的最基本命令。 比如我们平时写的 Java 代码、C 语言代码&#xff0c;CPU 它能直接识别并且运行吗&#xff1f; 当然是不行的。 Java、C 语言这些都属于高级语言&#xff0c;它们还需要经过一系列的编译最…

再也不怕面试官问:详解Synchronized和Lock的实现原理及使用场景

1、Synchronized与Lock对比 实现方式&#xff1a;Synchronized是Java语言内置的关键字&#xff0c;而Lock是一个Java接口。锁的获取和释放&#xff1a;Synchronized是隐式获取和释放锁&#xff0c;由Java虚拟机自动完成&#xff1b;而Lock需要显式地调用lock()方法获取锁&#…

Moonbeam 操作指南|使用Docker和Systemd在Moonbeam上运行节点

运行全节点允许您存储链的本地副本、验证新的区块、获取对RPC端点的本地访问权限以及配置为创作区块的收集人等。 &#x1f4c4; 查看开发者文档 重点操作如下&#xff1a; 虽然运行&#xff08;和升级&#xff09;您自己的节点需要付出时间和精力&#xff0c;但同时您也会获…

动力节点Vue笔记——第四章Vue与Ajax

四、Vue与AJAX 4.1 回顾发送AJAX异步请求的方式 发送AJAX异步请求的常见方式包括&#xff1a; 原生方式&#xff0c;使用浏览器内置的JS对象XMLHttpRequest const xhr new XMLHttpRequest()xhr.onreadystatechange function(){}xhr.open()xhr.send() 原生方式&#xff0…

【算法题】2583. 二叉树中的第 K 大层和

题目&#xff1a; 给你一棵二叉树的根节点 root 和一个正整数 k 。 树中的 层和 是指 同一层 上节点值的总和。 返回树中第 k 大的层和&#xff08;不一定不同&#xff09;。如果树少于 k 层&#xff0c;则返回 -1 。 注意&#xff0c;如果两个节点与根节点的距离相同&…

Android kotlin 用RecyclerView(androidx+BRVAH3.0.6)实现从底部弹出列表对话框(单选/多选)功能

文章目录 一、实现效果二、引入依赖三、实现源码1、实体类2、适配器单选/多选3、框架弹窗AnyLayer单选/多选3、实现视图一、实现效果 二、引入依赖 在app的build.gradle在添加以下代码 1、框架弹窗AnyLayer(github官网):implementation "com.github.goweii:AnyLayer:4.1…

linux(CentOS 6.5) 安装 Oracle 11g步骤

第一步&#xff1a;准备 1、服务器环境&#xff1a; 系统:CentOS 6.5 (Final) x86_64(Py3.7.8) 2、Navcat&#xff1a; 版本16.0.14 - Premium&#xff08;这里数据库管理工具&#xff0c;不限&#xff0c;可以用其他的&#xff09; 3、FinalShell&#xff1a; 版本3.9.2.2&a…

FE_CSS 页面布局之圆角边框 盒子阴影 文字阴影

1 圆角边框 在 CSS3 中&#xff0c;新增了圆角边框样式&#xff0c;这样我们的盒子就可以变圆角了。border-radius 属性用于设置元素的外边框圆角。 border-radius:length;参数值可以为数值或百分比的形式如果是正方形&#xff0c;想要设置为一个圆&#xff0c;把数值修改为高…

2023-04-15 算法面试中常见的链表问题

2023-04-15 算法面试中常见的链表问题 本章的两个基础类如下 链表的节点类。toString()在debug时实时查看链表很有用 /************************************************************ Description : 链表的节点* author : 梁山广(Liang Shan Guang)* date : 2020…

使用Oracle数据库的java程序员注意:不要再使用generated always as identity了!

Identity Columns是在Oracle版本≥12c中的新特性&#xff1a;自增字段 在自增字段后使用以下2种语句的1种即可完成自增&#xff1a; generated by default as identitygenerated always as identity 在userinfo表的基础上&#xff0c;我们来看下区别&#xff1a; 1、使用ge…

VMware vSphere 8.0 Update 1 正式版发布 - 企业级工作负载平台

ESXi 8.0 U1 & vCenter Server 8.0 U1 请访问原文链接&#xff1a;https://sysin.org/blog/vmware-vsphere-8-u1/&#xff0c;查看最新版。原创作品&#xff0c;转载请保留出处。 作者主页&#xff1a;sysin.org 2023-04-18&#xff0c;VMware vSphere 8.0 Update 1 正式…

V2G模式下含分布式能源网优化运行研究(Matlab代码实现)

&#x1f4a5; &#x1f4a5; &#x1f49e; &#x1f49e; 欢迎来到本博客 ❤️ ❤️ &#x1f4a5; &#x1f4a5; &#x1f3c6; 博主优势&#xff1a; &#x1f31e; &#x1f31e; &#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 …

LAMP架构的配置

一.LAMP概述 1、LAMP的概念 LAMP架构是目前成熟的企业网站应用模式之一&#xff0c;指的是协同工作的一整套系统和相关软件&#xff0c;能够提供动态web站点服务及其应用开发环境 LAMP是一个缩写词&#xff0c;具体包括Linux操作系统、Apache网站服务器、MySQL数据库服务器、…