【Netty 源码】服务端启动流程源码分析 篇一

news2025/1/16 19:04:41

【Netty 源码】服务端启动流程源码分析 篇一

1.原生Java NIO服务端创建流程

使用Java NIO创建服务端时,通常我们需要先创建Channel,Selector两个对象,然后将Channel绑定端口并注册到Selector上,最后对事件轮询监听

        //第一步:创建Channel
        ServerSocketChannel channel = ServerSocketChannel.open();
        channel.configureBlocking(false);
        //第二步:创建selector
        Selector selector = Selector.open();
        //第三步:将Channel注册到selector
        SelectionKey selectionKey = channel.register(selector, 0, new Object());
        //第四步:Channel监听端口
        channel.bind(new InetSocketAddress(8080));
        //第五步:关注感兴趣的事件
        selectionKey.interestOps(SelectionKey.OP_ACCEPT);

2.Netty 服务端创建流程

Netty的服务端创建流程都在 ServerBootstrap.bind方法中完成

private ChannelFuture doBind(final SocketAddress localAddress) {
    //创建 channel并初始化
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }
    //不能肯定register完成,因为register是丢到nio event loop里面执行去了。
    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        //进一步进行绑定操作(此处的绑定指的是将 channel 绑定 selector)
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        //等着register完成后再通知再执行bind
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();

                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

2.1 initAndRegister() 初始化Channel并将Channel注册到Selector

final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
        		//通过工厂类创建Channel
            channel = channelFactory.newChannel();
            //完成Channel初始化
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        //开始register
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        // If we are here and the promise is not failed, it's one of the following cases:
        // 1) If we attempted registration from the event loop, the registration has been completed at this point.
        //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
        // 2) If we attempted registration from the other thread, the registration request has been successfully
        //    added to the event loop's task queue for later execution.
        //    i.e. It's safe to attempt bind() or connect() now:
        //         because bind() or connect() will be executed *after* the scheduled registration task is executed
        //         because register(), bind(), and connect() are all bound to the same thread.

        return regFuture;
    }

通过堆栈信息可以看到此方法由main线程进行调用

image-20240310111903103

channelFactory.newChannel()通过DEBUG追踪,调用的 io.netty.channel.ReflectiveChannelFactory#newChannel,底层通过反射无参构造,创建的Channel

    public T newChannel() {
        try {
            //反射创建channel
            return constructor.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
        }
    }

·ChannelFactory 是一个接口类,只有一个抽象方法 newChannel() ,在ServerBootstrap.channel 方法执行时,赋值

io.netty.bootstrap.AbstractBootstrap#channel

image-20240310113115988

io.netty.bootstrap.AbstractBootstrap#channelFactory(io.netty.bootstrap.ChannelFactory<? extends C>)

image-20240310113130755

