第十四章 客户端创建
14.1 Netty客户端创建流程分析
BootStrap是socket客户端创建工具类,通过Bootstrap可以方便地创建Netty客户端发起异步TCP连接操作
14.1.1 客户端创建时序图
14.1.2 客户端流程分析
- 用户线程创建BootStrap实例,通过API设置创建客户端相关的参数,异步发起客户端连接
- 创建处理客户端连接、I/O读写的Reactor线程组NioEventLoopGroup。通过构造函数指定I/O线程的个数,默认为cpu核数的两倍
- 通过 Bootstrap的ChannelFactory和用户指定的Channel 类型创建用于客户端连接的 NioSocketChannel,它的功能类似于JDK NIO类库提供的 SocketChannel;
- 创建默认的ChannelHandler Pipeline,用于调度和执行网络事件;
- 异步发起TCP连接,判断连接是否成功。如果成功,则直接将NioSocketChannel注册到多路复用器上,监听读操作位,用于数据报读取和消息发送:如果没有立即连接成功,则注册连接监听位到多路复用器,等待连接结果
- 注册对应的网络监听状态位到多路复用器;
- 由多路复用器在I/0现场中轮询各Channel,处理连接结果;
- 如果连接成功,设置Future结果,发送连接成功事件,触发ChannelPipeline执行
- 由ChannelPipeline调度执行系统和用户的ChannelHandler,执行业务逻辑
14.2 源码分析
public class EchoClient {
private final String host;
private final int port;
private final int firstMessageSize;
public EchoClient(String host, int port, int firstMessageSize) {
this.host = host;
this.port = port;
this.firstMessageSize = firstMessageSize;
}
public void run() throws Exception {
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
//new LoggingHandler(LogLevel.INFO),
new EchoClientHandler(firstMessageSize));
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
final String host = args[0];
final int port = Integer.parseInt(args[1]);
final int firstMessageSize;
if (args.length == 3) {
firstMessageSize = Integer.parseInt(args[2]);
} else {
firstMessageSize = 256;
}
new EchoClient(host, port, firstMessageSize).run();
}
}
EchoClientHandler 实现:
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = Logger.getLogger(
EchoClientHandler.class.getName());
private final ByteBuf firstMessage;
/**
* Creates a client-side handler.
*/
public EchoClientHandler(int firstMessageSize) {
if (firstMessageSize <= 0) {
throw new IllegalArgumentException("firstMessageSize: " + firstMessageSize);
}
firstMessage = Unpooled.buffer(firstMessageSize);
for (int i = 0; i < firstMessage.capacity(); i ++) {
firstMessage.writeByte((byte) i);
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(firstMessage);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);
ctx.close();
}
}
14.2.1 客户端连接辅助类BootStrap
- 设置I/O线程组:只需要一个
EventLoopGroup
,因为客户端只需要处理自己发起的连接和 I/O 操作。 - TCP参数设置:
- SO_TIMEOUT:控制读取操作将阻塞多少毫秒。如果返回值为0,计时器就被禁止了,该线程将无限期阻塞;
- SO_SNDBUF:套接字使用的发送缓冲区大小;
- SO_RCVBUF:套接字使用的接收缓冲区大小
- SO_REUSEADDR:用于决定如果网络上仍然有数据向旧的ServerSocket传输数据,是否允许新的ServerSocket绑定到与旧的ServerSocket同样的端口上。SOREUSEADDR选项的默认值与操作系统有关,在某些操作系统中,允许重用端口,而在某些操作系统中不允许重用端口
- CONNECT_TIMEOUT_MILLIS:客户端连接超时时间,由于NO原生的客户端并不提供设置连接超时的接口,因此,Netty采用的是自定义连接超时定时器负责检测和超时控制;
- TCP_NODELAY:激活或禁止TCP_NODELAY 套接字选项,它决定是否使用 Nagle算法,如果是时延敏感型的应用,建议关闭Nagle算法
- channel接口:对于TCP客户端,默认使用NioSocketChannel
- Handler接口:提供ChannelInitializer,调用initChannel接口,设置用户ChannelHandler
- 最后发起连接
14.2.2 客户端连接操作
首先创建和初始化NioSocketChannel,从NioEventLoopGroup中获取NioEventLoop,再创建NioSocketChannel,初始化Channel之后,注册到Selector上。创建链路后发起TCP连接,调用doConnect0,最终调用HeadHandler的connect方法,最终调用UNsafe的connect方法。
需要注意的是,SocketChannel执行connect()操作后有以下三种结果。
- 连接成功,返回True;
- 暂时没有连接上,服务端没有返回ACK应答,连接结果不确定,返回False;
- 连接失败,直接抛出IO异常。
如果是第二种结果,需要将NioSocketChannel中的selectionKey设置为OP_CONNECT监听连接结果。
异步连接返回之后,需要判断连接结果,如果连接成功,则触发ChannelActive事件代码如下。最终将SelectionKey设置为OP_READ.
14.2.3 异步连接结果通知
Selector轮询客户端连接Channel,当服务端返回握手应答后,对结果进行判断。调用unsafe.finishConnect()中的doFinishConnect判断,返回true表示连接成功,其他值或异常表示连接失败,成功后调用fulfillConnectPromise, 最终pipeline调用fireChannelActive。
14.2.4 客户端连接超时机制
可通过CONNECT_TIMEOUT_MILLIS配置项设置连接超时时间,发起连接的同时开启定时器,超时后将异常返回给connectPromise,同时关闭客户端连接。如果超时前获得链接结果则删除定时器。