Netty源码剖析之HashedWheelTimer时间轮

news2025/1/5 15:16:36

版本信息:

JDK1.8

Netty-all:4.1.38.Final

时间轮的介绍

我们知道钟表分为很多块,每时钟滴答一次就往前走一个块,而时间轮就是使用这个思想。如下图

上图总共分为8块,每过100ms就往前走一块,然后周而复始。此时,我们能不能在每一块上挂载任务呢,然后每过100ms就执行块上的任务,实现类似于Scheduled延迟调度任务的功能。

下面使用一个案例+画图介绍一下时间轮。

此时,每块的间隔是100ms,时间轮的current指针已经执行了400ms,此时插入一个延迟200ms调度的任务进来。

而插入之需要算出时间轮的current指针的时间,然后加上本次调度的时间,就可以直接往哪一块添加任务,所以插入的效率是O1时间复杂度,不过在冲突的情况下需要使用链表链起来。而解决冲突的最好办法就是把块增多减少碰撞(HashMap同样的思想)

如果某个节点发生了碰撞,存在3个任务都在一个块,当current执行到哪一块的时候,就会串行化执行3个任务,如果任务中存在耗时任务,那么其他任务就会延迟执行,超过预期的执行时间,也会影响到整体的current前进,导致整体的时间对不上。所以使用时间轮的任务需要对时延的准确性低,并且尽量保证任务本身精简不携带耗时操作~

时间轮和小顶堆的区别

PriorityQueue优先级队列icon-default.png?t=N7T8https://blog.csdn.net/qq_43799161/article/details/132734047?spm=1001.2014.3001.5502

在上篇文章中讲述了PriorityQueue优先级队列,它底层由小顶堆实现(完全二叉树),在插入元素的时候需要向上调整(siftUp),在取出元素的时候需要向下调整(siftDown),调整的过程是非常浪费性能,尤其是数据量过多的时候。

而时间轮通过O1的时间复杂度直接定位在哪一块上,如果有冲突就使用链表把定位在同一块的任务链起来,不需要任何的调整,整体效率比小顶堆高,尤其是数据量大的时候差距就更加的明显~

HashedWheelTimer源码分析

直接从构造方法入手~

/**
 * @param threadFactory         线程工厂
 * @param tickDuration          间隔时间,默认是100
 * @param unit                  时间单位,默认是毫秒ms
 * @param ticksPerWheel         总块数,默认是512块
 * @param leakDetection         是否泄漏检测,默认为true
 * @param maxPendingTimeouts    最大任务数,默认为-1,-1代表无限数量。
 * */
public HashedWheelTimer(
    ThreadFactory threadFactory,
    long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
    long maxPendingTimeouts) {

    // 创建HashedWheelBucket数组。数组大小为ticksPerWheel,默认512快。(会优化成2的指数倍数,因为要hash运算)
    wheel = createWheel(ticksPerWheel);
    mask = wheel.length - 1;    // hash运算掩码

    // 把用户传入的间隔单位转换成纳秒。
    long duration = unit.toNanos(tickDuration);

    // 时间间隔小于默认的最小值(最小值为1毫秒)
    if (duration < MILLISECOND_NANOS) {
        // 赋值为默认的最小值
        this.tickDuration = MILLISECOND_NANOS;
    } else {
        // 正常赋值
        this.tickDuration = duration;
    }

    // 创建时间轮的工作线程
    workerThread = threadFactory.newThread(worker);

    // 默认为-1,也即为无限大
    // 当然这个值用户可以自行传入。
    this.maxPendingTimeouts = maxPendingTimeouts;
}

对构造方法做一个总结:

  1. 创建HashedWheelBucket数组,这个数组就是时间轮的块,默认有512块。所以也尽可能的减少碰撞
  2. 把用户传入的间隔时间,默认为100ms,转换成纳秒,因为纳秒计算保证了精准性
  3. 创建时间轮的工作线程,此工作线程的指责是每次的100ms滴答,执行每个块的任务
  4. 赋值总任务量,默认为-1,也即默认无限多。

