Netty的重要组件:EventLoop、Channel、Future & Promise、Handler & Pipeline、ByteBuf
本篇主要介绍Netty的EventLoop和Channel组件。
1、Netty入门案例
服务器端的创建,主要分为以下步骤:
- 创建serverBootstrap对象。
- 配置服务器的线程模型。可以指定两个线程模型,parentGroup是专门负责接收连接的线程模型,childGroup是处理读写事件的工作线程模型。
- 设置 Channel 类型,这里使用NioServerSocketChannel,是基于NIO实现的,还有其他的实现如下:
- 配置 Channel 初始化器,使用ChannelInitializer初始化NioSocketChannel,在这里我们配置了处理字符串编码以及打印字符串。
- 绑定端口。
ServerBootstrap serverBootstrap = new ServerBootstrap();
//接受连接的线程 or 工作线程
serverBootstrap.group(new NioEventLoopGroup())
//服务器ServerSocketChannel的实现 是NIO 还是 BIO
.channel(NioServerSocketChannel.class)
.childHandler(
//初始化与客户端进行数据读写的通道,并且添加别的handler 在连接建立后回调
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel channel) throws Exception {
//处理编码(解码 )
channel.pipeline().addLast(new StringDecoder());
//自己的业务逻辑,比如打印字符串
channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
}
});
}
}
)
.bind(8080);
客户端的创建步骤与服务器端创建的步骤类似,但是在connect方法后需要加上.sync(),以及调用writeAndFlush()方法进行收发数据。(为什么会这样后面会说明)
new Bootstrap()
.group(new NioEventLoopGroup())
//客户端SocketChannel的实现
.channel(NioSocketChannel.class)
//初始化与服务器进行数据读写的通道,并且添加别的handler 在连接建立后回调
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
//字符输出编码
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost",8080))
.sync()//阻塞方法 连接建立成功后才会执行
.channel()
.writeAndFlush("netty");//收发数据,会调用ch.pipeline().addLast(new StringEncoder());所添加的处理器
启动程序看一下效果(先启动服务器,再启动客户端)
2、EventLoop
下面我们开始介绍Netty中的第一个组件,配置服务器的线程模型时设置的EventLoop。
EventLoop类关系图:
我们首先看下EventLoop的NIO实现:NioEventLoopGroup的构造方法:
可以看到它有多个重载的构造方法:
在构造方法中可以指定线程的数量,如果使用的是无参构造方法,那么默认传递的线程数是0,但是会把0传递给其他的构造方法:
最后调用父类的构造:
在父类的构造中,会判断传入的线程参数是否为0。
目前很显然条件成立,就会获取成员变量DEFAULT_EVENT_LOOP_THREADS 并且再次作为参数调用父类的构造。成员变量DEFAULT_EVENT_LOOP_THREADS 会在静态代码块中被赋值。
如果能获取到"io.netty.eventLoopThreads" key对应的value,就以该值为准,否则线程数是当前cpu核心数*2 。
//NioEventLoopGroup构造方法指定线程数 如果不指定为CPU可运行核心数 * 2
NioEventLoopGroup loopGroup = new NioEventLoopGroup(2);
一个 EventLoop由一个单独的线程驱动,它不断轮询 I/O 事件并执行相应的任务:
log.debug("{}",loopGroup.next());
log.debug("{}",loopGroup.next());
log.debug("{}",loopGroup.next());
并且EventLoop的内部维护了一个任务队列,可以提交任务到这个队列中,由 EventLoop的线程顺序执行。
向EventLoop提交任务:
//执行普通任务
loopGroup.next().submit(() -> log.debug("run..."));
//执行定时任务
loopGroup.next().scheduleAtFixedRate((Runnable) () -> log.debug("run with schedule..."),0,1, TimeUnit.SECONDS);
下面通过一个案例来加深对EventLoop的理解:
改造最初的入门案例中的代码,主要体现在.group方法,这次传递了两个参数,将负责连接的EventLoop和负责读写的EventLoop分离开,并且给负责读写的EventLoop设置了两个线程。
//boss只负责接受连接 1个线程 worker负责读写 2个线程
.group(new NioEventLoopGroup(),new NioEventLoopGroup(2))
服务端的代码和之前的相同,我们先启动一个服务端,再启动三个客户端:
三个连接(channel)由两个EventLoop轮询处理,并且每个连接(channel)和EventLoop是绑定的,一个EventLoop可以负责多条消息的处理。
如图所示:
同时EventLoop也有不同的实现:
案例进一步细化,我们可以再设置一个EventLoop 专门处理其他任务:
EventLoopGroup eventLoop = new DefaultEventLoopGroup();
改造.childHandler:
.childHandler(new ChannelInitializer<NioSocketChannel>() {
/**
* 工序有多道,合在一起就是 pipeline,
* pipeline 负责发布事件(读、读取完成...)传播给每个 handler,
* handler 对自己感兴趣的事件进行处理(重写了相应事件处理方法)
* @param ch
* @throws Exception
*/
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast("handle1",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = ((ByteBuf) msg);
log.debug(byteBuf.toString(Charset.defaultCharset()));
//责任链模式,将消息传递个下一个处理器
ctx.fireChannelRead(msg);
}
}).addLast(eventLoop,"handle2",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = ((ByteBuf) msg);
log.debug(byteBuf.toString(Charset.defaultCharset()));
}
});
}
})
启动一个客户端:
相当于链式调用:
小结:
- 在创建EventLoopGroup实例时,可以指定线程数,如果没有指定,默认使用cpu核心数*2。
- 调用ServerBootstrap的.group时,可以将EventLoop细化成为专门处理连接以及负责读写的。
- 可以利用pipeline()的.addLast,链式组合多个EventLoopGroup,实现不同的功能。
- 每一个EventLoop可以轮询处理多个Channel事件,但是会和Channel绑定,线程间相互独立。
3、Channel
Channel在Netty中代表一个可以执行I/O操作(如读、写、连接、绑定等)的对象。它可以是一个套接字连接、文件、管道等。
Netty支持多种类型的Channel,包括NioSocketChannel(用于客户端连接)、NioServerSocketChannel(用于服务端绑定)以及其他专门用途的Channel如EmbeddedChannel等。
Channel的I/O操作都是异步的,会返回一个ChannelFuture对象,用于表示操作的结果:
ChannelFuture channelFuture = new Bootstrap()
// 。。。。。。
.connect(new InetSocketAddress("localhost", 8080));
并且每个Channel都有一个与之关联的ChannelPipeline。Pipeline中包含多个ChannelHandler,每个Handler负责特定的处理逻辑。
为什么在案例代码中,客户端写出数据之前必须要调用.sync()方法?
因为Channel的I/O操作都是异步的,是主线程调用了.connect() 方法,但是建立连接是在NioSocketChannel所在线程。.sync()方法的作用就是让主线程在此处阻塞,等到NioSocketChannel所在线程建立完成连接,主线程才会继续向下执行。(如果不使用.sync()方法,主线程会在连接没有建立完成的时候继续执行后续代码,服务端无法正常接受消息。)
与此类似的还有.close()方法,如果我们想在断开连接后执行一段自己的逻辑:
客户端代码:
//... new BootStrap...
//接受控制台输入,q则断开连接
new Thread(()->{
Scanner scanner = new Scanner(System.in);
while (true){
String str = scanner.nextLine();
if ("q".equals(str)){
//关闭channel连接
channel.close();
break;
}
channel.writeAndFlush(str);
}
},"input").start();
可以将执行断开后逻辑的代码放在input 线程的channel.close();后,或者主线程中吗?
答案是否定的:
- 如果放在主线程中,那么input 线程和主线程是并行执行的,无法控制先后顺序。
- 如果放在input 线程的channel.close(); 后,也是不行的。因为channel.close(); 方法也是异步调用,由NioSocketChannel所在线程负责关闭连接:
解决该问题有两个方案:
方案一的思路和解决.connect() 方法异步调用的类似,都是使用.sync()方法阻塞主线程,等待input 线程中的channel.close(); 执行完成后再由主线程处理连接关闭后释放资源
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.sync();
log.debug("连接关闭后释放资源");
第二种方案是通过调用ChannelFuture的.addListener 方法,添加一个监听器,由nioEventLoopGroup所在线程关闭连接后,处理连接关闭后释放资源(释放资源和处理后续都是nioEventLoopGroup同一线程。):
closeFuture.addListener((ChannelFutureListener) future -> log.debug("连接关闭后释放资源"));