Netty
文章目录
- Netty
- 6 Netty 核心模块
- 6.1 EventLoopGroup 与 NioEventLoopGroup
- 6.2 Bootstrap/ServerBootstrap
- 6.3 ChannelPipline、ChannelHandler、ChannelHandlerContext
- 6.3.1 三者的关系
- 6.3.2 ChannelPipline
- 6.3.3 ChannelHandler
- 6.3.4 ChannelHandlerContext
- 6.3.5 三者创建过程
- 6.4 EventLoop
- 6.4.1 select
- 6.4.2 processSelectedKeys
- 6.4.3 runAllTask
- 6.5 Buffer
6 Netty 核心模块
6.1 EventLoopGroup 与 NioEventLoopGroup
NioEventLoopGroup 是 EventLoopGroup 的具体实现
EventLoopGroup 是 一组 EventLoop
通过构造器同时启动多个 EventLoop,来充分利用 CPU 资源
// 创建 boosGroup 一直循环只处理连接请求,真正的业务交由 workGroup 处理
EventLoopGroup boosGroup = new NioEventLoopGroup(1);
// 创建 workGroup 处理 read write 事件
EventLoopGroup workgroup = new NioEventLoopGroup(4);
调用 NioEventLoopGroup 构造器 public NioEventLoopGroup(int nThreads)
调用 NioEventLoopGroup 的父类 MultithreadEventLoopGroup 构造器
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args)
并通过 DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
设置默认的线程数(启动的EventLoop个数) 默认为机器 CPU * 2
执行最的 MultithreadEventLoopGroup 构造器创建NioEventLoopGroup
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
// private final EventExecutor[] children;
// 创建 EventLoop 数组
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 创建 NioEventLoop
children[i] = newChild(executor, args);
----------------------------------
NioEventLoopGroup
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
--------------------------------------------
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
// 如果出现异常则关闭 EventLoop
children[j].shutdownGracefully();
每个 EventLoop 都维护一个 Selector
public final class NioEventLoop extends SingleThreadEventLoop {
/**
* The NIO {@link Selector}.
*/
private Selector selector; //NioEventLoop selector java.nio.channels.Selector
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
// 获取 Selector
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
}
private static final class SelectorTuple {...}
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
unwrappedSelector = provider.openSelector();
----------------------------------------
WindowsSelectorProvider
public AbstractSelector openSelector() throws IOException {
return new WindowsSelectorImpl(this);
---------------------------------------
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
if (DISABLE_KEYSET_OPTIMIZATION) {
// 通过 WindowsSelectorProvider 获取 SelectorTuple 从而得到一个 Selector
return new SelectorTuple(unwrappedSelector);
}
EventLoopGroup 提供 next 接口可以从组中按照一定规则获取一个 EventLoop 来执行任务
ChannelFuture channelFuture = b.bind("127.0.0.1", 8090).sync();
MultithreadEventLoopGroup#next
MultithreadEventExecutorGroup#next
DefaultEventExecutorChooserFactory$GenericEventExecutorChooser#next
public EventExecutor next() {
// 轮询 EventLoopGroup 中的 EventLoop
// 123 -> 123 ...
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
通常一个 ServerSocketChannel 对应一个 Selector 和一个 EventLoop 线程
BoosEventLoop 负责接收客户端的连接并将连接 SocketChannel 交给 WorkerEventLoopGroup 来进行 I/O 处理
6.2 Bootstrap/ServerBootstrap
ServerBootstrap 与 Bootstrap 都是 AbstractBootstrap 的子类
AbstractBootstrap 的主要作用是配置 Netty 程序,串联各个组件
Bootstrap 客户端启动引导类
ServerBootstrap 服务端启动引导类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(boosGroup, workgroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 64)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("logger", new LoggingHandler(LogLevel.INFO));
}
});
ServerBootstrap 默认空构造器,有默认的成员变量
public ServerBootstrap() { }
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
private volatile EventLoopGroup childGroup;
private volatile ChannelHandler childHandler;
group 方法将 boosGroup,workGroup 分别赋值给 parentGroup、childGroup
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
channel 方法根据传入的 NioServerSocketChannel.class 创建对应的 channel对象
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
创建一个 channel 反射类工厂 ReflectiveChannelFactory
public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return self();
}
option/childOption 方法传入 TCP 参数,放入 options/childOptions LinkedHashMap 中
option 为 boosGroup EventLoop 设置参数
childOption 位 workGroup EventLoop 设置参数
io.netty.bootstrap.AbstractBootstrap#option
public <T> B option(ChannelOption<T> option, T value) {
if (option == null) {
throw new NullPointerException("option");
}
if (value == null) {
// LinkedHashMap 使用 synchronized
synchronized (options) {
options.remove(option);
}
} else {
synchronized (options) {
options.put(option, value);
}
}
return self();
}
io.netty.bootstrap.ServerBootstrap#childOption
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
if (childOption == null) {
throw new NullPointerException("childOption");
}
if (value == null) {
synchronized (childOptions) {
childOptions.remove(childOption);
}
} else {
synchronized (childOptions) {
childOptions.put(childOption, value);
}
}
return this;
}
-----------------------------------------------
/**
* Creates a new {@link ChannelOption} for the given {@code name} or fail with an
* {@link IllegalArgumentException} if a {@link ChannelOption} for the given {@code name} exists.
*/
@SuppressWarnings("unchecked")
public static <T> ChannelOption<T> newInstance(String name) {
return (ChannelOption<T>) pool.newInstance(name);
}
public static final ChannelOption<ByteBufAllocator> ALLOCATOR = valueOf("ALLOCATOR");
public static final ChannelOption<RecvByteBufAllocator> RCVBUF_ALLOCATOR = valueOf("RCVBUF_ALLOCATOR");
public static final ChannelOption<MessageSizeEstimator> MESSAGE_SIZE_ESTIMATOR = valueOf("MESSAGE_SIZE_ESTIMATOR");
public static final ChannelOption<Integer> CONNECT_TIMEOUT_MILLIS = valueOf("CONNECT_TIMEOUT_MILLIS");
/**
* @deprecated Use {@link MaxMessagesRecvByteBufAllocator}
*/
@Deprecated
public static final ChannelOption<Integer> MAX_MESSAGES_PER_READ = valueOf("MAX_MESSAGES_PER_READ");
public static final ChannelOption<Integer> WRITE_SPIN_COUNT = valueOf("WRITE_SPIN_COUNT");
/**
* @deprecated Use {@link #WRITE_BUFFER_WATER_MARK}
*/
@Deprecated
public static final ChannelOption<Integer> WRITE_BUFFER_HIGH_WATER_MARK = valueOf("WRITE_BUFFER_HIGH_WATER_MARK");
/**
* @deprecated Use {@link #WRITE_BUFFER_WATER_MARK}
*/
@Deprecated
public static final ChannelOption<Integer> WRITE_BUFFER_LOW_WATER_MARK = valueOf("WRITE_BUFFER_LOW_WATER_MARK");
public static final ChannelOption<WriteBufferWaterMark> WRITE_BUFFER_WATER_MARK =
valueOf("WRITE_BUFFER_WATER_MARK");
public static final ChannelOption<Boolean> ALLOW_HALF_CLOSURE = valueOf("ALLOW_HALF_CLOSURE");
public static final ChannelOption<Boolean> AUTO_READ = valueOf("AUTO_READ");
/**
* @deprecated Auto close will be removed in a future release.
*
* If {@code true} then the {@link Channel} is closed automatically and immediately on write failure.
* The default value is {@code true}.
*/
@Deprecated
public static final ChannelOption<Boolean> AUTO_CLOSE = valueOf("AUTO_CLOSE");
public static final ChannelOption<Boolean> SO_BROADCAST = valueOf("SO_BROADCAST");
public static final ChannelOption<Boolean> SO_KEEPALIVE = valueOf("SO_KEEPALIVE");
public static final ChannelOption<Integer> SO_SNDBUF = valueOf("SO_SNDBUF");
public static final ChannelOption<Integer> SO_RCVBUF = valueOf("SO_RCVBUF");
public static final ChannelOption<Boolean> SO_REUSEADDR = valueOf("SO_REUSEADDR");
public static final ChannelOption<Integer> SO_LINGER = valueOf("SO_LINGER");
public static final ChannelOption<Integer> SO_BACKLOG = valueOf("SO_BACKLOG");
public static final ChannelOption<Integer> SO_TIMEOUT = valueOf("SO_TIMEOUT");
public static final ChannelOption<Integer> IP_TOS = valueOf("IP_TOS");
public static final ChannelOption<InetAddress> IP_MULTICAST_ADDR = valueOf("IP_MULTICAST_ADDR");
public static final ChannelOption<NetworkInterface> IP_MULTICAST_IF = valueOf("IP_MULTICAST_IF");
public static final ChannelOption<Integer> IP_MULTICAST_TTL = valueOf("IP_MULTICAST_TTL");
public static final ChannelOption<Boolean> IP_MULTICAST_LOOP_DISABLED = valueOf("IP_MULTICAST_LOOP_DISABLED");
public static final ChannelOption<Boolean> TCP_NODELAY = valueOf("TCP_NODELAY");
@Deprecated
public static final ChannelOption<Boolean> DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION =
valueOf("DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION");
public static final ChannelOption<Boolean> SINGLE_EVENTEXECUTOR_PER_GROUP =
valueOf("SINGLE_EVENTEXECUTOR_PER_GROUP");
handler 方法传入一个只属于 ServerSocketChannel 的 handler
childHandler 传、入一个供 SocketChannel 使用的 handler,这个 handler 将会在每个客户端连接时调用
bind 方法,服务器在 bind 方法中完成启动
ChannelFuture channelFuture = serverBootstrap.bind("127.0.0.1", 8090).sync();
AbstractBootstrap#bind
AbstractBootstrap#doBind
initAndRegister、doBind0 两个主要方法
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
doBind0(regFuture, channel, localAddress, promise);
=================================== initAndRegister 方法
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 通过 ServerBoostrap 的 ReflectiveChannelFactory 工厂反射创建 NioServerSocketChannel
// 通过 Nio 的 SelectorProvider 的 openServerSocketChannel 方法获取到 JDK 的 channel,然后包装成 netty 的 channel
// 创建一个唯一的 channelId
// 创建一个 NioMessageUnsafe,用于操作消息
// 创建一个 DefaultChannelPipeline 管道,本质是一个双向链表,用于过滤消息
// 创建一个 NioServerSocketChannelConfig 对象,用于对外展示配置
channel = channelFactory.newChannel();
----------------------------------------------------
ReflectiveChannelFactory#newChannel
public T newChannel() {
try {
return clazz.getConstructor().newInstance();
sun.nio.ch.SelectorProviderImpl#openServerSocketChannel
io.netty.channel.socket.nio.NioServerSocketChannel#NioServerSocketChannel(java.nio.channels.ServerSocketChannel)
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
io.netty.channel.nio.AbstractNioMessageChannel#newUnsafe
io.netty.channel.DefaultChannelPipeline#DefaultChannelPipeline
// 初始化 DefaultChannelPipeline 双向链表
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
io.netty.channel.socket.nio.NioServerSocketChannel#javaChannel
return (ServerSocketChannel) super.javaChannel();
io.netty.channel.socket.DefaultServerSocketChannelConfig#DefaultServerSocketChannelConfig
----------------------------------------------------
// 设置 NioServerSocketChannel 的 tcp 属性 使用同步方法 因为 LinkedHashMap 非线程安全
// 对 NioServerSocketChannel 的 ChannelPipline 添加 ChannelInitializer 处理器
init(channel);
io.netty.bootstrap.ServerBootstrap#init
//获取 tcp 的配置信息
final Map<ChannelOption<?>, Object> options = options0();
// 设置channel 的配置信息
synchronized (options) {setChannelOptions(channel, options, logger);}
channel.attr(key).set(e.getValue());
//获取 ChannelPipline
ChannelPipeline p = channel.pipeline();
// ChannelPipline 添加 ChannelInitializer 处理器
p.addLast(new ChannelInitializer<Channel>() {...}
io.netty.channel.DefaultChannelPipeline#addLast(io.netty.util.concurrent.EventExecutorGroup, io.netty.channel.ChannelHandler...)
addLast(executor, null, h);
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
// 创建 AbstractChannelHandlerContext
// ChannelHandlerContext 是 ChannelHander 和 ChannelPipline 之间的关联
// 每当有 ChannelHandler 添加到 Pipline 中时都会创建 context
newCtx = newContext(group, filterName(name, handler), handler);
// 将Context 添加到链表中
addLast0(newCtx);
}
// 同步或异步执行 callHandlerAdded0
callHandlerAdded0(newCtx);
io.netty.bootstrap.AbstractBootstrap#initAndRegister
// 将注册 NioServerSocketChannel
ChannelFuture regFuture = config().group().register(channel);
io.netty.channel.nio.AbstractNioChannel#doRegister
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
==================================================doBind0
io.netty.bootstrap.AbstractBootstrap#doBind
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
//
doBind0(regFuture, channel, localAddress, promise);
}
}
});
io.netty.bootstrap.AbstractBootstrap#doBind0
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, () -> {next.invokeBind(localAddress, promise);}, promise, null);
}
io.netty.channel.AbstractChannelHandlerContext#invokeBind
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
io.netty.handler.logging.LoggingHandler#bind
ctx.bind(localAddress, promise);
io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
next.invokeBind(localAddress, promise);
io.netty.channel.AbstractChannelHandlerContext#invokeBind
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
io.netty.channel.DefaultChannelPipeline.HeadContext#bind
unsafe.bind(localAddress, promise);
io.netty.channel.AbstractChannel.AbstractUnsafe#bind
doBind(localAddress);
io.netty.channel.socket.nio.NioServerSocketChannel#doBind
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
sun.nio.ch.ServerSocketChannelImpl#bind
public ServerSocketChannel bind(SocketAddress var1, int var2) throws IOException {
synchronized(this.lock) {
if (!this.isOpen()) {
throw new ClosedChannelException();
} else if (this.isBound()) {
throw new AlreadyBoundException();
} else {
InetSocketAddress var4 = var1 == null ? new InetSocketAddress(0) : Net.checkAddress(var1);
SecurityManager var5 = System.getSecurityManager();
if (var5 != null) {
var5.checkListen(var4.getPort());
}
NetHooks.beforeTcpBind(this.fd, var4.getAddress(), var4.getPort());
Net.bind(this.fd, var4.getAddress(), var4.getPort());
Net.listen(this.fd, var2 < 1 ? 50 : var2);
synchronized(this.stateLock) {
this.localAddress = Net.localAddress(this.fd);
}
return this;
}
}
}
netty 启动 2 个 EventLoopGroup 线程池 BoosGroup,WorkerGroup
BoosGroup,WorkerGroup 默认启动线程的数量为 CPU *2
Bootstrap 将 boosGroup 设置为 group 属性,将 worker 设置为 childer 属性
通过 bind 方法启动
bind 方法中的 initAndRegister 方法通过反射创建 NioServerSocketChannel 及 NIO 的相关对象 pipline unsafe 等
bind 的方法中的 dobind 方法调用 dobind0 通过 NioServerSocketChannel 的 doBind 方法对 JDK 的 channel 和 端口进行绑定,完成启动,并监听端口
6.3 ChannelPipline、ChannelHandler、ChannelHandlerContext
6.3.1 三者的关系
每当 ServerSocket 创建一个新连接,就会创建一个 Socket 对应目标客户端
每一个新的 Socket 都会分配一个全新的 ChannelPipline
每个 ChannelPipline 内部包含多个 ChannelHandlerContext
它们一起组成双端链表,这些 ChannelHandlerContext 用于包装 ChannelHandler
6.3.2 ChannelPipline
public interface ChannelPipeline
extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {
ChannelPipeline 继承 inBound、outBound、iterable 接口,这使得其即可以 调用数据出站也可以调用数据入站同时还能遍历链表
How an event flows in a pipeline
The following diagram describes how I/O events are processed by ChannelHandlers in a ChannelPipeline typically. An I/O event is handled by either a ChannelInboundHandler or a ChannelOutboundHandler and be forwarded to its closest handler by calling the event propagation methods defined in ChannelHandlerContext, such as ChannelHandlerContext.fireChannelRead(Object) and ChannelHandlerContext.write(Object).
I/O Request
via Channel or
ChannelHandlerContext
|
+---------------------------------------------------+---------------+
| ChannelPipeline | |
| \|/ |
| +---------------------+ +-----------+----------+ |
| | Inbound Handler N | | Outbound Handler 1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler N-1 | | Outbound Handler 2 | |
| +----------+----------+ +-----------+----------+ |
| /|\ . |
| . . |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
| [ method call] [method call] |
| . . |
| . \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 2 | | Outbound Handler M-1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 1 | | Outbound Handler M | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
+---------------+-----------------------------------+---------------+
| \|/
+---------------+-----------------------------------+---------------+
| | | |
| [ Socket.read() ] [ Socket.write() ] |
| |
| Netty Internal I/O Threads (Transport Implementation) |
+-------------------------------------------------------------------+
An inbound event is handled by the inbound handlers in the bottom-up direction as shown on the left side of the diagram. An inbound handler usually handles the inbound data generated by the I/O thread on the bottom of the diagram. The inbound data is often read from a remote peer via the actual input operation such as SocketChannel.read(ByteBuffer). If an inbound event goes beyond the top inbound handler, it is discarded silently, or logged if it needs your attention.
An outbound event is handled by the outbound handler in the top-down direction as shown on the right side of the diagram. An outbound handler usually generates or transforms the outbound traffic such as write requests. If an outbound event goes beyond the bottom outbound handler, it is handled by an I/O thread associated with the Channel. The I/O thread often performs the actual output operation such as SocketChannel.write(ByteBuffer).
For example, let us assume that we created the following pipeline:
ChannelPipeline p = ...;
p.addLast("1", new InboundHandlerA());
p.addLast("2", new InboundHandlerB());
p.addLast("3", new OutboundHandlerA());
p.addLast("4", new OutboundHandlerB());
p.addLast("5", new InboundOutboundHandlerX());
In the example above, the class whose name starts with Inbound means it is an inbound handler. The class whose name starts with Outbound means it is a outbound handler.
In the given example configuration, the handler evaluation order is 1, 2, 3, 4, 5 when an event goes inbound. When an event goes outbound, the order is 5, 4, 3, 2, 1. On top of this principle, ChannelPipeline skips the evaluation of certain handlers to shorten the stack depth:
3 and 4 don't implement ChannelInboundHandler, and therefore the actual evaluation order of an inbound event will be: 1, 2, and 5.
1 and 2 don't implement ChannelOutboundHandler, and therefore the actual evaluation order of a outbound event will be: 5, 4, and 3.
If 5 implements both ChannelInboundHandler and ChannelOutboundHandler, the evaluation order of an inbound and a outbound event could be 125 and 543 respectively.
handler 用于处理入站和出站事件,pipline 就是一个过滤器,可以通过 pipline 来控制事件如何处理以及 handler 在 pipline 中如何交互
I/O 事件由 Inbound Handler 或 Outbound Handler 处理,并通过调用 ChannelHandlerContext 的 fireChannelRead 方法将事件转发给最近的处理程序
入站事件由入站处理程序自下而上处理,入站处理通常由底部的 I/O 线程 生成入站数据,通常出 SocketChannel.read(ByteBuffer) 获取
一个 pipline 通常有多个 handler
decode handler -> compute handler -> encod handler
在处理业务时,对于耗时较长的业务会影响 netty 程序的性能,可以考虑放入线程池中异步处理
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("logger", new LoggingHandler(LogLevel.INFO));
// 1
pipeline.addLast(workgroup, new StringDecoder());
pipeline.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel channel = ctx.channel();
// 2
channel.eventLoop().execute(() -> {
System.out.println("将耗时的操作交由 taskQueue 处理 提交多个任务还是一个线程在执行");
});
// 3
channel.eventLoop().schedule(() -> {
System.out.println(" 将耗时的操作交由 scheduledTaskQueue 处理 提交多个任务使用不同的线程 ");
}, 10, TimeUnit.SECONDS);
}
});
}
}
6.3.3 ChannelHandler
ChannelHandler 的作用是处理 I/O 事件,并将其转发给下一个 ChannelHandler 处理
handler 处理事件分为入站和出站,两个方向的操作都是不同的
server
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import java.util.List;
public class NettyHandlerServer {
/*
出栈
client --------------------> server
<--------------------
入栈
client server
+--------------------------+ +--------------------------+
+---------------- | decoder (inBoundHandler) | <---+ +--------- | encoder(outBoundHandler) | <------------+
| +--------------------------+ | | +--------------------------+ |
| | | |
↓ | ↓ |
+---------------+ +--------+ +---------------+
| clientHandler | | socket | | serverHandler |
+---------------+ +--------+ +---------------+
| ↑ | ↑
| | | |
| +--------------------------+ | | +--------------------------+ |
+----------------> | encoder(outBoundHandler) | ---+ +----------> | decoder (inBoundHandler) | -----------+
+--------------------------+ +--------------------------+
不论解码器handler 还是 编码器handler 即接 收的消息类型必须与待处理的消息类型一致, 否则该handler不会被执行
在解码器 进行数据解码时,需要判断 缓存 区 的数据是否足够 ,否则接收到的结果会与期望的结果不一致
LineBasedFrameDecoder:这个类在Netty内部也有使用,它使用行尾控制字符(\n或者\r\n) 作为分隔符来解析数据。
DelimiterBasedFrameDecoder:使用自定义 的特殊字符作为消息的分隔符。
HttpObjectDecoder:一个HTTP数据的解码器
LengthFieldBasedFrameDecoder:通过指定 长度来标识整包消息,这样就可以自动的处理 黏包和半包消息。
StringDecoder
....
*/
public static void main(String[] args) {
EventLoopGroup boosGroup = null;
EventLoopGroup workGroup = null;
try {
boosGroup = new NioEventLoopGroup(1);
workGroup = new NioEventLoopGroup(4);
ServerBootstrap serverBootstrap = new ServerBootstrap()
.group(boosGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 64)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加一个入站的解码器
// pipeline.addLast(new ByteToMessageDecoder() {
// @Override
// protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
//
// /*
// decode 方法会根据接收的数据 被调用多次 直到没有新的元素被添加到 out 中 或者 ByteBuf 没有更多的可读字节
//
// 如果 out 不为空 就会将 list 传递给下一个 ChannelInboundHandler 处理 该处理器的方法也会被调用多次
//
// */
//
// System.out.println("server 端解码器 ByteToMessageDecoder 被调用~~~~");
//
// // 因为 long 8个字节, 需要判断有8个字节,才能读取一个long
// if (in.readableBytes() >= 8) out.add(in.readLong());
//
// }
// });
// ReplayingDecoder扩展了ByteToMessageDecoder类,使用这个类,
//我们不必调用readableBytes()方法。参数S指定了用户状态管理的类型
//其中Void代表不需要状态管理
//ReplayingDecoder使用方便,但它也有一些局限性:
// 1.并不是所有的ByteBuf操作都被支持,如果调用了一个不被支持的方法,将会抛出一个UnsupportedOperationException.
// 2.ReplayingDecoder在某些情况下可能稍慢于ByteToMessageDecoder,例如网络缓慢并且消息格式复杂时,消息会被拆成了多个碎片,速度变慢
pipeline.addLast(new ReplayingDecoder<Void>() {
// public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("server 端解码器 ReplayingDecoder 被调用~~~~");
// ReplayingDecoder 无需判断数据是否足够读取,内部会进行判断
out.add(in.readLong());
}
});
// 添加自定义 handler
pipeline.addLast(new SimpleChannelInboundHandler<Long>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("自定义 handler SimpleChannelInboundHandler 被调用。。。。");
System.out.println("从 client:" + ctx.channel().remoteAddress() + " 读取到数据 msg = " + msg);
// 回复一条信息
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
System.out.println("channel:" + ctx.channel().hashCode() + " 加入 pipline:" + ctx.pipeline().hashCode());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
System.out.println("channel:" + ctx.channel().hashCode() + " 成功建立连接 ");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
ctx.close();
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind("127.0.0.1", 8090).sync();
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) System.out.println("server 监听 8090 端口成功");
else System.out.println("server 监听 8090 端口失败");
}
});
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
boosGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
client
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.CharsetUtil;
public class NettyHandlerClient {
public static void main(String[] args) {
EventLoopGroup group = null;
try {
group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加一个出站的编码器
pipeline.addLast(new MessageToByteEncoder<Long>() {
@Override
protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
System.out.println("client 出站编码器 MessageToByteEncoder 被调用~~~~");
System.out.println("[MessageToByteEncoder] client 端发送的信息 msg = " + msg);
// 将数据编码后发出
out.writeLong(msg);
}
});
// 添加自定义 handler 在 channel 建立成功时发送数据 并 读取 server 端 回复的信息
pipeline.addLast(new SimpleChannelInboundHandler<Long>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
System.out.println("client 自定义 handler channelActive 被调用~~~~");
// ctx.writeAndFlush(12345678L);
/*
qwertyuiqwertyui 16个字节
server 端的解码器每次处理 8 个字节
所以 server 端编码器被调用了两次 解编码器将数据分两次发送到下游的 handler 所有 下游的 handler 也被调用了两次
client 端的编码器没有被调用????
编码器 MessageToByteEncoder 的 write 方法会判断传入的类型是不是需要编码处理
因此编写 Encoder 是要注意传入的数据类型和处理的数据类型一致
io.netty.handler.codec.MessageToByteEncoder.write
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
if (acceptOutboundMessage(msg)) { //判断当前msg 是不是应该处理的类型,如果是就处理,不是就跳过encode
@SuppressWarnings("unchecked")
I cast = (I) msg;
buf = allocateBuffer(ctx, cast, preferDirect);
try {
encode(ctx, cast, buf);
} finally {
ReferenceCountUtil.release(cast);
}
if (buf.isReadable()) {
ctx.write(buf, promise);
} else {
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable e) {
throw new EncoderException(e);
} finally {
if (buf != null) {
buf.release();
}
}
}
*/
ctx.writeAndFlush(Unpooled.copiedBuffer("qwertyuiqwertyui", CharsetUtil.UTF_8));
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8090).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}
6.3.4 ChannelHandlerContext
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
ChannelInboundInvoker、ChannelOutboundInvoker 这两个 invoker 针对入站或出站的 handler 再包装一层,已达到在方法前后拦截并做一些特定的操作
ChannelHandlerContext 除继承 ChannelInboundInvoker、ChannelOutboundInvoker 的方法外,还自定义了一些方法 用于获取 Context 上下文环境,如 channel、executor、handler、pipline、内存分配器等信息
6.3.5 三者创建过程
每当 ChannelSocket 创建都会绑定一个 pipline 两者一一对应
创建 pipline(DefaultChannelPipeline) 时也会创建 tail 和 head 节点形成最初的链表
tail 是入站 inbound 类型的 handler
head 既时 inbound 也是 outbound 类型的handler
在调节 pipline 的 addLast 方法时就会根据给定的 handler 创建一个 context 然后将 这个 context 插入到链表的尾部(tail 前)
io.netty.channel.DefaultChannelPipeline#addLast(io.netty.channel.ChannelHandler...)
return addLast(null, handlers);
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 检查 handler 是否是共享的,如果不是且已经被其他 pipline 使用 则抛出异常
checkMultiplicity(handler);
/*
创建 context
io.netty.channel.DefaultChannelHandlerContext#DefaultChannelHandlerContext
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.inbound = inbound;
this.outbound = outbound;
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
*/
newCtx = newContext(group, filterName(name, handler), handler);
/*
将 context 添加到链表中
io.netty.channel.DefaultChannelPipeline#addLast0
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
*/
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
/*
判断 channel 是否注册到 selector 上
没有这会在 io.netty.bootstrap.AbstractBootstrap#initAndRegister 方法中注册
ChannelFuture regFuture = config().group().register(channel);
*/
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
6.4 EventLoop
ScheduledExecutorServer 定时任务接口,表示 EventLoop 可以接受定时任务
SingleThreadEventExecutor 表示每个 EventLoop 是一个 单独的线程池
EventLoop 是一个单列的线程池,主要做 3 件事:监听端口,处理端口事件,处理队列事件
io.netty.util.concurrent.SingleThreadEventExecutor#execute
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
// 判断是否为当前线程
boolean inEventLoop = inEventLoop();
if (inEventLoop) { // 如果是当前线程直接加入队列
addTask(task);
} else { // 如果不是则 启动该线程 并将任务提交到队列(io.netty.util.internal.shaded.org.jctools.queues.BaseMpscLinkedArrayQueue)
startThread();
-----------------------------------------------------------
io.netty.util.concurrent.SingleThreadEventExecutor#startThread
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
try {
doStartThread(); // 启动线程
io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread
private void doStartThread() {
assert thread == null;
/*
executor 为 EventLoop 创建的时的 ThreadPerTaskExecutor
ThreadPerTaskExecutor 的 execute 方法将 Runnable 包装成 FastThreadLocalRunnable
io.netty.util.concurrent.ThreadPerTaskExecutor#execute
hreadFactory.newThread(command).start();
io.netty.util.concurrent.DefaultThreadFactory#newThread(java.lang.Runnable)
Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
*/
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
// 判断线程中断状态
if (interrupted) {
thread.interrupt();
}
boolean success = false;
//设置最后一次执行时间
updateLastExecutionTime();
try {
// 启动线程
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
/*
CAS 不断修改 state 的状态为 ST_SHUTTING_DOWN
确认线程是否关闭
然后执行 cleanup() 方法 更新线程状态
ST_TERMINATED 释放当前线程,并打印队列中还有多少未完成的任务
*/
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
"before run() implementation terminates.");
}
try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.release();
if (!taskQueue.isEmpty()) {
logger.warn(
"An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
}
io.netty.channel.nio.NioEventLoop#run
for(;;){
select(); 监听端口
processSelectedKeys(); 处理端口事件
runAllTasks(); 处理队列事件
}
@Override
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
// select 方法 监听端口
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
// processSelectedKeys 方法 处理端口事件
processSelectedKeys();
} finally {
// Ensure we always run tasks.
// runAllTasks 执行任务 处理队列事件
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
-----------------------------------------------------------
// 添加到 io.netty.util.internal.shaded.org.jctools.queues.BaseMpscLinkedArrayQueue 队列
addTask(task);
// 如果当前当前线程异常 则执行拒绝策略 默认是抛出异常
if (isShutdown() && removeTask(task)) {
reject();
}
}
// 添加的任何没有被唤醒 并且 任务类型不是 NonWakeupRunnable 就唤醒 selector
// 此时阻塞 selector 的线程就会立即返回
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
------------------------------------------------
sun.nio.ch.WindowsSelectorImpl#wakeup
public Selector wakeup() {
synchronized(this.interruptLock) {
if (!this.interruptTriggered) {
this.setWakeupSocket();
this.interruptTriggered = true;
}
return this;
}
}
--------------------------------------------
}
}
io.netty.channel.nio.NioEventLoop#run
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
6.4.1 select
select(wakenUp.getAndSet(false));
调用 selector 方法,默认阻塞 1s
如果有定时任务,则在剩余的时间基础上再加上 0.5s 进行阻塞
当执行 execute 方法时,也就是添加任务的时候就会唤醒 selector 防止 selector 阻塞的时间过长
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
// 默认阻塞 1s
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
// 1s 后返回,有返回值 || select 被用户唤醒 || 任务队列有任务 || 有定时任务将被调用
// 否则退出循环
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}
6.4.2 processSelectedKeys
processSelectedKeys();
io.netty.channel.nio.NioEventLoop#processSelectedKeys
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
io.netty.channel.nio.NioEventLoop#processSelectedKeysOptimized
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
// 获取共享数据
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
// 1 read
// 2 write
// 8 op_connect
// 16 op_accept
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
// 处理 16 accept 事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 读取 boosGroup 的 NioServerSocketChannnel 数据
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
1.检测是否是当前线程
2.执行doReadMessages(readBuf) 传入一个 list 容器
3.doReadMessages 读取 boos 线程中的 NioServerSocketChannel 介绍到的请求,并将请求放入容器中
4.循环遍历容器中的所有请求,通过 pipline 的 fireChannelRead 方法执行 handler 的 ChannelRead
public void read() {
// 判断是否是当前线程
assert eventLoop().inEventLoop();
// 获取 channel 配置
final ChannelConfig config = config();
// 获取 pipline
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
// 通过 ServerSocket 的 accept 方法获取到 tcp 连接,
// 然后封装成 netty 的 NioSocketChannel 对象,最后添加到容器中
int localRead = doReadMessages(readBuf);
-------------------------------------------------------------
io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages
protected int doReadMessages(List<Object> buf) throws Exception {
// 通过 SocketUtils 工具类 调用 ServerSocketChannel 的 accept 方法 获取 tcp 连接
SocketChannel ch = SocketUtils.accept(javaChannel());
// 将 jdk 的 channel 包装成 netty NioSocketChannel 并添加到 容器中
buf.add(new NioSocketChannel(this, ch));
-------------------------------------------------------------
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 循环调用 ServerSocket 的 pipline 的 fireChannelRead 执行管道中的 handler 的 channelRead 方法
// pipline 中有我们添加的所有 handler headContext ... serverBootstrap ... tailContext
pipeline.fireChannelRead(readBuf.get(i));
-----------------------------------------------------------------
io.netty.channel.DefaultChannelPipeline#fireChannelRead
io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(io.netty.channel.AbstractChannelHandlerContext, java.lang.Object)
io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object)
循环几遍然后到 ServerBootstrapAcceptor
io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 获取到 NioSocketChannel
final Channel child = (Channel) msg;
// 添加 NioSocketChannel 的 pipline 的 handler
child.pipeline().addLast(childHandler);
// 设置 channel 的属性
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
// !!!! 将 NioSocketChannel 注册到 childGroup(workerGroup) 中的 EventLoop 上 并为其添加一个监听器
// sun.nio.ch.WindowsSelectorImpl#implRegister
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
-----------------------------------------------------------------
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
6.4.3 runAllTask
io.netty.util.concurrent.SingleThreadEventExecutor#runAllTasks(long)
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
io.netty.util.concurrent.SingleThreadEventExecutor#runAllTasks(long)
safeExecute(task);
io.netty.util.concurrent.AbstractEventExecutor#safeExecute
task.run();
io.netty.channel.AbstractChannelHandlerContext.AbstractWriteTask#run
public final void run() {
try {
// Check for null as it may be set to null if the channel is closed already
if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
ctx.pipeline.decrementPendingOutboundBytes(size);
}
write(ctx, msg, promise);
} finally {
// Set to null so the GC can collect them directly
ctx = null;
msg = null;
promise = null;
handle.recycle(this);
}
}
io.netty.channel.AbstractChannelHandlerContext.WriteAndFlushTask#write
io.netty.channel.AbstractChannelHandlerContext#invokeWrite
private void invokeWrite(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
} else {
write(msg, promise);
}
}
io.netty.channel.AbstractChannelHandlerContext#invokeWrite0
io.netty.handler.codec.MessageToMessageEncoder#write
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
encode(ctx, cast, out);
ctx.write(msg, promise);
io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, io.netty.channel.ChannelPromise)
io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, boolean, io.netty.channel.ChannelPromise)
io.netty.channel.AbstractChannelHandlerContext#invokeWrite
io.netty.channel.AbstractChannel.AbstractUnsafe#write
6.5 Buffer
NIOBuffer
public static void main1(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(5);
buffer.put((byte) 1);
buffer.put((byte) 2);
buffer.put((byte) 3);
buffer.put((byte) 'A');
buffer.put((byte) 'H');
System.out.println("capacity:" + buffer.capacity());
buffer.flip();
for (int i = 0; i < buffer.capacity(); i++) {
System.out.println(buffer.get());
}
ByteBuffer byteBuffer = ByteBuffer.wrap("asdsadsa".getBytes(StandardCharsets.UTF_8));
}
Netty Bytebuf
/*
1.ByteBuf 底层是一个 byte[]
2.在 netty 的 ByteBuf 中无需使用 flip 反转
其底层维护 readIndex 和 writeIndex
通过 readIndex writeIndex capacity 将 buffer 分成三个区域
0 --------- readIndex 已经读取的区域
0 ------------------------ writeIndex 已写区域
readIndex ---- writeIndex 可读区域
0 ----------------------------------------- capacity 容量
writeIndex ---- capacity 可写区域
*/
//创建一个buffer byte[5]
ByteBuf buffer = Unpooled.buffer(5);
for (int i = 0; i < buffer.capacity(); i++) {
buffer.writeByte(i);
}
System.out.println("capacity:" + buffer.capacity());
for (int i = 0; i < buffer.capacity(); i++) {
System.out.println(buffer.getByte(i));
}
System.out.println("...........................................");
for (int i = 0; i < buffer.capacity(); i++) {
System.out.println(buffer.readByte());
}