构造方法把一切初始化好了,创建了线程,所以需要找到线程在那里开启,线程的执行代码~

既然已经初始化好了,那么就看到创建延迟调度任务的方法,此方法中启动了时间轮工作线程

/**
    * @param task  任务
    * @param delay 延迟时间
    * @param unit  时间单位
    * @author liha
    * 
*/
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
    // 限流。
    long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
    if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
        pendingTimeouts.decrementAndGet();
        throw new RejectedExecutionException("Number of pending timeouts ("
            + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
            + "timeouts (" + maxPendingTimeouts + ")");
    }

    // 开启线程。
    // 内部使用状态机+unsafe保证只会有一个线程启动
    start();

    // 当前时间 + 本次延迟调度的时间 - 时间轮开始的时间 = 本次调度的绝对时间。
    long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

    // 创建任务,往时间轮的工作线程队列投递
    HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
    timeouts.add(timeout);      
    return timeout;
}

对创建延迟调度任务的方法做一个总结:

  1. 限流操作,是否达到了用户设置的总任务数的阈值
  2. 启动线程,内部使用状态机+unsafe保证只会有一个线程启动
  3. 算出本次调度的绝对时间。而绝对时间是从工作线程启动的时候开始算的,为什么要这么算?因为工作线程启动时钟就开始滴答,也即current指针开始移动。所以我们有必要把这些时间算进去 ,再加上本次延迟调度的时间 ,就等于最终调度的绝对时间。
  4. 创建出HashedWheelTimeout对象,此对象就是延迟调度任务
  5. 多线程之间的传输任务肯定是使用队列,所以使用队列将HashedWheelTimeout投递到工作线程中

所以,我们接下来看到时间轮工作线程。

// 线程执行点。
@Override
public void run() {
    // 当前线程的启动时间
    startTime = System.nanoTime();         

    // 唤醒阻塞在等待此线程启动的线程。
    startTimeInitialized.countDown();           

    do {
        // 使用休眠模拟滴答。
        final long deadline = waitForNextTick();
        if (deadline > 0) {
            // 算出本次滴答执行的任务在那个索引位置。
            int idx = (int) (tick & mask);

            // 处理取消的任务
            processCancelledTasks();

            // 取出当前滴答索引对应的HashedWheelBucket
            HashedWheelBucket bucket =
            wheel[idx];

            // 从队列中取出其他线程投递的HashedWheelTimeout调度任务。
            transferTimeoutsToBuckets();

            // 处理当前批次的。
            bucket.expireTimeouts(deadline);
            
            // 为下次滴答+1。
            tick++;         
        }
    } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

    // 跳出do while循环代表已经处于stop状态,所以需要做收尾工作。
    // 把队列中还没有处理的任务返回给用户自行去处理
    for (HashedWheelBucket bucket: wheel) {
        bucket.clearTimeouts(unprocessedTimeouts);
    }
    for (;;) {
        HashedWheelTimeout timeout = timeouts.poll();
        if (timeout == null) {
            break;
        }
        if (!timeout.isCancelled()) {
            unprocessedTimeouts.add(timeout);
        }
    }
    processCancelledTasks();
}
  1. 获取到当前线程启动的时间
  2. 唤醒等待当前线程启动的线程
  3. waitForNextTick方法使用休眠模拟时钟滴答
  4. 算出本次滴答后需要执行的块的索引
  5. 处理取消的任务
  6. 通过块的索引拿到HashedWheelBucket
  7. 从队列中取出其他线程投递的HashedWheelTimeout调度任务。
  8. 处理当前HashedWheelBucket中的任务
  9. 为下次滴答+1。
  10. 当工作线程进入到stop状态后,会把没有执行的任务打包,当用户调用stop方法会拿到这些没处理的任务,交给用户自行处理。

这里我们看到waitForNextTick方法如何使用休眠模拟时钟滴答

