Netty源码解读

news2024/11/20 9:13:32

Netty源码解读

Netty线程模型

在这里插入图片描述
1、定义了两组线程池BossGroup和WorkerGroup,BossGroup专门负责接收客户端的连接, WorkerGroup专门负责网络的读写
2、BossGroup和WorkerGroup类型都是NioEventLoopGroup,Group中维护了多个事件循环线程NioEventLoop,每个NioEventLoop维护了一个Selector和TaskQueue
3、每个Boss NioEventLoop线程内部循环执行的步骤有 3 步
3.1、处理accept事件 , 与client 建立连接 , 生成 NioSocketChannel
3.2、将NioSocketChannel注册到某个worker NIOEventLoop上的selector
3.3、runAllTasks处理任务队列TaskQueue的任务
4、 每个worker NioEventLoop线程循环执行的步骤
4.1、轮询注册到自己selector上的所有NioSocketChannel 的read, write事件
4.2、处理 I/O 事件, 即read , write 事件, 在对应NioSocketChannel 处理业务
4.3、runAllTasks处理任务队列TaskQueue的任务 ,一些耗时的业务处理一般可以放入TaskQueue中慢慢处理,这样不影响数据在pipeline中的流动处理
4.4、处理NioSocketChannel业务时,会使用 pipeline (管道),管道中维护了很多 handler处理器用来处理 channel 中的数据

Netty服务启动示例

// 创建两个线程组bossGroup和workerGroup, 含有的子线程NioEventLoop的个数默认为cpu核数的两倍
// bossGroup只是处理连接请求 ,真正的和客户端业务处理,会交给workerGroup完成
EventLoopGroup bossGroup = new NioEventLoopGroup(2);
EventLoopGroup workerGroup = new NioEventLoopGroup(4);
// 创建服务器端的启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 1024)
        .childHandler(new ChannelInitializer<SocketChannel>() {//创建通道初始化对象,设置初始化参数,在 SocketChannel 建立起来之前执行
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                //对workerGroup的SocketChannel设置handler处理器
                ch.pipeline().addLast(new NettyServerHandler());
            }
        });
// 启动服务器(并绑定端口)
ChannelFuture cf = bootstrap.bind(9099).sync();

Netty源码分析

从bootstrap.bind作为入口分析启动流程,进入后可以看到会调用AbstractBootstrap#doBind,最终会调用initAndRegister()方法,主要逻辑都在前三步中实现,本次也主要分析这三个步骤

# AbstractBootstrap类
// 1、创建一个服务端Channel,即NioServerSocketChannel
channel = channelFactory.newChannel();
// 2、初始化NioServerSocketChannel,在pipeline中添加一些处理器hander
init(channel);
// 3、进行注册
ChannelFuture regFuture = config().group().register(channel);
// 把NioServerSocketChannel绑定到指定端口
channel.bind(localAddress, promise);

channelFactory.newChannel();

bootstrap.channel(NioServerSocketChannel.class) 会将serverChannel绑定到ReflectiveChannelFactory上

# AbstractBootstrap类

public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

channelFactory.newChannel()会调用ReflectiveChannelFactory的newChannel方法,进而调用constructor.newInstance(),而该constructor正好是NioServerSocketChannel类;所以new的对象就是NioServerSocketChannel
服务端NioServerSocketChannel进行初始化
1、设置感兴趣事件为连接事件OP_ACCEPT
2、设置channel为非阻塞
3、初始化服务端pipeline

# NioServerSocketChannel类

public NioServerSocketChannel(ServerSocketChannel channel) {
    // 将感兴趣的事件设置为连接事件OP_ACCEPT
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

// 父类初始化方法 ch 即为NioServerSocketChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    // 设置为非阻塞
    ch.configureBlocking(false);
}

// 父类的父类中初始化pipeline,此时只有HeadContext和TailContext
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

init(channel)

调用ServerBootstrap.init方法,向服务端NioServerSocketChannel的pipeline中添加hander处理器ChannelInitializer;此时服务端pipeline链表中的hander如下
在这里插入图片描述

# ServerBootstrap 类

