序言
之前学习了传输,通过前面的学习我们都知道,网络数据的基本单位是字节。JDK中提供了ByteBuffer作为字节的容器,但是过于繁琐复杂,Netty中提供了ByteBuf作为替代品。学习一下。
API
Netty的数据处理API通过两个组件暴露 ------ abstract class byteBuf 和 interface ByteBufHolder。
优点如下:
1.它可以被用户自定义的缓冲区类型拓展。
2.通过内置的复合缓冲区类型实现了透明的零拷贝。
3.容量可以按需增长。
4.切换读写形态不需要调用flip()方法。
5.读和写使用不同的索引。
6.支持方法链式调用。
7.支持引用计数。
8.支持池化。
ByteBuf类 - Netty的数据容器
因为所有的网络通信都设计字节系列的移动,所以高效易用的数据结构是不可少的。Netty的ByteBuf实现满足并超越了这些需求。
HOW TO WORK
ByteBuf维护了两个不同的索引,一个用于读,一个用于写入。当你从ByteBuf中读取的时候。他的readerIndex将会被递增已经被读取的字节数。同样的写入的时候,writerindex也会被递增。read和wirte的索引起始位置都为位置0。
如果打算读取字节直到readerIndex达到和writerIndex同样的值会发送什么。在那是,你将会达到 可以读取的 数据的末尾。类似于数据的索引溢出。
名字以read或者write开头的ByteBuf方法,将会推进器对应的索引,而名称以set或者get开头的操作则不会。后面这些方法只是会以索引为操作来执行想做的操作。
可以指定ByteBuf最大容量。试图移动写索引超过这个值就会触发一个异常!
你完全可以理解为数组,这样更好理解,因为大家对数组比较熟悉,可以借鉴一下其思想
ByteBuf的使用模式
堆缓冲区
最常用的ByteBuf模式是将数据存储在JVM的堆空间中,这种模式被称为 支撑数组,它能在没有使用池化的情况下提供快速的分配和释放。
public static void heapBuffer() {
ByteBuf heapBuf = Unpooled.buffer(1024);; //get reference form somewhere
// check array
if (heapBuf.hasArray()) {
// get the array
byte[] array = heapBuf.array();
int offset = heapBuf.arrayOffset() + heapBuf.readerIndex();
int length = heapBuf.readableBytes();
// doSomething
doSomething(array, offset, length);
}
}
适合有遗留的数据需要处理的请求。
直接缓冲区
直接缓冲区是另外一种ByteBuf模式。我们期望用于对象创建的内存分配永远都来于堆中,但是这并不是必须的-NIO在jdk1.4中引入的ByteBuffer类允许jvm通过本地调用来分配内存。这主要是避免每次在调用本地IO之前将缓冲区里的内容都复制到一个中间缓冲区。
ByteBuf在javadoc中曾指出:直接缓冲区的内容将会被驻留在 会被垃圾回收器回收的堆 之外。说白了,gc干不掉它。如果你的数据包含在一个堆上分配的缓冲区上中,在套接字发送它之前,JVM会将它复制到一个直接缓冲区中。
直接缓冲区的主要缺点是:相对于堆缓冲区,分配和释放比较昂贵,处理遗留代码时如果数据不在堆上,那么还得进行一次复制。
public static void directBuffer() {
ByteBuf directBuf = Unpooled.buffer(1024);; //get reference form somewhere
if (!directBuf.hasArray()){
int length = directBuf.readableBytes();
byte[] bytes = new byte[length];
directBuf.getBytes(directBuf.readerIndex(),bytes);
doSomething(bytes,0,length);
}
}
复合缓冲区
第三种也是最后一种模式使用的是复合缓冲区,它为多个ByteBuf提供一个聚合视图。在这里可以根据需要添加或者删除ByteBuf实例,JDK中的ByteBuffer完全没有这个东西。Netty是通过一个ByteBuf子类 – CompositeByteBuf 为了实现这个模式,它提供了一个将多个缓冲区表示为单个合并缓冲区的虚拟表示。
CompositeByteBuf实例可以同时包含直接内存分配和非直接内存分配。
以http为例:一个由两部分 —头部和主体— 组成的将通过HTTP协议传输的消息。这两部分由应用程序不同的模块产生,将会在消息将要发生的时候再行组装。该应用程序可以选择多个重复相同的消息主体。当这种情况发生的时候,对于每个消息都会创建一个新的头部。
因为我们不想为每个消息重新分配这两个缓冲区,所以使用CompositeByteBuf是一个完美的选择。它在消除了没必要复制的同时,暴露了ByteBuf的API。下面看一段代码,如何通过使用JDK的ByteBuffer来实现这个需求,创建了一个包含两个ByteBuffer数组来保存这些消息组件。同时创建了第三个ByteBuffer来保存所有这些数据的副本。
public static void byteBufComposite() {
CompositeByteBuf messageBuf = Unpooled.compositeBuffer();
ByteBuf headerBuf = Unpooled.buffer(1024); // can be backing or direct
ByteBuf bodyBuf = Unpooled.buffer(1024); // can be backing or direct
messageBuf.addComponents(headerBuf, bodyBuf);
messageBuf.removeComponent(0); // remove the header
for (ByteBuf buf : messageBuf) {
System.out.println(buf.toString());
}
}
Netty使用了CompositeByteBuf来优化套接字的IO操作,尽可能消除由JDK缓冲区实现带来的性能问题和内存的问题,这种优化细节不会暴露。存在于netty核心代码中。
字节级操作
随机访问索引
和Java字节数组一样,ByteBuf的索引是从零开始的:第一个字节的索引是0,最后一个总是capacity() - 1.我们可以像遍历数组一样遍历ByteBuf
public static void main(String[] args) {
byteBufRelativeAccess();
}
public static void byteBufRelativeAccess() {
CompositeByteBuf messageBuf = Unpooled.compositeBuffer();
messageBuf.writeBytes("ok le".getBytes());
for (int b = 0; b < messageBuf.capacity(); b++) {
byte aByte = messageBuf.getByte(b);
System.out.println(aByte);
}
}
顺序访问索引
虽然ByteBuf同时具有读索引和写索引,但是JDK的ByteBuffer却只有一个索引。所以需要flip()方法在两种模式之间进行来回切换的原因。
ByteBuf被两个索引切成三个部门,第一部分是已经读过的部分,可以被丢弃。第二部分自然是没有被读过的字节,可以读取。那么第三部分肯定就是可以写的空间了,可写字节。
可丢弃字节
已丢弃字节的分段包括了已经被读过的字节。通风调用 discardReadBytes()方法,可以丢弃他们并回收空间。这个分段的初始大小为0,存储在readerIndex中,会随着read的造作执行而增加。调用了这个方法之后,可丢弃的空间现在变成可写的了。这个操作只移动了可读的字节和写索引,而没有对所有可写入的字节进行擦除操作。所以这种操作虽然可以扩充写区域,但是内存的复制不可避免,总得移动读字节到开始位置吧,如果不是内存宝贵,我相信不会这么做的。
可读字节
ByteBuf的可读字节分段存储了实际数据。新分配的,包装的或者复制的缓冲区的默认readerIndex值为0.任何名称以read或者skip开头的操作都将检索或者跳过当前可读索引,并且增加已读索引。
如果被调用的方法需要一个ByteBuf参数作为写入的目标。并且没有指定目标索引参数,那么该缓冲区的写索引也会被增加,例如:readBytes(ByteBuf dest)
如果在缓冲区的可读字节数已经耗尽时次中读取数据,那么会引发一个IndexOutOfBoundsException。
如何从读取所有可以读取的字节:
public static void readAllData() {
ByteBuf buffer = Unpooled.compositeBuffer();
while (buffer.isReadable()) {
System.out.println(buffer.readByte());
}
}
可写字节
可写字节分段是指一个拥有未定义内容的、写入就绪的内存区域。新分配的缓冲区的写索引默认值为0。任何名称以write开头的操作都是从当前写操作中开始写数据,并将它增加以及写入的字节数。如果写操作的目标也是ByteBuf,并且没有指定源索引的值,则源缓冲区的readerIndex也同样会被增加相同的大小。调用如下:
writeBytes(ByteBuf dest)。如果我那个目标中写入超过目标容量的数据,将会已发一个索引越界异常。
public static void write() {
// Fills the writable bytes of a buffer with random integers.
ByteBuf buffer =Unpooled.compositeBuffer(); //get reference form somewhere
while (buffer.writableBytes() >= 4) {
buffer.writeInt(1024);
}
}
索引管理
JDK的InputStream定义了mark(int readlimit)和reset()方法,这些方法分别被用来将流中的当前位置指定,以及将流重置到该位置。同样,可以通过调用markReaderIndex()、markWriterIndex()、和resetReaderIndex()来标记和重置ByteBuf的readerIndex和writeIndex。这些和InputStream中的调用别无二致,只是没有什么readlimit来指定标记何时失效。
和数组一样,我们可以通过调用readerIndex(int)或者writerIndex(int)来讲索引移动到指定位置。试图将任何一个索引设置到无效的位置将会导致索引越界异常。可以通过clear()方法将readerIndex和writerIndex都设置为0,但是并不会清理内存。clear()比discardReadBytes()清的多,他只会重置索引而非复制处理内存。
查找操作
在ByteBuf中有多种可以用来确定索引的方法。最简单的是使用IndexOf方法。较复杂的查找可以通过哪些需要一个ByteProcessor作为参数的方法达成。这个接口只定义了一个方法:
boolean process(byte value)
他将检查输入值是否是正在查找的值。
例子:
public static void byteProcessor() {
ByteBuf buffer = Unpooled.compositeBuffer(); //get reference form somewhere
buffer.writeBytes("测测测测测测测测,\r".getBytes());
int index = buffer.forEachByte(ByteProcessor.FIND_CR);
System.out.println(index);
}
查找回车符。
派生缓冲区
派生缓冲区为ByteBuf提供了专门的方式来呈现其内容的视图。这些类视图是通过以下方法被创建的:
duplicate();
slice();
slice(int,int);
Unpooled.unmodifiableBuffer(xxx)
order(ByteOrder)
readSlice(int)
这个方法都会返回一个新的实例,它具有自己的读索引,写索引和标记索引。其存储内部和JDK的ByteBuffer一样,都是共享的。这使得创建一个派生缓存区的代价低廉,但是这也意味着,如果你修改了它的内容,同时也修改了对应的实例。这个类似于 浅拷贝,从某种意义上来说,他们是差不多的从设计层面上和使用层面。但是又可以切片,反正有这个概念就可以。
public static void byteBufSlice() {
Charset utf8 = Charset.forName("UTF-8");
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
ByteBuf sliced = buf.slice(0, 15);
System.out.println(sliced.toString(utf8));
buf.setByte(0, (byte)'J');
System.out.println(buf.toString(utf8));
System.out.println(sliced.toString(utf8));
assert buf.getByte(0) == sliced.getByte(0);
}
你会发现改了sliced,原来的buf也会改变,不妨debuf跑一下。
再对比一下copy,我刚刚以浅拷贝为例,这个就像深拷贝。
public static void byteBufCopy() {
Charset utf8 = Charset.forName("UTF-8");
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
ByteBuf copy = buf.copy(0, 15);
System.out.println(copy.toString(utf8));
buf.setByte(0, (byte)'J');
System.out.println(buf.toString(utf8));
System.out.println(copy.toString(utf8));
assert buf.getByte(0) != copy.getByte(0);
}
可以自行debug看结果不同,我举深浅拷贝的例子,就可以很好理解了。
深拷贝: User id User1 id1 id改 id1不变
浅拷贝: User id User1 id1 id改 id1变
读/写操作
前面曾经提到,有两种类别的读写操作:
set / get 索引不变,从给定的索引开始
read / write 从给定的索引开始,索引变化。
API就不放出来了。直接放个书上的案例:
get 和 set
public static void byteBufSetGet() {
Charset utf8 = StandardCharsets.UTF_8;
ByteBuf byteBuf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
char char1 = (char) byteBuf.getByte(0);
System.out.println(char1);
int readerIndex = byteBuf.readerIndex();
int writerIndex = byteBuf.writerIndex();
System.out.println(readerIndex);
System.out.println(writerIndex);
byteBuf.setByte(0, (byte) 'W');
System.out.println((char) byteBuf.getByte(0));
readerIndex = byteBuf.readerIndex();
writerIndex = byteBuf.writerIndex();
System.out.println(readerIndex);
System.out.println(writerIndex);
}
read 和 write
public static void byteBufWriteRead() {
Charset utf8 = StandardCharsets.UTF_8;
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
System.out.println((char)buf.readByte());
int readerIndex = buf.readerIndex();
System.out.println(readerIndex);
int writerIndex = buf.writerIndex();
System.out.println(writerIndex);
buf.writeByte((byte)'?');
System.out.println((char)buf.readByte());
readerIndex = buf.readerIndex();
writerIndex = buf.writerIndex();
System.out.println(readerIndex);
System.out.println(writerIndex);
System.out.println(buf.toString(utf8));
}
每次read之后,索引后移,前面数据就是废物数据了。自行debuf,一看便知。还有很多方法,使用方法自行查看API
ByteBufHolder接口
在平时开发中不难发现,处理实际的数据负载以外,我们还需要存储各种属性值。HTTP响应便是一个很好的例子,有什么Cookie,状态码之类的额外的东西。
为了处理这种常用的用例,Netty提供了ByteBufHolder接口。ByteBufHolder也为Netty的高级特性提供了支持,如缓冲区池化,和所有的池一样,从池中拿,自行释放。ByteBufHolder只有几种用于访问底层数据和引用技术的方法。
content() 返回这个ByteBufHolder所持的ByteBuf
copy() 返回ByteBufHolder的一个深拷贝,其中带一个copy副本
duplicate() 返回ByteBufHolder的一个浅拷贝,其中带一个共享副本
如果要实现一个将其有效负载储存在ByteBuf中的消息对象。可以使用ByteBufHolder。
ByteBuf分配
按需分配:ByteBufAllocator接口
为了降低分配和释放内存的开销。通过ByteBufAllocator实现了(ByteBuf)的池化。它可以用来分配我们所描述过得任何类型的ByteBuf。使用池化特定于应用程序的决定,它不会改变ByteBuf的API。
ByteBufAllocator的方法有
buffer()返回一个基于堆或者直接内存的ByteBuf
heapBuffer()返回一个基于堆内存存储的ByteBuf
directBuffer()返回一个基于直接内存的ByteByf
compostiteBuffer()返回一个可以通过添加最大到指定数目的基于堆或者直接内存存储的缓冲区来拓展的compostiteByteBuf
toBuffer()返回一个用于套接字的IO操作的ByteBuf
可以通过Channel或者绑定到ChannelHandler的ChannelHandlerContext获取到一个到ByteBufAllocator的引用。
public static void obtainingByteBufAllocatorReference(){
Channel channel = ...; //get reference form somewhere
ByteBufAllocator allocator = channel.alloc();
ChannelHandlerContext ctx = ...; //get reference form somewhere
ByteBufAllocator allocator2 = ctx.alloc();
}
差不多就这样。Netty实现了两种ByteBufAllocator的实现,PooledByteBufAllocator和UnpooledByteBufAllocator。前者池化了ByteBuf的实例以提升性能并最大限度的减少内存碎片。而后者不池化,每次都会返回一个新的实例,和连接池还有线程池别无二例,池化就进池,不池化自己管理。当然了,Netty的实现是默认池化的。
Unpooled缓冲区
有时候,你无法获得一个ByteBufAllocator的引用,那么,Netty给我们提供了一个小工具类叫做:Unpooled,它提供了静态的辅助方法,来获取没有池化的ByteByf实例。方法有如下几个:
buffer() 返回一个没有池化的基于堆内存存储的ByteBuf
directBuffer() 返回一个未池化的基于直接内存存储的ByteBuf
wrappedBuffer() 返回一个包装了给定数据的ByteBuf
copiedBuffer() 返回了一个复制了给定数据的ByteBuf
我们案例中经常使用的代码你一定非常熟悉。
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
Unpooled类还能使得ByteBuf同样可以用于哪些并不需要netty介入的非网络项目,使得其能得益于高性能可扩展的缓冲区API。(后续考虑将其引入naruku)。
ByteBufUtil类
看名字就知道是个缓冲区的工具类。这个API是同样的,并且和池化无关。
这些方法中有个叫做 hexdump()的方法,它以十六进制的表现形式打印ByteBuf里的内容。这很有用,他可以轻易地转换回来实际的字节表示。
还有equals方法,可以比较两个ByteBuf实例的相等性。如果需要实现自己的ByteBuf子类,可能会发现其他用途。
引用计数
引用计数是通过正某个对象所持有的资源不再被其他对象引用时释放该对象所持有的资源来优化内存使用和性能的技术,在netty 第四版中引入到了ByteBuf和ByteBufHolder中,他们均实现了ReferenceCounted接口
public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf> {
// ...
}
public interface ReferenceCounted {
/**
* Returns the reference count of this object. If {@code 0}, it means this object has been deallocated.
*/
int refCnt();
/**
* Increases the reference count by {@code 1}.
*/
ReferenceCounted retain();
/**
* Increases the reference count by the specified {@code increment}.
*/
ReferenceCounted retain(int increment);
/**
* Records the current access location of this object for debugging purposes.
* If this object is determined to be leaked, the information recorded by this operation will be provided to you
* via {@link ResourceLeakDetector}. This method is a shortcut to {@link #touch(Object) touch(null)}.
*/
ReferenceCounted touch();
/**
* Records the current access location of this object with an additional arbitrary information for debugging
* purposes. If this object is determined to be leaked, the information recorded by this operation will be
* provided to you via {@link ResourceLeakDetector}.
*/
ReferenceCounted touch(Object hint);
/**
* Decreases the reference count by {@code 1} and deallocates this object if the reference count reaches at
* {@code 0}.
*
* @return {@code true} if and only if the reference count became {@code 0} and this object has been deallocated
*/
boolean release();
/**
* Decreases the reference count by the specified {@code decrement} and deallocates this object if the reference
* count reaches at {@code 0}.
*
* @return {@code true} if and only if the reference count became {@code 0} and this object has been deallocated
*/
boolean release(int decrement);
}
其实通过名字计数就知道这个东西不会太复杂,它主要涉及跟踪某发哥特定对象活动引用的数量。一个ReferenceCounted 实现的实例将会通常以活动的引用计数1作为开始,只要引用数大于0。就能保证对象不会被释放。减少到0的时候,就会释放,释放的对象就没有办法使用了。这个小玩意对于池化技术来说,至关重要。
我也很好奇这东西是怎么实现的,我曾经自诩为JAVA性能开发工程师,自吹自擂说任何JAVA技术,我都能追其源码,这个东西不知道的话,我面子挂不住,我决定探险,去看看他的源码。顺便看看 Unpooled.copiedBuffer的实现,我决定用断点去一探究竟。前方高能!!!
开始
Charset utf8 = StandardCharsets.UTF_8;
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
刚进来,就看到了想看到的,我们追过去看看。release应该表示是一个是否释放的状态。 ByteBuf buffer = ALLOC.heapBuffer(ByteBufUtil.utf8Bytes(string));应该表示是一个基于堆的一个缓冲区。里面的逻辑我不看,我目的是看释放。是怎么释放的
说实话这个方法具体干嘛我不知道,但是他肯定是在获取引用次数,如果这个数是2的话,那么就会尝试这个方法。
这个看懂了,cas操作嘛,把这个2换成1。如果这个失败了,就执行这个方法
private boolean retryRelease0(T instance, int decrement) {
for (;;) {
int rawCnt = updater().get(instance), realCnt = toLiveRealRefCnt(rawCnt, decrement);
if (decrement == realCnt) {
if (tryFinalRelease0(instance, rawCnt)) {
return true;
}
} else if (decrement < realCnt) {
// all changes to the raw count are 2x the "real" change
if (updater().compareAndSet(instance, rawCnt, rawCnt - (decrement << 1))) {
return false;
}
} else {
throw new IllegalReferenceCountException(realCnt, -decrement);
}
Thread.yield(); // this benefits throughput under high contention
}
}
这段代码的意思是:如果真实计数是1,回到tryFinalRelease0,返回true,如果真实计数大于1,那么减1,就不能释放,如果小于1,就报错。如果不是2。进入nonFinalRelease0方法。
private boolean nonFinalRelease0(T instance, int decrement, int rawCnt, int realCnt) {
if (decrement < realCnt
// all changes to the raw count are 2x the "real" change - overflow is OK
&& updater().compareAndSet(instance, rawCnt, rawCnt - (decrement << 1))) {
return false;
}
return retryRelease0(instance, decrement);
}
意思是更新引用计数,失败调用retryRelease0方法。
这释放连起来就是用nonVolatileRawCnt获得引用计数,然后判断引用计数是否是2或者减的值就是真实引用计数值,是的话就可以尝试直接设置的方法tryFinalRelease0,如果失败会去尝试释放方法retryRelease0,这个是自旋,直到成功为止。如果不是的话就普通的引用计数器值的修改即可nonFinalRelease0。
再往下跑就是
private boolean handleRelease(boolean result) {
if (result) {
deallocate();
}
return result;
}
如果可以释放调用deallocate方法,进去看看
将Long(Heap)置为负数之后,直接释放内存。
这就是释放,那么增加引用,还有什么其他的逻辑应该都差不多,每次看到一些有趣的或者设计到工作内容的代码,我都忍不住进去看看源码。看大神是怎么实现的。以后我们遇到了,能不能用一样的手法,这就是阅读源码的附赠品,获得power是主产品。这个power是非常重要的,是关乎职业进阶的命脉,power够了,则早晚引发质变的。
回来,一个ReferenceCounted 我们不用关心当前引用值,只需在我们想释放的时候,让所有活动都失效就足够了。
结束语
这一把学习了ByteBuf容器,知道了优于JDK的地方,还搞了一些变体,并且何时去使用的例子,我们知道了
使用不同的读索引和写索引控制数据访问,
使用内存的不同方式 基于 heap 和 direct
通过CompositeByteBuf生成多个ByteBuf的聚合视图
数据访问方法,切片,搜索和复制
读写和设置API
ByteBufAllocate池化和引用计数
从源码上走了一下释放,满足了一下求知欲。