private long waitForNextTick() {
    // tick是总滴答的次数。
    // 滴答间隔 * 总滴答的次数+1 = 本次滴答完后的总滴答时间
    long deadline = tickDuration * (tick + 1);

    for (;;) {
        // 得到工作线程从启动开始总共运行的时间(这是一个相对时间)
        final long currentTime = System.nanoTime() - startTime;

        /**
         * 这里拿本次应该滴答后达到的时间 - 工作线程从启动开始总共运行的时间 = 本次睡眠的时间
         * 注意:这里可能已经是负数了,因为执行之前的调度任务需要时间
         * + 999999 / 1000000 是为了四舍五入,并且把纳秒转换成毫秒,因为sleep方法需要毫秒
         * */
        long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

        // 小于0直接返回,代表达到时间了。
        if (sleepTimeMs <= 0) {
            if (currentTime == Long.MIN_VALUE) {
                return -Long.MAX_VALUE;
            } else {
                return currentTime;
            }
        }

        try {
            // 睡眠
            Thread.sleep(sleepTimeMs);
        } catch (InterruptedException ignored) {
            if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                return Long.MIN_VALUE;
            }
        }
    }
}

这里比较简单,就是算出本次滴答后的绝对时间 - 当前工作线程总共执行的时间 = 本次应该休眠的时间,然后去Thread.sleep 睡眠模拟时钟滴答。

我们继续看到transferTimeoutsToBuckets方法是如何接受队列的HashedWheelTimeout调度任务

private void transferTimeoutsToBuckets() {
    // 尝试10000次,如果10000次还没有队列来就下一轮再处理,因为我们不能在这里浪费过多的时间影响到精准度
    for (int i = 0; i < 100000; i++) {
        // 从队列中尝试取出。
        HashedWheelTimeout timeout = timeouts.poll();

        // timeout.deadline是拿到用户传入的调度时间
        // tickDuration 这个是一次滴答的时间。
        // 所以这里算出调度需要多少次滴答
        long calculated = timeout.deadline / tickDuration;
        
        // 计算出多少轮可以调度。
        timeout.remainingRounds = (calculated - tick) / wheel.length;

        final long ticks = Math.max(calculated, tick); 

        // hash运算,得到HashedWheelBucket数组的索引。
        int stopIndex = (int) (ticks & mask);

        HashedWheelBucket bucket = wheel[stopIndex];
        // 添加到HashedWheelBucket,其中使用双向链表,等待被调度。
        bucket.addTimeout(timeout);
    }
}

这里for循环尝试10w次,因为不能尝试太多次,不然会影响到调度的精准度。

每次尝试从队列中获取到调度任务,计算出当前任务需要多少个滴答,最后hash运算添加到对应的HashedWheelBucket中,等待被调度。

在本文的最后看一下,如何调用任务,看到HashedWheelBucket的expireTimeouts方法

