【Netty】netty启动流程源码解析

news2024/9/20 15:40:39

文章目录

  • Netty整体架构
  • 一个启动流程
  • 源码解析
    • new NioEventLoopGroup(1)
      • 构建线程池
      • 基础信息构建
      • 线程选择策略
    • group
    • channel
    • handler
    • childHandler
    • bind
      • initAndRegister
        • 反射创建 NioServerSocketChannel 对象
        • init
      • 注册channel
      • doBind0
  • 流程图
    • 思考

Netty整体架构

在这里插入图片描述
是什么: Netty 是一个 NIO 客户端服务器框架,可以快速轻松地开发网络应用程序,例如协议服务器和客户端。
解决什么问题: 我们知道NIO编程虽然可以解决网络编程IO问题,但是其繁琐的细节配置,对于开发意义网络组员来说其实是复杂的,而Netty就是在NIO的基础上进一步封装,一个网络编程框架。
核心组件
BootStrap->Channel -> EventLoop&EventLoopGroup -> ChannelPipeline -> ChannelHandler .

在这里插入图片描述

一个启动流程

这是一个简单的服务端HTTP的Demo,我们按照每行代码的方式 ,逐步分析整个启动过程具体是做了哪些工作。


/**
 * @author qxlx
 * @date 2024/7/28 20:52
 */
public class SImpleServer {

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGrpup = new NioEventLoopGroup(1);
        EventLoopGroup wrokerGrpup = new NioEventLoopGroup();

        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGrpup,wrokerGrpup)
                .channel(NioServerSocketChannel.class)
                .handler(new SimpleServerHandler())
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {

                    }
                });

        ChannelFuture f = bootstrap.bind(8888).sync();

        f.channel().closeFuture().sync();
    }

    private static class SimpleServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelActive");
        }

        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelRegistered");
        }

        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            System.out.println("handlerAdded");
        }
    }

}

源码解析

new NioEventLoopGroup(1)

    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }

    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        // 不指定线程数,则使用默认自定义的线程数
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

可以看到通过调用super父类的方式,判断传入的nThreads的线程数等于0,就使用默认的方式,比如当前是10核,就创建20个线程。否则就采用传入执行的线程数。我们知道对于boss线程来说,其实本身就只有一个线程进行接收客户端传入的线程,所以线程数就是1。

    private static final int DEFAULT_EVENT_LOOP_THREADS;

    // 获取属性对应的值,获取不到 就是CPU的两倍
    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
        }
    }

构建线程池

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
		// 判断线程数是否小于0                                            
        checkPositive(nThreads, "nThreads");

        // 如果没有构建,使用默认的
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        // 产生nThreads个Nio保存在数组中
        // EventExecutor 是excutor的子类
        // 这里因为创建的是1个线程。创数组大小是0
        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
               // ⭐️
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                // 执行失败 将前面创建成功的直接shutDown  下面是关于异常的处理
            }
        }

        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };
		
		// 遍历线程 给每个线程创建都田间一个关闭的监听器
        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }
    }

可以看到EventExecutor 其实顶层的父类就是Executor,所以就是一个线程池的实现。
在这里插入图片描述

基础信息构建

    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        // 参数传递进来的
        return new NioEventLoop(this, executor, selectorProvider,
                selectStrategyFactory.newSelectStrategy(),
                rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
    }

    // 构造方法进行初始化
    NioEventLoop(参数 忽略) {
        super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
                rejectedExecutionHandler);
        this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
        this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
        // NIO的封装 // 完成selector的创建
        final SelectorTuple selectorTuple = openSelector();
        // 创建selector
        this.selector = selectorTuple.selector;
        this.unwrappedSelector = selectorTuple.unwrappedSelector;
    }

    this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");

线程选择策略

这里根据不同的选择器 实现不同策略,如果是2的倍数,选择使用& 否则就是 %的算法。

    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }
	
     public EventExecutor next() {
         return executors[idx.getAndIncrement() & executors.length - 1];
     }
     
     public EventExecutor next() {
         return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
     }

总结下,上述过程就是这样的。
在这里插入图片描述

group

这里其实就是调用父类 AbstractBootstrap 设置主group , 然后在设置 子group

    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        // 初始化父类 将bossGroup赋值
        super.group(parentGroup);
        // 如果childGroup变量 如果已经有值,抛出异常
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        // childGroup = workerGroup赋值
        this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
        return this;
    }

channel

将 NioServerSocketChannel 作为Class对象传入。然后获取构造方法,然后获取NioServerSocketChannel的构造方法,在之后的流程中,其实就是利用反射创建NioServerSocketChannel对象。

    public B channel(Class<? extends C> channelClass) {
        // 直接调用channelFactory工厂,创建一个ReflectiveChannelFactory 工厂方法
        return channelFactory(new ReflectiveChannelFactory<C>(
                ObjectUtil.checkNotNull(channelClass, "channelClass")
        ));
    }

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
		// 通过Class对象 获取构造参数
        // 等待后续的利用反射进行创建对象
        this.constructor = clazz.getConstructor();
    }

	// channelFactory赋值
    this.channelFactory = channelFactory;

