通过netty源码带你一步步剖析NioEventLoop 的任务队列原理

news2025/1/12 17:22:49

NioEventLoop 的异步任务队列成员:
在这里插入图片描述

NioEventLoop 中对newTaskQueue 接口的实现,返回的是JCTools工具包Mpsc队列(多生产者单一消费者无锁队列,(无界和有界都有实现)


private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
    // newMpscQueue 无界对列,newMpscQueue(maxPendingTasks) 有界队列
    return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
            : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}

任务的提交

Task任务的提交有3种典型使用场景,具体如下:

  • 用户提交的普通任务:
  • 用户提交的定时任务
  • 非Reactor线程调用Channel的各种方法,例如在推送系统的业务线程里面,根据用户的标识,找到对应的Channel引用,然后调用Write类方法向该用户推送消息,就会进入到这种场景,最终的Write 会提交到任务队列中后被异步消费.

用户提交的普通任务

通过ChannelHandlerContext 获取channel,通过channel 获取eventLoop,然后调用execute方法即可放入到任务队列中,代码如下:

Channel channel =ctx.channel();
channel.eventLoop().execute(new Runnable() {
    @Override
    public void run() {
        try{
            Thread.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
});

在AbstractChannel.AbstractUnsafe.register中,有一个eventLoop.execute()方法调用启动Eventlopo线程的入口,

 @Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ...
    AbstractChannel.this.eventLoop = eventLoop;
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            //此处处理的是用户提交的普通任务
            //如果线程没有启动,启动线程
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    // 通道注册到选择器,向流水线发送通道激活事件
                    register0(promise);
                }
            });
        } catch (Throwable t) {
           ...
        }
    }
}


如果线程没有启动,register0会作为一个Runable实例封装起来通过eventLoop.execute()方法提交到任务队列中.

用户提交的定时任务

Netty 提供了一些添加定时任务的接口,NioEventLoop的父类AbstractScheduledEventExecutor的schedule方法,通过ChannelHandlerContext获取channel,通过channel 获取eventLoop,然后调用schedule方法即放入到任务队列,代码如下:

 public  void heartBeat(ChannelHandlerContext ctx, ProtoMsg heartbeatMsg){
       ctx.executor().schedule(()->{
           if (ctx.channel().isActive()) {
               
               ctx.writeAndFlush(heartbeatMsg);
               heartBeat(ctx, heartbeatMsg);
           }
       },HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
   }
}

schedule 第一个参数和普通任务一样,传入一个线程即可,第二个参数是延时时间,第三个参数是延时单位,此处使用的是秒.

非Reactor线程调用Channel的各种方法

非反应器线程的消息发送操作,当用户线程(业务线程)发起write操作时,Netty 会进行判断,如果发现不是NioEventLoop 线程(反应器线程),则将发送消息封装封成WriteTask,放入NioEventLoop 的任务队列,由NioEventLoop 线程后续去执行.
在这里插入图片描述

用户线程发起write操作时的入口为io.netty.channel.AbstractChannelHandlerContext#write(final Object msg, final ChannelPromise promise),其源代码如下:

@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
   write(msg, true, promise);
   return promise;
}