`NioServerSocketChannel 无参构造被调用的时候会执行 newSocket 方法

public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

参数 DEFAULT_SELECTOR_PROVIDER 被 SelectorProvider.provider() 赋值,不同的平台下 SelectorProvider 实现类不一样

最终通过 provider.openServerSocketChannel() 创建一个Channel

io.netty.channel.socket.nio.NioServerSocketChannel#newSocket

private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        /**
         *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
         *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
         *
         *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
         */
        return provider.openServerSocketChannel();
    } catch (IOException e) {
        throw new ChannelException(
                "Failed to open a server socket.", e);
    }
}

image-20240310152014731

可以看到这里Netty的做法跟原生的Java NIO 是一样的

2.1.1 init() 初始化Channel,并添加ServerBootstrapAcceptor 处理器
    @Override
    void init(Channel channel) {
        //参数配置,后面再看 此时跳过·
        setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger);
        setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0)));

        //此时Channel已经创建出来了,拿到pipeline,准备添加Handler
        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions =
                childOptions.entrySet().toArray(newOptionArray(0));
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
        //ChannelInitializer一次性、初始化handler:
        //负责添加一个ServerBootstrapAcceptor handler,添加完后,自己就移除了:
        //ServerBootstrapAcceptor handler: 负责接收客户端连接创建连接后,对连接的初始化工作。
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                //将ServerBootstrap.Handler()方法设置的Handler添加到pipeline中
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
                //获取一个EventLoop,并提交往pipeline中添加ServerBootstrapAcceptor Handler的任务
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

·ServerBootstrapAcceptor 继承自 ChannelInboundHandlerAdapter ,ServerBootstrapAcceptor作用就是当发生连接事件时与Channel建立连接。

需要注意的时,此时只是往pipeline中添加了一个Handler,并没有真正执行。

2.1.2 register(channel) 将Channel注册到EventLoop
ChannelFuture regFuture = config().group().register(channel);

io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)

  promise.channel().unsafe().register(this, promise);

io.netty.channel.AbstractChannel.AbstractUnsafe#register

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    if (eventLoop == null) {
        throw new NullPointerException("eventLoop");
    }
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }

    AbstractChannel.this.eventLoop = eventLoop;

    //此时还是main线程,因此走false逻辑
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            //将任务交由EventLoop执行
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}
2.2.3 register0()真正执行Channel注册

io.netty.channel.AbstractChannel.AbstractUnsafe#register0

private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        //将Channel注册到Selector上
        doRegister();
        neverRegistered = false;
        registered = true;

        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
         //server socket的注册不会走进下面if,server socket接受连接创建的socket可以走进去。因为accept后就active了。
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805
                beginRead();
            }
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

doRegister 方法将调用父类AbstractNioChannel

io.netty.channel.nio.AbstractNioChannel#doRegister

 protected void doRegister() throws Exception {
        boolean selected = false;
        //死循环注册Channel,0没有绑定事件,并将Channel作为附件,方便后续取出使用
        for (;;) {
            try {
                logger.info("initial register: " + 0);
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }

此处是一个死循环,确保一定会注册上 并且将Channel作为一个附件,方便取出使用。每个eventLoop都维护了一个Selector。

回到register0方法中,在将Channel注册到Selector上后,执行pipeline.invokeHandlerAddedIfNeeded();

io.netty.channel.DefaultChannelPipeline#callHandlerAddedForAllHandlers

private void callHandlerAddedForAllHandlers() {
    final PendingHandlerCallback pendingHandlerCallbackHead;
    synchronized (this) {
        assert !registered;

        // This Channel itself was registered.
        registered = true;

        pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
        // Null out so it can be GC'ed.
        this.pendingHandlerCallbackHead = null;
    }

    // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
    // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
    // the EventLoop.
    PendingHandlerCallback task = pendingHandlerCallbackHead;
    while (task != null) {
        task.execute();
        task = task.next;
    }
}

此时将调用 init()方法中,往pipeline中添加的ChannelInitializer.initChannel()。由此可知,ChannelInitializer.initChannel 只会被调用一次,就是在Channel实例化后,成功注册到Selector上。

image-20240310160635710

此时Channel已经注册到Selector上了,因此重新获取一个EventLoop后,提交任务往pipeline中添加ServerBootstrapAcceptor Handler,负责Channel建立连接事件

在回到 register0 方法,执行完Handler后,将调用 safeSetSuccess(promise) 放promise中设置结果,进而触发promise的监听器,执行doBind0()方法

io.netty.bootstrap.AbstractBootstrap#doBind image-20240310163800134

io.netty.bootstrap.AbstractBootstrap#doBind0

image-20240310163920293

将任务交给EventLoop执行

io.netty.channel.AbstractChannel#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)

public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.bind(localAddress, promise);
}

io.netty.channel.DefaultChannelPipeline#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)

@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return tail.bind(localAddress, promise);
}

每个pipeline默认都有tail和head两个Handler,此时的bind方法将交由tail执行

io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)

public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    if (isNotValidPromise(promise, false)) {
        // cancelled
        return promise;
    }

    final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
    EventExecutor executor = next.executor();
    //任务交由了EventLoop执行,因此走true逻辑
    if (executor.inEventLoop()) {
        next.invokeBind(localAddress, promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeBind(localAddress, promise);
            }
        }, promise, null);
    }
    return promise;
}

invokeBind 方法调用链比较深,最终走到io.netty.channel.socket.nio.NioServerSocketChannel#doBind

image-20240310164350164

`ServerSocketChannel继承自父类ServerSocketChannel,拿到的就是java的ServerSocketChannel,如果java版本大于7,最终通过ServerSocketChannelImpl.bind方法绑定端口号,并设置是否阻塞

执行doBind方法后,AbstractUnsafe将给EventLoop提交pipeline.fireChannelActive 任务。

io.netty.channel.AbstractChannel.AbstractUnsafe#bind

