文章目录
- 1. 入门
- 2. Netty 代码实例
- 3. Netty bind
- 3.1 initAndRegister
- 3.1.1 newChannel, 创建 NioServerSocketChannel
- 3.1.2 init(channel); 初始化 NioServerSocketChannel
- 3.1.3 register 注册channel
- 3.2 doBind0 绑定端口
- 3.3 ServerBootstrapAcceptor
1. 入门
主从Reactor模型 :Acceptor 接收到客户端TCP连接请求并处理完成后, 将新创建的SocketChannel 注册到 I/O线程池 (sub Reactor)传送门
主要步骤:
Acceptor
创建ServerSocketChannel
Acceptor ServerSocketChannel
绑定端口Acceptor ServerSocketChannel
设置非阻塞Acceptor
创建Selector
,将ServerSocketChannel
注册到Selector
上,监听SelectionKey.OP_ACCEPT
事件Acceptor ServerSocketChannel
设置分发处理器,处理监听到的SelectionKey.OP_ACCEPT
事件,新创建的SocketChannel
分发到到I/O线程池 (sub Reactor)
I/O线程池 (sub Reactor)
不断轮训处理SocketChannel
上的读写请求
2. Netty 代码实例
传送门
ServerBootstrap:
- 配置
EventLoopGroup
线程组:
需要注意的是, 只绑定一个端口,bossEventLoopGroup
1个就够了, 2个会有一个闲置
NioEventLoopGroup bossEventLoopGroup = new NioEventLoopGroup(2);
NioEventLoopGroup workerEventLoopGroup = new NioEventLoopGroup(8);
- 设置
Channel
类型:NioServerSocketChannel
- 设置
childHandler
的ChannelPipeline
- 绑定端口: 创建
NioServerSocketChannel
, 注册, 绑定端口,ServerSocketChannel
添加分发到子线程组的 handler
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.StandardCharsets;
public class NettyServer01 {
public static void main(String[] args) {
// 创建BossGroup和WorkerGroup,分别处理连接接受和数据读写
NioEventLoopGroup bossEventLoopGroup = new NioEventLoopGroup(2);
NioEventLoopGroup workerEventLoopGroup = new NioEventLoopGroup(8);
new ServerBootstrap() // 初始化ServerBootstrap
.group(bossEventLoopGroup, workerEventLoopGroup) // 设置EventLoopGroup
.channel(NioServerSocketChannel.class) // 指定服务器通道类
.childHandler(new ChannelInitializer<NioSocketChannel>() { // 设置通道初始化器
/**
* 初始化通道,添加处理器到通道的管道中
* @param ch 当前初始化的通道
*/
protected void initChannel(NioSocketChannel ch) {
// 添加多个处理器,分别处理入站和出站事件
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
/**
* 处理入站数据
* @param ctx 通道上下文
* @param msg 接收到的消息对象
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = inbound((ByteBuf) msg, "1");
ctx.fireChannelRead(byteBuf);
}
});
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws CharacterCodingException {
ByteBuf byteBuf = inbound((ByteBuf) msg, "2");
ctx.fireChannelRead(byteBuf);
}
});
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
/**
* 处理入站数据,将处理后的数据写回通道
* @param ctx 通道上下文
* @param msg 接收到的消息对象
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = inbound((ByteBuf) msg, "3");
ctx.channel().write(byteBuf);
}
});
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
/**
* 处理出站数据,在数据写出前进行加工
* @param ctx 通道上下文
* @param msg 要写出的消息对象
* @param promise 写操作的承诺
*/
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ByteBuf byteBuf = outbound((ByteBuf) msg, "4");
ctx.writeAndFlush(msg);
ctx.write(byteBuf, promise);
}
});
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ByteBuf byteBuf = outbound((ByteBuf) msg, "5");
ctx.write(byteBuf, promise);
}
});
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ByteBuf byteBuf = outbound((ByteBuf) msg, "6");
ctx.write(byteBuf, promise);
}
});
}
})
.bind(8080); // 绑定端口并启动服务器
}
/**
* 对出站数据进行处理
* @param msg 待处理的ByteBuf对象
* @param no 数据标识号
* @return 处理后的ByteBuf对象
*/
private static ByteBuf outbound(ByteBuf msg, String no) {
ByteBuf byteBuf = msg;
String output = byteBufToString(byteBuf);
System.out.printf("\n\noutbound%s output: %s", no, output);
stringWriteToByteBuf(byteBuf, String.format("\noutbound%s 已处理", no));
return byteBuf;
}
/**
* 对入站数据进行处理
* @param msg 待处理的ByteBuf对象
* @param no 数据标识号
* @return 处理后的ByteBuf对象
*/
private static ByteBuf inbound(ByteBuf msg, String no) {
String input = byteBufToString(msg);
System.out.printf("\n\ninbound%s input: %s\n", no, input);
stringWriteToByteBuf(msg, String.format("\ninbound%s 已处理", no));
return msg;
}
/**
* 将ByteBuf对象转换为字符串
* @param msg 待转换的ByteBuf对象
* @return 字符串表示的数据
*/
private static String byteBufToString(ByteBuf msg) {
return msg.toString(StandardCharsets.UTF_8);
}
/**
* 将字符串写入ByteBuf对象
* @param byteBuf 待写入的ByteBuf对象
* @param msg 要写入的字符串数据
*/
private static void stringWriteToByteBuf(ByteBuf byteBuf, String msg) {
byteBuf.writeBytes(msg.getBytes(StandardCharsets.UTF_8));
}
}
3. Netty bind
主要下面两个方法:
final ChannelFuture regFuture = initAndRegister();
初始化channel
,并且注册ServerSocketChannel
到bossEventLoopGroup
的 一个EventLoop
的Selector
上doBind0(regFuture, channel, localAddress, promise);
绑定端口号
public ChannelFuture bind(InetAddress inetHost, int inetPort) {
return bind(new InetSocketAddress(inetHost, inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
3.1 initAndRegister
io.netty.bootstrap.AbstractBootstrap#initAndRegister
初始化channel
,并且注册ServerSocketChannel
到 bossEventLoopGroup
的 一个 EventLoop
的 Selector
上
inal ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
3.1.1 newChannel, 创建 NioServerSocketChannel
- 创建
channel = channelFactory.newChannel();
io.netty.channel.ReflectiveChannelFactory#newChannel
io.netty.channel.socket.nio.NioServerSocketChannel#NioServerSocketChannel
,NioServerSocketChannel
处理连接事件:super(null, channel, SelectionKey.OP_ACCEPT);
AbstractNioChannel
: 设置非阻塞ch.configureBlocking(false);
AbstractChannel
初始化unsafe、pipeline: unsafe = newUnsafe(); pipeline = newChannelPipeline();
DefaultChannelPipeline
:tail = new TailContext(this); head = new HeadContext(this);
3.1.2 init(channel); 初始化 NioServerSocketChannel
初始化 Channel
:io.netty.bootstrap.ServerBootstrap#init
, pipeline
添加 ServerBootstrapAcceptor
是一个异步过程,需要 EventLoop
线程负责执行。而当前 EventLoop
线程该去执行 register0()
的注册流程,所以等到 register0()
执行完之后才能被添加到 Pipeline
当中
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
- 注册
Channel
:ChannelFuture regFuture = config().group().register(channel);
config().group()
就是bossEventLoopGroup
execute
:io.netty.util.concurrent.SingleThreadEventExecutor#execute
addTask(task);
if (!inEventLoop) startThread();
3.1.3 register 注册channel
- 注册
ServerSocketChannel
到bossEventLoopGroup
的一个EventLoop
的Selector
上,监听SelectionKey.OP_ACCEPT
事件
ChannelFuture regFuture = config().group().register(channel);
io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
io.netty.channel.AbstractChannel.AbstractUnsafe#register
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
);
io.netty.channel.AbstractChannel.AbstractUnsafe#register0
io.netty.channel.nio.AbstractNioChannel#doRegister
- 调用java 的 nio:
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
- 调用java 的 nio:
3.2 doBind0 绑定端口
io.netty.bootstrap.AbstractBootstrap#doBind0
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
- io.netty.channel.AbstractChannel#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
- io.netty.channel.DefaultChannelPipeline#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
- io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
- io.netty.channel.AbstractChannelHandlerContext#invokeBind
- io.netty.channel.DefaultChannelPipeline.HeadContext#bind
- io.netty.channel.AbstractChannel.AbstractUnsafe#bind
- io.netty.channel.socket.nio.NioServerSocketChannel#doBind 调用java 的 bind
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
3.3 ServerBootstrapAcceptor
child
就是 workerEventLoopGroup
,socketChannel
注册到 workerEventLoopGroup
进行处理
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
}