单机环境下定时任务的基本原理和常见解决方案(二)之时间轮原理分析

news2024/9/29 5:30:43

单机环境下定时任务的基本原理和常见解决方案之时间轮原理分析

  • 时间轮
    • Netty时间轮使用
    • Netty 时间轮 HashedWheelTimer 源码分析
      • 向时间轮里添加task
      • WorkerThread线程执行时间轮任务
  • 多层时间轮
  • 总结

时间轮

生活中的时钟想必大家都不陌生,而时间轮的设计思想就是来源于生活中的时钟,这个从其命名就可以看出。

时间轮是一种环形的数据结构,我们可以将其想象成时钟的样子,时间轮上有许多格子(bucket),每个格子代表一段时间,时间轮的精度取决于一个格子的代表的时间,比如时间轮的格子是一秒跳一次,那么其调度任务的精度就是1秒,小于一秒的任务无法被时间轮调度。

时间轮上的bucket数量是有限的,而任务的数量是可以无限大的(理论上),所以时间轮使用一个链表来存储放在某个格子上的定时任务。

如下图所示 :
假设一个格子是1秒,整个时间轮有10个格子,那走一圈就表示10s,假如当前指针指向1,此时需要调度一个12s后执的任务,应该等指针走完一圈+2格再执行,因此应放入序号为3的格子,同时将round(1)保存到任务中。

指针随着时间一格格转动,走到每个格子,则检查格子中是否有可以执行的任务。此时时间轮指将链表里round=0的任务取出来执行,其他任务的round都减1。
在这里插入图片描述

简单总结一下,时间轮通过数组+链表的形式来存储定时任务,每个任务存放的bucket的计算公式:
(预计时间-时间轮开始时间)/(每一格的时间*时间轮的bucket数) 对应的商就是round,余数就是bucket的下标(本质还是取模)

Netty 需要管理大量的连接,每一个连接都会有很多检测超时任务,比如发送超时、心跳检测间隔等。
它提供了工具类 HashedWheelTimer 来实现延迟任务。该工具类就是采用时间轮原理来实现的。

Netty时间轮使用

后续的源码分析都是基于4.1.80版本的源码

  <dependency>
     <groupId>io.netty</groupId>
     <artifactId>netty-common</artifactId>
     <version>4.1.80.Final</version>
  </dependency>
    public static void main(String[] args) {
        //创建一个HashedWheelTimer时间轮,有16个格的轮子,每一秒走一个格子
        Timer timer = new HashedWheelTimer(1, TimeUnit.SECONDS, 16);
        System.out.println(Calendar.getInstance().getTime() + "开始执行任务...");
        //添加任务到时间轮中
        timer.newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) {
                System.out.println(Calendar.getInstance().getTime() + ":执行任务1");
            }
        }, 5, TimeUnit.SECONDS);
        timer.newTimeout(timeout ->
                        System.out.println(Calendar.getInstance().getTime() + ":执行任务2"), 8,
                TimeUnit.SECONDS);
    }

在这里插入图片描述

Netty 时间轮 HashedWheelTimer 源码分析

