【图解IO与Netty系列】Netty源码解析——事件循环

news2025/1/12 0:55:11

Netty源码解析——事件循环

  • Netty事件循环
  • 源码解析
    • select()
    • processSelectedKeys()
      • NioMessageUnsafe#read()
      • NioByteUnsafe#read()
    • runAllTasks()

Netty事件循环

当Netty服务端启动起来以后,就可以接受客户端发送的请求,接收到客户端发来的请求后就会有事件就绪,有事件就绪就会在Netty的事件循环中被监听到并处理,我们下面看看Netty事件循环的逻辑。

在这里插入图片描述

Netty中的NIO事件循环的代码位于NioEventLoop的run()方法内部,这个方法总体是一个for(;;)死循环,然后for循环里头依次执行的三个重要方法分别是:

  1. select():调用Selector#select()方法监听注册到Selector上的Channel,等待Channel关注的事件就绪。
  2. processSelectedKeys():处理就绪事件,会调用ChannelPipeline处理,ChannelPipeline又会通过责任链模式调用里面的ChannelHandler处理。
  3. runAllTasks():处理NioEventLoop中的taskQueue的异步任务。

这就是Netty事件循环的大体逻辑,下面我们进入代码解析。

源码解析

select()

    private int select(long deadlineNanos) throws IOException {
        if (deadlineNanos == NONE) {
            return selector.select();
        }
        long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
        return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
    }

在NioEventLoop的事件循环中,首先是select()方法的调用,select()方法里面就是调用NioEventLoop对应的Selector的select()方法,阻塞当前线程,监听注册到Selector的Channel,等待事件就绪。

在这里插入图片描述

当有事件就绪时,当前线程就会解阻塞,然后调用processSelectedKeys()方法处理就绪事件。

processSelectedKeys()

NioEventLoop的 processSelectedKeys()方法会进入processSelectedKey(SelectionKey k, AbstractNioChannel ch)方法。

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        try {
            int readyOps = k.readyOps();
            ...
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (...) {...}
    }

无论是accept事件,还是read事件时,都是调用unsafe.read()方法。

在这里插入图片描述

只是这里的Unsafe的类型有可能不一样,如果是NioServerSocketChannel的话,那么Unsafe的类型就是NioMessageUnsafe,是在创建NioServerSocketChannel时就创建好的,我们看一下NioMessageUnsafe的read()方法。

NioMessageUnsafe#read()

    private final class NioMessageUnsafe extends AbstractNioUnsafe {

        private final List<Object> readBuf = new ArrayList<Object>();

        @Override
        public void read() {
            	...
                doReadMessages(readBuf);
				...
                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    ...
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                ...
        }
    }

重要方法就两个doReadMessages(readBuf)和pipeline.fireChannelRead(readBuf.get(i)),其他代码全部省略不看。

在这里插入图片描述

我们先看doReadMessages(readBuf)

@Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = SocketUtils.accept(javaChannel());
		...
                buf.add(new NioSocketChannel(this, ch));
				...
    }

SocketUtils.accept(javaChannel())里面是调用ServerSocketChannel的accept()方法,返回一个SocketChannel。

然后new NioSocketChannel(this, ch)把返回的SocketChannel包装成NioSocketChannel,放入buf中,这个buf就是外面read()方法的readBuf。

在这里插入图片描述

NioSocketChannel的构造方法调用父类AbstractNioByteChannel的构造方法:

    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        super(parent, ch, SelectionKey.OP_READ);
    }

可以看到指定关注的事件类型是read事件,再看看AbstractNioByteChannel的父类AbstractNioChannel的构造方法

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);
        } catch (...) {...}
    }

继续调用父类的构造方法,然后保存了NioSocketChannel和关注的事件类型OP_READ,设置NioSocketChannel为非阻塞。

再看AbstractNioChannel的父类AbstractChannel的构造方法

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

