dubbo之时间轮算法分析

news2025/1/11 22:52:38

文章目录

  • 前言
  • 一、参数说明
  • 二、具体实现
    • 1、HashedWheelTimer
    • 2、createWheel
    • 3、newTimeout
    • 4、start
    • 5、run
    • 6、waitForNextTick
    • 7、transferTimeoutsToBuckets
    • 8、expireTimeouts
  • 总结


前言

时间轮(TimingWheel)是一种高效利用线程资源进行批量化调度的算法,广泛应用于各种操作系统的定时任务调度中,如Linux的crontab,以及Java开发中常用的Dubbo、Netty、Kafka等框架。时间轮的核心思想是将时间划分为若干个时间间隔(bucket),每个时间间隔代表一个时间段,并通过一个循环的数据结构(类似于时钟)来管理这些时间间隔。


文章基于3.1.0版本进行分析

		<dependency>
            <groupId>org.apache.dubbo</groupId>
            <artifactId>dubbo</artifactId>
            <version>3.1.0</version>
        </dependency>

一、参数说明

在这里插入图片描述

  • tickDuration:表示一个槽所代表的时间范围 默认100ms
  • ticksPerWheel:表示该时间轮有多少个槽 默认512
  • startTime:表示该时间轮的开始时间
  • interval:时间轮所能表示的时间跨度,也就是 tickDuration * ticksPerWheel
  • currentTime:表示当前时间,也就是时间轮指针指向的时间
  • wheel:表示TimerTaskList的数组,即各个槽,每个bucket都是一个 HashedWheelBucket 。
  • HashedWheelBucket:存储TimerTaskEntry的双向链表
  • HashedWheelTimeout:延迟任务,有两个值 deadline 和 remainingRounds
  • deadline:TimerTask 最后的执行时间
  • remainingRounds:剩余圈数
  • timeouts:用来保存新增的HashedWheelTimeout,每次执行会拿出10W个放入HashedWheelBucket

二、具体实现

1、HashedWheelTimer

时间轮实现类

	public HashedWheelTimer(
            ThreadFactory threadFactory,
            long tickDuration, TimeUnit unit, int ticksPerWheel,
            long maxPendingTimeouts) {
		// 检查参数
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }
        if (tickDuration <= 0) {
            throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
        }
        if (ticksPerWheel <= 0) {
            throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
        }

        // Normalize ticksPerWheel to power of two and initialize the wheel.
		// 创建时间轮
        wheel = createWheel(ticksPerWheel);
		
		// 位运算标识 
		// 因为一圈的长度为2的n次方,mask = 2^n-1后低位将全部是1,然后deadline& mast == deadline % wheel.length    
		// deadline = System.nanoTime() + unit.toNanos(delay) - startTime; TODO
        mask = wheel.length - 1;

        // Convert tickDuration to nanos.
		// 时间轮的基本时间跨度,转成最小时间单位Nanos
        this.tickDuration = unit.toNanos(tickDuration);

        // Prevent overflow.
		// 时间跨度限制不能太大,计算会有问题
        if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
            throw new IllegalArgumentException(String.format(
                    "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
                    tickDuration, Long.MAX_VALUE / wheel.length));
        }
		// 创建时间轮工作线程
        workerThread = threadFactory.newThread(worker);

        this.maxPendingTimeouts = maxPendingTimeouts;
		// 延迟任务太多的时间,警告日志
        if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
                WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
            reportTooManyInstances();
        }
    }

参数说明:

  • threadFactory
    线程工厂,创建时间轮线程
  • tickDuration
    每一tick的时间
  • timeUnit
    tickDuration的时间单位
  • ticksPerWheel
    就是轮子一共有多个格子,即要多少个tick才能走完这个wheel一圈。

2、createWheel

创建时间轮

    private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
        if (ticksPerWheel <= 0) {
            throw new IllegalArgumentException(
                    "ticksPerWheel must be greater than 0: " + ticksPerWheel);
        }
        if (ticksPerWheel > 1073741824) {
            throw new IllegalArgumentException(
                    "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
        }
		// 如果不是2^n 则调整为2^n
        ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
		// 初始化时间轮槽
        HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
        for (int i = 0; i < wheel.length; i++) {
            wheel[i] = new HashedWheelBucket();
        }
        return wheel;
    }