构造方法的三个参数分别代表

  • tickDuration 每一tick的时间,走一格是多久
  • timeUnit tickDuration的时间单位
  • ticksPerWheel 时间轮一共有多个格子,即一圈表示多少个tick。
  public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts) {
        this.worker = new Worker();
        //一个CountDownLatch
        this.startTimeInitialized = new CountDownLatch(1);
        //mpsc队列
        this.timeouts = PlatformDependent.newMpscQueue();
        //mpsc队列
        this.cancelledTimeouts = PlatformDependent.newMpscQueue();
        this.pendingTimeouts = new AtomicLong(0L);
        ObjectUtil.checkNotNull(threadFactory, "threadFactory");
        ObjectUtil.checkNotNull(unit, "unit");
        ObjectUtil.checkPositive(tickDuration, "tickDuration");
        ObjectUtil.checkPositive(ticksPerWheel, "ticksPerWheel");
        //创建时间轮,默认创建512个轮 就是创建一个长度为512的HashedWheelBucket数组
        this.wheel = createWheel(ticksPerWheel);
        this.mask = this.wheel.length - 1;
        //默认tickDuration=100ms 
        long duration = unit.toNanos(tickDuration);
        if (duration >= Long.MAX_VALUE / (long)this.wheel.length) {
            throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / (long)this.wheel.length));
        } else {
            if (duration < MILLISECOND_NANOS) {
                logger.warn("Configured tickDuration {} smaller then {}, using 1ms.", tickDuration, MILLISECOND_NANOS);
                this.tickDuration = MILLISECOND_NANOS;
            } else {
                this.tickDuration = duration;
            }
            //创建一个线程workerThread,此时未启动(延迟启动,当有任务添加后再启动)
            this.workerThread = threadFactory.newThread(this.worker);
            this.leak = !leakDetection && this.workerThread.isDaemon() ? null : leakDetector.track(this);
            this.maxPendingTimeouts = maxPendingTimeouts;
            if (INSTANCE_COUNTER.incrementAndGet() > 64 && WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
                reportTooManyInstances();
            }

        }
    }

向时间轮里添加task

  public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        ObjectUtil.checkNotNull(task, "task");
        ObjectUtil.checkNotNull(unit, "unit");
        long pendingTimeoutsCount = this.pendingTimeouts.incrementAndGet();
        //如果maxPendingTimeouts>0,则表示对于存储的任务有上限,默认无限制
        if (this.maxPendingTimeouts > 0L && pendingTimeoutsCount > this.maxPendingTimeouts) {
            this.pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending timeouts (" + this.maxPendingTimeouts + ")");
        } else {
            //启动workerThread线程
            this.start();
            //判断当前任务还要多长时间执行(这里的startTime就是workerThread的启动时间,执行到这的时候startTime一定有值,否则this.start()会阻塞)
            long deadline = System.nanoTime() + unit.toNanos(delay) - this.startTime;
            if (delay > 0L && deadline < 0L) {
                deadline = Long.MAX_VALUE;
            }
            //封装成HashedWheelTimeout,并将其加入到MpscQueue(timeouts队列)
            //MPSC: Multi producer, Single consumer FIFO
            HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
            this.timeouts.add(timeout);
            return timeout;
        }
    }

workerThread线程延迟启动

    public void start() {
        switch (WORKER_STATE_UPDATER.get(this)) {
            case 0:
                //通过CAS保证线程安全,workThread线程只会启动一次
                if (WORKER_STATE_UPDATER.compareAndSet(this, 0, 1)) {
                    this.workerThread.start();
                }
            case 1:
                break;
            case 2:
                throw new IllegalStateException("cannot be started once stopped");
            default:
                throw new Error("Invalid WorkerState");
        }
        //当startTime==0L,表示workThread线程还没有启动,通过CountDownLatch阻塞在这,直到workThread线程启动
        while(this.startTime == 0L) {
            try {
                this.startTimeInitialized.await();
            } catch (InterruptedException var2) {
            }
        }

    }