public void expireTimeouts(long deadline) {

    HashedWheelTimeout timeout = head;

    while (timeout != null) {
        HashedWheelTimeout next = timeout.next;
        // 小于等于0 代表可以被调度了,要不然就-1
        if (timeout.remainingRounds <= 0) {  
            // 要被调度的任务就从链表中移除。   
            next = remove(timeout); 
            if (timeout.deadline <= deadline) {
                // 执行任务。
                timeout.expire();
            } else {
                        // The timeout was placed into a wrong slot. This should never happen.
                throw new IllegalStateException(String.format(
                    "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
            }
        } else if (timeout.isCancelled()) {
            // 被取消了,所以从队列中移除
            next = remove(timeout);
        } else {        
            // 整整大了N个周期,所以-1,等到remainingRounds为0的时候就是需要被调度
            timeout.remainingRounds --;
        }
        timeout = next;
    }
}

这里就非常的容易了,直接遍历双向链表,串行化的执行HashedWheelTimeout的expire方法,在expire方法中会调用用户传入的业务逻辑 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/988256.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

关于faust勒索病毒,这些您该了解,勒索病毒解密,数据恢复

faust勒索病毒是一种近年来流行的恶意软件&#xff0c;它通过对计算机系统进行加密并要求支付赎金来达到释放系统的目的。faust勒索病毒的流行程度和危害性不断上升&#xff0c;给企业和个人带来了严重的安全威胁。接下来云天数据恢复中心将从faust勒索病毒的背景、特点、危害、…

paddlespeech on centos7

概述 paddlespeech是百度飞桨平台的开源工具包&#xff0c;主要用于语音和音频的分析处理&#xff0c;其中包含多个可选模型&#xff0c;提供语音识别、语音合成、说话人验证、关键词识别、音频分类和语音翻译等功能。 paddlespeech整体是比较简单易用的&#xff0c;但是安装…

开学季!ChatGPT遭多国高校“封杀”!

美国OpenAI公司研发的聊天机器人ChatGPT&#xff0c;在过去的几个月席卷全球。 OpenAI的创始人之一马斯克&#xff08;Elon Musk&#xff09;曾在社交媒体平台上称赞ChatGPT&#xff1a;“这是全新的世界。和家庭作业说再见吧&#xff01;”&#xff08;Its a new world. Good…

轮播图横向和纵向同时滚动

轮播图横向和纵向同时滚动: <view><swiper v-if"noticeList.length > 0" style"margin-left: 20rpx;position: fixed;margin-top: 10rpx;z-index: 9999;width: 710rpx;border-radius: 20rpx !important;overflow: hidden;transform: translateY(0)…

js-13-Js中的事件模型

事件模型分为&#xff1a;事件与事件流、原始事件模型、标准事件模型和IE事件模型 1、事件与事件流 js中的事件&#xff0c;可以理解为是在HTML中文档或者浏览器中发生的一种交互操作&#xff0c;使得网页具备互动性&#xff0c;常见的有加载事件、鼠标事件、自定义事件等。 由…

CocosCreator3.8研究笔记(十)CocosCreator 图像资源的理解

一、图像资源导入 Cocos Creator 可使用图像文件格式&#xff0c;支持 JPG、PNG、BMP、TGA、HDR、WEBBP、PSD、TIFF 等。 将图像资源直接拖拽到 资源管理器 即可将其导入 二、图像资源的类型 在 属性检查器 面板中便可根据需要设置图像资源的使用类型&#xff1a;raw 、 textu…

window和linux下载ffmpeg

window 进入官方 进入 download 页面并选择 window 版本 下载 zip 压缩包 解压压缩包 解压压缩包之后&#xff0c;在 bin 目录下有三个文件&#xff0c;我们要下载的 window 版 ffmpeg.exe 就在其中&#xff0c;后续你可以添加系统环境变量或者在每次执行 ffmpeg.exe 都带上路…

leetcode386. 字典序排数(java)

字典序排数 题目描述递归法迭代 题目描述 难度 - 中等 leetcode386. 字典序排数 给你一个整数 n &#xff0c;按字典序返回范围 [1, n] 内所有整数。 你必须设计一个时间复杂度为 O(n) 且使用 O(1) 额外空间的算法。 示例 1&#xff1a; 输入&#xff1a;n 13 输出&#xff1a…

学习Jetpack Compose的反思,总结及新的开始(无干货,纯叙事)

前言及个人简介 我是一名90后安卓开发者&#xff0c;我是从去年五月四日开始学习 Jetpack Compose的&#xff0c;出于对前沿安卓知识的渴望&#xff0c;我点开了Jetpack Compose官网的网页&#xff0c;开始了我的学习之旅&#xff0c;那时候国内的相关文档还没有现在多&#x…

小程序中使用分包

前言 小程序在未使用的分包的情况下仅支持大小为2M,如果图片等资源过多的情况下可以使用分包功能&#xff0c;使用分包的情况下单个分包大小不能超过2M,总大小不能超过20M&#xff0c;分包有两种情况&#xff1a;普通分包和独立分包&#xff0c;下面介绍的是普通分包。官方文档…

Linux常用命令——cp命令

在线Linux命令查询工具 cp 将源文件或目录复制到目标文件或目录中 补充说明 cp命令用来将一个或多个源文件或者目录复制到指定的目的文件或目录。它可以将单个源文件复制成一个指定文件名的具体的文件或一个已经存在的目录下。cp命令还支持同时复制多个文件&#xff0c;当一…

大佬带飞,代码分享不会用?玩转Git,跟上大佬节奏!

一、安装 Git 客户端 这里为大家提供了windows版的Git客户端以及安装图文详解文档。百度网盘&#xff1a; https://pan.baidu.com/s/1CDu0Ke199pt3Ysv-QtWObA 提取码&#xff1a;8888 如果过期了请留言联系我。 二、注册码云账号 打开码云网站&#xff1a;https://gitee.co…

分布式 - 服务器Nginx:基础系列之Nginx静态资源配置优化sendfile | tcp_nopush | tcp_nodelay

文章目录 1. sendfile 指令2. tcp_nopush 指令3. tcp_nodelay 指令 1. sendfile 指令 请求静态资源的过程&#xff1a;客户端通过网络接口向服务端发送请求&#xff0c;操作系统将这些客户端的请求传递给服务器端应用程序&#xff0c;服务器端应用程序会处理这些请求&#xff…

ARM+Codesys标准通用型控制器

整机工业级设计&#xff0c;通讯外设经过隔离保护 电源宽电压设计(9~36V DC ) 丰富的通讯接口&#xff0c;满足多种场合控制和通讯需求 四核工业级处理器&#xff0c;高性能&#xff0c;低功耗&#xff0c;高可靠性 机身无风扇设计&#xff0c;外壳小巧 搭载内核 100% 自主…

【面试高频题】二叉树“神级遍历“入门

题目描述 这是 LeetCode 上的 「99. 恢复二叉搜索树」 &#xff0c;难度为 「中等」。 Tag : 「二叉树」、「树的搜索」、「递归」、「迭代」、「中序遍历」、「Morris 遍历」 给你二叉搜索树的根节点 root&#xff0c;该树中的 恰好 两个节点的值被错误地交换。请在不改变其结…

HBuilderX安装+配置教程

HbuilderX是Hbuilder的升级版。它是是DCloud&#xff08;数字天堂&#xff09;推出为前端开发者服务的通用IDE&#xff0c;或者称为编辑器。 目录 一 下载 二 安装 三 创建桌面快捷方式 四 使用 1.【新建项目】&#xff1a; 2 运行 一 下载 官网&#xff1a;DCloud - …

JavaScript的面向对象

一、认识对象 1.概述 对象&#xff08;object&#xff09;是 JavaScript 语言的核心概念&#xff0c;也是最重要的数据类型。 什么是对象&#xff1f;简单说&#xff0c;对象就是一组“键值对”&#xff08;key-value&#xff09;的集合&#xff0c;是一种无序的复合数据集合…

vscode调试程序设置

主要设置和json内容如下&#xff1a; cpp_properties.json内容&#xff1a; {"configurations": [ //C intellisense插件需要这个文件&#xff0c;主要是用于函数变量等符号的只能解析{"name": "Win32","includePath": ["${work…

Discourse 如何访问运行数据库

在需要了解 Discourse 如何访问数据库之前我们需要了解的是 Discourse 的所有软件都使用的是 Docker 容器。 因此我们必须要进入到 Docker 容器后才能访问 Discourse 内部的东西。 进入 Discourse 容器 进入 Discourse 容器的命令是 cd /var/discourse/ ./launcher enter a…

Container容器

Container继承体系 Winow是可以独立存在的顶级窗口,默认使用BorderLayout管理其内部组件布局;Panel可以容纳其他组件&#xff0c;但不能独立存在&#xff0c;它必须内嵌其他容器中使用&#xff0c;默认使用FlowLayout管理其内部组件布局&#xff1b;ScrollPane 是 一个带滚动条…