DotNetty是一个高性能的.NET网络通信框架,基于Netty,支持TCP、UDP、HTTP、WebSocket等协议。适用于高并发、低延迟场景,如实时通信、游戏服务器、IoT应用及大型分布式系统,通过异步I/O、零拷贝等技术提升性能,具备易用性、可扩展性。
架构上,围绕Channel、EventLoop、ChannelPipeline、ChannelHandlerContext设计,包含Transport、Codec、Handler等模块,简化网络编程,加速数据传输处理。
ByteBuffer 用于高效处理字节数据,固定容量,通过allocate或wrap创建。涉及position和limit追踪读写位置,提供get/put等读写方法,flip切换读写模式,clear清空缓冲区,compact移动数据并准备下一轮读写,适用于高性能IO与网络通信。.
优点
零拷贝通过复合缓冲区实现。
动态容量增长。
无需flip切换读写模式。
分离读写索引。
内置引用计数。
支持缓冲区池化。
优化网络通信中的字节处理。
结构
readerIndex 读取,随读增。
writerIndex 写入,随写增。
capacity容量。
ByteBuffer属性
readIndex:下一个读位
writeIndex:下一个写位置
capacity:容量
0:缓冲区开始位置
已经读取的区域:[0,readerindex)
可读取的区域:[readerindex,writerIndex)
可写的区域: [writerIndex,capacity)
读写独立索引,操作命名区分(read/write自动进,get/set不影响索引)。
支持设定最大容量,超限写操作将抛异常,默认上限Integer.MAX_VALUE。
readerIndex writerIndex 本质为灵活高效字节容器
堆缓冲区
最常用的 ByteBuffer 模式是将数据存储在堆空间中。这种模式被称为支撑数组(backing array),它能在没有使用池化的情况下提供快速的分配和释放。可以由 hasArray() 来判断检查 ByteBuffer是否由数组支撑。如果不是,则这是一个直接缓冲区。
直接缓冲区
直接缓冲区是另外一种 ByteBuffer 模式。
直接缓冲区的主要缺点是,相对于基于堆的缓冲区,它们的分配和释放都较为昂贵。
复合缓冲区
复合缓冲区 CompositeByteBuf,它为多个 ByteBuffer提供一个聚合视图。比如 HTTP 协议, 分为消息头和消息体,这两部分可能由应用程序的不同模块产生,各有各的 ByteBuffer,将会在消息被发送的时候组装为一个 ByteBuffer,此时可以将这两个 ByteBuffer聚合为一个 CompositeByteBuf,然后使用统一和通用的 ByteBuffer API 来操作。
如何在的程序中获得 ByteBuf 的实例,并使用它呢?Netty 提供了两种方式
ByteBufAllocator 接口
ByteBufAllocator 分配任意类型的 ByteBuffer 实例。
buffer(): 基于堆/直接内存ByteBuffer。
heapBuffer(): 仅堆内存。
directBuffer(): 仅直接内存。
compositeBuffer(): 可组合多个缓冲区,支持堆/直接内存。
ioBuffer(): 适配I/O操作,优先直接内存(需Unsafe支持)。
ByteBufAllocator:
Channel ChannelHandlerContext
Unpooled 缓冲区
Unpooled工具类助创ByteBuffer
buffer():堆内存ByteBuffer
directBuffer():直接内存ByteBuffer
wrappedBuffer():包装数据的ByteBuffer
copiedBuffer():复制数据的ByteBuffer 适用范围广泛,不仅限于Netty网络项目。
IByteBuffer byteBuf = Unpooled.CopiedBuffer("hello,world!", Encoding.UTF8);
ByteBuf访问/读写
索引从0开始至capacity()-1。
随机访问(get/set)不改索引,需显式调用readerIndex()/writerIndex()移动。
顺序访问分两类:
get/set:固定索引,读写数据不移位。
read/write:自动调整readerIndex/writerIndex。
isReadable() 如果至少有一个字节可供读取,则返回 true
isWritable() 如果至少有一个字节可被写入,则返回 true
readableBytes() 返回可被读取的字节数
writableBytes() 返回可被写入的字节数
capacity() 返回 ByteBuffer 可容纳的字节数。在此之后,它会尝试再次扩展直到达到
maxCapacity()
maxCapacity() 返回 ByteBuffer 可以容纳的最大字节数
hasArray() 如果 ByteBuffer 由一个字节数组支撑,则返回 true
array() 如果 ByteBuffer 由一个字节数组支撑则返回该数组
可丢弃字节
读操作后,字节积累在"可丢弃"段,始于readerIndex。
discardReadBytes()释放已读字节,回收空间,可能导致之前数据移动。
此操作可能涉及内存复制,谨慎使用。
可读字节:存储实际数据,新缓冲区默认从readerIndex=0开始。
可写字节:空白区域待写入,新缓冲区writerIndex初设0,写操作后自动递增。
资源释放
处理Netty入站ByteBuffer ,可选自动释放:用SimpleChannelInboundHandler,或在channelRead()末手动调用ReferenceCountUtil.release(msg)确保资源管理。
using DotNetty.Buffers;
using DotNetty.Transport.Channels;
public class CommanChannelInboundHandler : SimpleChannelInboundHandler<object>
{
protected override void ChannelRead(IChannelHandlerContext context, object message)
{
bool shouldRelease = true;
try
{
if (IsMessageAcceptable(message))
{
ChannelRead0(context, message);
}
else
{
shouldRelease = false;
context.FireChannelRead(message);
}
}
finally
{
if (AutoRelease && shouldRelease)
{
ResourceUtil.Release(message);
}
}
}
private bool IsMessageAcceptable(object msg)
{
return msg is IByteBuffer; // 假设只处理IByteBuffer类型的消息
}
protected virtual void ChannelRead0(IChannelHandlerContext context, object message)
{
//IByteBuffer ...
}
private bool AutoRelease { get; set; } = true;
}
对于出站请求,不管 ByteBuffer 是否由我们的业务创建的,当调用了 write 或者 writeAndFlush 方法后,Netty 会自动替我们释放,不需要我们业务代码自行释放。
// 创建ByteBuf对象,初始容量为1,内部动态扩容以容纳更多数据
IByteBuffer byteBuf = Unpooled.Buffer(1);
Console.WriteLine("byteBuf=" + byteBuf);
for (int i = 0; i < 8; i++)
{
byteBuf.WriteByte(i);
}
Console.WriteLine("byteBuf=" + byteBuf);
for (int i = 0; i < 5; i++)
{
Console.WriteLine(byteBuf.GetByte(i));
}
Console.WriteLine("byteBuf=" + byteBuf);
for (int i = 0; i < 5; i++)
{
Console.WriteLine(byteBuf.ReadByte());
}
Console.WriteLine("byteBuf=" + byteBuf);
//byteBuf2
// 使用Unpooled工具类创建ByteBuf,内容为"hello,world!"
IByteBuffer byteBuf2 = Unpooled.CopiedBuffer("hello,world!", Encoding.UTF8); // Unpooled.CopiedBuffer("hello,world!", Encoding.UTF8);
// 使用相关的方法
if (byteBuf2.HasArray) // if (byteBuf2.HasArray)
{
byte[] content = byteBuf2.Array;
// 将content转成字符串
Console.WriteLine(Encoding.UTF8.GetString(content));
Console.WriteLine("byteBuf2=" + byteBuf2);
Console.WriteLine(byteBuf2.GetByte(0)); // 获取数组0这个位置的字符'h'的ASCII码,h=104
int len = byteBuf2.ReadableBytes; // 可读的字节数 12
Console.WriteLine("len=" + len);
// 使用for取出各个字节并转换为对应的字符打印
for (int i = 0; i < len; i++)
{
Console.WriteLine((char)byteBuf2.GetByte(i));
}
// 范围读取
Console.WriteLine(byteBuf2.GetCharSequence(0, 6, Encoding.UTF8));
Console.WriteLine(byteBuf2.GetCharSequence(6, 6, Encoding.UTF8));
}
//byteBuf3 WrappedBuffer
var str1 = Encoding.UTF8.GetBytes("hello,world!");
var byteBuf3 = Unpooled.WrappedBuffer(str1);
if (byteBuf3.HasArray) // if (byteBuf2.HasArray)
{
byte[] content = byteBuf3.Array;
// 将content转成字符串
Console.WriteLine(Encoding.UTF8.GetString(content));
Console.WriteLine("byteBuf3=" + byteBuf3);
Console.WriteLine(byteBuf3.GetByte(0)); // 获取数组0这个位置的字符'h'的ASCII码,h=104
int len = byteBuf3.ReadableBytes; // 可读的字节数 12
Console.WriteLine("len=" + len);
// 使用for取出各个字节并转换为对应的字符打印
for (int i = 0; i < len; i++)
{
Console.WriteLine((char)byteBuf3.GetByte(i));
}
// 范围读取
Console.WriteLine(byteBuf3.GetCharSequence(0, 6, Encoding.UTF8));
Console.WriteLine(byteBuf3.GetCharSequence(6, 6, Encoding.UTF8));
}
结果得出 get操作 readerIndex 不变,read操作使其移动。
End