创建一个Unsafe,然后初始化了ChannelPipeline。newUnsafe()方法会进入NioSocketChannel的newUnsafe()方法,创建的时NioSocketChannelUnsafe类型的Unsafe,因此NioSocketChannel的Unsafe类型就是NioSocketChannelUnsafe。

    protected AbstractNioUnsafe newUnsafe() {
        return new NioSocketChannelUnsafe();
    }

NioSocketChannel的构造方法总结起来就是做了5件事:

  1. 创建并保存NioSocketChannelUnsafe
  2. 创建并保存ChannelPipeline
  3. 保存SocketChannel
  4. 保存关注的事件类型OP_READ
  5. 设置SocketChannel为非阻塞

在这里插入图片描述

然后回到unsafe.read()方法,接下来执行的代码pipeline.fireChannelRead(readBuf.get(i))就是调用触发就绪事件的Channel对应的ChannelPipeline的fireChannelRead(…)方法,触发ChannelPipeline中每个处理入站事件的ChannelHandler的入站事件处理。

ChannelPipeline的fireChannelRead(…)方法会从头到尾以责任链的处理方式调用每个ChannelInboundHandler类型的channelRead(…)方法。NioServerSocketChannel的ChannelPipeline中的ChannelHandler是ServerBootstrapAcceptor。我们看看ServerBootstrapAcceptor的channelRead(…)方法。

        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;

            child.pipeline().addLast(childHandler);

            try {
                childGroup.register(child).addListener(new ChannelFutureListener() {...});
            } catch (...) {...}
        }

这里的参数msg的类型是NioSocketChannel,就是上面放入buf中的NioSocketChannel,调用pipeline.fireChannelRead(readBuf.get(i))方法时readBuf.get(i)就是中buf中取出NioSocketChannel作为参数传进来。

然后child.pipeline().addLast(childHandler)这一行这里的childHandler是我们定义的ChannelInitializer,这里放入NioSocketChannel的ChannelPipeline中,当NioSocketChannel里面的SocketChannel被注册到Selector之后,会触发ChannelInitializer的调用,初始化ChannelPipeline。

然后childGroup.register(child)就是把这个NioSocketChannel注册到workerGroup中的其中一个NioEventLoop上,也就是把NioSocketChannel中的SocketChannel注册到workerGroup中的其中一个NioEventLoop的Selector上。

在这里插入图片描述

这里NioSocketChannel注册的细节跟NioServerSocketChannel的注册是一样的,上一篇文章已经分析过,这里就不重复了。

NioSocketChannel注册好之后,就可以接收客户端发来的数据,于是又有read事件触发,此时调用的unsafe.read(),就是NioSocketChannelUnsafe的父类NioByteUnsafe的read()方法了。

NioByteUnsafe#read()

        @Override
        public final void read() {
            		...
                    byteBuf = allocHandle.allocate(allocator);
                    doReadBytes(byteBuf);
					...
                    pipeline.fireChannelRead(byteBuf);
					...
    	}

就是通过allocator分配一个ByteBuf,然后把Channel中的数据读取到ByteBuf中,然后调用pipeline.fireChannelRead(byteBuf)触发ChannelPipeline的处理,然后ChannelPipeline中的ChannelHandler就会处理byteBuf中的数据,这里的ChannelPipeline中的ChannelHandler就是我们定义的ChannelInitializer组装到ChannelPipeline中的ChannelHandler了。

在这里插入图片描述

runAllTasks()

runAlllTasks方法其实就是从NioEventLoop的taskQueue中不停的取出task并执行。

    protected boolean runAllTasks(long timeoutNanos) {
        ...
        Runnable task = pollTask();
        ...
        for (;;) {
            safeExecute(task);
			...
            task = pollTask();
            if (task == null) {
                ...
                break;
            }
        }
		...
    }

可以看到,就是在一个for循环中,pollTask()方法取出task,然后在下一轮循环中调用safeExecute(task)去执行,safeExecute(task)里面就是调用task.run()方法直接执行,没什么好看的。如果pollTask()方法取出的task为null,那么就break结束循环。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1849377.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

大数据学习-大数据介绍

