手撕netty源码(一)- NioEventLoopGroup

news2025/2/26 14:36:08

文章目录

  • 前言
  • 一、NIO 与 netty
  • 二、NioEventLoopGroup 对象的创建过程
    • 2.1 创建流程图
    • 2.2 EventExecutorChooser 的创建


前言

processOn文档跳转
本文是手撕netty源码系列的开篇文章,会先介绍一下netty对NIO关键代码的封装位置,主要介绍 NioEventLoopGroup 对象的创建过程,看看new一个对象可以做哪些事情。


一、NIO 与 netty

平时使用NIO的主要步骤:

/*创建选择器的实例*/
Selector selector = Selector.open();
/*创建ServerSocketChannel的实例*/
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

/*设置通道为非阻塞模式*/
serverSocketChannel.configureBlocking(false);
/*绑定端口*/
serverSocketChannel.socket().bind(new InetSocketAddress(port));
/*注册事件,表示关心客户端连接*/
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
while(true){
	/*获取当前有哪些事件*/
    selector.select(1000);
    /*获取事件的集合*/
    Set<SelectionKey> selectionKeys = selector.selectedKeys();
    Iterator<SelectionKey> iterator = selectionKeys.iterator();
    while(iterator.hasNext()){
         SelectionKey key = iterator.next();
         /*我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。
         如果我们没有删除处理过的键,那么它仍然会在主集合中以一个激活
         的键出现,这会导致我们尝试再次处理它。*/
         iterator.remove();
         handleInput(key);
    }
}

/*处理事件的发生*/
private void handleInput(SelectionKey key) throws IOException {
     if(key.isValid()){
         /*处理新接入的客户端的请求*/
         if(key.isAcceptable()){
             /*获取关心当前事件的Channel*/
             ServerSocketChannel ssc
                     = (ServerSocketChannel) key.channel();
             /*接受连接*/
             SocketChannel sc = ssc.accept();
             System.out.println("==========建立连接=========");
             sc.configureBlocking(false);
             /*关注读事件*/
             sc.register(selector,SelectionKey.OP_READ);
         }
         /*处理对端的发送的数据*/
         if(key.isReadable()){
             SocketChannel sc = (SocketChannel) key.channel();
             /*创建ByteBuffer,开辟一个缓冲区*/
             ByteBuffer buffer = ByteBuffer.allocate(1024);
             /*从通道里读取数据,然后写入buffer*/
             int readBytes = sc.read(buffer);
             if(readBytes>0){
                 /*将缓冲区当前的limit设置为position,position=0,
                 用于后续对缓冲区的读取操作*/
                 buffer.flip();
                 /*根据缓冲区可读字节数创建字节数组*/
                 byte[] bytes = new byte[buffer.remaining()];
                 /*将缓冲区可读字节数组复制到新建的数组中*/
                 buffer.get(bytes);
                 String message = new String(bytes,"UTF-8");
                 System.out.println("服务器收到消息:"+message);
                 /*处理数据*/
                 String result = Const.response(message);
                 、、、、、

             }else if(readBytes<0){
                 /*取消特定的注册关系*/
                 key.cancel();
                 /*关闭通道*/
                 sc.close();
             }
         }
         、、、、
     }
 }

平时使用netty的主要步骤:

// 配置服务端的NIO线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
	.channel(NioServerSocketChannel.class)
    .option(ChannelOption.SO_BACKLOG, 1024)
    .childHandler(new ServerInit());

// 绑定端口,同步等待成功
b.bind(NettyConstant.SERVER_PORT).sync();

那么,netty 对 NIO 的封装具体体现在哪里呢?先揭晓答案,后续一点点细嚼慢咽

  1. 创建选择器的实例
    io/netty/channel/nio/NioEventLoop.java
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
             EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
    super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
            rejectedExecutionHandler);
    this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
    this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
    final SelectorTuple selectorTuple = openSelector();
    this.selector = selectorTuple.selector;
    this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
  1. 创建ServerSocketChannel的实例
    io/netty/channel/socket/nio/NioServerSocketChannel.java