WorkerThread线程执行时间轮任务

  public void run() {
            //设置startTime
            HashedWheelTimer.this.startTime = System.nanoTime();
            if (HashedWheelTimer.this.startTime == 0L) {
                HashedWheelTimer.this.startTime = 1L;
            }
            HashedWheelTimer.this.startTimeInitialized.countDown();

            int idx;
            HashedWheelBucket bucket;
            //自旋直到wheelTimer被关闭
            do {
                //计算时间轮走到下一个tick的时间点(如果没有到时间则通过sleep休眠等待),这里返回的deadline是当前时间距离时间轮启动经过的时间(deadline小于0说明数据异常,不执行操作)
                long deadline = this.waitForNextTick();
                if (deadline > 0L) {
                    //根据tick与轮的大小取模 得到当前tick所在的bucket的下标
                    idx = (int)(this.tick & (long)HashedWheelTimer.this.mask);
                    //处理已经取消的task(将取消队列里的任务从bucket丢弃,如果已经放入到bucket里的话)
                    this.processCancelledTasks();
                    bucket = HashedWheelTimer.this.wheel[idx];
                    //将timeouts队列中缓存的任务取出加入到时间轮中
                    this.transferTimeoutsToBuckets();
                    //处理当前bucket所有的到期任务
                    bucket.expireTimeouts(deadline);
                    ++this.tick;
                }
            } while(HashedWheelTimer.WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == 1);
            //wheelTimer被关闭后的处理,取出每一个bucket里还没被执行的任务,放到unprocessedTimeouts中
            HashedWheelBucket[] var5 = HashedWheelTimer.this.wheel;
            int var2 = var5.length;

            for(idx = 0; idx < var2; ++idx) {
                bucket = var5[idx];
                bucket.clearTimeouts(this.unprocessedTimeouts);
            }

            while(true) {
                HashedWheelTimeout timeout = (HashedWheelTimeout)HashedWheelTimer.this.timeouts.poll();
                //处理所有被取消的任务
                if (timeout == null) {
                    this.processCancelledTasks();
                    return;
                }
                //将加到timeouts队列里的任务添加到把未处理的unprocessedTimeouts队列中
                if (!timeout.isCancelled()) {
                    this.unprocessedTimeouts.add(timeout);
                }
            }
        }

主要流程:

1.如果HashedWheelTimer未关闭,则等待到达下一个tick的时间(未到则sleep)
2.到达下一tick时间后

  • 1)将已取消的任务丢弃
  • 2)然后将timeouts队列里的任务迁移到bucket对应的位置上
  • 3)获取当前tick对应的bucket,执行其中已经到达执行时间的任务

3.如果HashedWheelTimer已关闭,则将bucket里还没被执行的任务和timeouts队列里未取消的任务,统一放到unprocessedTimeouts队列中。
然后统一处理取消队列里的任务(processCancelledTasks) 也就是说已取消的任务在取消操作时只是放入到取消队列里,并没有从timeouts队列或者bucket里移除

  private long waitForNextTick() {
            //计算下一个tick的时间点,该时间是相对时间轮启动时间的相对时间
            long deadline = HashedWheelTimer.this.tickDuration * (this.tick + 1L);
            //自旋
            while(true) {
                //计算时间轮启动后经过的时间
                long currentTime = System.nanoTime() - HashedWheelTimer.this.startTime;
                //判断需要休眠的时间
                long sleepTimeMs = (deadline - currentTime + 999999L) / 1000000L;
                if (sleepTimeMs <= 0L) {
                    if (currentTime == Long.MIN_VALUE) {
                        return -9223372036854775807L;
                    }
                    //如果当前时间大于下一个tick的时间,则直接返回(说明到执行任务的时间了),否则sleep
                    return currentTime;
                }

                if (PlatformDependent.isWindows()) {
                    sleepTimeMs = sleepTimeMs / 10L * 10L;
                    if (sleepTimeMs == 0L) {
                        sleepTimeMs = 1L;
                    }
                }

                try {
                    //休眠对应的时间
                    Thread.sleep(sleepTimeMs);
                } catch (InterruptedException var8) {
                    if (HashedWheelTimer.WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == 2) {
                        return Long.MIN_VALUE;
                    }
                }
            }
        }
 private void transferTimeoutsToBuckets() {
     //从timeouts队列中获取任务,每次最多只能获取10万个任务
            for(int i = 0; i < 100000; ++i) {
                HashedWheelTimeout timeout = (HashedWheelTimeout)HashedWheelTimer.this.timeouts.poll();
                //timeout==null说明timeouts队列已经空了
                if (timeout == null) {
                    break;
                }
                //计算执行该任务需要放到哪个bucket下并且对应的round为多少
                if (timeout.state() != 1) {
                    //计算任务的执行时间
                    //这里的deadline是任务执行时间相对时间轮开始时间的时间,也就是计算从时间轮的开始时间算起,需要经过多少次tick
                    long calculated = timeout.deadline / HashedWheelTimer.this.tickDuration;
                    timeout.remainingRounds = (calculated - this.tick) / (long)HashedWheelTimer.this.wheel.length;
                    //calculated和tick去较大者,就是说如果当前任务的执行时间已过期,则让其在当前tick执行
                    long ticks = Math.max(calculated, this.tick);
                    //计算该任务要在哪个bucket下执行
                    int stopIndex = (int)(ticks & (long)HashedWheelTimer.this.mask);
                    HashedWheelBucket bucket = HashedWheelTimer.this.wheel[stopIndex];
                    //HashedWheelBucket底层是一个HashedWheelTimeout的链表 
                    bucket.addTimeout(timeout);
                }
            }

        }