3、newTimeout

添加新任务

	public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }

        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
		// 如果任务大于最大量,则不允许继续添加
        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
            pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts ("
                    + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                    + "timeouts (" + maxPendingTimeouts + ")");
        }
		// 启动时间轮工作线程
        start();

        // Add the timeout to the timeout queue which will be processed on the next tick.
        // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

        // Guard against overflow.
		// 如果为负数,那么说明超过了long的最大值
        if (delay > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        }
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        timeouts.add(timeout);
        return timeout;
    }

4、start

启动时间轮

    public void start() {
        switch (WORKER_STATE_UPDATER.get(this)) {
			// 如果是初始化 启动实践论
            case WORKER_STATE_INIT:
				// 保证只启动一次,原子性
                if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                    workerThread.start();
                }
                break;
			// 已经启动过了
            case WORKER_STATE_STARTED:
                break;
			// 时间轮停止了
            case WORKER_STATE_SHUTDOWN:
                throw new IllegalStateException("cannot be started once stopped");
            default:
                throw new Error("Invalid WorkerState");
        }

        // Wait until the startTime is initialized by the worker.
		// 线程启动执行任务是异步的,这里是等待workerThread.start(),线程已经启动了
        while (startTime == 0) {
            try {
                startTimeInitialized.await();
            } catch (InterruptedException ignore) {
                // Ignore - it will be ready very soon.
            }
        }
    }

5、run

workerThread.start()启动后,会执行Worker的run方法

public void run() {
    // Initialize the startTime.
    startTime = System.nanoTime();
    if (startTime == 0) {
        // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
        startTime = 1;
    }

    // Notify the other threads waiting for the initialization at start().
	// 唤醒被阻塞的start()方法。
    startTimeInitialized.countDown();

    do {
		// 等下一个槽的到达时间,开始执行上一个槽的任务 TODO 不明白这里的设计,哪位大佬知道可以指点一下
        final long deadline = waitForNextTick();
        if (deadline > 0) {
			// 计算时间轮的槽位
            int idx = (int) (tick & mask);
			// 移除取消的了task
            processCancelledTasks();
            HashedWheelBucket bucket = wheel[idx];
            // 将newTimeout()方法中加入到待处理定时任务队列中的任务加入到指定的格子中
			transferTimeoutsToBuckets();
			// 运行目前指针指向的槽中的bucket链表中的任务,执行到期的延时任务
            bucket.expireTimeouts(deadline);
            tick++;
        }
		
    } 
	// 如果Worker_State一只是started状态,就一直循环
	while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

    // Fill the unprocessedTimeouts so we can return them from stop() method.
    for (HashedWheelBucket bucket : wheel) {
		// 清除时间轮中超时未处理的任务
        bucket.clearTimeouts(unprocessedTimeouts);
    }
    for (; ; ) {
		// 遍历任务队列,发现如果有任务被取消,则添加到unprocessedTimeouts,也就是不需要处理的队列中。
        HashedWheelTimeout timeout = timeouts.poll();
        if (timeout == null) {
            break;
        }
        if (!timeout.isCancelled()) {
            unprocessedTimeouts.add(timeout);
        }
    }
	// 再次移除取消的了task
    processCancelledTasks();
}

6、waitForNextTick