private static ServerSocketChannel newChannel(SelectorProvider provider, InternetProtocolFamily family) {
    try {
        ServerSocketChannel channel =
                SelectorProviderUtil.newChannel(OPEN_SERVER_SOCKET_CHANNEL_WITH_FAMILY, provider, family);
        return channel == null ? provider.openServerSocketChannel() : channel;
    } catch (IOException e) {
        throw new ChannelException("Failed to open a socket.", e);
    }
}
  1. 设置通道为非阻塞模式
    io/netty/channel/nio/AbstractNioChannel.java
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                logger.warn(
                            "Failed to close a partially initialized socket.", e2);
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }
  1. 绑定端口
    io/netty/bootstrap/AbstractBootstrap.java
private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }
  1. 注册事件,表示关心客户端连接
    io/netty/channel/nio/AbstractNioChannel.java
protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }
  1. 获取当前事件的集合
  2. 处理事件
    io/netty/channel/nio/NioEventLoop.java
private int select(long deadlineNanos) throws IOException {
   if (deadlineNanos == NONE) {
        return selector.select();
    }
    // Timeout will only be 0 if deadline is within 5 microsecs
    long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
    return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}

private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized();
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

其实,netty 也不难对吧
学习netty,主要学习它的设计思想和对性能优化的巧妙处理,当工作需要时,能够灵活运用

二、NioEventLoopGroup 对象的创建过程

2.1 创建流程图

在这里插入图片描述
可以看到,其实我们传的线程数量实际控制的是NioEventLoop对象创建的数量,而每个 NioEventLoop 其实是一个Executor执行器,那么至此,我们只是相当于创建了两个 NioEventLoopGroup 对象,他们分别有自己的children执行器 NioEventLoop 数组,同一个数组内的 NioEventLoop 共享一个ThreadPerTaskExecutor执行器,但是现在这个执行器后续如何处理事件和如何调度还不知道,后续会讲到,本文先看看创建NioEventLoopGroup对象都做了什么
在这里插入图片描述

2.2 EventExecutorChooser 的创建

// io/netty/util/concurrent/MultithreadEventExecutorGroup.java
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}

// io/netty/util/concurrent/MultithreadEventExecutorGroup.java
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
    checkPositive(nThreads, "nThreads");

    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            、、、
        }
    }
	// *********关键代码********
    chooser = chooserFactory.newChooser(children);

    、、、
}

// io/netty/util/concurrent/DefaultEventExecutorChooserFactory.java
public EventExecutorChooser newChooser(EventExecutor[] executors) {
    if (isPowerOfTwo(executors.length)) {
        return new PowerOfTwoEventExecutorChooser(executors);
    } else {
        return new GenericEventExecutorChooser(executors);
    }
}

private static boolean isPowerOfTwo(int val) {
    return (val & -val) == val;
}

这段代码很简单但是有需要我们学习的地方,从类名和方法名可以看出来,这个工厂类是创建事件执行者选择器的,并且是通过我们创建NioEventLoopGroup时指定的线程数来创建不同的选择器:

  • 当数量是2的次幂时,创建PowerOfTwoEventExecutorChooser
  • 否则,创建GenericEventExecutorChooser

(val & -val) == val
netty 使用这种方法来判断一个数是不是2的倍数,稍微讲一下,& 是"与"运算,只有1&1才得1,那么一个数的负数用二进制是怎么表示的呢?答案是“补码”,也就是对这个数的二进制取反+1,举例:
8的二进制是0000 1000,取反之后是 1111 0111,加1之后是 1111 1000,所以-8的二进制就是 1111 1000

0000 1000 & 1111 1000 = 0000 1000
学到了吧,以后有人问你如何判断一个数是不是2的次幂时,就可以用这个方法,因为二进制与或运算比加减运算更加高效

在这里插入图片描述

从这个工厂类的注释看,无论使用哪个选择器,策略都是轮询,那么为什么还涉及两个选择器呢?来看看具体实现:

private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
   private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;

    PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    @Override
    public EventExecutor next() {
        return executors[idx.getAndIncrement() & executors.length - 1];
    }
}

private static final class GenericEventExecutorChooser implements EventExecutorChooser {
    // Use a 'long' counter to avoid non-round-robin behaviour at the 32-bit overflow boundary.
    // The 64-bit long solves this by placing the overflow so far into the future, that no system
    // will encounter this in practice.
    private final AtomicLong idx = new AtomicLong();
    private final EventExecutor[] executors;

    GenericEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    @Override
    public EventExecutor next() {
        return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
    }
}

PowerOfTwoEventExecutorChooser 中定义了一个AtomicInteger idx,选择执行器的算法是“idx.getAndIncrement() & executors.length - 1”,举例说明:如果executors.length是2的次幂,那么二进制就是1000…,那么减1之后就是 01111…,和任何数做“按位与”运算,结果都只会是0到executors.length - 1之间,只要这个数递增的,那么就会在0到executors.length - 1之间轮询,达到轮询的目的,很巧妙吧,又学到了~

GenericEventExecutorChooser 的算法就很普通了,对executors.length取余

所以,在创建NioEventLoopGroup的时候,知道如何指定线程数了吧!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1628073.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用PyCharm开发工具创建工程

一. 简介 前面文章实现了开发 python程序使用的 开发工具PyCharm&#xff0c;本文来学习使用 PyCharm开发工具创建一个 python工程。 二. 使用PyCharm开发工具创建工程 1. 首先&#xff0c;打开 PyCharm开发工具&#xff0c;打开 "New project" 选项&#xff1a; …

hive启动beeline报错

问题一在zpark启动集群报错 出现上面的问题执行以下代码 chmod 777 /opt/apps/hadoop-3.2.1/logs 问题二启动beeline报错 执行 cd /opt/apps/hadoop-3.2.1 bin/hadoop dfsadmin -safemode leave 问题三执行查询语句报错 执行 set hive.exec.mode.local.autotrue;

java接口加密解密

这里写目录标题 controller加解密工具类加密&#xff08;本质是对ResponseBody加密&#xff09;解密&#xff08;本质是对RequestBody传参解密&#xff09;注解 controller Controller public class PathVariableController {GetMapping(value "/test")ResponseBod…

Redis缓存问题:穿透,击穿,雪崩等

Redis缓存问题:穿透,击穿,雪崩等 在高并发场景下,数据库往往是最薄弱的环节,我们通常选择使用redis来进行缓存,以起到缓冲作用,来降低数据库的压力,但是一旦缓存出现问题,也会导致数据库瞬间压力过大甚至崩溃,从而导致整个系统崩溃.今天就聊聊常见的redis缓存问题. 缓存击穿 …

相关分析方法

目录 1.什么是相关分析方法 2.相关系数 3.常见的相关分析方法 3.1.皮尔逊相关系数 3.2.斯皮尔曼等级相关 ​​​​​​​3.3.肯德尔等级相关 ​​​​​​​3.4.其它 4.应用 5.注意事项 6.结语 1.什么是相关分析方法 相关分析是数据分析中的一种统计方法&#xff0c…

[C++基础学习]----02-C++运算符详解

前言 C中的运算符用于执行各种数学或逻辑运算。下面是一些常见的C运算符及其详细说明&#xff1a;下面详细解释一些常见的C运算符类型&#xff0c;包括其原理和使用方法。 正文 01-运算符简介 算术运算符&#xff1a; a、加法运算符&#xff08;&#xff09;&#xff1a;对两个…

RabbitMQ(高级)笔记

一、生产者可靠性 &#xff08;1&#xff09;生产者重连&#xff08;不建议使用&#xff09; logging:pattern:dateformat: MM-dd HH:mm:ss:SSSspring:rabbitmq:virtual-host: /hamllport: 5672host: 192.168.92.136username: hmallpassword: 123listener:simple:prefetch: 1c…

AWTK 开源串口屏开发(17) - 通过 MODBUS 访问数组数据

在 AWTK 串口屏中&#xff0c;内置了 MODBUS Client Channel 的模型&#xff0c;不用编写代码即可实现在 ListView 中显示数组数据。 MODBUS 协议一次只能读取 125 个 WORD&#xff0c;AWTK-MODBUS Client Channel 支持长数据&#xff0c;自动分成多个请求访问。 1. 功能 不用…

浏览器的同源策略与解决跨域