void init(Channel channel) throws Exception {
    ChannelPipeline p = channel.pipeline();
    //向 pipeline中添加hander处理器ChannelInitializer
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

config().group().register(channel)

bootstrap.group(bossGroup, workerGroup)构造时,将group设置为bossGroup,childGroup设置为workerGroup; config().group().register(channel)会调用bossGroup的register方法,从bossGroup的MultithreadEventLoopGroup线程组中取一个线程SingleThreadEventLoop进行调用register方法

register注册逻辑

服务端的NioServerSocketChannel和客户端的NioSocketChannel都会调用此方法进行注册
1、服务启动时,NioServerSocketChannel注册到selector上,对客户端OP_ACCEPT操作感兴趣
2、当有客户端连接时,通过NioServerSocketChannel的accept()得到每个客户端的NioSocketChannel,将其注册到selector上,对客户端OP_READ操作感兴趣

# SingleThreadEventLoop extends SingleThreadEventExecutor 类

public ChannelFuture register(final ChannelPromise promise) {
    promise.channel().unsafe().register(this, promise);
    return promise;
}

调用AbstractChannel的register方法,创建一个注册的task交给EventLoop线程处理

# AbstractChannel 类

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    .......
    AbstractChannel.this.eventLoop = eventLoop;
    .......
    // 1、处理连接事件时,用的是bossGroup里的NioEventLoop
    // 2、处理读写事件时,用的是workGroup里的NioEventLoop
    eventLoop.execute(new Runnable() {
        @Override
        public void run() {
            register0(promise);
        }
    });
}

private void register0(ChannelPromise promise) {
    doRegister();
    // 1、NioServerSocketChannel 处理逻辑
        // 调用NioServerSocketChannel服务端pipeline中hander的handlerAdded方法
        // 此时会调用到ChannelInitializer的handlerAdded,然后调用其initChannel,该方法中
        // 会向服务端pipeline中加入ServerBootstrapAcceptor
        // 调用服务端pipeline中hander的channelRegistered方法
        // 调用服务端pipeline中hander的ChannelActive方法
    // 2、NioSocketChannel 处理逻辑 调用我们自定义hander中的方法
        // 调用客户端pipeline中hander的handlerAdded方法
        // 调用客户端pipeline中hander的channelRegistered方法
        // 调用客户端pipeline中hander的ChannelActive方法,我们自定义hander的ChannelActive在此调用
    pipeline.invokeHandlerAddedIfNeeded();
    pipeline.fireChannelRegistered();
    pipeline.fireChannelActive();

}
// doRegister()逻辑由子类AbstractNioChannel实现
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            // 将channel注册到Selector上
            // 1、NioServerSocketChannel注册到Selector上
            // 2、NioSocketChannel注册到Selector上
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
        }
    }
}

eventLoop.execute就是调用SingleThreadEventExecutor#execute

# SingleThreadEventExecutor 类
@Override
public void execute(Runnable task) {
    // 将注册register0逻辑加入队列taskQueue
    addTask(task);
    // 开启线程循环监听事件,会调用SingleThreadEventExecutor.run方法
	// 最终调用子类NioEventLoop的run()方法
    startThread();
}

死循环执行 selector.select方法,直到监听到事件或者超时,才会执行processSelectedKeys逻辑
1、服务端启动后,NioServerSocketChannel若监听到客户端OP_ACCEPT操作,则会执行processSelectedKeys逻辑,若超时,则继续下一次循环
2、客户端连接成功后,NioSocketChannel若监听到客户端OP_READ操作,则会执行processSelectedKeys逻辑,若超时,则继续下一次循环

# NioEventLoop 类
@Override
protected void run() {
    for (;;) {
        try {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    ....
                    case SelectStrategy.SELECT:
                        // 该方法监听到事件(OP_ACCEPT|OP_READ)时才会返回
                        select(wakenUp.getAndSet(false));
                    default:
                }
            } catch (IOException e) {
            	.....
            }
            // 监听到事件执行
            try {
                // 1、获取SelectionKey处理事件
                processSelectedKeys();
            } finally {
                // 2、执行taskQueue中其他的注册方法register0
                runAllTasks();
            }
            
        }
   }
}   