handler

设置handler , 然后返回self(); 其实就是返回this。这样就可以实现链式编程。

    public B handler(ChannelHandler handler) {
        // 设置handler
        this.handler = ObjectUtil.checkNotNull(handler, "handler");
        return self();
    }

    private B self() {
        return (B) this;
    }

childHandler

设置childHandler 属性,可以看到很多地方都做了判空的处理逻辑。

    public ServerBootstrap childHandler(ChannelHandler childHandler) {
        // 设置childHanlder
        this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
        // this是当前对象的引用
        return this;
    }

其实到这里,都是一些前戏的工作。

bind

initAndRegister

反射创建 NioServerSocketChannel 对象

因为之前channelFactory设置的是 ReflectiveChannelFactory ,所以就是利用构造方法创建NioServerSocketChannel的对象,既然创建对象,那么必定会调用 NioServerSocketChannel的构造方法。

	channel = channelFactory.newChannel();
	// 对象实例化
	return constructor.newInstance();

构造方法里,其实就是 创建了一个NIO的通道对象,ServerSocketChannel的实现 ServerSocketChannelImpl

    public NioServerSocketChannel() {
        this(DEFAULT_SELECTOR_PROVIDER);
    }
	// 创建一个底层变量
	private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

  // 核心是创建一个 ServerSocketChannel
   private static ServerSocketChannel newChannel(SelectorProvider provider, InternetProtocolFamily family) {
       try {
           // 创建一个channel
           ServerSocketChannel channel =
                   SelectorProviderUtil.newChannel(OPEN_SERVER_SOCKET_CHANNEL_WITH_FAMILY, provider, family);
           // 打开一个服务端套接字通道对象
           // 底层其实就是创建ServerSocketChannel ⭐️ ServerSocketChannelImpl
           return channel == null ? provider.openServerSocketChannel() : channel;
       } catch (IOException e) {
           throw new ChannelException("Failed to open a socket.", e);
       }
   }

    public NioServerSocketChannel(ServerSocketChannel channel) {
        // 设置为接受事件
        super(null, channel, SelectionKey.OP_ACCEPT);
        // 设置config属性
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp; // OP_ACCEPT
            // 设置当前的serverSocketChannel为非阻塞的
            ch.configureBlocking(false);
    }
init

1.设置channel的option
2.设置channel的attr
3.设置handler的pipeline
4.pipeline添加channelInitializer对象 ,并且使用 ch.eventLoop().execute 线程池 创建了一个专门接受客户端请求的Acceptor对象。添加到pipeline管道对象中

    void init(Channel channel) {
        // 1.设置新介入的channel的option
        setChannelOptions(channel, newOptionsArray(), logger);

        // 2、设置新接入channel的attr
        setAttributes(channel, newAttributesArray());

        // 3、设置handler到pipeline上
        // 获取到通道流水线对象
        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
        final Collection<ChannelInitializerExtension> extensions = getInitializerExtensions();

        // 4.pipeline 添加一个channelInitializer对象 ServerBootstrapAcceptor
        // 通道初始化对象
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                // 这是一个接入器,专门接受新请求,把新的请求扔给某个事件循环器
                // 看到这里,我们发现其实init只是初始化了一些基本的配置和属性,
                // 以及在pipeline上加入了一个接入器,用来专门接受新连接,并没有启动服务.
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs,
                                extensions));
                    }
                });
            }
        });
        if (!extensions.isEmpty() && channel instanceof ServerChannel) {
            ServerChannel serverChannel = (ServerChannel) channel;
            for (ChannelInitializerExtension extension : extensions) {
                try {
                    extension.postInitializeServerListenerChannel(serverChannel);
                } catch (Exception e) {
                    logger.warn("Exception thrown from postInitializeServerListenerChannel", e);
                }
            }
        }
    }

注册channel

	ChannelFuture regFuture = config().group().register(channel);
	// 启动线程池 
	eventLoop.execute(new Runnable() { // ⭐️
         @Override
         public void run() {
             register0(promise); // 分析
         }
    });
    
	doRegister();

这里其实启动一个线程,一个死循环 接受将channel注册到selector中。 然后通过selector就可以获取有事件的线程进行操作。 ServerSocketChannel注册到Selector中。

    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // ServerSocketChannel注册到Selector中。
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }
	// 在注册完毕之后,调用这个就会触发
    pipeline.invokeHandlerAddedIfNeeded();

在这里插入图片描述

doBind0