处理已到执行时间的任务

// 这里的deadline是当前时间距离时间轮启动经过的时间
public void expireTimeouts(long deadline) {
            HashedWheelTimeout next;
            //从头遍历HashedWheelTimeout链表
            for(HashedWheelTimeout timeout = this.head; timeout != null; timeout = next) {
                next = timeout.next;
                if (timeout.remainingRounds <= 0L) {
                    next = this.remove(timeout);
                    //说明当前任务的执行时间大于deadline,中间可能哪里出现故障,抛出异常
                    if (timeout.deadline > deadline) {
                        throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
                    }
                    //调用timeout.expire方法,执行task,通过ImmediateExecutor线程池执行任务,实际就是调用task的run方法
                    timeout.expire();
                } else if (timeout.isCancelled()) {
                    next = this.remove(timeout);
                } else {
                    //未到执行时间,remainingRounds-1
                    --timeout.remainingRounds;
                }
            }

        }

多层时间轮

当时间跨度很大时,提升单层时间轮的 tickDuration 可以减少空转次数,但会导致时间精度变低,使用多层时间轮既可以避免精度降低,也能减少空转次数。

如果有时间跨度较长的定时任务,则可以交给多层级时间轮去调度。

假设有一个设置为5天14 小时40 分30 秒后执行的定时任务,在 tickDuration = 1s 的单层时间轮中,需要经过:5x24x60x60+14x60x60+40x60+30 数十万次tick才能被执行。
但在 wheel1 tickDuration = 1 天,wheel2 tickDuration = 1 小时,wheel3 tickDuration = 1 分,wheel4 tickDuration = 1 秒 的四层时间轮中,只需要经过 5+14+40+30 次tick就可以了。

总结

while+sleepTimerScheduledThreadPoolExecutorHashedWheelTimer
实现方式while+sleep最小堆最小堆基于时间轮
写入效率-O(logN)O(logN)类HashMap,近似O(1)
查询效率-O(1)O(1)近似O(1)
优点实现简单 O(1)可以对大量定时任务进行统一调度线程池执行,有异常捕获机制写入性能高
缺点对于大量定时任务不便于管理单线程执行;没有异常捕获机制写入效率较低,在需要大量添加定时任务的场景下会影响性能单线程执行;没有异常捕捉机制

注意下,HashedWheelTimer 的写入和查询效率都是近似O(1),由于链表的存在,如果要执行任务的存放在长链表的末尾,那他的查询性能可能会很差,HashMap通过扰动函数来将减少hash冲突,时间轮也可以通过设置合适的时间精度,来减少hash冲突

Netty对时间轮的实现是基于他的使用场景,我们可以根据不同的业务场景对时间轮进行优化

  1. 将所有的任务交给线程池执行,避免单个任务的执行耗时较长影响下一个任务的执行
  2. 可以给每个bucket设置一个线程池来执行这个bucket的任务
  3. 假设需要在同一时刻,执行大量比较耗时的任务,那么可以通过MQ解耦,然后使用消费者并发执行任务,提高性能
    。。。。

