Netty 入门
1. 概述
1.1 Netty
Netty是一个异步的,基于事件驱动的网络应用框架,用于快速开发可维护,高性能的网络服务器和客户端
Cassandra,Spark,Hadoop,RocketMQ,ElasticSearch,gRPC,Dubbo,Spring5.x,Zookeeper都是基于netty开发。
1.2 Netty优势
相比NIO:构建自己的协议,解决TCP传输问题(粘包),epoll空轮询导致CPU100%,对API进行增强(FastThreadLocal-ThreadLocal)。
相比其他网络框架:比Mina更简洁,存在时间更长
2. 入门
2.1 目标
开发简单客户端和服务端,客户端发送消息,服务区接收消息
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.39.Final</version>
</dependency>
2.2 通信代码及流程
- 把channel理解为数据的通道
- 把msg理解为流动的数据,最开始输入是ByteBuf,但经过pipeline的加工,会变成其他类型对象,最后输出又变成ButeBuf
- 把handler理解为数据的处理工序:
- 工序有多道,合起来是pipeline,pipeline负责发布事件(读,读取完成)传播给每个handler,handler对自己感兴趣的事件进行处理(重写对应方法)
- handler分Inbound和Outbound两类
- 把eventLoop理解为处理数据的工人
- 工人可管理多个channel的io操作,并一旦工人负责了某个channel,就要负责到底(绑定)
- 工人既可以执行io操作,也可进行任务处理,每个工人有任务队列,队列里可堆放多个channel的待处理任务,任务分为普通,定时任务
- 工人按照pipeline顺序,依次按照handler的规划(代码)处理数据,可为每道工序执行不同的工人
3. 组件
3.1 EventLoop
EventLoop本质是单线程执行器(同时维护一个Selector),里面有run方法处理channel上源源不断的io事件。
继承关系
- 一是继承自juc包下的ScheduledExecutorService,因此包含线程池中所有方法
- 二是继承自netty的OrderedEventExcutor,提供了 inEventLoop(thread)方法判断存在,提供了parent方法查询归属哪个EGroup
EventLoopGroup是一组EventLoop,Channel一般会调用ELoopGroup的register方法来绑定一个EventLoop,后续这个Channel上的io事件都由此ELoop来处理。(保证io事件处理的线程安全)
-三是继承netty的EventExecutorGroup。实现了Iterable接口遍历EventLoop的能力,另有next方法获取集合下一个Eloop。
EventLoopGroup
EventLoopGroup是一组EventLoop,Channel一般会调用EventLoopGroup的register方法来绑定其中一个EventLoop,后续这个channel上的IO事件都由此EventLoop来处理(线程安全)
继承自netty自己的EventExecutorGroup:
- 实现了Iterable接口提供遍历EventLoop的能力
- 另有next方法获取集合中下一个EventLoop
代码
实现:boss处理连接事件,两个worker处理读写事件:
public class EventLoopServer {
public static void main(String[] args) {
//问题:一个handler耗时较长会拖慢整个worker上的channel
//改进:创建新的EventLoopGroup处理耗时长的
DefaultEventLoopGroup group = new DefaultEventLoopGroup();
new ServerBootstrap()
// 改进:EventLoopGroup分工明确
// 参数一:boss只处理accept时间 参数二:worker只处理socketChannel(多个客户端可多路复用共用channel)读写操作
.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast("handler1",new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
String s = buf.toString(Charset.defaultCharset());
System.out.println(s);
ctx.fireChannelRead(msg); //将消息传递给下一个handler
}
}).addLast(group, "handler2",new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
String s = buf.toString(Charset.defaultCharset());
System.out.println(s);
}
});
}
}).bind(8080);
}
}
可以看到两个工人轮流处理 channel,但工人与 channel 之间进行了绑定。
加入DefaulGroup后nio工人和非nio工人也分别绑定了channel(LoggingHandler由nio工人执行,而我们自己的 handler由非nio工人执行)。
handler 执行中如何换人?
如果两个handler绑定的是同一个EventLoop(线程),就直接调用,否则要把被调用的代码放在Runnable对象中传给下一个handler的线程处理
源码:
void invokeChannelRead(final AbstractChannelHandlerCOntext next, Object msg){
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
//下一个handler的事件循环是否与当前的事件循环是同一个线程
EventExecutor executor = next.executor();
//是,直接调用
if(executor.inEventLoop()) {
next.invokeChannelRead(m);
} else { //不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)
executor.execute(new Runnable(){
@Override
public void run() {
next.invokeChannelRead(m); //下一个handler线程
}
});
}
}
3.2 Channel
channel主要作用:
close:关闭channel
closeFuture:处理channel的关闭(sync同步关闭,addListener异步关闭)
pipeline:添加流水线处理器
write数据写入
writeAndFlush:数据写入并刷出
ChannelFuture
//带有future,promise的类型都是和异步方法一起使用,用来处理结果
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(ChannelInitializer<NioSocketChannel>() ch.pipeline().addLast(new StringEncoder());})
//connect:异步非阻塞,main调用,真正执行的连接的是nio线程,连接需要花费1s,如果没有调用sync会无阻塞向下获取channel
.connect(new InetSocketAddress("localhost", 8080));
//1.使用sync方法同步处理结果
channelFuture.sync(); //阻塞当前线程,直到nio线程建立完毕
Channel channel = channelFuture.channel();
//2.使用addListener(回调对象)方法异步处理结果,主线程不用等,全部交给nio线程处理
channelFuture.addListener(new ChannelFutureListener() {
//在nio线程连接建立好后,会调用operationComplete
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Channel channel = future.channel();
}
});
CloseFuture
Channel channel = channelFuture.sync().channel();
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
if("q".equals(line)) {
channel.close(); //close异步操作,善后操作不应该在这之后,交给closeFuture处理
break;
}
channel.writeAndFlush(line);
}
},"input").start();
//获取closeFuture对象, 1)同步处理关闭 2)异步处理关闭
//1.同步:主线程执行关闭
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.sync();
System.out.println("执行善后操作");
//2.异步:调用线程处理关闭
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println("执行善后操作");
}
});
优雅关闭
优雅关闭 shutdownGracefully
方法。该方法会首先切换 EventLoopGroup
到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的。
NioEventLoopGroup group = new NioEventLoopGroup();
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println("执行善后操作");
group.shutdownGracefully();
}
});
异步思考
多线程:单线程做所有事
优化:单线程只做一件事:
- 单线程没法异步提高效率,必须多线程,多核cpu才能发挥异步优势
- 异步没有缩短响应时间,反而又增加
- 合理任务拆分
3.3 Future & Promise
netty的Future继承自jdk的Future,而Promise对netty的Future进行扩展。
- jdk的Future只能同步等待任务结束才能得到结果
- netty Future可同步等待任务结束得到结果,也可异步等待结果
- netty Promise不仅有future共嗯,而且脱离了任务独立存在,只作为两线程间传递结果的容器。
功能/名称 | jdk Future | netty Future | Promise |
---|---|---|---|
cancel | 取消任务 | ||
isCanceled | 任务是否取消 | ||
isDone | 任务是否完成,不能区分成功失败 | ||
get | 获取任务结果,阻塞等待 | ||
getNow | 获取任务结果,非阻塞,还未产生结果时返回 null | ||
await | 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 | ||
sync | 等待任务结束,如果任务失败,抛出异常 | ||
isSuccess | 判断任务是否成功 | ||
cause | 获取失败信息,非阻塞,如果没有失败,返回null | ||
addLinstener | 添加回调,异步接收结果 | ||
setSuccess | 设置成功结果 | ||
setFailure | 设置失败结果 | ||
代码测试:https://gitee.com/xuyu294636185/netty-demo |
3.4 Handler & Pipeline
ChannelHandler用来处理Channel上的各种事件,分为入站,出站两种。所有handler连起来就是pipeline。
- 入站通常是ChannelInboundHandlerAdapter的子类,主要用于读取客户端数据,写回结果
- 出站通常是ChannelOutboundHandlerAdapter的子类,主要对写回结果进行加工
每个Channel是加工车间,pipeline是流水线,ChannelHandler是流水线上各道工序,ButeBuf是原材料。
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
//1.通过channel拿到pipeline
ChannelPipeline pipeline = ch.pipeline();
//2.添加处理器 headHandler - addLast(添加位置) h1 - h2 - h3 - tailHandler,底层采用双向链表
//入站
pipeline.addLast("h1", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
String str = buf.toString(Charset.defaultCharset());
super.channelRead(ctx, msg); //将加工后的str传递下一个handler处理,不调用链会断开
// ctx.fireChannelRead(msg); //或者使用此方法传递
}
});
pipeline.addLast("h2", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("Server...".getBytes())); //从当前的节点向前找出战处理器
ch.writeAndFlush(ctx.alloc().buffer().writeBytes("Server...".getBytes())); //从尾部的节点向前找出战处理器
}
});
//出站
pipeline.addLast("h3", new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
super.write(ctx, msg, promise);
}
});
}
})
.bind(8080);
3.5 ByteBuf
是对字节数据的封装
创建
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
直接内存&堆内存
ByteBufAllocator.DEFAULT.heapBuffer(10);//池化基于堆内存
ByteBufAllocator.DEFAULT.DirecBuffer(10);//池化直接内存,创建销毁代价高,性能好,不受jvmGC管理,需主动释放
池化&非池化
池化可以重用ByteBuf。不必每次都创建新的实例,采用类似jemalloc分配,高并发时更节约内存,减少溢出。
Dio.netty.allocator.type={unpooled 禁用|pooled 开启} 池化开启,通过设置环境变量设置
组成
- 第一个部分是已经丢弃的字节,这部分数据是无效的;(已经读过的内容)
- 第二部分是可读字节,这部分数据是 ByteBuf 的主体数据, 从 ByteBuf 里面读取的数据都来自这一部分;(已经写入但还未读取的内容)
- 第三部分数据是可写字节,所有写到 ByteBuf 的数据都会写到这一段;(剩余可写入数据的空间大小)
- 最后一部分表示的是该 ByteBuf 最多还能扩容多少容量
从 ByteBuf 中每读取一个字节,readerIndex 自增1,ByteBuf 里面总共有 writerIndex-readerIndex 个字节可读, 由此可以推论出当 readerIndex 与 writerIndex 相等的时候,ByteBuf 不可读;
写数据是从 writerIndex 指向的部分开始写,每写一个字节,writerIndex 自增1,直到增到 capacity,这个时候,表示 ByteBuf 已经不可写了;
ByteBuf 里面还有一个参数 maxCapacity,当向 ByteBuf 写数据的时候,如果容量不足,那么这个时候可以进行扩容,直到 capacity 扩容到 maxCapacity,超过 maxCapacity 就会报错。
写入 & 读取
方法签名 | 含义 | 备注 |
---|---|---|
writeBoolean(boolean value) | 写入 boolean 值 | 用一字节 01|00 代表 true|false |
writeByte(int value) | 写入 byte 值 | |
writeShort(int value) | 写入 short 值 | |
writeInt(int value) | 写入 int 值 | Big Endian大端,即 0x250,写入后 00 00 02 50 |
writeIntLE(int value) | 写入 int 值 | Little Endian小端,即 0x250,写入后 50 02 00 00 |
writeLong(long value) | 写入 long 值 | |
writeChar(int value) | 写入 char 值 | |
writeFloat(float value) | 写入 float 值 | |
writeDouble(double value) | 写入 double 值 | |
writeBytes(ByteBuf src) | 写入 netty 的 ByteBuf | |
writeBytes(byte[] src) | 写入 byte[] | |
writeBytes(ByteBuffer src) | 写入 nio 的 ByteBuffer | |
int writeCharSequence(CharSequence sequence, Charset charset) | 写入字符串 |
- 方法未指明返回值的,其返回值都是ByteBuf,意味可链式调用
- 网络传输,默认Big Endian
buffer.writeBytes(new byte[]{1,2,3,4,});
buffer.writeInt(5);
System.out.println(buffer.readByte());
扩容
- 如何写入后数据大小未超过 512,则选择下一个 16 的整数倍,例如写入后大小为 12 ,则扩容后 capacity 是 16
- 如果写入后数据大小超过 512,则选择下一个 2^n,例如写入后大小为 513,则扩容后 capacity 是 210=1024(29=512 已经不够了)
- 扩容不能超过 max capacity 会报错
retain & release
由于 Netty 中有堆外内存的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。
- UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
- UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
- PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存
Netty 这里采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口 - 每个 ByteBuf 对象的初始计数为 1
- 调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收
- 调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收
- 当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用
请思考,因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在 finally 中 release 了,就失去了传递性(当然,如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命,那么便无须再传递)
基本规则是,谁是最后使用者,谁负责 release,详细分析如下
- 起点,对于 NIO 实现来讲,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read 方法中首次创建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))
- 入站 ByteBuf 处理原则
- 对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead(msg) 向后传递,这时无须 release
- 将原始 ByteBuf 转换为其它类型的 Java 对象,这时 ByteBuf 就没用了,必须 release
- 如果不调用 ctx.fireChannelRead(msg) 向后传递,那么也必须 release
- 注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release
- 假设消息一直向后传,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)
- 出站 ByteBuf 处理原则
- 出站消息最终都会转为 ByteBuf 输出,一直向前传,由 HeadContext flush 后 release
- 异常处理原则
- 有时候不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true
TailContext 释放未处理消息逻辑
- 有时候不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true
// io.netty.channel.DefaultChannelPipeline#onUnhandledInboundMessage(java.lang.Object)
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
具体代码
// io.netty.util.ReferenceCountUtil#release(java.lang.Object)
public static boolean release(Object msg) {
if (msg instanceof ReferenceCounted) {
return ((ReferenceCounted) msg).release();
}
return false;
}
Slice
【零拷贝】的体现之一,对原始ByteBuf进行切片成多个ByteBuf,切片后的ByteBuf并没有发生内存复制,还是使用原始ByteBuf内存
,切片后ByteBuf维护独立的read,write指针。
duplicate
【零拷贝】的体现之一,好比截取原始ByteBuf所有内容,并没有max capacity的限制,与原始ByteBuf使用同一块底层内存,只是读写指针是独立的。
copy
会将底层内存数据进行深拷贝,无论读写,都与原始ByteBuf无关。
compositeBuffer
【零拷贝】的体现之一,可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf,避免拷贝
CompositeByteBuf 是一个组合的 ByteBuf,它内部维护了一个 Component 数组,每个 Component 管理一个 ByteBuf,记录了这个 ByteBuf 相对于整体偏移量等信息,代表着整体中某一段的数据。
- 优点,对外是一个虚拟视图,组合这些 ByteBuf 不会产生内存复制
- 缺点,复杂了很多,多次操作会带来性能的损耗
Unpooled
提供非池化的ByteBuf创建,组合和复制等操作。
ByteBuf buf = Unpooled.wrappedBuffer(new byte[]{1, 2, 3}, new byte[]{4, 5, 6});
ByteBufUtil.prettyHexDump(buf);
ByteBuffer的优势
- 池化:可以重用池中ByteBuf实例,更节约内存,减少内存溢出的可能。
- 读写指针分离,不需要像ByteBuffer一样切换读写模式
- 可自动扩容
- 支持链式调用
- 很多方法支持零拷贝
4 双向通信
4.1 实现一个echo server
代码:https://gitee.com/xuyu294636185/netty-demo.git