因为调用链路比较长,最终 其实就是根据版本的不同 将端口和channel进行绑定。

    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) { // 全连接队列的大小
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }

流程图

在这里插入图片描述

思考

对于一个NIO原生程序来说,本质做了几件事情

  • 创建EventLoop对象
  • 完成ServerSocketChannel的创建 初始化过程
  • 以及ServetSocketChannel的注册工作
  • 端口绑定
  • 设置事件
    在这里插入图片描述

根据 我们从源码分析一个常见netty的启动过程的过程本质上,也是完成了对上述过程NIO的封装过程。基础属性的设置,创建 NioServerSocketChannel的实例 本质就是 ServetSocketChannel。以及完成注册到selector的过程。最后完成端口绑定的工作。虽然netty抽象几个概念,但是万变不离其中,本质还是NIO的封装。

那么剩下来,要研究的重点是什么呢?

  • eventloop 线程是如何接受任务的。
  • unsafe的写过程
  • channeloutbounBuffer
  • channelPipeline
  • netty常见优化方案
  • fastThreadLocal
  • 内存池
  • 事件轮
  • 编码器

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1975017.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【学习方法】高效学习因素 ① ( 开始学习 | 高效学习因素五大因素 | 高效学习公式 - 学习效果 = 时间 x 注意力 x 精力 x 目标 x 策略 )

文章目录 一、高效学习因素1、开始学习2、高效学习因素五大因素3、高效学习公式 - 学习效果 时间 x 注意力 x 精力 x 目标 x 策略 一、高效学习因素 1、开始学习 对于 学习差 , 调皮捣蛋 的学生 , 不要把 学习成绩差 的 原因 归因为 不爱学习 / 没有学习方法 , 可能是 还没有 …

Docker-学习笔记(借助宝塔面板)

ubuntu环境 一、安装 可以参考官网进行或其他博客进行安装 1.进入宝塔面板 进图Docker菜单&#xff0c;查看是否提示安装。 2.查看是否安装 查看版本 docker -v 证明已经安装 二、常用命令 1.查看版本 docker -v 2.启动、停止、重启docker systemctl start docker…

windows C++-通过 C++/WinRT 使用 API(一)

本文介绍如何使用 C/WinRT API&#xff0c;无论它们是 Windows 的一部分、由第三方组件供应商或自行实现。 本文中的代码示例较短&#xff0c;并且很容易试验&#xff0c;可以通过创建新的 Windows 控制台应用程序 (C/WinRT) 项目和复制粘贴代码来重现它们。 但是&#xff0c;…

【Redis】浅谈架构和认识Redis

目录 架构演进 单机架构 应用数据分离架构 应用服务集群架构 读写分离/主从分离架构 冷热分离架构&#xff08;引入缓存&#xff09; 垂直分库 微服务架构 认识Redis Redis的特性 架构演进 单机架构 简单来说就是只有一台服务器&#xff0c;这个服务器用来负责所有…

GlobalMapper方量计算(两期地形对比,提取填挖方区域及每个区域的方量)

0.序 在工程设计中&#xff0c;经常需要根据设计方案和现状地形之间进行方量计算&#xff0c;尤其关注方量变化的区域&#xff0c;哪些区域需要填方&#xff0c;哪些区域需要挖方&#xff0c;并依据此进行方量的平衡。 在流域管理中&#xff0c;尤其是湿地、三角洲等容易淤积或…

详细分析Java中的SPI机制(附Demo)

目录 前言1. 基本知识2. Demo3. 解读源码 前言 相关的Java知识推荐阅读&#xff1a; java框架 零基础从入门到精通的学习路线 附开源项目面经等&#xff08;超全&#xff09;【Java项目】实战CRUD的功能整理&#xff08;持续更新&#xff09; 1. 基本知识 SPI&#xff08;S…

PANDA:免微调提升大型语言模型领域特定能力的新方法

人工智能咨询培训老师叶梓 转载标明出处 大模型虽然在广泛的任务上具有通用性&#xff0c;但在面对特定领域的任务时&#xff0c;它们的性能往往不如专门为这些领域训练的模型。传统的知识蒸馏&#xff08;Knowledge Distillation, KD&#xff09;方法通过资源密集型的微调过程…

怎么在电脑上查找打印机打印过的文件?告别翻箱倒柜!电脑查找已打印文件技巧公示!

在日常办公中&#xff0c;我们经常会使用打印机来输出各种文件&#xff0c;但有时候&#xff0c;我们可能需要回顾或查找之前打印过的文件。然而&#xff0c;这些文件一旦打印完成&#xff0c;往往就离开了我们的电脑屏幕&#xff0c;进入了纸质世界&#xff0c;而电子文件可能…

Tree-of-Traversals:结合知识图谱与大模型,通过树遍历和回溯寻找高置信度推理路径