private void write(Object msg, boolean flush, ChannelPromise promise) {
   ObjectUtil.checkNotNull(msg, "msg");
   try {
       if (isNotValidPromise(promise, true)) {
           ReferenceCountUtil.release(msg);
           // cancelled
           return;
       }
   } catch (RuntimeException e) {
       ReferenceCountUtil.release(msg);
       throw e;
   }

   final AbstractChannelHandlerContext next = findContextOutbound(flush ?
           (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
   final Object m = pipeline.touch(msg, next);
   EventExecutor executor = next.executor();
   if (executor.inEventLoop()) {
       if (flush) {
           next.invokeWriteAndFlush(m, promise);
       } else {
           next.invokeWrite(m, promise);
       }
   } else {
       final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
       if (!safeExecute(executor, task, promise, m, !flush)) {
          
           task.cancel();
       }
   }
}

private static boolean safeExecute(EventExecutor executor, Runnable runnable,
       ChannelPromise promise, Object msg, boolean lazy) {
   try {
       if (lazy && executor instanceof AbstractEventExecutor) {
           ((AbstractEventExecutor) executor).lazyExecute(runnable);
       } else {
           //executor执行的是netty自己实现的SingleThreadEventExecutor#execute方法
           executor.execute(runnable);
       }
       return true;
   } catch (Throwable cause) {
       try {
           if (msg != null) {
               ReferenceCountUtil.release(msg);
           }
       } finally {
           promise.setFailure(cause);
       }
       return false;
   }
}

任务的处理

任务处理的时序图如下:

这里safeExecute执行的task就是前面write写入时包装的AbstractWriteTask,

io.netty.channel.AbstractChannelHandlerContext.WriteTask#run
WriteTask的run经过一些系统处理操作,最终会调用io.netty.channel.ChannelOutboundBuffer#addMessage方法,将发送消息加入发送队列(链表)

io.netty.channel.nio.NioEventLoop#run

    @Override
    protected void run() {
        int selectCnt = 0;
        for (;;) {
            try {
                int strategy;
                try {
                    // select 策略选择
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                        // 1.1 非阻塞的select策略,即重试IO循环
                    case SelectStrategy.CONTINUE:
                        continue;
                        //1.2非阻塞的新事件IO循环
                    case SelectStrategy.BUSY_WAIT:

                        //1.3 阻塞的select策略
                    case SelectStrategy.SELECT:
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {
                            curDeadlineNanos = NONE; // nothing on the calendar
                        }
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {
                            if (!hasTasks()) {
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            ...
                        }
                        // 1.4 不需要select,目前已经有可以执行的任务了
                    default:
                    }
                } catch (IOException e) {
                    ...
                }

                selectCnt++;
                //2.执行网络IO事件和任务调度
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                boolean ranTasks;
                if (ioRatio == 100) {
                    try {
                        if (strategy > 0) {
                            //2.1 处理网络IO事件,分发入口
                            processSelectedKeys();
                        }
                    } finally {
                        // Ensure we always run tasks.
                        //2.2 处理系统Task和自定义Task
                        ranTasks = runAllTasks();
                    }
                } else if (strategy > 0) {
                    //根据ioRatio 计算非IO最多执行的时间
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        ...
                    }
                } else {
                    //处理队列中的task任务
                    ranTasks = runAllTasks(0); 
                }

                ...
            } catch (CancelledKeyException e) {
                ...
            } 
            ...
        }
    }


io.netty.util.concurrent.SingleThreadEventExecutor#runAllTasks()


 protected boolean runAllTasks() {
    assert inEventLoop();
    boolean fetchedAll;
    boolean ranAtLeastOne = false;

    do {
        // 聚合到期的定时任务
        fetchedAll = fetchFromScheduledTaskQueue();
        if (runAllTasksFrom(taskQueue)) {  //执行任务
            ranAtLeastOne = true;
        }
    } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.

    if (ranAtLeastOne) {
        lastExecutionTime = getCurrentTimeNanos();
    }
    afterRunningAllTasks();
    return ranAtLeastOne;
}


private boolean fetchFromScheduledTaskQueue() {
   //定时任务队列为空的时候返回
    if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
        return true;
    }
    long nanoTime = getCurrentTimeNanos();
    for (;;) {
        //从定时任务队列中抓取第一个定时任务,寻找截止时间为nanoTime任务
        Runnable scheduledTask = pollScheduledTask(nanoTime);
       //只有该定时任务队列不为空,才会塞到任务队列里面
        if (scheduledTask == null) {
            return true;
        }
        //如果添加到普通任务队列过程中失败
        if (!taskQueue.offer(scheduledTask)) {
           //则重新添加到定时任务队列中
            scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
    }
} 
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
    Runnable task = pollTaskFrom(taskQueue);
    if (task == null) {
        return false;
    }
    for (;;) {
        //重点代码:安全执行消息队列中的任务
        safeExecute(task);
        task = pollTaskFrom(taskQueue);
        if (task == null) {
            return true;
        }
    }
}

其中定时任务队列scheduledTaskQueue 定义在AbstractScheduledEventExecutor中,

PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;

io.netty.util.concurrent.AbstractEventExecutor#safeExecute