意义 从海量的数据中分析出海量数据背后的价值 需要分析海量的数据&#xff0c;就需要存储、计算和分析 那就需要分布式多台计算机合适的工具来处理数据 工具 特点 大数据的核心工作&#xff1a;从海量的、高增长的、多类别的、信息密度低的数据中挖掘出高质量的结果 数据存储…

RedHat9 | Web服务配置与管理(Apache)

一、实验环境 1、Apache服务介绍 Apache服务&#xff0c;也称为Apache HTTP Server&#xff0c;是一个功能强大且广泛使用的Web服务器软件。 起源和背景 Apache起源于NCSA httpd服务器&#xff0c;经过多次修改和发展&#xff0c;逐渐成为世界上最流行的Web服务器软件之一。…

C++ STL ③

sort排序 #include <iostream> #include <algorithm> using namespace std;int main() {int a[5],i;cout<<"请输入数组元素:"<<endl;for(i0;i<5;i){cin>>a[i];}sort(a,a5,greater<int>());for(i0;i<5;i){cout<<a[i…

系统架构师考点--数据库系统

大家好。今天我来总结一下数据库系统的相关考点。本考点一般情况下上午场考试占3-5分&#xff0c;下午场案例分析题也会出现。 一、数据库系统 数据&#xff1a;数据库中存储的基本对象&#xff0c;是描述事物的符号记录。数据的种类:文本、图形、图像、音频、视频、学生的档…

基于Redis和openresty实现高并发缓存架构

目录 概述缓存架构设计实践代码路由业务封装redis 效果 概述 本文是对项目中 QPS 高并发相关问题的一种解决方案&#xff0c;利用 Nginx 与 Redis 的高并发、超低延迟响应&#xff0c;结合 Canal 进行实现。 openrestry官网 当程序需要提供较高的并发访问时&#xff0c;往往需…

状态压缩DP——AcWing 291. 蒙德里安的梦想

状态压缩DP 定义 状态压缩DP是一种利用二进制数来表示状态的动态规划算法。它通过将状态压缩成一个整数&#xff0c;从而减少状态数量&#xff0c;提高算法效率。 运用情况 状态压缩DP通常用于解决具有状态转移和最优解性质的问题&#xff0c;例如组合优化、图论、游戏等问…

UDS服务——TransferData (0x36)

诊断协议那些事儿 诊断协议那些事儿专栏系列文章,本文介绍TransferData (0x36)—— 数据传输,用于下载/上传数据时用的,数据的传输方向由不同的服务控制:0x34服务表示下载,0x35服务表示上传。通过阅读本文,希望能对你有所帮助。 文章目录 诊断协议那些事儿传输数据服务…

Redis-事务-watch-unwatch

文章目录 1、监视key2、提交事务 1、监视key 打开两个窗口&#xff0c;第一个窗口先监视key&#xff0c;然后开始事务&#xff0c;然后再打开第二个窗口&#xff0c;修改balance为0 2、提交事务 此时事务被打断

银河麒麟V10 SP1.1操作系统 离线安装 nginx1.21.5、redis 服务

银河麒麟官网地址&#xff1a;国产操作系统、麒麟操作系统——麒麟软件官方网站 一、查看系统版本 命令&#xff1a;nkvers 我的是 release V10 (SP1)&#xff0c;根据这个版本去官网找对应的rpm包 银河麒麟操作系统的rpm包必须从官方找&#xff0c; 要是随便找个Centos的rp…

1.SG90

目录 一.实物图 二.原理图 三.简介 四.工作原理 一.实物图 二.原理图 三.简介 舵机&#xff08;英文叫Servo&#xff09;&#xff0c;是伺服电机的一种&#xff0c;伺服电机就是带有反馈环节的电机&#xff0c;这种电机可以进行精确的位置控制或者输出较高的扭矩。舵机…

基于深度学习的图像识别技术与应用是如何?

