文章目录
- 概述
- ByteBuf 的特点
- ByteBuf的组成
- ByteBuf 的生命周期
- ByteBuf 相关api
- 1. ByteBuf 的创建
- 2. 直接内存 vs 堆内存
- 3. 池化 vs 非池化
- 4. ByteBuf写入
- 代码示例
- 5. ByteBuffer扩容
- 6. ByteBuf 读取
- 7. retain() & release()
- TailContext 释放未处理消息逻辑
- HeadContext
- 8. ByteBuf.slice()
- 调用有参的slice()
- 调用无参的ByteBuf.slice()
- 9. ByteBuf.duplicate()
- duplicate() 方法的特点
- 代码示例
- 10. ByteBuf.copy()
- copy() 方法的特点
- 代码示例
- 11. CompositeByteBuf
- 12. Unpooled
- 总结ByteBuf
概述
在Netty中,ByteBuf 是一个非常重要的组件,它用于处理二进制数据。ByteBuf 是 Netty 设计的一种高效的内存模型,用于替代传统的 byte[] 数组和 ByteBuffer 类型,旨在提高性能并减少垃圾回收的开销。
ByteBuf 的特点
- 内存管理:
ByteBuf 提供了对内存的细粒度控制,可以有效地管理分配和释放内存。
它使用了池化技术(PooledByteBufAllocator),可以复用 ByteBuf 实例,减少垃圾回收的压力。节约内存,减少内存溢出的可能。 - 高效操作:
ByteBuf 提供了许多方法来高效地读写数据,包括直接操作内存的方法,减少了不必要的数据拷贝。
支持多种操作方式,如读取、写入、切片等,使开发者可以灵活地处理数据。
很多地方体现零拷贝(减少内存复制,提高性能),例如 slice、duplicate、CompositeByteBuf - 跨平台兼容:
ByteBuf 设计时考虑到了跨平台的兼容性,能够在不同的操作系统和硬件上高效运行。 - 安全性:
ByteBuf 提供了安全的 API,可以防止越界读写操作,提高了代码的健壮性。
ByteBuf的组成
ByteBuf 由四部分组成
最开始读写指针都在 0 位置。
ByteBuf最大容量为整数的最大值,也就是20亿。
ByteBuf有读指针和写指针,一开始读、写指针都是在下标为0位置,写入数据时写指针向后移动,读取数据时读指针向后移动,开头到读指针的区域为废弃字节。
ByteBuf 的生命周期
- 分配:通过 ByteBufAllocator 创建 ByteBuf。
- 使用:读写数据,处理事件。
- 释放:使用完毕后,通过 release() 方法释放 ByteBuf。
ByteBuf 相关api
1. ByteBuf 的创建
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump;
import static io.netty.util.internal.StringUtil.NEWLINE;
public class TestByteBuf {
public static void main(String[] args) {
// ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
// System.out.println(buf);//PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 256)
// StringBuilder sb = new StringBuilder();
// for (int i = 0; i < 300; i++) {
// sb.append("a");
// }
// buf.writeBytes(sb.toString().getBytes());
// System.out.println(buf);//PooledUnsafeDirectByteBuf(ridx: 0, widx: 300, cap: 512)
// ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
//class io.netty.buffer.PooledUnsafeHeapByteBuf 采用池化的堆内存
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
//class io.netty.buffer.PooledUnsafeDirectByteBuf 采用池化的直接内存
System.out.println(buf.getClass());
System.out.println(buf.maxCapacity());
log(buf);
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 32; i++) {
sb.append("a");
}
buf.writeBytes(sb.toString().getBytes());
log(buf);
}
public static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder buf = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index:").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
appendPrettyHexDump(buf, buffer);
System.out.println(buf.toString());
}
}
实际开发中通常会在ChannelInboundHandlerAdapter的channelRead()中,所以建议使用 ctx.alloc() 创建 ByteBuf。
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buffer = (ByteBuf) msg;
System.out.println(buffer.toString(Charset.defaultCharset()));
// 建议使用 ctx.alloc() 创建 ByteBuf
ByteBuf response = ctx.alloc().buffer();
response.writeBytes(buffer);
ctx.writeAndFlush(response);
}
});
}
}).bind(8080);
2. 直接内存 vs 堆内存
可以使用下面的代码来创建池化基于堆的 ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);
也可以使用下面的代码来创建池化基于直接内存的 ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);
- 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用
- 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放
直接内存使用的是系统内存(减少内存复制),直接内存的读写效率高于堆内存
减少内存复制:磁盘读取文件时,可以将数据读入系统内存,系统内存通过直接内存的方式映射到java内存中
3. 池化 vs 非池化
池化的最大意义在于可以重用 ByteBuf,优点有
- 没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力
- 有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
- 高并发时,池化功能更节约内存,减少内存溢出的可能
池化功能是否开启,可以通过下面的系统环境变量来设置
-Dio.netty.allocator.type={unpooled|pooled}
- 4.1 以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现
- 4.1 之前,池化功能还不成熟,默认是非池化实现
池化类似数据库连接池
在运行设置中配置是否使用池化功能(也可以配置到环境变量中)
public static void main(String[] args) {
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
System.out.println(buf.getClass());
}
输出:
class io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf
4. ByteBuf写入
写入 byte 类型:
buffer.writeByte(byte value);
写入 short 类型:
buffer.writeShort(short value);
写入 int 类型:
buffer.writeInt(int value);
写入 long 类型:
buffer.writeLong(long value);
写入 float 类型:
buffer.writeFloat(float value);
写入 double 类型:
buffer.writeDouble(double value);
写入字符串(指定字符集):
buffer.writeCharSequence(CharSequence sequence, Charset charset);
此方法将字符串转换为字节序列,并按照指定的字符集编码写入缓冲区。
写入 byte[] 数组:
buffer.writeBytes(byte[] src);
buffer.writeBytes(byte[] src, int offset, int length);
这些方法用于将一个字节数组的一部分或全部写入缓冲区。
写入 ByteBuffer:
buffer.writeBytes(ByteBuffer src);
此方法用于将一个 ByteBuffer 的内容写入缓冲区。
写入 CharSequence:
buffer.writeCharSequence(CharSequence sequence, Charset charset);
此方法用于将 CharSequence 对象按照指定的字符集编码后写入缓冲区。
写入另一个 ByteBuf:
buffer.writeBytes(ByteBuf src);
buffer.writeBytes(ByteBuf src, int length);
buffer.writeBytes(ByteBuf src, int index, int length);
这些方法用于将另一个 ByteBuf 的内容写入当前 ByteBuf。
写入其他对象
buffer.writeObject(Object obj);
此方法用于写入一个实现了 Externalizable 接口的对象。请注意,此方法不是 ByteBuf 的标准方法,而是依赖于序列化机制,因此在使用时需要注意兼容性和效率问题。
代码示例
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
//先写入 4 个字节
buffer.writeBytes(new byte[]{1, 2, 3, 4});
log(buffer);
System.out.println();
System.out.println("---------------------------------------------------");
System.out.println();
//再写入一个 int 整数,也是 4 个字节
buffer.writeInt(5);
log(buffer);
}
public static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder buf = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index:").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
appendPrettyHexDump(buf, buffer);
System.out.println(buf.toString());
}
输出:
read index:0 write index:4 capacity:10
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 |.... |
+--------+-------------------------------------------------+----------------+
---------------------------------------------------
read index:0 write index:8 capacity:10
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 |........ |
+--------+-------------------------------------------------+----------------+
5. ByteBuffer扩容
以上代码中再写入一个 int 整数时,容量不够了(初始容量是 10),这时会引发扩容
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
//先写入 4 个字节
buffer.writeBytes(new byte[]{1, 2, 3, 4});
log(buffer);
System.out.println();
System.out.println("---------------------------------------------------");
System.out.println();
//再写入一个 int 整数,也是 4 个字节
buffer.writeInt(5);
log(buffer);
System.out.println("---------------------------------------------------");
buffer.writeInt(6);
log(buffer);
}
输出
read index:0 write index:4 capacity:10
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 |.... |
+--------+-------------------------------------------------+----------------+
---------------------------------------------------
read index:0 write index:8 capacity:10
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 |........ |
+--------+-------------------------------------------------+----------------+
---------------------------------------------------
read index:0 write index:12 capacity:64
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 00 00 00 06 |............ |
+--------+-------------------------------------------------+----------------+
扩容规则是
- 如何写入后数据大小未超过 512,则选择下一个 16 的整数倍,例如写入后大小为 12 ,则扩容后 capacity 是 16
- 如果写入后数据大小超过 512,则选择下一个 2^n,例如写入后大小为 513,则扩容后 capacity 是 210=1024(29=512 已经不够了)
- 扩容不能超过 max capacity 会报错
6. ByteBuf 读取
例如读了 4 次,每次一个字节
public static void main(String[] args) {
// ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
// System.out.println(buf.getClass());
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
//先写入 4 个字节
buffer.writeBytes(new byte[]{1, 2, 3, 4});
log(buffer);
System.out.println();
System.out.println("---------------------------------------------------");
System.out.println();
//再写入一个 int 整数,也是 4 个字节
buffer.writeInt(5);
log(buffer);
System.out.println("---------------------------------------------------");
buffer.writeInt(6);
log(buffer);
System.out.println("读取");
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
//读指针移动
log(buffer);
}
读过的内容,就属于废弃部分了,再读只能读那些尚未读取的部分
读取
1
2
3
4
read index:4 write index:12 capacity:64
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05 00 00 00 06 |........ |
+--------+-------------------------------------------------+----------------+
如果需要重复读取 int 整数 5,怎么办?
可以在 read 前先做个标记 mark
System.out.println("重复读取");
buffer.markReaderIndex();
System.out.println(buffer.readInt());//读取4个字节
log(buffer);
System.out.println("重置到标记位置 reset");
buffer.resetReaderIndex();
log(buffer);
输出
重复读取
5
read index:8 write index:12 capacity:64
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 06 |.... |
+--------+-------------------------------------------------+----------------+
重置到标记位置 reset
read index:4 write index:12 capacity:64
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05 00 00 00 06 |........ |
+--------+-------------------------------------------------+----------------+
还有种办法是采用 get 开头的一系列方法,这些方法不会改变 read index(读指针)
7. retain() & release()
由于 Netty 中有堆外内存的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。
- UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
- UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
- PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存
回收内存的源码实现,请关注下面方法的不同实现protected abstract void deallocate()
Netty 这里采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口
- 每个 ByteBuf 对象的初始计数为 1
- 调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收
- 调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收
- 当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用
谁来负责 release 呢?
一般情况下
ByteBuf buf = ...
try {
...
} finally {
buf.release();
}
因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在 finally 中 release 了,就失去了传递性(当然,如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命,那么便无须再传递)
基本规则是,谁是最后使用者,谁负责 release,详细分析如下
- 起点,对于 NIO 实现来讲,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read 方法中首次创建 ByteBuf 放入 pipeline(pipeline.fireChannelRead(byteBuf))
- 入站 ByteBuf 处理原则
- 对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead(msg) 向后传递,这时无须 release
- 将原始 ByteBuf 转换为其它类型的 Java 对象,这时 ByteBuf 就没用了,必须 release
- 如果不调用 ctx.fireChannelRead(msg) 向后传递,那么也必须 release
- 注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release
- 假设消息一直向后传,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)
- 出站 ByteBuf 处理原则
- 出站消息最终都会转为 ByteBuf 输出,一直向前传,由 HeadContext flush 后 release
- 异常处理原则
- 有时候不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true
TailContext 释放未处理消息逻辑
在Pipeline中,消息从head()到入站处理器一直传到tail,tail的实现类就是TailContext类。而tail因为要收尾所以也要拿到入站消息,因此也实现了ChannelInboundHandler接口。
以下为TailContext的相关源码
io.netty.channel.DefaultChannelPipeline.TailContext#channelRead
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
...
//和入站处理器一样,观察channelRead()
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
onUnhandledInboundMessage(ctx, msg);
}
...
}
↓
io.netty.channel.DefaultChannelPipeline#onUnhandledInboundMessage(io.netty.channel.ChannelHandlerContext, java.lang.Object)
protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
onUnhandledInboundMessage(msg);
//下面是日志,可以忽略
if (logger.isDebugEnabled()) {
logger.debug("Discarded message pipeline : {}. Channel : {}.",
ctx.pipeline().names(), ctx.channel());
}
}
↓
io.netty.channel.DefaultChannelPipeline#onUnhandledInboundMessage(java.lang.Object)
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
↓
io.netty.util.ReferenceCountUtil#release(java.lang.Object)
public static boolean release(Object msg) {
//判断是否为ByteBuf,是ByteBuf才会释放
if (msg instanceof ReferenceCounted) {
//释放消息
return ((ReferenceCounted) msg).release();
}
return false;
}
HeadContext
而head则处理入站消息,消息向后传。同时head也会作为出站处理器,出战的消息从tail一直传给head
io.netty.channel.DefaultChannelPipeline.HeadContext#write
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
...
//和出站处理器一样,观察write()
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
unsafe.write(msg, promise);
}
...
}
↓
io.netty.channel.AbstractChannel.AbstractUnsafe#write
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
//出站缓冲区
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise, newClosedChannelException(initialCloseCause));
// release message now to prevent resource-leak
//释放
ReferenceCountUtil.release(msg);
return;
}
...
}
↓
io.netty.util.ReferenceCountUtil#release(java.lang.Object)
public static boolean release(Object msg) {
if (msg instanceof ReferenceCounted) {
return ((ReferenceCounted) msg).release();
}
return false;
}
8. ByteBuf.slice()
【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write 指针。
例,原始 ByteBuf 进行一些初始操作
调用有参的slice()
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
public class Test03Slice {
public static void main(String[] args) {
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
buf.writeBytes(new byte[]{'a','b','c','d','e','f','g','h','i','j'});
log(buf);
// 在切片过程中,没有发生数据复制
ByteBuf f1 = buf.slice(0, 5);
f1.retain();//ByteBuf.retain()方法将引用计数器加1,表示该ByteBuf被另一个对象所持有
// 'a','b','c','d','e', 'x'
ByteBuf f2 = buf.slice(5, 5);
f2.retain();//作用:让切片自己去做释放操作,不让原始的ByteBuf释放影响切片
log(f1);
log(f2);
// f1.writeByte('x');//f1无法写入,因为会跟f2冲突
System.out.println("释放原有 byteBuf 内存");
buf.release();
log(f1);
System.out.println("========================");
f1.setByte(0, 'b');//允许替换
log(f1);//bbcde
log(buf);//buf中也替换为bbcdefghij
f1.release();//切片自己释放ByteBuf
f2.release();
}
public static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder buf = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index:").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
appendPrettyHexDump(buf, buffer);
System.out.println(buf.toString());
}
}
输出:
read index:0 write index:10 capacity:10
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 62 63 64 65 66 67 68 69 6a |abcdefghij |
+--------+-------------------------------------------------+----------------+
read index:0 write index:5 capacity:5
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 62 63 64 65 |abcde |
+--------+-------------------------------------------------+----------------+
read index:0 write index:5 capacity:5
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 66 67 68 69 6a |fghij |
+--------+-------------------------------------------------+----------------+
释放原有 byteBuf 内存
read index:0 write index:5 capacity:5
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 62 63 64 65 |abcde |
+--------+-------------------------------------------------+----------------+
========================
read index:0 write index:5 capacity:5
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 62 62 63 64 65 |bbcde |
+--------+-------------------------------------------------+----------------+
read index:0 write index:10 capacity:10
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 62 62 63 64 65 66 67 68 69 6a |bbcdefghij |
+--------+-------------------------------------------------+----------------+
调用无参的ByteBuf.slice()
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
public class Test03Slice2 {
public static void main(String[] args) {
ByteBuf origin = ByteBufAllocator.DEFAULT.buffer(10);
origin.writeBytes(new byte[]{1, 2, 3, 4});
origin.readByte();
System.out.println(ByteBufUtil.prettyHexDump(origin));
System.out.println();
System.out.println("--------------------------------------------------------------");
System.out.println();
/**
* 这时调用 slice 进行切片,无参 slice 是从原始 ByteBuf 的 read index 到 write index
* 之间的内容进行切片,切片后的 max capacity 被固定为这个区间的大小,因此不能追加 write
*/
System.out.println("origin.slice()");
ByteBuf slice = origin.slice();
System.out.println(ByteBufUtil.prettyHexDump(slice));
// slice.writeByte(5); 如果执行,会报 IndexOutOfBoundsException 异常
System.out.println();
System.out.println("--------------------------------------------------------------");
System.out.println();
System.out.println("origin.readByte()");
//原始 ByteBuf 再次读操作(又读了一个字节)
origin.readByte();
System.out.println(ByteBufUtil.prettyHexDump(origin));
System.out.println();
System.out.println("--------------------------------------------------------------");
System.out.println();
System.out.println("slice不受影响,因为它有独立的读写指针");
//注意这时的 slice 不受影响,因为它有独立的读写指针
System.out.println(ByteBufUtil.prettyHexDump(slice));
System.out.println();
System.out.println("--------------------------------------------------------------");
System.out.println();
System.out.println("slice发生变化");
//如果 slice 的内容发生了更改
slice.setByte(2, 5);
System.out.println(ByteBufUtil.prettyHexDump(slice));
System.out.println();
System.out.println("--------------------------------------------------------------");
System.out.println();
System.out.println("原始 ByteBuf 也会受影响");
//这时,原始 ByteBuf 也会受影响,因为底层都是同一块内存
System.out.println(ByteBufUtil.prettyHexDump(origin));
}
}
输出:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 04 |... |
+--------+-------------------------------------------------+----------------+
--------------------------------------------------------------
origin.slice()
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 04 |... |
+--------+-------------------------------------------------+----------------+
--------------------------------------------------------------
origin.readByte()
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 03 04 |.. |
+--------+-------------------------------------------------+----------------+
--------------------------------------------------------------
slice不受影响,因为它有独立的读写指针
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 04 |... |
+--------+-------------------------------------------------+----------------+
--------------------------------------------------------------
slice发生变化
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 05 |... |
+--------+-------------------------------------------------+----------------+
--------------------------------------------------------------
原始 ByteBuf 也会受影响
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 03 05 |.. |
+--------+-------------------------------------------------+----------------+
9. ByteBuf.duplicate()
【零拷贝】的体现之一,就好比截取了原始 ByteBuf 所有内容,并且没有 max capacity 的限制,也是与原始 ByteBuf 使用同一块底层内存,只是读写指针是独立的
duplicate() 方法的特点
- 共享内容:duplicate() 创建的新 ByteBuf 与原 ByteBuf 共享相同的数据区域,这意味着修改其中一个 ByteBuf 的内容会影响另一个 ByteBuf。
- 独立指针:新创建的 ByteBuf 有自己的读写指针,这意味着你可以独立地操作这两个 ByteBuf 的读写位置,而不影响另一个。
- 引用计数不变:duplicate() 方法不会增加 ByteBuf 的引用计数,所以释放任何一个 ByteBuf 都会导致原始数据的释放。
代码示例
public static void main(String[] args) {
// 创建一个 ByteBuf
ByteBuf originalBuf = Unpooled.buffer(1024);
// 写入一些数据
originalBuf.writeBytes(new byte[]{0x01, 0x02, 0x03, 0x04});
// 创建一个副本
ByteBuf duplicateBuf = originalBuf.duplicate();
// 修改原始 ByteBuf 的内容
originalBuf.setByte(0, (byte) 0xAA);
// 输出两个 ByteBuf 的内容
System.out.println(ByteBufUtil.prettyHexDump(originalBuf));
System.out.println(ByteBufUtil.prettyHexDump(duplicateBuf));
// 释放 ByteBuf
originalBuf.release();
}
输出
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| aa 02 03 04 |.... |
+--------+-------------------------------------------------+----------------+
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| aa 02 03 04 |.... |
+--------+-------------------------------------------------+----------------+
10. ByteBuf.copy()
copy() 方法,用于创建当前 ByteBuf 的一个副本。这个副本包含从当前的 readerIndex 到 writerIndex 之间的所有数据的一个新拷贝,而不是像 duplicate() 方法那样共享原有 ByteBuf 的数据。因此,copy() 方法创建的是一个全新的 ByteBuf,与原 ByteBuf 不共享数据,修改一个 ByteBuf 的内容不会影响另一个。
会将底层内存数据进行深拷贝,因此无论读写,都与原始 ByteBuf 无关。
copy() 方法的特点
- 独立的数据副本:copy() 方法创建的新 ByteBuf 包含了从 readerIndex 到 writerIndex 之间的所有数据的一个新拷贝,这意味着修改其中一个 ByteBuf 的内容不会影响另一个 ByteBuf。
- 独立的读写指针:新创建的 ByteBuf 有自己的读写指针(readerIndex 和 writerIndex),因此可以独立地操作这两个 ByteBuf 的读写位置。
- 引用计数不变:copy() 方法不会增加 ByteBuf 的引用计数,因为它是创建了一个新的 ByteBuf。
- 新的内存分配:copy() 方法会创建一个新的内存分配,用于存放从原 ByteBuf 中复制的数据。
代码示例
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
public class ByteBufCopyExample {
public static void main(String[] args) {
// 创建一个 ByteBuf
ByteBuf originalBuf = Unpooled.buffer(1024);
// 写入一些数据
originalBuf.writeBytes(new byte[]{0x01, 0x02, 0x03, 0x04});
// 设置读写指针
originalBuf.readerIndex(0);
originalBuf.writerIndex(4);
// 创建一个副本
ByteBuf copyBuf = originalBuf.copy();
System.out.println("Original ByteBuf content: " + originalBuf.toString());//ByteBuf(ridx: 0, widx: 4, cap: 1024)
System.out.println("Copy ByteBuf content : " + copyBuf.toString()); //ByteBuf(ridx: 0, widx: 4, cap: 4)
// 修改原始 ByteBuf 的内容
originalBuf.setByte(0, (byte) 0xAA);
// 输出两个 ByteBuf 的内容
System.out.println(ByteBufUtil.prettyHexDump(originalBuf));// aa 02 03 04
System.out.println(ByteBufUtil.prettyHexDump(copyBuf));// 01 02 03 04
// 读取数据
byte originalByte = originalBuf.readByte();
byte copyByte = copyBuf.readByte();
System.out.println("Original Byte read: " + originalByte);
System.out.println("Copy Byte read: " + copyByte);
// 释放 ByteBuf
originalBuf.release();
copyBuf.release();
}
}
输出
Original ByteBuf content: UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 4, cap: 1024)
Copy ByteBuf content : UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 4, cap: 4)
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| aa 02 03 04 |.... |
+--------+-------------------------------------------------+----------------+
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 |.... |
+--------+-------------------------------------------------+----------------+
Original Byte read: -86
Copy Byte read: 1
11. CompositeByteBuf
【零拷贝】的体现之一,可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf,避免拷贝
示例如下
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump;
import static io.netty.util.internal.StringUtil.NEWLINE;
/**
* 2.36
*/
public class Test04CompositeByteBuf {
public static void main(String[] args) {
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer();
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer();
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
//将多个 ByteBuf 合并为一个的 ByteBuf
//方法1:这种方法不太好,因为进行了数据的内存复制操作
// ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// buffer.writeBytes(buf1).writeBytes(buf2);
// log(buffer);
//方法2:可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf,避免拷贝
CompositeByteBuf buffer1 = ByteBufAllocator.DEFAULT.compositeBuffer();
buffer1.addComponents( buf1, buf2);
//注意:addComponent()和addComponents()不带increaseWriterIndex参数不会自动改变读写指针
log(buffer1);
//read index:0 write index:0 capacity:10
/**
不带参数:默认情况下,addComponents() 不会自动更新 buf1 和 buf2 的读写指针。
结果:buffer1 的读指针为 0,写指针也为 0,容量为 10。这意味着 buf1 和 buf2 的数据虽然被合并,但它们的读写指针没有变化。
*/
CompositeByteBuf buffer2 = ByteBufAllocator.DEFAULT.compositeBuffer();
buffer2.addComponents(true, buf1, buf2);
log(buffer2);
//read index:0 write index:10 capacity:10
/**
带参数 true:表示在添加组件时更新 buf1 和 buf2 的写指针到最大值。
结果:buffer2 的读指针为 0,写指针为 10,容量为 10。这意味着 buf1 和 buf2 的数据被合并,并且它们的写指针被更新到了各自的最大值。
总结:
buffer1:读写指针未更新。
buffer2:读写指针已更新,便于后续连续读取
*/
}
public static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder buf = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index:").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
appendPrettyHexDump(buf, buffer);
System.out.println(buf.toString());
}
}
输出
read index:0 write index:0 capacity:10
read index:0 write index:10 capacity:10
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a |.......... |
+--------+-------------------------------------------------+----------------+
CompositeByteBuf 是一个组合的 ByteBuf,它内部维护了一个 Component 数组,每个 Component 管理一个 ByteBuf,记录了这个 ByteBuf 相对于整体偏移量等信息,代表着整体中某一段的数据。
- 优点,对外是一个虚拟视图,组合这些 ByteBuf 不会产生内存复制
- 缺点,复杂了很多,多次操作会带来性能的损耗
12. Unpooled
Unpooled 是一个工具类,类如其名,提供了非池化的 ByteBuf 创建、组合、复制等操作
这里仅介绍其跟【零拷贝】相关的 wrappedBuffer 方法,可以用来包装 ByteBuf
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
// 当包装 ByteBuf 个数超过一个时, 底层使用了 CompositeByteBuf
ByteBuf buf3 = Unpooled.wrappedBuffer(buf1, buf2);
System.out.println(ByteBufUtil.prettyHexDump(buf3));
输出
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a |.......... |
+--------+-------------------------------------------------+----------------+
也可以用来包装普通字节数组,底层也不会有拷贝操作
ByteBuf buf4 = Unpooled.wrappedBuffer(new byte[]{1, 2, 3}, new byte[]{4, 5, 6});
System.out.println(buf4.getClass());
System.out.println(ByteBufUtil.prettyHexDump(buf4));
输出
class io.netty.buffer.CompositeByteBuf
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 |...... |
+--------+-------------------------------------------------+----------------+
总结ByteBuf
- 池化 - 可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能
- 读写指针分离,不需要像 ByteBuffer 一样切换读写模式
- 可以自动扩容
- 支持链式调用,使用更流畅
- 很多地方体现零拷贝(减少内存复制,提高性能),例如 slice、duplicate、CompositeByteBuf