private void select(boolean oldWakenUp) throws IOException {
    // 一直循环遍历
    int selectCnt = 0;
    for (;;) {
        // 根据注册的定时任务,获取本次select的阻塞时间
        long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
    	// 没有监听到事件或没有超时,则一直阻塞(会让出cpu资源)
        int selectedKeys = selector.select(timeoutMillis);
        selectCnt ++;
        if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
	        // 正常场景
            // 当有连接|读写操作或者selector被唤醒了,则直接返回
            break;
        }

        long time = System.nanoTime();
        if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
            // 正常场景
            // 说明没有监听到事件,而是超时了,则重置selectCnt
            selectCnt = 1;
        } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
            // 异常场景  select 空轮询bug修复
            // 若空轮询次数超过SELECTOR_AUTO_REBUILD_THRESHOLD配置
            // 则关闭老的select,建立新的select
            selector = selectRebuildSelector(selectCnt);
            selectCnt = 1;
            break;
        }
        currentTimeNanos = time;
    }
}

private void processSelectedKeysOptimized() {
    // 遍历所有的selectedKeys进行处理
    for (int i = 0; i < selectedKeys.size; ++i) {
         processSelectedKey(k, (AbstractNioChannel) a);
    }
}
            
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    int readyOps = k.readyOps();
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
        // 连接|读写操作会调用该方法
        // 1、连接操作调用NioMessageUnsafe的read方法
        // 2、读写操作调用NioByteUnsafe的read方法
        unsafe.read();
    }
}

OP_ACCEPT连接操作处理
1、为每个客户端创建NioSocketChannel,并进行初始化
1.1、设置感兴趣事件为OP_READ
1.2、设置channel为非阻塞
1.3、初始化客户端pipeline
2、调用服务端NioServerSocketChannel的pipeline,将客户端的NioSocketChannel作为参数传过去,最终会调用到ServerBootstrapAcceptor,将NioSocketChannel注册到workGroup上

# NioMessageUnsafe 类
public void read() {
    final ChannelPipeline pipeline = pipeline();
    // 创建每个客户端的NioSocketChannel
    doReadMessages(readBuf);
    int size = readBuf.size();
    // readBuf为NioSocketChannel,遍历客户端所有的NioSocketChannel
    // 执行服务端NioServerSocketChannel的pipeline,循环执行fireChannelRead,
    // 最终会调用服务端hander的ChannelRead方法,此处会调用到ServerBootstrapAcceptor的ChannelRead方法
    for (int i = 0; i < size; i ++) {
        readPending = false;
        pipeline.fireChannelRead(readBuf.get(i));
    }
    // 调用服务端pipeline的读完成方法
    pipeline.fireChannelReadComplete();

}

protected abstract int doReadMessages(List<Object> buf) throws Exception;
// 调用子类NioServerSocketChannel#doReadMessages
protected int doReadMessages(List<Object> buf) throws Exception {
    // 获取客户端的连接得到SocketChannel,每个客户端在服务端都有一个对应的SocketChannel
    SocketChannel ch = SocketUtils.accept(javaChannel());
    try {
        if (ch != null) {
            // NioSocketChannel处理方式同NioServerSocketChannel
            // 1、设置感兴趣事件为连接事件OP_READ
            // 2、设置channel为非阻塞
            // 3、初始化客户端NioSocketChannel的pipeline
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {

    }
    return 0;
}

将我们自定义的hander添加到NioSocketChannel的pipeline上,然后将NioSocketChannel注册到workGroup上,此时客户端pipeline链表中的hander如下
在这里插入图片描述

# ServerBootstrapAcceptor 类

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    // 传过来的NioSocketChannel
    final Channel child = (Channel) msg;
    // 将我们手动添加的Hander添加到pipeline
    child.pipeline().addLast(childHandler);
    try {
        // 将NioSocketChannel注册workGroup的一个线程的selector上,
        // 方式同NioServerSocketChannel,执行register注册逻辑
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                .....
            }
        });
    } catch (Throwable t) {
    }
}

OP_READ操作处理
进行数据读写,并执行pipeline中自定义的hander

# NioByteUnsafe类

