Netty通信技术进阶
- 1. 概念
- 2. 线程同步、异步
- 3. 其他通信技术对比
- 4. Netty中的Reactor实现
- 5. Pipeline 和 Handler
- 5.1 ChannelHandler 分类
- 6. 入站事件传播
- 7.inbound/outbound 加载顺序和执行顺序
- 8. 出站事件传播
- 9. Code example
- 9.1 编写服务端
- 9.2 编写客户端
- 10. 核心组件
- 10.1 Bootstrap
- 10.2 Channel
- 10.3 EventLoopGroup 和 EventLoop
- 10.3.1 eventLoopThreads 是多少?
- 10.3.2 复用Handler
1. 概念
Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供非阻塞的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
- 本质:网络应用程序框架
- 实现:异步、事件驱动
- 特性:高性能、可维护、快速开发
- 用途:开发服务器和客户端
2. 线程同步、异步
线程同步、异步是相对的,在请求或执行过程中,如果会阻塞等待,就是同步操作,反之就是异步操作
3. 其他通信技术对比
- Apache Mina:和Netty是同一作者,但是推荐Netty,作者认为Netty是针对Mina的重新打造版本,解决了一些问题并提高了扩展性
- Sun Grizzly:用得少、文档少,更新少
- Cindy:生命周期不长
- Tomcat、Jetty:还没有独立出来,另外他们有自己的网络通信层实现,是为了专门针对servelet容器而做的,不具备通用性
4. Netty中的Reactor实现
Netty线程模型是基于Reactor模型实现的,对Reactor三种模式都有非常好的支持,并做了一定的改进,也非常的灵活,一般情况,在服务端会采用主从架构模型
1)Netty抽象出两组线程池:BossGroup和WorkerGroup,每个线程池中都有EventLoop线程(可以是OIO,NIO,AIO); BossGroup 中的线程专门负责和客户端建立连接,WorkerGroup中的线程专门负责处理连接上的读写,EventLoopGroup相当于一个事件循环组,这个组中含有多个事件循环
2)EventLoop表示一个不断循环的执行事件处理的线程,每个EventLoop都包含一个Selector,用于监听注册在其上的Socket网络连接(Channel)
3)每个BossEventLoop中循环执行以下三个步骤:
3.1)select:轮训注册在其上的ServerSocketChannel的accept事件(OP_ACCEPT事件)
3.2)processSelectedKeys:处理accept事件,与客户端建立连接,生成一个SocketChannel,并将其注册到某个Worker
3.3)runAllTasks:再去以此循环处理任务队列中的其他任务
4)每个WorkerEventLoop中循环执行以下三个步骤:
4.1)select:轮训注册在其上的SocketChannel的read/write事件(OP_READ/OP_WRITE事件)
4.2)processSelectedKeys:在对应的SocketChannel上处理read/write事件
4.3)runAllTasks:再去以此循环处理任务队列中的其他任务
5)在以上两个processSelectedKeys步骤中,会使用Pipeline(管道),Pipeline中引用了Channel,即通过Pipeline可以获取到对应的Channel,Pipeline中维护了很多的处理器(拦截处理器、过滤处理器、自定义处理器等)
5. Pipeline 和 Handler
ChannelPipeline 提供了 ChannelHandler 链的容器
pipeline 中包装的是由ChannelHandlerContext包装的ChannelHandler, 为双向链表, 其中head为netty内置, 无法修改, 我们只需要专注于中间的ChannelHandler, tail不一定存在
5.1 ChannelHandler 分类
对于数据的出站和入站,有着不同的ChannelHandler类型与之对应:
ChannelInboundHandler: 入站事件处理器
ChannelOutBoundHandler: 出站事件处理器
ChannelHandlerAdapter: 提供了一些方法的默认实现,可减少用户对于ChannelHandler的编写
ChannelDuplexHandler: 混合型,既能处理入站事件又能处理出站事件
SimpleChannelInboundHandler: 对ChannelHandlerAdapter的继承(可指定消息泛型)服务端异步处理数据禁止使用, 因为方法内部读取消息后会自动release掉数据占用的Bytebuffer资源
6. 入站事件传播
在ChannelInboundHandler中, channelActive中的channelActive方法可将事件向后传递, 另一种写法ctx.fireChannelActive()
/**
* 通道准备就绪
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// ctx.fireChannelActive();
super.channelActive(ctx);
}
7.inbound/outbound 加载顺序和执行顺序
InboundHandler是按照Pipleline的加载顺序(addLast),顺序执行
OutboundHandler是按照Pipeline的加载顺序(addLast),逆序执行
如果想让所有的OutboundHandler都能被执行到,可以选择把OutboundHandler放在最后一个有效的InboundHandler之前
有一种做法是通过addFirst加载所有OutboundHandler,再通过addLast加载所有InboundHandler;另外也推荐:使用addLast先加载所有OutboundHandler,然后加载所有InboundHandler(注意考虑加载顺序和执行顺序)
8. 出站事件传播
在outboundhandler中最好不要再通过Channel写数据,会导致事件再次从尾部流动到头部,造成类似递归问题
可以在事件向前传播出去之后通过ChannelHandlerContext写数据
9. Code example
9.1 编写服务端
定义NioEventLoopGroup时可在构造方法指定线程数量, 默认构造器的线程数量为cpu核数的2倍
Math.max(1, SystemPropertyUtil.getInt(“io.netty.eventLoopThreads”, NettyRuntime.availableProcessors() * 2))
从源码中可以看出, 也可以在启动时指定io.netty.eventLoopThreads的线程数
public static void main(String[] args) {
NettyServer server = new NettyServer();
server.start(9999);
}
private void start(int port) {
// 定义reactor线程组
EventLoopGroup boss = new NioEventLoopGroup(1, new DefaultThreadFactory("boss"));
EventLoopGroup worker = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));
// 业务线程池
EventExecutorGroup business = new UnorderedThreadPoolEventExecutor(NettyRuntime.availableProcessors() * 2, new DefaultThreadFactory("business"));
// 基于netty引导整个服务端程序的启动
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
//当客户端 SocketChannel初始化时回调该方法,添加handler
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 日志和超时
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
pipeline.addLast(new ServerReadIdleHandler());
// 编码
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(new ProtoStuffEncoder());
// 解码
pipeline.addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 4));
pipeline.addLast("protostuffdecoder", new ProtoStuffDecoder());
// 执行业务
pipeline.addLast(business, "tcptesthandler", new TcpStickHalfHandler());
}
});
// 绑定端口并启动
try {
ChannelFuture future = serverBootstrap.bind(port).sync();
// 监听端口的关闭 sync阻塞
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//清理一些资源
worker.shutdownGracefully();
boss.shutdownGracefully();
}
}
自定义ChannelInboundHandler, 继承ChannelInboundHandlerAdapter, 也做一些数据的解码, 业务处理等操作
public class TcpStickHalfHandler extends ChannelInboundHandlerAdapter {
int count = 0;
/**
* 通道准备就绪
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
UserInfo data = (UserInfo) msg;
count++;
log.info("服务端收到的第{}个数据:{}", count, data);
super.channelRead(ctx, msg);
}
}
自定义ChannelOutboundHandler, 继承ChannelOutboundHandlerAdapter, 需要注意的是ctx.writeAndFlush和ctx.channel().wirte的区别
前者在是此handler往依次往前执行(pipeline双向链表), 后者是最后一个tail往前执行, initChannel中addLast时顺序错误可能会数据错误
public class ServerOutboundHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.info("ServerOutboundHandler write ");
super.write(ctx, msg, promise);
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes("append".getBytes(StandardCharsets.UTF_8));
ctx.writeAndFlush(buffer);
}
}
9.2 编写客户端
其中ChannelHandler的编写和服务端的用法一致, 编码解码流程相反
public static void main(String[] args) {
NettyClient client = new NettyClient();
client.start("127.0.0.1", 9999);
}
public void start(String host, int port) {
// 定义线程组,
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 超时
pipeline.addLast(new ClientWriterIdleHandler());
// 编码
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(new ProtoStuffEncoder());
// 解码
pipeline.addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 4));
pipeline.addLast(new ProtoStuffDecoder());
// 解码器
pipeline.addLast(new ClientInboundHandler());
}
});
//连接服务端
try {
ChannelFuture future = bootstrap.connect(host, port).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
10. 核心组件
10.1 Bootstrap
Bootstrap是引导的意思,它的作用是配置整个Netty程序,将各个组件都串起来,最后绑定端口、启动Netty服务
Netty中提供了2种类型的引导类,一种用于客户端(Bootstrap),而另一种(ServerBootstrap)用于服务器,区别在于1、ServerBootstrap 将绑定到一个端口,因为服务器必须要监听连接,而 Bootstrap 则是由想要连接到远程节点的客户端应用程序所使用的
2、引导一个客户端只需要一个EventLoopGroup,但是一个ServerBootstrap则需要两个
10.2 Channel
Netty中的Channel是与网络套接字相关的,可以理解为是socket连接,在客户端与服务端连接的时候就会建立一个Channel,它负责基本的IO操作,比如:bind()、connect(),read(),write() 等
不同协议、不同的I/O类型的连接都有不同的 Channel 类型与之对应
主要作用:
- 通过Channel可获得当前网络连接的通道状态。
- 通过Channel可获得网络连接的配置参数(缓冲区大小等)。
- Channel提供异步的网络I/O操作,比如连接的建立、数据的读写、端口的绑定等。
10.3 EventLoopGroup 和 EventLoop
Netty是基于事件驱动的,比如:连接注册,连接激活;数据读取;异常事件等等,有了事件,就需要一个组件去监控事件的产生和事件的协调处理,这个组件就是EventLoop(事件循环/EventExecutor)
在Netty 中每个Channel 都会被分配到一个 EventLoop。一个 EventLoop 可以服务于多个 Channel。每个EventLoop 会占用一个 Thread,同时这个 Thread 会处理 EventLoop 上面发生的所有 IO 操作和事件。
EventLoopGroup 是用来生成 EventLoop 的,包含了一组EventLoop(可以初步理解成Netty线程池)
10.3.1 eventLoopThreads 是多少?
核心线程数默认:cpu核数*2, 核心线程数在创建时可通过构造函数指定
对于boss group,我们其实也只用到了其中的一个线程,因为服务端一般只会绑定一个端口启动
10.3.2 复用Handler
每个客户端Channel创建后初始化时,均会向与该Channel绑定的Pipeline中添加handler,此种模式下,每个Channel享有的是各自独立的Handler
如果复用的handler对象不加@Sharable注解会报错, 另外存在线程安全问题, 内部全局变量线程安全问题要自己处理