image-20240310164820325

此时的pipeline中有三个Handler,分别是head,tail,acceptor。fireChannelActive 方法将调用他们的channelActive方法。

我们主要看 head.channelActive

io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive

image-20240310165233107

调用链比较深,最终会回到AbstractNioChannel

io.netty.channel.nio.AbstractNioChannel#doBeginRead

Netty 源码学习

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1506749.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ptmalloc、tcmalloc与jemalloc对比分析

文章目录 一、内存管理二、ptmalloc系统角度看ptmalloc内存管理用户角度看ptmalloc内存管理线程中内存管理从工作原理来看从作用来看Chunk说明问题 三、tcmalloc系统角度看tcmalloc内存管理用户角度看tcmalloc内存管理tcmalloc的优势 四、jemalloc系统角度看jemalloc内存管理用…

【Web - 框架 - Vue】随笔 - Vue的简单使用(01) - 快速上手

【Web - 框架 - Vue】随笔 - Vue的简单使用(01) - 快速上手 Vue模板代码 代码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>模板</title> </head> <body> <div></di…

js 获取浏览器相关的宽高尺寸

window 屏幕 屏幕分辨率的高&#xff1a; window.screen.height 屏幕分辨率的宽&#xff1a; window.screen.width 屏幕可用工作区高度&#xff1a; window.screen.availHeight 屏幕可用工作区宽度&#xff1a; window.screen.availWidth document 网页 网页可见区域宽&#xf…

基于网格搜索的随机森林回归算法Python实现

摘要 随机森林回归算法的应用领域很广&#xff0c;可用于市场销售预测、客户细分、商品推荐等领域&#xff0c;也可应用于气候模型预测、土地利用分析、水资源管理等环境领域问题。其算法的Python实现涉及到多参数调优问题&#xff0c;本文使用了网格搜索法&#xff0c;以MSE作…

Springer旗下SCI,16天见刊!稳定检索13年,质量稳定

毕业推荐 SCIE&#xff1a; • 计算机类&#xff0c;6.5-7.0&#xff0c;JCR1区&#xff0c;中科院2区 • 2个月19天录用&#xff0c;6天见刊&#xff0c;36天检索 SCI&EI&#xff08;CCF-C类&#xff09; • 算法类&#xff0c;2.0-3.0&#xff0c;JCR3区&#xff0c…

笔记76:32位/64位操作系统的区别

64位系统和32位系统的区别: 操作系统只是硬件和应用软件中间的一个平台32位操作系统针对的32位的CPU设计64位操作系统针对的64位的CPU设计我们的CPU从原来的8位&#xff0c;16位&#xff0c;到现在的32位和64位&#xff1b;CPU处理计算的时候“数据”和“指令”是不同对待的 &…

解锁网络数据:入门级IP代理使用教程

博主猫头虎的技术世界 &#x1f31f; 欢迎来到猫头虎的博客 — 探索技术的无限可能&#xff01; 专栏链接&#xff1a; &#x1f517; 精选专栏&#xff1a; 《面试题大全》 — 面试准备的宝典&#xff01;《IDEA开发秘籍》 — 提升你的IDEA技能&#xff01;《100天精通鸿蒙》 …

Spring Security认证授权流程详解

认证的工作原理 过滤链 Spring Security框架的作用就是安全访问控制即对所有进入系统的请求进行拦截, 校验每个请求是否能够访问到它所期望的资源 通过Filter或AOP等技术可以实现安全访问控制功能,而Spring Security对Web资源的保护是靠Filter实现的,Spring Security有一个过…

在 .NET 项目中复制资源文件夹到生成目录

本文主要介绍在使用 Visual Studio 进行调试和发布时&#xff0c;如何在 .NET 项目中复制资源文件夹到生成目录。 1. 背景 在开发 .NET 项目的过程中&#xff0c;我们有时会遇到需要在 debug 、 release 或是发布时将资源文件夹复制到生成目录的需求。这些资源可能包括图片、配…

Java旋转矩阵

题目&#xff1a; 给你一幅由 N N 矩阵表示的图像&#xff0c;其中每个像素的大小为 4 字节。请你设计一种算法&#xff0c;将图像旋转 90 度。 不占用额外内存空间能否做到&#xff1f; 示例 1: 给定 matrix [ [1,2,3], [4,5,6], [7,8,9] ], 原地旋转输入矩阵&…

