阅读Netty官方文档的时候,提到了Netty主要有三大核心,分别是buffer、channel、Event Model,接下来我们就从阅读Netty代码来理解这三大核心。
示例程序
先给出示例程序,方便自己也方便读者进行debug调试。
Server端代码
# Server.java文件
package org.example;
public class Server {
public static void main(String[] args) throws Exception {
int port = 8080;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
new DiscardServer(port).run(); // ref-1
}
}
ref-1处的代码创建的DiscardServer对象如下所示。
// DiscardServer.java文件
package org.example;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class DiscardServer {
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeEncoder(), new TimeServerHandler()); // ref-2
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync();
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
ref-2处会将ChannelHandler的实现类TimeEncoder和TimeServerHandler的对象添加到pipeline的最后位置。这个两个类的代码如下所示:
// TimeEncoder.java
package org.example;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
@Override
protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
out.writeInt((int)msg.value()); // ref-3
}
}
// TimeServerHandler.java文件
package org.example;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ChannelFuture f = ctx.writeAndFlush(new UnixTime());
f.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
ref-3处代码就是将数据写入到ByteBuf中,后文会详细讲解。
Client端代码
package org.example;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class Client {
public static void main(String[] args) throws Exception {
String host = "127.0.0.1";
int port = 8080;
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler()); // ref-4
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
ref-4处会将ChannelHandler的实现类TimeDecoder和TimeClientHandler的对象注册到pipeline的最后处,这两个类的实现代码如下所示:
package org.example;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class TimeDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() < 4) {
return;
}
out.add(new UnixTime(in.readUnsignedInt())); // ref-5
}
}
package org.example;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
UnixTime m = (UnixTime) msg;
System.out.println(m);
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
ref-5处的代码会从ByteBuf中读取数据,详细内容后文会解析。
运行Server和Client
先运行Server的main方法,然后运行Client的main方法,会得到如下的输出:
Sat Aug 24 15:06:11 CST 2024
ByteBuf解析
写入数据到ByteBuf
ref-3代码会向ByteBuf写入一个Int类型的数据,先看一下在抽象类ByteBuf中的申明:
// io.netty.buffer.ByteBuf.java文件
/**
* Sets the specified 32-bit integer at the current {@code writerIndex}
* and increases the {@code writerIndex} by {@code 4} in this buffer.
* If {@code this.writableBytes} is less than {@code 4}, {@link #ensureWritable(int)}
* will be called in an attempt to expand capacity to accommodate.
*/
public abstract ByteBuf writeInt(int value);
这个方法的大意就是会将32位的整数设置到当前写指针(writerIndex)的位置,并且将writerIndex增加4。如果可以写的空间少于4,那么就会调用ensureWritable(int)
方法尝试扩大容量以容纳32位整数数据。
然后我们在看一下具体的实现:
// io.netty.buffer.AbstractByteBuf.java文件
@Override
public ByteBuf writeInt(int value) {
ensureWritable0(4);
_setInt(writerIndex, value);
writerIndex += 4;
return this;
}
在实现类中,这个写入32位整数的代码就是在完成上述申明中的步骤。先调用ensureWritable0(4)
确保有足够的空间写入32位整数,然后调用_setInt(writerIndex, value)
执行写入操作,最后将writerIndex增加4。
接下来我们追一下写入数据的步骤,如下所示:
// io.netty.buffer.PooledUnsafeDirectByteBuf.java文件
@Override
protected void _setInt(int index, int value) {
UnsafeByteBufUtil.setInt(addr(index), value); // ref-6
}
继续跟一下,可以发现是调用了Java的Unsafe方法:
// io.netty.buffer.UnsafeByteBufUtil.java文件
static void setInt(long address, int value) {
if (UNALIGNED) {
PlatformDependent.putInt(address, BIG_ENDIAN_NATIVE_ORDER ? value : Integer.reverseBytes(value));
} else {
PlatformDependent.putByte(address, (byte) (value >>> 24));
PlatformDependent.putByte(address + 1, (byte) (value >>> 16));
PlatformDependent.putByte(address + 2, (byte) (value >>> 8));
PlatformDependent.putByte(address + 3, (byte) value);
}
}
// io.netty.util.internal.PlatformDependent.java文件
public static void putInt(long address, int value) {
PlatformDependent0.putInt(address, value);
}
// io.netty.util.internal.PlatformDependent0.java文件
static void putInt(long address, int value) {
UNSAFE.putInt(address, value);
}
我们看一下Unsafe类的说明,直接上jdk文档内容:
/**
* A collection of methods for performing low-level, unsafe operations.
* Although the class and all methods are public, use of this class is
* limited because only trusted code can obtain instances of it.
*
* <em>Note:</em> It is the responsibility of the caller to make sure
* arguments are checked before methods of this class are
* called. While some rudimentary checks are performed on the input,
* the checks are best effort and when performance is an overriding
* priority, as when methods of this class are optimized by the
* runtime compiler, some or all checks (if any) may be elided. Hence,
* the caller must not rely on the checks and corresponding
* exceptions!
*
* @author John R. Rose
* @see #getUnsafe
*/
public final class Unsafe {
......
}
第一句话就说明了,这个类提供了一系列方法来执行底层的、不安全的操作。简单点说,就是这个类直接操作的内存。
ref-6 处有个细节,就是计算地址的方法调用addr(index)
,我们下面详细看一下:
// io.netty.buffer.PooledUnsafeDirectByteBuf.java文件
private long addr(int index) {
return memoryAddress + index;
}
计算地址就是起始地址加一个偏移量index,这个index就是我们在上层传递的writerIndex。这儿就体现了写指针的作用,它就是记录数据已经写到哪个位置了,下一次写数据就从这个位置开始写。
从ByteBuf读取数据
写入数据分析完了,我们再分析一下读取数据。ref-5处的代码就是在从ByteBuf中读取数据in.readUnsignedInt()
,我们先看一下这个方法的申明。
// io.netty.buffer.ByteBuf.java文件
/**
* Gets an unsigned 32-bit integer at the current {@code readerIndex}
* and increases the {@code readerIndex} by {@code 4} in this buffer.
*
* @throws IndexOutOfBoundsException
* if {@code this.readableBytes} is less than {@code 4}
*/
public abstract long readUnsignedInt();
这个方法会在readerIndex位置读取32位的整数,然后将readerIndex增加4。
我们再看一下具体实现:
// 会进入到io.netty.buffer.AbstractByteBuf.java中的这个方法。
@Override
public int readInt() {
checkReadableBytes0(4);
int v = _getInt(readerIndex);
readerIndex += 4;
return v;
}
接下来看看_getInt(readerIndex)
方法的调用:
// io.netty.buffer.PooledUnsafeDirectByteBuf.java
@Override
protected int _getInt(int index) {
return UnsafeByteBufUtil.getInt(addr(index));
}
这个方法是不是很熟悉啊,和写入数据一样,都是先计算地址,再进行操作,底层也是依赖的Unsafe类。
到这儿也能体现出来readerIndex的作用了,它就是记录读取数据到哪儿了,然后下一次读取的时候就从readerIndex开始读取。
ByteBuf总结
结合ByteBuf类上的注释,对它进行一个总结。ByteBuf是底层byte数组或者java NIO Buffer的一个视图,它维护了两个指针,分别是读指针(readerIndex)和写指针(writerIndex),这两个指针分别记录读取和写入数据的位置。
具体示意图如下:
+-------------------+------------------+------------------+
| discardable bytes | readable bytes | writable bytes |
| | (CONTENT) | |
+-------------------+------------------+------------------+
| | | |
0 <= readerIndex <= writerIndex <= capacity
这两个指针将对应byte数组分成了三个区域。readable bytes区域是实际存储数据的区域,writable bytes是需要填充的未定义区域,discardable bytes区域包含的是已经被读操作获取了的数据。
Channel解析
接下来我们看一下核心组件Channel,它代表的是与Socket或者有能力进行I/O操作的组件的连结,比如读、写、连接或者绑定。
Channel为用户提供如下能力:
- 获取channel的当前状态。
- 获取Channel的配置参数。
- Channel支持的I/O操作。
- ChannelPipeling会处理和channel相关的所有I/O事件和请求。
由于使用Netty时并不直接使用Channel,所以对于Channel的理解,目前就到这儿。
Event Model解析
用下面的图对Netty事件模型进行一个总结:
Java NIO负责处理客户端的请求,每来一个请求就会创建一个channel进行处理。
channel会附带一个channelPipeline,里面有添加的ChannelHandler。
EventLoop其实就是一个被封装了的线程池,ChannelHandler的执行就是在EventLoop中的线程上完成的。
总结
自己的水平有限,对于Netty的源码就只能分析到这儿了。
做个简单的总结,Netty底层是基于Java NIO的,在其上创造了三个重要的概念,(1)Channel,接收客户端请求的通道;(2)ByteBuf 对底层内存进行直接操作的缓冲区;(3)Event Model,主要是EventLoop对线程池的封装,还有对各个生命周期函数的调用。