同源策略&#xff08;协议、域名、端口&#xff09; 同源策略&#xff08;Same-Origin Policy&#xff09;是一个在浏览器安全模型中被实施的重要安全机制。它是基于域名、协议和端口号的限制&#xff0c;用于防止不同源的网页间的恶意行为和信息泄露。 根据同源策略&#xf…

【Diffusion实战】训练一个diffusion模型生成蝴蝶图像(Pytorch代码详解)

上一篇Diffusion实战是确确实实一步一步走的公式&#xff0c;这回采用一个更方便的库&#xff1a;diffusers&#xff0c;来实现Diffusion模型训练。 Diffusion实战篇&#xff1a;   【Diffusion实战】训练一个diffusion模型生成S曲线&#xff08;Pytorch代码详解&#xff09;…

【Linux学习】​​学习Linux的准备工作和Linux的基本指令

˃͈꒵˂͈꒱ write in front ꒰˃͈꒵˂͈꒱ ʕ̯•͡˔•̯᷅ʔ大家好&#xff0c;我是xiaoxie.希望你看完之后,有不足之处请多多谅解&#xff0c;让我们一起共同进步૮₍❀ᴗ͈ . ᴗ͈ აxiaoxieʕ̯•͡˔•̯᷅ʔ—CSDN博客 本文由xiaoxieʕ̯•͡˔•̯᷅ʔ 原创 CSDN 如…

并并并并·病查坤

P1、什么是并查集 引用自百度百科&#xff1a; 并查集&#xff0c;在一些有N个元素的集合应用问题中&#xff0c;我们通常是在开始时让每个元素构成一个单元素的集合&#xff0c;然后按一定顺序将属于同一组的元素所在的集合合并&#xff0c;其间要反复查找一个元素在哪个集合…

【数据标注】使用LabelImg标注YOLO格式的数据(案例演示)

文章目录 LabelImg介绍LabelImg安装LabelImg界面标注常用的快捷键标注前的一些设置案例演示检查YOLO标签中的标注信息是否正确参考文章 LabelImg介绍 LabelImg是目标检测数据标注工具&#xff0c;可以标注两种格式&#xff1a; VOC标签格式&#xff0c;标注的标签存储在xml文…

insightface 环境配置

首先创建续集环境&#xff1a; conda create -n insightface3 python3.8 然后打开此虚拟环境&#xff1a;conda activate insightface3 然后安装&#xff1a; pip install insightface 再安装&#xff1a;pip install onnxruntime-gpu 就可以了

零基础俄语培训哪家好,柯桥俄语培训

1、Мощный дух спасает расслабленное тело. 强大的精神可以拯救孱弱的肉体。 2、Единственное правило в жизни, по которому нужно жить — оставаться человеком в лю…

物联网的基本功能及五大核心技术——青创智通

工业物联网解决方案-工业IOT-青创智通 物联网基本功能 物联网的最基本功能特征是提供“无处不在的连接和在线服务”&#xff0c;其具备十大基本功能。 &#xff08;1&#xff09;在线监测&#xff1a;这是物联网最基本的功能&#xff0c;物联网业务一般以集中监测为主、控制为…

qml和c++结合使用

目录 文章简介1. 创建qml工程2. 创建一个类和qml文件&#xff0c;修改main函数3. 函数说明&#xff1a;4. qml 文件间的调用5. 界面布局6. 代码举例 文章简介 初学qml用来记录qml的学习过程&#xff0c;方便后面归纳总结整理。 1. 创建qml工程 如下图&#xff0c;我使用的是…

本地Mysql开启远程访问(图文)

目录 1. 问题所示2. 原理分析3. 解决方法 1. 问题所示 事因是访问同事的数据库时&#xff0c;出现无法访问 出现1130 - Host ‘IT07’ is not allowed to connect to this MySQL server截图如下&#xff1a; 2. 原理分析 如果账号密码地址都正常的情况下&#xff0c;这是没开…

SQLite尽如此轻量

众所周知&#xff0c;SQLite是个轻量级数据库&#xff0c;适用于中小型服务应用等&#xff0c;在我真正使用的时候才发现&#xff0c;它虽然轻量&#xff0c;但不知道它却如此轻量。 下载 官网&#xff1a; SQLite Download Page 安装 1、将下载好的两个压缩包同时解压到一个…