// 接受到客户端OP_READ事件时调用
public void read() {
    // 获取客户端NioSocketChannel的pipeline
    final ChannelPipeline pipeline = pipeline();
    do {
        // 数据读写
        // 调用pipeline.fireChannelRead时会依次调用pipeline中hander的ChannelRead方法
        // 我们自定义的hander的ChannelRead方法就会在此处调用
        byteBuf = allocHandle.allocate(allocator);
        allocHandle.lastBytesRead(doReadBytes(byteBuf));
        pipeline.fireChannelRead(byteBuf);
    } while (allocHandle.continueReading());
    allocHandle.readComplete();
    // 调用pipeline的读完成方法
    pipeline.fireChannelReadComplete();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/450888.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AI新产品层出不穷,学不过来怎么办。

最近各个互联网巨头和创业新贵发布的AI工具&#xff0c;AI模型层出不穷&#xff0c;相关自媒体的热度也都很高&#xff0c;当然&#xff0c;各种大佬的隔空喊话也是非常吸引眼球&#xff0c;那么很多人就会觉得&#xff0c;要看的东西太多了&#xff0c;要学的东西太多了&#…

【数据结构】顺序表详解(附leetcode练习题)

☃️个人主页&#xff1a;fighting小泽 &#x1f338;作者简介&#xff1a;目前正在学习C语言和数据结构 &#x1f33c;博客专栏&#xff1a;数据结构 &#x1f3f5;️欢迎关注&#xff1a;评论&#x1f44a;&#x1f3fb;点赞&#x1f44d;&#x1f3fb;留言&#x1f4aa;&…

Java编译器插件Manifold(流形)

流形 文天祥正气歌中有云&#xff1a;“天地有正气&#xff0c;杂然赋流形”。 流形是一种抽象而又具体的事务&#xff0c;要研究一个事务就要格物&#xff0c;不格物就不能知道事物的具体描绘形式。流形大多数情况下是一种数学计算方式&#xff0c;可以将一个复杂的模型抽象…

Matplotlib Pyplot

Pyplot 是 Matplotlib 的子库&#xff0c;提供了和 MATLAB 类似的绘图 API。 Pyplot 是常用的绘图模块&#xff0c;能很方便让用户绘制 2D 图表。 Pyplot 包含一系列绘图函数的相关函数&#xff0c;每个函数会对当前的图像进行一些修改&#xff0c;例如&#xff1a;给图像加上…

ChatGPT | 申请与使用new bing的实用教程

1. 教程参考&#xff1a; https://juejin.cn/post/7199557716998078522 2.在参考上述教程遇到的问题与解决 2.1 下载dev浏览器的网址打不开 egde dev下载地址&#xff08;上面网站上的&#xff09;我电脑打不开 换用下面的网址即可 https://www.microsoftedgeinsider.com/z…

给定一个正整数字符串,使用Python正则表达式在其千分位上添加逗号

点击上方“Python爬虫与数据挖掘”&#xff0c;进行关注 回复“书籍”即可获赠Python从入门到进阶共10本电子书 今 日 鸡 汤 楼阁玲珑五云起&#xff0c;其中绰约多仙子。 大家好&#xff0c;我是皮皮。 一、前言 前几天在Python黄金青铜群【沐】问了一个Python正则表达式的问题…

MySQL开发工具评测,包含了Navicat、DBeaver、SQL Studio等12种

面对五花八门的MySQL客户端,开发者该如何选择,今天我整理了12种MySQL开发工具,从产品体验,功能完整度,云适配,计费模式,OS先容性等多个角度进行评估与分析,大家可根据自己的实际情况选择![在这里插入图片描述](https://img-blog.csdnimg.cn/56bdfc89afe743b9b87477d7c0521023.p…

SAP KANBAN 从入门到放弃系列之调拨模式

之前已经有三篇文章写了后台配置相关的介绍&#xff0c;这里不赘述。详见&#xff1a; PP-KANBAN-看板概述 SAP KANBAN 从入门到放弃系列之生产补货模式 SAP KANBAN 从入门到放弃系列之采购补货模式 第一步&#xff1a;补货策略-转库。不同的补充策略的控制类型有不同的作用…

【vue2 pc端】下拉滑动加载更多 vue-data-loading

官网地址 页面项目中使用 <template><!-- 空数据时显示 --><div class"nonono"><img src"/assets/img/404_cloud.png" alt"" v-if"goodslist.length < 0" class"nonnonoimg"></div>&…

燃气管道定位83KHZ地下电子标识器探测仪ED-8000操作指南

1、电子标识器探测工作 燃气管道定位83KHZ地下电子标识器探测仪ED-8000&#xff0c;探测时周边 3 米范围内不能有其他探测仪&#xff0c;保持探测仪垂直向 下&#xff0c;探测仪的末端距离地面 5~10cm 左右&#xff0c;延估计的埋地管线走向水平移动探测仪。当发现持续信号且信…

反射-Class类分析

反射相关的主要类 java.lang.Class&#xff1a;代表一个类&#xff0c;Class对象表示某个类加载后在堆中的对象java.lang.reflect.Method&#xff1a;代表类的方法&#xff0c;Method对象表示某个类的方法java.lang.reflect.Field&#xff1a;代表类的成员变量&#xff0c;Fie…

有手就行——基础XGBoost实战以 iris 数据集为例

基础 XGBoost 实战以 iris 数据集为例 1、导入数据2、数据预处理3、分训练集和测试集4、训练模型构建5、测试集预测准确度6、构建混淆矩阵7、特征重要性 对于很多只是小小使用机器学习&#xff0c;而不是深入了解的人来说&#xff0c;了解各种原理可能是十分痛苦的&#xff0c;…

Gnuplot绘图入门2 快捷方式

Gnuplot绘图入门2——根据多列文本数据绘制图形 Gnuplot绘图入门1以绘制sin(x)的函数图形为例&#xff0c;对Gnuplot进行了简要介绍。这个教程将介绍如何使用Gnuplot对保存在文本文件&#xff08;.txt、.dat文件&#xff09;中的数据进行可视化。 将下面的数据复制下了&#…

Linux安装Nginx,源码安装及创建软连接

前言 Nginx是一个功能强大、高性能、可扩展、易用和安全的Web服务器和反向代理服务器&#xff0c;被广泛应用于企业级和互联网领域 可扩展性&#xff1a;Nginx可以通过添加各种模块和插件来扩展其功能&#xff0c;包括HTTP流控制、SSL加密、压缩和解压缩、访问控制等。 高可靠…

项目设计:迷宫游戏设计day3

一、界面制作 我用的easyx实现图形化界面&#xff0c;在制作第一个界面的时候&#xff0c;第一个界面的功能有开始游戏&#xff0c;退出游戏&#xff0c;排行榜&#xff08;虽然还没实现&#xff09; 那么首先还是得用一个图片贴在上面&#xff0c;这个图片是我自己画的&#…

20230419 生物基础学习- 氨基酸-密码子-突变

文章目录 名称形状密码子-氨基酸对应表简明中文英文和简称突变类型生物化学课程笔记特殊氨基酸的性质缬氨酸和甲硫氨酸 染色体 - RNA - 蛋白质 名称 Phenylalanine,Leucine,Isoleucine,Methionine,Valine,Serine,Proline,Threonine,Alanine,Tyrosine,Histidine,Glutamine,Aspa…

「 计算机网络 」HTTP和RPC的区别与联系

「 计算机网络 」HTTP和RPC的区别与联系 参考&鸣谢 HTTP 和 RPC 的区别 小十七_ Http协议和Rpc协议有什么区别&#xff1f; 俗人杂念 为什么要自研RPC框架&#xff1f;HTTP和RPC的区别 starine 既然有HTTP协议&#xff0c;为什么还要有RPC 小白debug 文章目录 「 计算机网络…

工装识别工装检测系统 yolov7

工装识别工装检测系统通过yolov7python网络模型算法智能分析技术&#xff0c;工装识别工装检测系统对现场人员是否穿戴的进行实时分析&#xff0c;发现现场画面人员未按要求着装&#xff0c;系统会自动抓拍发出警报并讲违规图片视频保存下来&#xff0c;同步回传后台提醒监理人…

uniapp调用七牛云api实现文件上传-node.js向外提供uploadToken的接口-客户端不用下载七牛云的包和SDK-发起网络请求直接上传

uniapp调用七牛云api实现文件上传 实现思路&#xff1a; 1.使用node.js向客户端提供uploadToken&#xff0c;客户端获取uploadToken后使用七牛云的api接口发起网络请求&#xff0c;上传文件&#xff1b; node.js向外提供uploadToken的接口-客户端不用下载七牛云的包和SDK-&…

macOS电脑

UNIX操作系统有一个规范&#xff0c;名叫《单一UNIX规范》&#xff08;Single UNIX Specification&#xff09;。凡是符合这个规范的操作系统都可以叫UNIX操作系统&#xff0c;并且可以通过UNIX官方认证。 UNIX商标认证官网是&#xff1a;The Register of UNIX Certified Prod…