运筹从业者也需要的因果推断入门:基础概念解析和体系化方法理解

文章目录 1 引言2 相关关系 VS 因果关系2.1 相关关系2.2 因果关系2.3 相关关系不等于因果关系 3 因果推断方法3.1 方法体系3.2 方法理解 4 运筹从业者也需要因果推断4.1 问题描述4.2 算法方案4.3 算法验证 5 总结6 相关阅读 1 引言 已经3月初了&#xff0c;原计划的因果推断学…

抖音在线点赞任务发布接单运营平台PHP网站源码多个支付通道+分级会员制度,附带系统搭建教程

介绍 1、三级代理裂变&#xff0c;静态返佣/动态返佣均可设置。&#xff08;烧伤制度&#xff09;。 2、邀请二维码接入防红跳转。 3、自动机器人做任务&#xff0c;任务时间可设置&#xff0c;机器人价格时间可设置。 4、后台可设置注册即送X天机器人。 5、不同级别会员使…

浅谈JUC的理解(含JUC知识体系图)

浅谈JUC的理解 一、前言感悟二、并发知识三、一年前回答四、补充体系回答五、补充层次回答六、碎碎念 本文除了说技术&#xff0c;更多的是在一个两年多开发经验的程序员视角下&#xff0c;记录下自己探索到的世界。 如有不妥之处&#xff0c;还请指正。共勉。 一、前言感悟 当…

上海雷卯湿敏元器件存储及使用规范

湿敏等级是指材料或产品对湿度变化的敏感程度。它用于评估材料或产品在湿度变化条件下的稳定性和可靠性。 湿敏等级通常通过数字表示&#xff08;如MSL- Moisture Sensitivity Level&#xff09;&#xff0c;从1到6级不等&#xff0c;每个级别代表不同的湿敏程度。较低的级别表…

基于网络爬虫的购物平台价格监测系统的设计与实现

通过对网络爬虫的购物平台价格监测系统的业务流程进行梳理可知&#xff0c;网络爬虫的购物平台价格监测系统主要由前台买家模块、后台卖家模块以及管理员模块构成。前台功能包含登录功能、注册功能、系统首页功能、唯品会商品详情浏览、唯品会商品收藏、唯品会商品点赞、唯品会…

9. 内核、文件系统加载工具

内核、文件系统加载工具 内核、文件系统加载工具是嵌入式开发必备的工具 1. 烧写BootLoader 1.1 通过超级终端方式 烧写 Bootloader 可以使用超级终端的“传送” |“发送文件”命令进入发送文件对话框&#xff0c;使用 Xmodem 协议和 Kermit 协议发送 Bootloader 的各个文件…

DenseNet笔记

&#x1f4d2;from ©实现pytorch实现DenseNet&#xff08;CNN经典网络模型详解&#xff09; - 知乎 (zhihu.com) 是什么之 DenseBlock 读图&#xff1a; x0是inputH1的输入是x0 (input)H2的输入是x0和x1 (x1是H1的输出) Summary&#xff1a; 传统卷积网&#xff0c;网…

角蜥优化算法 (Horned Lizard Optimization Algorithm ,HLOA)求解无人机路径优化

一、无人机路径规划模型介绍 无人机三维路径规划是指在三维空间中为无人机规划一条合理的飞行路径,使其能够安全、高效地完成任务。路径规划是无人机自主飞行的关键技术之一,它可以通过算法和模型来确定无人机的航迹,以避开障碍物、优化飞行时间和节省能量消耗。 二、算法介…

python学习笔记------集合(set)

集合定义格式 基本语法&#xff1a; #定义集合字面量 {元素&#xff0c;元素&#xff0c;元素......&#xff0c;元素} #定义集合变量 变量名称{元素&#xff0c;元素&#xff0c;元素......&#xff0c;元素} #定义空集合 变量名称set() #定义集合字面量 {元素&#…

海纳斯Hinas安装Tailscale

海纳斯Hinas安装Tailscale 海纳斯安装Tailscale第一步&#xff0c;注册Tailscale账号&#xff0c;并在手机/电脑安装Tailscale第二步&#xff0c;进入海纳斯后台卸载重装Tailscale 海纳斯安装Tailscale 海纳斯自己安装了Tailscale&#xff0c;但是无法使用&#xff0c;需要卸载…