protected static void safeExecute(Runnable task) {
    try {
        //直接调用run方法执行
        runTask(task);
    } catch (Throwable t) {
        logger.warn("A task raised an exception. Task: {}", task, t);
    }
}
protected static void runTask(@Execute Runnable task) {
    task.run();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/690165.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

10万元存款是年轻人的一个“坎”?存款超过10万就会超过53.7%的人?不要焦虑,以过来人的身份帮你分析分析!

&#x1f337; 古之立大事者&#xff0c;不惟有超世之才&#xff0c;亦必有坚忍不拔之志 &#x1f390; 个人CSND主页——Micro麦可乐的博客 &#x1f425;《Docker实操教程》专栏以最新的Centos版本为基础进行Docker实操教程&#xff0c;入门到实战 &#x1f33a;《RabbitMQ》…

ChatGPT最新版实现多样化聚合文章的批量生成文章

随着人工智能技术的不断发展&#xff0c;ChatGPT最新版在多样化聚合文章的批量生成方面取得了重要突破。本文将从随机选取的8个方面&#xff0c;对ChatGPT最新版的构建思想进行详细阐述。这些方面包括&#xff1a;自然语言处理、大规模数据集、迁移学习、多模态输入、生成模型优…

JS将图片转pdf,jspdf的使用

Hi I’m Shendi 最近做转换工具&#xff0c;需要将图片转pdf&#xff0c;这里记录下来 JS将图片转pdf&#xff0c;jspdf的使用 简介 A library to generate PDFs in JavaScript. 一个用JavaScript生成PDF的库。 下载 在网站或github下载 https://parall.ax/products/jspdf …

图像增强之图像锐化(边缘增强)之sobel算子

note matx (-1,0,1;-2,0,2;-1,0,1) maty (-1,-2,-1;0,0,0;1,2,1) code // 图像增强之图像锐化(边缘增强)之sobel算子 void GetSobelMat(Mat& sobelX, Mat& sobelY) {sobelX (Mat_<int>(3,3) << -1,0,1,-2,0,2,-1,0,1);sobelY (Mat_<int>(3,3…

【面试】数据仓库

数据分层 维度建模 (0) 什么是维度建模&#xff1f; 维度建模以分析决策的需求出发构建模型&#xff0c;构建的数据模型为分析需求&#xff08;也就是我们通常所说的数据分析&#xff09;服务。它重点解决如何更快速完成分析需求&#xff0c;同时还有较好的大规模复杂查询的响…

品达通用权限系统-Day01

文章目录 1. 项目概述1.1 项目介绍1.2 业务架构1.3 技术架构1.4 环境要求 2. Spring Boot starter2.1 starter介绍2.2 starter原理2.2.1 起步依赖2.2.2 自动配置2.2.2.1 基于Java代码的Bean配置2.2.2.2 自动配置条件依赖2.2.2.3 Bean参数获取2.2.2.4 Bean的发现2.2.2.5 Bean的加…

NXP i.MX 8M Plus工业开发板规格书(四核ARM Cortex-A53 + 单核ARM Cortex-M7,主频1.6GHz)

1 评估板简介 创龙科技TLIMX8MP-EVM是一款基于NXP i.MX 8M Plus的四核ARM Cortex-A53 单核ARM Cortex-M7异构多核处理器设计的高性能工业评估板&#xff0c;由核心板和评估底板组成。ARM Cortex-A53(64-bit)主处理单元主频高达1.6GHz&#xff0c;ARM Cortex-M7实时处理单元主…

【Java】如何在 Java 中使用条件运算符

本文仅供学习参考&#xff01; 相关教程地址&#xff1a; http://c.biancheng.net/view/792.html https://www.cnblogs.com/bmbm/archive/2012/01/16/2342239.html 在软件开发中&#xff0c;运算符处理表达式中的一个或多个操作数。Java 编程语言支持以下类型的运算符&#xff…

HTML5、JS实现元素拖拽排序

先介绍一下html5的drag属性,拖放&#xff08;Drag 和 drop&#xff09;是 HTML5 标准的组成部分。想要启用drag&#xff0c;只要给元素加上draggable"true"就行了&#xff08;Safari 5.1.2除外&#xff09;。 拖动事件 事件分为两类&#xff0c;当前拖动的元素上的事…

【Makefile】解析Makefile:驾驭自动编译的力量

Makefile简介 一个工程中的源文件不计其数&#xff0c;其按类型、功能、模块分别放在若干个目录中&#xff0c;makefile定义了一系列的规则来指定&#xff0c;哪些文件需要先编译&#xff0c;哪些文件需要后编译&#xff0c;哪些文件需要重新编译&#xff0c;甚至于进行更复杂的…

你如何理解 JS 的继承?

在JavaScript中&#xff0c;继承是一种机制&#xff0c;允许一个对象&#xff08;子类&#xff09;从另一个对象&#xff08;父类&#xff09;继承属性和方法。这使得子类可以共享父类的功能&#xff0c;并有能∧自身定义新的功能。 JavaScript中的继承通过原型链实现。 具体来…

JavaWeb开发(前端Web开发)

文章目录 前言一、初识Web1.Web开发-介绍2.初识Web前端3.Web标准 二、HTML1.HTML快速入门2.VS Code开发工具3.基础标签&样式4.表格标签5.表单标签 三、JavaScript1.JS-介绍2.JS-引入方式3.JS-基础语法3.1.JS-基础语法-书写语法3.2.JS-基础语法-变量3.2.JS-基础语法-数据类型…

面向Java开发者的ChatGPT提示词工程(7)

在如今信息爆炸的时代&#xff0c;我们面临着海量的文字信息&#xff0c;很难抽出足够的时间去阅读所有感兴趣的内容。然而&#xff0c;大语言模型为我们提供了一种有效的解决方案&#xff1a;通过自动文本摘要&#xff0c;帮助我们快速获取文章的核心内容。这种技术已经被广泛…

vue2 h5开发前进刷新后退缓存实现

vue2 h5开发前进刷新后退缓存实现 在store定义变量 const state {includedComponents: [] }const mutations {includedComponents (state, data) {state.includedComponents data} }在app.vue&#xff08;我这里主要在layout.vue修改&#xff09;使用 keep-alive :include…

使用Microsoft.Office.Interop.PowerPoin遥控PPT

Microsoft.Office.Interop.PowerPoin操作PPT 主窗体&#xff0c;填写ppt路径&#xff0c;打开ppt打开ppt后&#xff0c;可用代码操作ppt可获取每页PPT截图&#xff0c;并获取对应小节名称&#xff0c;备注等代码下载地址联系qq 主窗体&#xff0c;填写ppt路径&#xff0c;打开p…

四.图像处理与光学之3A的 AE

五.图像处理与光学之3A的 AE 3A 是Camera ISP 控制算法的一个重要组成部分,通常分为自动曝光(AE)、自动聚焦(AF)、自动白平衡(AWB)三个组件。 5.0 概述自动曝光(Auto Exposure) 自动曝光算法可以理解为一个伺服系统,它不断监控ISP生成的每一帧图像的曝光状态,如果发现采…

C# Winform小程序:局域网设置NTP服务器、实现时间同步

设置NTP服务器&#xff1a; NTP是网络时间协议(Network Time Protocol)&#xff0c;它是用来同步网络中各个计算机的时间的协议。 局域网不能连接Internet&#xff0c;可以设置一台计算机为NTP服务器。 依次点击&#xff1a;开始---运行---regedit&#xff0c;进入注册表&am…

晶体管放大器结构原理图解

功率放大器的作用是将来自前置放大器的信号放大到足够能推动相应扬声器系统所需的功率。就其功率来说远比前置放大器简单&#xff0c;就其消耗的电功率来说远比前置放大器为大&#xff0c;因为功率放大器的本质就是将交流电能"转化"为音频信号&#xff0c;当然其中不…

JAVA 正则表达式 及 案例

JAVA 正则表达式 及 案例 目录 JAVA 正则表达式 及 案例1.正则表达式Regex1.1 概述1.2 常见语法1.3 String提供了支持正则表达式的方法1.4 练习&#xff1a;测试输入身份证号 1.正则表达式Regex 1.1 概述 正确的字符串格式规则。 常用来判断用户输入的内容是否符合格式的要求…

chatgpt赋能python:为什么在写Python代码时需要注意空格的使用

为什么在写Python代码时需要注意空格的使用 作为一门高效、易读且简单的编程语言&#xff0c;Python在软件开发领域中得到了越来越广泛的应用。然而&#xff0c;在Python的编码过程中&#xff0c;空格的使用可能会带来一些难以预料到的问题。为了避免这些问题&#xff0c;我们…