一个钟表上的间隔是代表一个单位时间的间隔,那么waitForNextTick就是根据当前时间计算出跳动到下个时间的时间间隔,然后进行sleep,结束后进入下一个时间间隔,下一个间隔到来的时候返回。

	/**
     * 根据startTime和当前槽位计算目标nanoTime,
     * 等待时间到达
     *
     * @return Long.MIN_VALUE if received a shutdown request,
     * current time otherwise (with Long.MIN_VALUE changed by +1)
     */
    private long waitForNextTick() {
		// tick槽位,tickDuration表示每个时间格的跨度,所以deadline返回的是下一次时间轮指针跳动的时间
        long deadline = tickDuration * (tick + 1);

        for (; ; ) {
			// 计算当前时间距离启动时间的时间间隔,期间休眠
            final long currentTime = System.nanoTime() - startTime;
			// 计算sleepTimeMs先加999999,应该是不足1ms的,补足1ms
            long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
			
            // sleepTimeMs小于零表示走到了下一个时间槽位置
			if (sleepTimeMs <= 0) {
                if (currentTime == Long.MIN_VALUE) {
                    return -Long.MAX_VALUE;
                } else {
                    return currentTime;
                }
            }
			// Windows 时间换算
            if (isWindows()) {
                sleepTimeMs = sleepTimeMs / 10 * 10;
            }

            try {
				// 当前时间距离下一次tick时间还有一段距离,需要sleep
                Thread.sleep(sleepTimeMs);
            } catch (InterruptedException ignored) {
                if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                    return Long.MIN_VALUE;
                }
            }
        }
    }

7、transferTimeoutsToBuckets

转移任务到时间轮的具体槽中

	// 将延时任务队列中等待添加到时间轮中的延时任务转移到时间轮的指定位置
	private void transferTimeoutsToBuckets() {
        // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
        // adds new timeouts in a loop.
		// 循环100000次,也就是每次转移10w个任务
		// 为了防止这个操作销毁太多时间,导致更多的任务时间不准,因此一次最多操作10w个
        for (int i = 0; i < 100000; i++) {
			// 从阻塞队列中获得具体的任务
            HashedWheelTimeout timeout = timeouts.poll();
            if (timeout == null) {
                // all processed
                break;
            }
            if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
                // Was cancelled in the meantime.
                continue;
            }

            // 计算tick次数,deadline表示当前任务的延迟时间,tickDuration表示时间槽的间隔,两者相除就可以计算当前任务需要tick几次才能被执行
			long calculated = timeout.deadline / tickDuration;
            // 计算剩余的轮数, 只有 timer 走够轮数, 并且到达了 task 所在的 slot, task 才会过期.(被执行)
			timeout.remainingRounds = (calculated - tick) / wheel.length;

            // Ensure we don't schedule for past.
			// 如果任务在timeouts队列里面放久了, 以至于已经过了执行时间, 这个时候就使用当前tick, 也就是放到当前bucket, 此方法调用完后就会被执行
            final long ticks = Math.max(calculated, tick);
            // 算出任务应该插入的 wheel 的 slot, stopIndex = tick 次数 & mask, mask = wheel.length - 1
			int stopIndex = (int) (ticks & mask);

            // 把timeout任务插入到指定的bucket链中。
			HashedWheelBucket bucket = wheel[stopIndex];
            bucket.addTimeout(timeout);
        }
    }

8、expireTimeouts

