文章目录
- Netty整体架构
- 一个启动流程
- 源码解析
- new NioEventLoopGroup(1)
- 构建线程池
- 基础信息构建
- 线程选择策略
- group
- channel
- handler
- childHandler
- bind
- initAndRegister
- 反射创建 NioServerSocketChannel 对象
- init
- 注册channel
- doBind0
- 流程图
- 思考
Netty整体架构
是什么: Netty 是一个 NIO 客户端服务器框架,可以快速轻松地开发网络应用程序,例如协议服务器和客户端。
解决什么问题: 我们知道NIO编程虽然可以解决网络编程IO问题,但是其繁琐的细节配置,对于开发意义网络组员来说其实是复杂的,而Netty就是在NIO的基础上进一步封装,一个网络编程框架。
核心组件
BootStrap->Channel -> EventLoop&EventLoopGroup -> ChannelPipeline -> ChannelHandler .
一个启动流程
这是一个简单的服务端HTTP的Demo,我们按照每行代码的方式 ,逐步分析整个启动过程具体是做了哪些工作。
/**
* @author qxlx
* @date 2024/7/28 20:52
*/
public class SImpleServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGrpup = new NioEventLoopGroup(1);
EventLoopGroup wrokerGrpup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGrpup,wrokerGrpup)
.channel(NioServerSocketChannel.class)
.handler(new SimpleServerHandler())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
}
});
ChannelFuture f = bootstrap.bind(8888).sync();
f.channel().closeFuture().sync();
}
private static class SimpleServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive");
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelRegistered");
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerAdded");
}
}
}
源码解析
new NioEventLoopGroup(1)
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
// 不指定线程数,则使用默认自定义的线程数
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
可以看到通过调用super父类的方式,判断传入的nThreads的线程数等于0,就使用默认的方式,比如当前是10核,就创建20个线程。否则就采用传入执行的线程数。我们知道对于boss线程来说,其实本身就只有一个线程进行接收客户端传入的线程,所以线程数就是1。
private static final int DEFAULT_EVENT_LOOP_THREADS;
// 获取属性对应的值,获取不到 就是CPU的两倍
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
}
}
构建线程池
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
// 判断线程数是否小于0
checkPositive(nThreads, "nThreads");
// 如果没有构建,使用默认的
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// 产生nThreads个Nio保存在数组中
// EventExecutor 是excutor的子类
// 这里因为创建的是1个线程。创数组大小是0
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// ⭐️
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 执行失败 将前面创建成功的直接shutDown 下面是关于异常的处理
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
// 遍历线程 给每个线程创建都田间一个关闭的监听器
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
}
可以看到EventExecutor 其实顶层的父类就是Executor,所以就是一个线程池的实现。
基础信息构建
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
// 参数传递进来的
return new NioEventLoop(this, executor, selectorProvider,
selectStrategyFactory.newSelectStrategy(),
rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}
// 构造方法进行初始化
NioEventLoop(参数 忽略) {
super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
// NIO的封装 // 完成selector的创建
final SelectorTuple selectorTuple = openSelector();
// 创建selector
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
线程选择策略
这里根据不同的选择器 实现不同策略,如果是2的倍数,选择使用& 否则就是 %的算法。
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
public EventExecutor next() {
return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
}
总结下,上述过程就是这样的。
group
这里其实就是调用父类 AbstractBootstrap 设置主group , 然后在设置 子group
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
// 初始化父类 将bossGroup赋值
super.group(parentGroup);
// 如果childGroup变量 如果已经有值,抛出异常
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
// childGroup = workerGroup赋值
this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
return this;
}
channel
将 NioServerSocketChannel 作为Class对象传入。然后获取构造方法,然后获取NioServerSocketChannel的构造方法,在之后的流程中,其实就是利用反射创建NioServerSocketChannel对象。
public B channel(Class<? extends C> channelClass) {
// 直接调用channelFactory工厂,创建一个ReflectiveChannelFactory 工厂方法
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
public ReflectiveChannelFactory(Class<? extends T> clazz) {
// 通过Class对象 获取构造参数
// 等待后续的利用反射进行创建对象
this.constructor = clazz.getConstructor();
}
// channelFactory赋值
this.channelFactory = channelFactory;
handler
设置handler , 然后返回self(); 其实就是返回this。这样就可以实现链式编程。
public B handler(ChannelHandler handler) {
// 设置handler
this.handler = ObjectUtil.checkNotNull(handler, "handler");
return self();
}
private B self() {
return (B) this;
}
childHandler
设置childHandler 属性,可以看到很多地方都做了判空的处理逻辑。
public ServerBootstrap childHandler(ChannelHandler childHandler) {
// 设置childHanlder
this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
// this是当前对象的引用
return this;
}
其实到这里,都是一些前戏的工作。
bind
initAndRegister
反射创建 NioServerSocketChannel 对象
因为之前channelFactory设置的是 ReflectiveChannelFactory
,所以就是利用构造方法创建NioServerSocketChannel
的对象,既然创建对象,那么必定会调用 NioServerSocketChannel
的构造方法。
channel = channelFactory.newChannel();
// 对象实例化
return constructor.newInstance();
构造方法里,其实就是 创建了一个NIO的通道对象,ServerSocketChannel
的实现 ServerSocketChannelImpl
public NioServerSocketChannel() {
this(DEFAULT_SELECTOR_PROVIDER);
}
// 创建一个底层变量
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
// 核心是创建一个 ServerSocketChannel
private static ServerSocketChannel newChannel(SelectorProvider provider, InternetProtocolFamily family) {
try {
// 创建一个channel
ServerSocketChannel channel =
SelectorProviderUtil.newChannel(OPEN_SERVER_SOCKET_CHANNEL_WITH_FAMILY, provider, family);
// 打开一个服务端套接字通道对象
// 底层其实就是创建ServerSocketChannel ⭐️ ServerSocketChannelImpl
return channel == null ? provider.openServerSocketChannel() : channel;
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
}
public NioServerSocketChannel(ServerSocketChannel channel) {
// 设置为接受事件
super(null, channel, SelectionKey.OP_ACCEPT);
// 设置config属性
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp; // OP_ACCEPT
// 设置当前的serverSocketChannel为非阻塞的
ch.configureBlocking(false);
}
init
1.设置channel的option
2.设置channel的attr
3.设置handler的pipeline
4.pipeline添加channelInitializer对象 ,并且使用 ch.eventLoop().execute 线程池 创建了一个专门接受客户端请求的Acceptor对象。添加到pipeline管道对象中
void init(Channel channel) {
// 1.设置新介入的channel的option
setChannelOptions(channel, newOptionsArray(), logger);
// 2、设置新接入channel的attr
setAttributes(channel, newAttributesArray());
// 3、设置handler到pipeline上
// 获取到通道流水线对象
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
final Collection<ChannelInitializerExtension> extensions = getInitializerExtensions();
// 4.pipeline 添加一个channelInitializer对象 ServerBootstrapAcceptor
// 通道初始化对象
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// 这是一个接入器,专门接受新请求,把新的请求扔给某个事件循环器
// 看到这里,我们发现其实init只是初始化了一些基本的配置和属性,
// 以及在pipeline上加入了一个接入器,用来专门接受新连接,并没有启动服务.
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs,
extensions));
}
});
}
});
if (!extensions.isEmpty() && channel instanceof ServerChannel) {
ServerChannel serverChannel = (ServerChannel) channel;
for (ChannelInitializerExtension extension : extensions) {
try {
extension.postInitializeServerListenerChannel(serverChannel);
} catch (Exception e) {
logger.warn("Exception thrown from postInitializeServerListenerChannel", e);
}
}
}
}
注册channel
ChannelFuture regFuture = config().group().register(channel);
// 启动线程池
eventLoop.execute(new Runnable() { // ⭐️
@Override
public void run() {
register0(promise); // 分析
}
});
doRegister();
这里其实启动一个线程,一个死循环 接受将channel注册到selector中。 然后通过selector就可以获取有事件的线程进行操作。 ServerSocketChannel注册到Selector中。
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// ServerSocketChannel注册到Selector中。
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
// 在注册完毕之后,调用这个就会触发
pipeline.invokeHandlerAddedIfNeeded();
doBind0
因为调用链路比较长,最终 其实就是根据版本的不同 将端口和channel进行绑定。
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) { // 全连接队列的大小
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
流程图
思考
对于一个NIO原生程序来说,本质做了几件事情
- 创建EventLoop对象
- 完成ServerSocketChannel的创建 初始化过程
- 以及ServetSocketChannel的注册工作
- 端口绑定
- 设置事件
根据 我们从源码分析一个常见netty的启动过程的过程本质上,也是完成了对上述过程NIO的封装过程。基础属性的设置,创建 NioServerSocketChannel的实例 本质就是 ServetSocketChannel。以及完成注册到selector的过程。最后完成端口绑定的工作。虽然netty抽象几个概念,但是万变不离其中,本质还是NIO的封装。
那么剩下来,要研究的重点是什么呢?
- eventloop 线程是如何接受任务的。
- unsafe的写过程
- channeloutbounBuffer
- channelPipeline
- netty常见优化方案
- fastThreadLocal
- 内存池
- 事件轮
- 编码器