选择哪一种方式来实现定时/延迟任务取决于各自的业务场景。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/803692.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【3】-使用@task设置测试用例执行的权重

多个测试链路压测使测试任务按预想的比例执行 locust的task装饰器提供了入参weight&#xff0c;locust执行测试任务时&#xff0c;会根据weight的比例进行分配用户数 from locust import task, HttpUserclass MyTestUser(HttpUser):# test_01 : test_02 3 : 1task(3)def wei…

数据结构:分块查找

分块查找&#xff0c;也叫索引顺序查找&#xff0c;算法实现除了需要查找表本身之外&#xff0c;还需要根据查找表建立一个索引表。例如图 1&#xff0c;给定一个查找表&#xff0c;其对应的索引表如图所示&#xff1a; 图 1 查找表及其对应的索引表 图 1 中&#xff0c;查找表…

安全测试国家标准解读——资源管理和内存管理

下面的系列文章主要围绕《GB/T 38674—2020 信息安全技术 应用软件安全编程指南》进行讲解&#xff0c;该标准是2020年4月28日&#xff0c;由国家市场监督管理总局、国家标准化管理委员会发布&#xff0c;2020年11月01日开始实施。我们对该标准中一些常见的漏洞进行了梳理&…

c++学习(哈希)[21]

哈希 哈希表&#xff08;Hash Table&#xff09;&#xff0c;也称为散列表&#xff0c;是一种常用的数据结构&#xff0c;用于实现键值对的存储和查找。它通过将键映射到一个索引位置来快速地访问和操作数据。 哈希表的基本思想是使用一个哈希函数将键映射到一个固定范围的整…

自定义 View(六) 自定义评分星星

先看看效果图 1.自定义 View 的基本流程 创建 View Class创建 attr 属性文件&#xff0c;确定属性View Class 绑定 attr 属性onMeasure 测量onDraw 绘制onTouchEvent ( 用户交互需要处理 ) 1.1 创建 View Class package com.example.view_day05_ratingbar;import android.…

LabVIEW实现三相异步电机磁通模型

LabVIEW实现三相异步电机磁通模型 三相异步电动机由于经济和出色的机电坚固性而广泛用于工业化应用。这台机器的设计和驱动非常简单&#xff0c;但在控制扭矩和速度方面&#xff0c;它隐藏了相当大的功能复杂性。通过数学建模&#xff0c;可以理解机器动力学。 基于微分方程的…

day44-Custom Range Slider(自定义范围滑块)

50 天学习 50 个项目 - HTMLCSS and JavaScript day44-Custom Range Slider&#xff08;自定义范围滑块&#xff09; 效果 index.html <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewp…

vue中tab隐藏display:none(v-show无效,v-if有效)

目录 背景 原因&#xff1a;display: table-cell>display:none 解决&#xff1a; 方法A.获取元素设置display&#xff08;适用于 简单场景&#xff09; 方法B.自定义tabs​​​​​​​ &#xff08;适用于 复杂场景&#xff09; 背景 内联样式(style“ ”) /this.$…

JVM简述

JDK&JRE&JVMJVM运行时内存结构图方法区堆区栈区程序计数器本地方法栈 JVM 的主要组成部分及其作用 JDK&JRE&JVM JVM就是java虚拟机&#xff0c;一台虚拟的机器&#xff0c;用来运行java代码 但并不是只有这台机器就可以的&#xff0c;java程序在运行时需要依赖…

Linux权限提升:自动化信息收集

在本文中&#xff0c;我们将介绍在基于Linux的设备上进行初始访问后&#xff0c;可用于后渗透阶段漏洞利用和枚举的一些自动化脚本。 ** 介绍** 大多数时候&#xff0c;当攻击者攻击Linux操作系统时&#xff0c;他们将获得基本的Shell&#xff0c;可以将其转换为TTY Shell或m…

apple pencil值不值得购买?便宜的电容笔推荐