基于深度学习的图像识别技术与应用在当今社会中扮演着越来越重要的角色。以下是对该技术与应用的详细解析&#xff1a; 一、技术原理 深度学习是一种模拟人脑处理和解析数据的方式的技术和方法论。在图像识别领域&#xff0c;深度学习主要通过深度神经网络&#xff08;如卷积…

我理解的文本表示模型

词袋模型与N-grams模型 1 词袋模型 (Bag of Words)1.1 one-hot 取值 (Binary)1.2 Term Frequency 取值 (TF)普通频数 r a w t f raw_{tf} rawtf​频率范数归一化对数频数 1.3 Inverse document frequency (IDF)1.4 TF-IDF scores 取值 N-Gram 最简单的文本建模场景&#xff1a…

定义多个类对象,分别输入和输出各对象中的时间(时:分:秒)

在前面的文章中&#xff0c;类中只有公用数据而无成员函数&#xff0c;而且只有1个对象。可以直接在主函数中进行输入和输出。若有多个对象&#xff0c;需要分别引用多个对象中的数据成员&#xff0c;可以写出如下程序&#xff1a; &#xff08;1&#xff09;编写程序&#xff…

stata17中java installation not found或java not recognozed的问题

此问题在于stata不知道去哪里找java,因此需要手动的告诉他 方法1&#xff1a; 1.你得保证已经安装并配置好java环境 2.在stata中输入以下内容并重启stata即可 set java_home "D:\Develope\JDk17" 其中java_home后面的""里面的内容是你的jdk安装路径 我的…

【Java算法】滑动窗口 上

&#x1f525;个人主页&#xff1a; 中草药 &#x1f525;专栏&#xff1a;【算法工作坊】算法实战揭秘 &#x1f456;一. 长度最小的子数组 题目链接&#xff1a;209.长度最小的子数组 算法原理 滑动窗口 滑动窗口算法常用于处理数组/字符串等序列问题&#xff0c;通过定义一…

IKVM.net调用Jar包实现SM4解密

近期&#xff0c;我深入学习了如何使用IKVM.net来调用Jar包&#xff0c;这次的学习经历让我对Java与.NET之间的互操作性有了更深刻的理解。IKVM.net作为一款强大的工具&#xff0c;为我们打通了Java与.NET之间的桥梁&#xff0c;使得在.NET环境中调用Java库变得简单而高效。 在…

生产环境安装odoo

odoo可以在多平台运行&#xff0c;但是在生产环境下官方不建议在Windows平台部署。在Windows下可能不能很好的支持一服务多worker的形式&#xff0c;更推荐在Linux下部署。 常见的Linux如Ubuntu、Debian等Debian系或Redhat系都能执行官网的包安装。 地址&#xff1a;Download |…

使用Jetpack Compose为Android App创建自定义页面指示器

使用Jetpack Compose为Android App创建自定义页面指示器 在现代移动应用中&#xff0c;页面指示器在提供视觉导航提示方面发挥着重要作用&#xff0c;帮助用户理解其在应用内容中的当前位置。页面指示器特别适用于顺序展示内容的场景&#xff0c;如图片轮播、图像库、幻灯片放…

Python3简单实现与Java的Hutool库SM2的加解密互通

1、背景&#xff1a; 因业务需求&#xff0c;需要与某平台接口对接。平台是Java基于Hutool库实现的SM2加密解密&#xff0c;研究了下SM2的加解密算法&#xff0c;网上找的资料&#xff0c;都是说SM2【椭圆曲线】 公钥长【x,y分量 64字节】&#xff0c;私钥短【32字节】&#x…

ChatTTS增强版V3【已开源】,长文本修复,中英混读,导入音色,批量SRT、TXT

ChatTTS增强版V3来啦&#xff01;本次更新增加支持导入SRT、导入音色等功能。结合上次大家反馈的问题&#xff0c;修复了长文本、中英混读等问题。 项目已开源(https://github.com/CCmahua/ChatTTS-Enhanced) 项目介绍 V3 ChatTTS增强版V3&#xff0c;长文本修复&#xff0c…