当指针跳动到某一个时间槽中时,会就触发这个槽中的任务的执行。该功能是通过expireTimeouts来实现

	void expireTimeouts(long deadline) {
		// 双向链表
        HashedWheelTimeout timeout = head;

        // process all timeouts
        // 遍历当前时间槽中的所有任务
		while (timeout != null) {
            HashedWheelTimeout next = timeout.next;
			// 如果当前任务要被执行,那么remainingRounds应该小于或者等于0
            if (timeout.remainingRounds <= 0) {
				// 从bucket链表中移除当前timeout,并返回链表中下一个timeout
                next = remove(timeout);
				// 如果timeout的时间小于当前的时间,那么就调用expire执行task
                if (timeout.deadline <= deadline) {
                    timeout.expire();
                } else {
                    // The timeout was placed into a wrong slot. This should never happen.
                    // 不可能发生的情况,就是说round已经为0了,deadline却 > 当前槽的deadline
					throw new IllegalStateException(String.format(
                            "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
                }
            } else if (timeout.isCancelled()) {
                next = remove(timeout);
            } else {
				// 因为当前的槽位已经过了,说明已经走了一圈了,把轮数减一
                timeout.remainingRounds--;
            }
			// 把指针放置到下一个timeout
            timeout = next;
        }
    }

总结

时间轮(TimingWheel)在计算机科学中,特别是在任务调度和时间管理方面,具有重要的意义,我们可以结合业务进行使用

  • 节省cpu资源

  • 易于实现和维护

  • 批量化调度模型

  • 高效处理大量定时任务

  • 灵活适应不同应用场景

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2095911.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ffmpeg音频编码

音视频播放的流程 根据我之前的文章 我们已经从解复用&#xff0c;解码得到原始数据&#xff0c;现在我们逆向&#xff0c;将frame转化packet。也就是原始数据转化为压缩后的数据文件。 介绍 PCM样本格式 PCM(Pulse Code Modulation&#xff0c;脉冲编码调制)⾳频数据是未经…

离散数学------关系理论

一、序偶和笛卡尔积 序偶 两个序偶如果相等&#xff0c;那么他们相对应的第一第二元素分别相等 笛卡尔积 笛卡尔积是集合之间的一种运算&#xff0c;运算的结果是个序偶&#xff0c;第一元素来自前面的集合&#xff0c;第二元素来自后面的集合。 两集合进行笛卡尔积运算后集合…

UE5学习笔记20-给游戏添加声音

一、准备音频资源 1.Jump文件夹中有跳跃的音频资源wav文件夹中是SoundCue的音波资源 2.音乐衰减文件&#xff0c;右键->音频->音效衰减 二、 在对应的动画资源处将音频添加 1.找到对应的动画帧 2.在对应的行右键添加通知->播放音效 3、选中添加的音效选择对应的音频资…

拦截通信助理,拦截小秘书技术

有人叫做空号识别&#xff0c;有人称为彩铃识别&#xff0c;磐石云通过嵌入软交换进行实时识别前期媒体 案例&#xff1a; 王总公司有20坐席的员工回访用户服务满意度业务&#xff0c;由于用户开通了语音秘书和通信助理&#xff0c;漏话提醒等等&#xff0c;坐席拨打时对方由…

【HTML】开源模拟输入框动画

代码地址&#xff1a; https://uiverse.io/eslam-hany/strange-goose-48代码地址&#xff1a; https://uiverse.io/vnuny/moody-swan-60代码地址&#xff1a; https://uiverse.io/boryanakrasteva/hard-pig-16代码地址&#xff1a; https://uiverse.io/Harsha2lucky/lovely…

TCP和UDP的主要区别以及应用场景

目录 1.主要区别 2.应用场景 1.主要区别 TCP&#xff08;Transmission Control Protocol&#xff09;&#xff1a;有连接&#xff0c;可靠传输&#xff0c;面向字节流&#xff0c;全双工通讯&#xff1b; UDP&#xff08;User Datagram Protocol&#xff09;&#xff1a;无连…

树莓派的启动

我的板子是树莓派3B。 [ 0.000000] Booting Linux on physical CPU 0x0 [ 0.000000] Linux version 6.6.31rpt-rpi-v7 (sergeraspberrypi.com) (gcc-12 (Raspbian 12.2.0-14rpi1) 12.2.0, GNU ld (GNU Binutils for Raspbian) 2.40) #1 SMP Raspbian 1:6.6.31-1rpt1 (202…

【C++ Primer Plus习题】8.6

问题: 解答: #include <iostream> using namespace std;template <typename T> T maxn(T arr[], int len)//通用 {T max 0;for (int i 0; i < len; i){if (max < arr[i]){max arr[i];}}return max; }template<> const char* maxn<const char*&g…

SpringBoot+Vue实现大文件上传(断点续传-后端控制(一))

SpringBootVue实现大文件上传&#xff08;断点续传&#xff09; 1 环境 SpringBoot 3.2.1&#xff0c;Vue 2&#xff0c;ElementUI&#xff0c;spark-md5 2 问题 在前一篇文章&#xff0c;我们写了通过在前端控制的断点续传&#xff0c;但是有两个问题&#xff0c;第一个问题&…

怎么在Windows操作系统部署阿里开源版通义千问(Qwen2)

怎么在Windows操作系统部署阿里开源版通义千问&#xff08;Qwen2&#xff09; | 原创作者/编辑&#xff1a;凯哥Java | 分类&#xff1a;人工智能学习系列教程 添加图片注释&#xff0c;不超过 140 字&#xff08;可选&#xff09; GitHub上qwen2截图 随着人工智能技术的不断…

【华为】轻松get!eNSP登录无线AC Web界面的新姿势

【华为】轻松get&#xff01;eNSP登录无线AC Web界面的新姿势 无线AC&#xff1a;web界面实验准备华为云配置01 拉取设备02添加UDP端口03再添加VMnet1(VMnet8 也行)网段连接AC的端口04最后设置端口映射 无线AC配置01拉取AC设备和连接华为云02配置AC的g0/0/1端口&#xff08;SVI…

AI时代,需要什么样的服务器操作系统?

文&#xff5c;刘俊宏 编&#xff5c;王一粟 AI时代&#xff0c;中国的服务器系统正在面临一场双重挑战。 今年6月底&#xff0c;最主流的开源服务器操作系统CentOS正式停服&#xff0c;找一个合适的操作系统进行迁移成为了必选项。同时&#xff0c;AI时代的到来&#xff0c…

笔记:《利用Python进行数据分析》之数据聚合

观前提示&#xff1a;这节内容不多&#xff0c;但难度较大&#xff0c;尤其是要能熟练运用时很不容易的 数据聚合 聚合指的是任何能够从数组产生标量值的数据转换过程。之前的例子已经用过一些&#xff0c;比如mean、count、min以及sum等。你可能想知道在GroupBy对象上调用me…

网络原理 - 初识

文章目录 局域网(LAN)广域网(WAN)网络设备IP地址格式 端口号格式 认识网络协议协议分层 OSI七层模型(只是理论,没有实际运用)TCP/IP五层&#xff08;或四层&#xff09;模型网络设备所在分层 封装和分用 计算机之间通过网络来传输数据&#xff0c;也称为网络通信。 根据网络互连…

AI问答:.NET核心组成概要、程序运行步骤和查询SDK版本的方法

.NET三大组成 ①Runtime (运行时)&#xff1a; CLR&#xff1a;公共语言运行时&#xff0c;执行程序、内存管理、垃圾回收&#xff08;GC&#xff09;、安全性检查、异常处理&#xff0c;是跨平台的关键要素。 JIT&#xff1a;实时编译器&#xff0c;将中间语言…

JDBC与数据库之间的操作(增删改查、获取主键、业务逻辑分离、属性文件配置)

参考视频哔哩哔哩 1、Service和Servicelmpl的概念 java中service和servicelmpl是常见的代码组织方式 Service是指业务逻辑的接口&#xff0c;定义了系统对外提供的功能。Servicelmpl是Service接口的具体实现&#xff0c;实现了具体的业务逻辑。 Service和Servicelmpl的好处…

Android自定义View实现不同朝向字体变色

实现效果&#xff1a; 1.一个文字两种颜色 2.实现不同朝向 3.结合ViewPager 思路&#xff1a;TextView可行&#xff1f;系统提供的只能够显示一种颜色&#xff0c;需要自定义View extends TextView&#xff1a;onMeasure()不需要实现 textColor颜色&#xff0c;textSize字体大小…

OpenAI Whisper API (InvalidRequestError)

题意: OpenAI Whisper API&#xff08;无效请求错误&#xff09; 问题背景&#xff1a; Im trying to use OpenAI Whisper API to transcribe my audio files. When I run it by opening my local audio files from disk, it worked perfectly. Now Im developing a FastAPI e…

学习WebGl基础知识(二)

学习目标&#xff1a; 掌握WebGl基础知识 学习内容&#xff1a; 创建一个Webgl程序 创建三维上下文对象创建顶点着色器和片元着色器创建和编译顶点着色器和片元着色器创建着色器程序对象绘制图元 创建一个Webgl程序 1.第一步获取画布&#xff0c;创建三维上下文对象 <ca…

一些硬件知识(十七)

电源芯片选型&#xff1a; 1.考虑拓扑结构 2.考虑功率&#xff0c;从而决定自行搭建电路还是选择芯片 3.对于低功耗产品&#xff0c;静态电流是非常重要的因素&#xff0c;一定重要考虑&#xff1a; 同步buck省去了续流二极管&#xff0c;效率比异步的高。 如果真的比耐压值…