Tree-of-Traversals&#xff1a;结合知识图谱与大模型&#xff0c;通过树遍历和回溯寻找高置信度推理路径 Tree-of-Traversals算法解析对比 MindMap1. 与知识图谱&#xff08;KGs&#xff09;的整合2. 推理方法3. 灵活性与可扩展性4. 在医学诊断中的应用 速度和准确1. 速度2. 推…

数据结构第九讲:二叉树

数据结构第九讲&#xff1a;二叉树 1.实现链式结构二叉树1.1二叉树的节点结构1.2创建二叉树节点1.3前中后序遍历1.3.1前序遍历1.3.2中序遍历1.3.3后序遍历1.3.4总结 1.4二叉树结点的个数1.4.1错误示范1.4.2实现方法 1.5二叉树叶子结点的个数1.6二叉树第k层结点的个数1.7二叉树的…

看门狗应用编程-I.MX6U嵌入式Linux C应用编程学习笔记基于正点原子阿尔法开发板

看门狗应用编程 看门狗应用编程介绍 看门狗定时器的基本概念 看门狗是一个可以在一定时间内被复位/重置的计数器 如果在规定时间内没有复位&#xff0c;看门狗计时器溢出会对CPU产生复位信号使系统重启 有些看门狗可以只产生中断信号而不会使系统复位 I.MX6UL/I.MX6ULL So…

如何减少内存碎片的产生——页

文章目录 1 页的设计目的2 进程块和主存块的对应关系3 页、页框、页表3.1 页&#xff08;Page&#xff09;3.2 页框&#xff08;Page Frame&#xff09;3.3 页表&#xff08;Page Table&#xff09; 4 逻辑地址到物理地址的转换4.1 转换过程4.2 具体示例4.3 图示 参考资料封面 …

C语言程序设计25

《C程序设计教程&#xff08;第四版&#xff09;——谭浩强》 习题2.2 分析下面程序的运行结果&#xff0c;然后上机验证。 代码&#xff1a; //《C程序设计教程&#xff08;第四版&#xff09;——谭浩强》 //习题2.2 分析下面程序的运行结果&#xff0c;然后上机验证。#inc…

【C语言篇】操作符详解(下篇)

文章目录 操作符详解&#xff08;下篇&#xff09;前言条件操作符移位操作符左移操作符右移操作符 位操作符下标引用操作符函数调用操作符 操作符的属性&#xff1a;优先级和结合性优先级结合性表达式求值整形提升算术转换 操作符详解&#xff08;下篇&#xff09; 前言 操作…

JavaScript基础——JavaScript常见语句(判断语句、循环语句、控制流语句)

JavaScript提供了丰富的语句来控制程序的执行流程&#xff0c;包括用于条件判断的if、switch和三元运算符&#xff0c;以及用于循环的for、while、do...while、for...in和for...of。此外&#xff0c;还有控制流语句如break、continue和return。 判断语句 if 语句 if 语句&…

C/C++开发,opencv轮廓提取实现

一、cv::findContours轮廓提取函数 1.1 cv::findContours函数简介 cv::findContours 函数是用于从二值图像&#xff08;灰度图&#xff09;中检索轮廓。这个函数在OpenCV的不同版本中参数可能有所不同&#xff0c;但基本概念保持一致。特别是在OpenCV 3.x和4.x版本中&#xff…

贪吃蛇(使用QT)

贪吃蛇小游戏 一.项目介绍**[贪吃蛇项目地址](https://gitee.com/strandingzy/QT/tree/zyy/snake)**界面一&#xff1a;游戏大厅界面二&#xff1a;关卡选择界面界面三&#xff1a;游戏界面 二.项目实现2.1 游戏大厅2.2关卡选择界面2.3 游戏房间2.3.1 封装贪吃蛇数据结构2.3.2 …

如何通过谷歌外链快速增加网站流量?

利用谷歌外链提升流量的方法非常直接&#xff0c;但实际上&#xff0c;外链影响的是关键词排名&#xff0c;关键词排名提升了&#xff0c;自然就会有流量&#xff0c;所以谷歌外链不是直接能提升网站流量&#xff0c;而是间接的&#xff0c;下面&#xff0c;我会详细介绍几种有…

验收测试:确保软件符合业务需求和合同要求

目录 前言1. 验收测试的概念1.1 用户验收测试&#xff08;UAT&#xff09;1.2 操作验收测试&#xff08;OAT&#xff09; 2. 验收测试的主要作用2.1 确认业务需求的满足2.2 验证合同要求的实现2.3 提升用户信心 3. 验收测试在整个测试中的地位3.1 测试的最后一道关卡3.2 用户与…

niushop逻辑漏洞

niushop逻辑漏洞 安装环境 这里控制不了 抓包在数据包中可以改数据