如今&#xff0c;对ipad使用者而言&#xff0c;苹果原装的Pencil系列无疑是最佳的电容笔。只是不过这款电容笔的售价&#xff0c;实在是太高了&#xff0c;一般的用户都无法入手。因此&#xff0c;在具体的使用过程中&#xff0c;如何选用一种性能优良、价格低廉的电容笔是非常…

Jmeter+验证json结果是否正确小技巧

前言&#xff1a; 通过sql语句或者返回的参数&#xff0c;可以在查看结果树返回的结果中&#xff0c;用方法先跑一下验证是否取到自己想要的值 步骤&#xff1a; 1、添加查看结果树 2、跑出结果 3、在查看结果树中 text改成选Json Path Tester 返回的值如果是列表里面的字符…

英码“深元”智慧工厂解决方案,提升管理效率,开启生产新时代!

智慧工厂&#xff0c;作为数字化和智能化的代表&#xff0c;深度融合了边缘计算、物联网、大数据分析和人工智能等技术&#xff0c;为传统工厂管理和生产带来了深刻的影响。英码“深元”智慧工厂解决方案&#xff0c;利用智能终端——“深元”AI工作站实时采集传输现场视频&…

8个特别好用的矢量图软件,一定要收藏

在设计工作中&#xff0c;矢量图软件能帮助设计师绘制出具有更高质量&#xff0c;更高清晰度的图画作品。本文整理了市面上8个好用的矢量图软件&#xff0c;一起来看看吧&#xff01; 1、即时灵感 即时灵感是基于云端运行的矢量图软件&#xff0c;完全可以满足运营、产品经理…

荣登央视,智慧集中供冷,未来空调技术的新趋势

我们上了央视了&#xff01;这不是开玩笑也不是蹭热度&#xff0c;最近我们做的一个项目被作为正向报导了&#xff0c;可以说是一件引以为傲的事情&#xff0c;具体涉及的项目&#xff0c;就是作为未来空调技术的集中供冷系统。 今年夏天&#xff0c;想必大家也都感觉到了&…

【phpstudy】Apache切换Nginx报错nginx: [emerg] CreateFile()

【phpstudy】Apache切换Nginx报错nginx: [emerg] CreateFile() 报错内容如下&#xff1a; nginx: [emerg] CreateFile() “D:/phpstudy_pro/WWW/www.xxx.com/nginx.htaccess” failed (2: The system cannot find the file specified) in D:\phpstudy_pro\Extensions\Nginx1.…

【WebGIS实例】(10)Cesium开场效果(场景、相机旋转,自定义图片底图)

效果 漫游效果视频&#xff1a; 【WebGIS实例】&#xff08;10&#xff09;Cesium开场效果&#xff08;场景、相机 点击鼠标后将停止旋转并正常加载影像底图&#xff1a; 代码 可以直接看代码&#xff0c;注释写得应该比较清楚了&#xff1a; /** Date: 2023-07-28 16:21…

接口自动化如何做?接口自动化测试- 正则用例参数化(实例)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 我们在做接口自动…

ElasticSearch基本使用--ElasticSearch文章一

文章目录 官网学习必要性elasticsearch/kibana安装版本数据结构说明7.x版本说明ElasticSearch kibana工具测试后续我们会一起分析 官网 https://www.elastic.co/cn/ 学习必要性 1、在当前软件行业中&#xff0c;搜索是一个软件系统或平台的基本功能&#xff0c; 学习Elastic…

树莓派微型web服务器——阶段设计报告

文章目录 1. 需求分析1.1 功能需求1.1.1 访问需求1.1.2 自定义域名需求1.1.3 下载公共文件需求1.1.4 用户体验需求 1.2 技术需求1.2.1 操作系统指令1.2.2 技术栈1.2.3 内网穿透 1.3 性能需求1.3.1 处理能力1.3.2 内存1.3.3 存储空间 2. 可行性分析2.1 硬件方面2.2 软件方面 3. …