【Netty】Netty时间轮实践与源码解析

news2024/11/8 21:46:26

目录

  • 定时任务
  • JDK定时任务
    • Timer
      • 原理
    • ScheduledThreadPoolExecutor
  • 时间轮算法
    • netty时间轮架构
  • netty时间轮 源码解析
    • 基本使用
    • HashedWheelTimer 初始化
      • createWheel 创建HashedWheelBucket数组
    • newTimeout 添加任务
    • 执行任务
    • 时间轮的优缺点
  • 品一品优秀设计
    • 实际的生产环境选择

定时任务

本篇来聊聊定时与时间轮,对于一个应用来说,其实很多场景需要采用定时任务进行执行,比如每天定时发送统计报表的数据,金融支付中的对账文件推送,除了业务场景中,在客户端服务端常连接的心跳检测、延时任务执行中其实都需要定时任务进行支持。JDK本身提供了几种定时任务,但是netty进一步优化了,那么本篇带着这三个问题聊聊

  • Jdk原生方案有哪些不满足的地方
  • Netty为什么采用时间轮算法来实现
  • netty时间轮算法流程源码解析

其实定时任务有三种方式,固定周期执行 (每天9点执行)、延迟一定时间执行(5S后执行)、执行某个时间执行(1月20号 10点执行)

JDK定时任务

Timer

具体执行就是集成TimerTask任务,覆盖run方法,其实就是另外启动一个线程。

public class TestTimer extends TimerTask{

    public static void main(String[] args) {
        Timer timer = new Timer();
        timer.schedule(new TestTimer(),6000); // 6S后执行
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()+"\t timer");
    }
}

原理

在这里插入图片描述

在构造方法的时候 创建一个线程。而这个线程会从queue中获取对应的任务执行,

	private final TaskQueue queue = new TaskQueue(); // 小根堆
	private final TimerThread thread = new TimerThread(queue);
    
    public Timer(String name) {
        thread.setName(name);
        thread.start();
    }

TaskQueue是一个数组实现的小根堆,越快deadline执行的任务,越在根部,所以执行一个任务的之间复杂度是O(1) 但是插入一个任务的时间复杂度是O(LogN)

ScheduledThreadPoolExecutor

上面的Timer其实存在一定设计的缺陷

  • Timer是单线程模型,如果一个任务执行太久,可能影响后面任务的执行。
  • TimerTask 如果执行出现异常,Timer 并不会捕获,会导致线程终止,其他任务永远不会执行。
   ScheduledExecutorService exthreadPool = Executors.newScheduledThreadPool(5);
    exthreadPool.scheduleAtFixedRate(() ->
            System.out.println("HelloWorld"),
            1000, // 延迟1S后开始执行
            2000,  // 2S执行一次
            TimeUnit.MILLISECONDS);

在这里插入图片描述
可以发现上述其实都有 执行任务的线程,添加任务的主线程、封装任务的任务。但是由于其本身数据结构的复杂度是O(N) , 所以在海量任务执行的场景中,其实性能和效率并不高。

时间轮算法

时间轮就是一个环形队列,类似于时钟的结构,所以叫时间轮。

在这里插入图片描述
如图所示,将时间轮划分为16个格子,每1S执行一个格子的任务。而每个格子的都是一个链表结构,通过指针进行连接所要执行的任务。

比如当前要执行一个1S后执行的任务,就可以根据当前指针所指的位置,添加到对应的3位置,但是如果要执行的是26S后的位置,那么如何计算的呢。其实很简单就是 26%15 = 11, 也就是一轮之后的第11个位置。

相比于JDK提供的定时任务,时间轮算法增加、删除任务的时间复杂度都是O(1) ,而时间轮算法是一种思想,具体的落地在Netty、Kafka、Dubbo中都有对应的具体实现。

netty时间轮架构

在这里插入图片描述
在这里插入图片描述
整体架构其实由三部分组成

  • HashedWheelBucket 数组 构建成一个时间轮结构
    • 每个格子 其实就是一个HashedWheelBucket对象,内部持有head、tail节点 分别指向 HashedWheelTimeout 具体的任务
  • HashedWheelTimeout 封装具体执行的任务
  • Woker 执行具体的任务

netty时间轮 源码解析

基本使用

public class TestHashWheelTimer {

    public static void main(String[] args) {
        // tickDuration : 时间间隔
        // TimeUnit : 单位
        // ticksPerWheel : 划分多少块  默认512
        HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(1, TimeUnit.SECONDS, 20);
        hashedWheelTimer.newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                System.out.println(Thread.currentThread().getName()+"\t hello");
            }
        }, 1, TimeUnit.SECONDS);
    }
}

其实主要就是初始化时间轮、添加任务(之后执行任务)

HashedWheelTimer 初始化

    1. 先进行参数的有效检查
    1. 创建时间轮,以及计算 & 运算的掩码
    1. 创建执行任务的线程
    1. 异常边界处理
	// threadFactory 线程工厂 构建woker线程的
    // tickDuration 时间间隔
    // ticksPerWheel 时间轮的槽
    public HashedWheelTimer(
            ThreadFactory threadFactory,
            long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
            long maxPendingTimeouts, Executor taskExecutor) {

        // 有效性检查
        checkNotNull(threadFactory, "threadFactory");
        checkNotNull(unit, "unit");
        checkPositive(tickDuration, "tickDuration");
        checkPositive(ticksPerWheel, "ticksPerWheel");
        this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");
        
        // 核心 ⭐️
        wheel = createWheel(ticksPerWheel);
        // 掩码 计算任务  应该存放在时间轮的具体位置 & 替换 % 运算
        mask = wheel.length - 1;

        // 转换成纳秒 10^6 毫秒 秒
        long duration = unit.toNanos(tickDuration);

        // 创建woker线程
        workerThread = threadFactory.newThread(worker);

        // 指定最大延时任务有多少
        this.maxPendingTimeouts = maxPendingTimeouts;

        // 控制HashedWheelTimer的实例 不能超过64
        if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
            WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
            reportTooManyInstances();
        }
    }

createWheel 创建HashedWheelBucket数组

  • 1.这里其实就是判断传入的值,如果是2的幂次 直接使用,否则找到第一个大于该数的2的次幂,比如当前传入的是15,那么就会转换成16,至于为什么要进行这样处理,这样的方式 可以利用 i & (length - 1) 进行高效的位运算。而前提必须是2的次幂。
  • 2.创建时间轮数组 并且初始化每个单独时间轮,需要注意的是 HashedWheelBucket中包含头和尾部节点,通过双向链表的方式
    private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
        // 调整成2的次幂
        // 目的: 位运算 替换 %的 运算 效率高
        ticksPerWheel = MathUtil.findNextPositivePowerOfTwo(ticksPerWheel);

        // 初始化数组 HashedWheelBucket
        HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
        for (int i = 0; i < wheel.length; i ++) {
            wheel[i] = new HashedWheelBucket();
        }
        return wheel;
    }

在这里插入图片描述

newTimeout 添加任务

  • 1.先判断任务和时间单位是否为空
  • 2.延时任务+1 以及判断是否超过阈值
  • 3.启动执行任务的核心线程
  • 4.将任务封装进 HashedWheelTimeout 并且添加到timeouts队列中
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        // 判断处理
        checkNotNull(task, "task");
        checkNotNull(unit, "unit");

        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

        // 超越延时任务的最大值
        // 控制一个HashedWheelTimer中的 延时任务的个数
        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
            pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts ("
                + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                + "timeouts (" + maxPendingTimeouts + ")");
        }

        // 核心 ⭐️ 启动woker线程
        start();

        // 当前时间+延时时间 - 开启时间
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

        // Guard against overflow.
        if (delay > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        }
        // 创建任务 封装 HashedWheelTimeout
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        // 添加到队列中 不会立即加入到Bucked 加入到一个Mpsc的队列中 无所队列 并发效率比较高
        timeouts.add(timeout);
        return timeout;
    }

执行任务

启动的时候,其实先启动woker线程,然后会判断启动时间starTime =0 会使用CountDownLatch进行Wait()

    public void start() {
        // 线程启动
        // 判断线程状态
        switch (WORKER_STATE_UPDATER.get(this)) {
            case WORKER_STATE_INIT:
                if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                    // 启动线程 只有init才能启动
                    workerThread.start();
                }
                break;
        }
        
        // 线程不一定马上启动  会进行等待
        while (startTime == 0) {
            try {
                // 线程启动完成后, woker线程进行等待唤醒
                // 后续会进行唤醒
                startTimeInitialized.await();
            } 
        }
    }
  • 1.启动线程,然后赋值startTime的值 countDown() 上面的步骤就可以执行了。
  • 2.等到下一批次要执行的时间,如果不到就sleep
  • 3.计算当前任务轮下标,取出对应时间轮的链表
  • 4.从队列中获取10W个任务,然后添加到对应的位置
  • 5.从链表中获取任务进行执行。
        public void run() {
            startTime = System.nanoTime();
            if (startTime == 0) {
                 startTime = 1;
            }

            startTimeInitialized.countDown();

            do {
                final long deadline = waitForNextTick();
                if (deadline > 0) {
                    // % 操作
                    int idx = (int) (tick & mask);
                    processCancelledTasks();
                    // 获取对应时间轮中的链表
                    HashedWheelBucket bucket =
                            wheel[idx];
                    transferTimeoutsToBuckets();
                    //
                    bucket.expireTimeouts(deadline);
                    tick++;
                }
                // worker 如果一直启动 就一直循环
            } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

            for (HashedWheelBucket bucket: wheel) {
                bucket.clearTimeouts(unprocessedTimeouts);
            }
            for (;;) {
                HashedWheelTimeout timeout = timeouts.poll();
                if (timeout == null) {
                    break;
                }
                if (!timeout.isCancelled()) {
                    unprocessedTimeouts.add(timeout);
                }
            }
            processCancelledTasks();
        }

        private void transferTimeoutsToBuckets() {
            // 先取10W个任务 不关心任务在时间轮的格子
            for (int i = 0; i < 100000; i++) {
                HashedWheelTimeout timeout = timeouts.poll();
                if (timeout == null) {
                    break;
                }
                if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
                    continue;
                }

                // 结束时间 / 时间轮的个数  还有几格需要执行
                long calculated = timeout.deadline / tickDuration;
                // 获取几轮执行
                timeout.remainingRounds = (calculated - tick) / wheel.length;

                final long ticks = Math.max(calculated, tick);
                int stopIndex = (int) (ticks & mask);

                HashedWheelBucket bucket = wheel[stopIndex];
                // 存储在对应的格子中 链表结构
                bucket.addTimeout(timeout);
            }
        }
        public void expireTimeouts(long deadline) {
            HashedWheelTimeout timeout = head;

            while (timeout != null) {
                HashedWheelTimeout next = timeout.next;
                if (timeout.remainingRounds <= 0) {
                    next = remove(timeout);
                    // 保证当前链表中,过期任务也可以执行
                    // deadline 当前时间 远远大于timeOut的deadline
                    // 这个timerout已经过期了 会执行
                    if (timeout.deadline <= deadline) {
                        // 执行
                        timeout.expire();
                    } else {
                        throw new IllegalStateException(String.format(
                                "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
                    }
                } else if (timeout.isCancelled()) {
                    next = remove(timeout);
                } else {
                    timeout.remainingRounds --;
                }
                timeout = next;
            }
        }

在这里插入图片描述

时间轮的优缺点

优点

  • 高效的插入和过期检查:插入的时间复杂度O(1) 因为是直接定位到对应的格子以及链表操作
  • 可配置的时间粒度:时间轮的槽数量可以可配置
  • 处理大量定时任务:比较适合处理大量定时任务的场景,超时监测。

缺点

  • 任务延迟执行:执行的时间可能不精确,会延后。
  • 极端情况的空推进,A任务1S后执行,B任务6小时后执行,中间这段时间就是空推进。这里Kafka使用了多级时间轮的方式进行解决。

品一品优秀设计

1.利用CountDownLatch锁
巧妙使用CountDownLatch 进行等待线程启动后 才执行后面的任务添加。

2.MPSC的应用
性能优化&线程安全,使用MPSC可以多生产者安全添加任务,单消费者消费,既减少了并发竞争也提高的了性能。

实际的生产环境选择

ScheduledThreadPoolExecutor和HashedWheelTimer 各有优劣,需要根据使用场景进行权衡

  • 关注任务调度的及时性:选择ScheduledThreadPoolExecutor
  • 存在大量调度任务:选择HashedWheelTimer

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2072649.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

css中块,行内块,行内元素转换

参考 元素作用范围 块元素 会在显示时自动换行&#xff0c;例如p标签div等 行内元素 例如span&#xff0c;可以认为是一个不换行的块&#xff0c;其他还有label等 行内块元素 例如img标签显示图片&#xff0c;但不换行 区别 块元素可以设置宽高&#xff0c;但行元素不…

C/C++语言基础--结构体知识详解(包括:结构体数组、字节对齐、位段等内容)

本专栏目的 更新C/C的基础语法&#xff0c;包括C的一些新特性 前言 C语言地结构体是核心内容之一&#xff0c;他运行自定义数据类型&#xff0c;可以将不同地数据类型当作成一个整体&#xff0c;变成一个数据类型&#xff0c;运用及其广泛欢迎点赞 收藏 关注&#xff0c;本…

UE管理内容 —— Alembic File Importer

目录 从Maya导出ABC缓存 导入ABC到UE 导入为静态网格体 导入为几何体缓存 导入为Skeletal Alembic文件格式(.abc)是一个开放的计算机图形交换框架&#xff0c;将复杂的动画化场景浓缩成一组非过程式的、与应用程序无关的烘焙几何结果&#xff1b;可以在外部自由地创建复杂…

如何查看ubuntu版本

在当前的技术环境中&#xff0c;了解操作系统的具体版本对于用户来说至关重要。这不仅能确保软件兼容性&#xff0c;还有助于进行系统管理和故障排查。对于使用Ubuntu系统的用户来说&#xff0c;有几种不同的方法可以查看当前系统的版本。下面将详细介绍如何查看您的Ubuntu系统…

CSS文本样式(二)

一、水平对齐文本 1、text-align属性 text-align​属性指定元素中文本的​水平对齐方式​。 默认情况下&#xff0c;您网站上的文字左对齐。 但是&#xff0c;有时您可能需要不同的对齐方式。 文本对齐属性值如下&#xff1a;​left​&#xff0c;​right​&#xff0c;​cen…

数据结构(Java实现):链表与LinkedList

文章目录 1. 单向链表1.1 链表的概念及结构1.2 链表的实现1.2.1 单向链表类和节点1.2.2 打印每个节点的值1.2.3 计算链表长度1.2.4 头插节点1.2.5 尾插节点1.2.6 在指定下标插入新节点1.2.7 判断是否存在某个节点1.2.8 移除某个节点1.2.9 移除所有指定节点1.2.10 清空链表1.2.1…

【Linux:管道】

进程间通信背景&#xff1a; 每一个进程想要访问物理内存&#xff0c;都是通过访问进程虚拟地址空间当中的虚拟地址进行访问&#xff0c;访问时&#xff0c;通过各自的页表结构&#xff0c;造成了每一个进程和每一个进程的数据独立&#xff0c;由于进程独立性的存在&#xff0c…

Java | Leetcode Java题解之第373题查找和最小的K对数字

题目&#xff1a; 题解&#xff1a; class Solution {public List<List<Integer>> kSmallestPairs(int[] nums1, int[] nums2, int k) {int m nums1.length;int n nums2.length;/*二分查找第 k 小的数对和的大小*/int left nums1[0] nums2[0];int right nums…

Github 2024-08-25 php开源项目日报 Top10

根据Github Trendings的统计,今日(2024-08-25统计)共有10个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量PHP项目10Blade项目1Laravel: 以优雅语法简化Web开发 创建周期:4028 天开发语言:PHP协议类型:MIT LicenseStar数量:30824 个Fork数量:1052…

windows安装wsl,出现错误WslRegisterDistribution failed with error: 0x8007019e的解决方案

错误WslRegisterDistribution failed with error: 0x8007019e 笔者直接从Microsoft Store 安装 WSL后&#xff0c;没有其他操作&#xff0c;直接打开WSL&#xff0c;结果出现Error: 0x8007019e错误提示&#xff1a; Error 0x8007019e 解决方案 &#xff08;1&#xff09;Win…

滑块自动化分析

大家好!我是炒青椒不放辣,关注我,收看每期的编程干货。 滑块分析是爬虫工程师进阶必备技能,当我们遇到一个问题时可能会有多种解决途径,而如何做出高效的抉择和完善的解决流程又需要经验的积累。本期文章将以实战的方式,带你使用 playwright 进行滑块分析,不仅会告诉你应…

iPhone抹掉数据后能恢复吗?详解数据恢复的可能性与方法

在使用iPhone的过程中&#xff0c;有时候我们会因为各种原因选择“抹掉所有内容和设置”&#xff0c;以期望将手机恢复到出厂状态。然而&#xff0c;一旦执行了这个操作&#xff0c;很多用户就会开始担心&#xff1a;iPhone抹掉数据后&#xff0c;这些数据还能恢复吗&#xff1…

VMware安装Ubuntu20.04

1. 下载 整理的镜像链接 阿里网盘&#xff1a;阿里云盘快传 2. 新建虚拟机向导 选择自定义&#xff0c;然后下一步。 默认配置&#xff0c;下一步。 选择稍后安装操作系统&#xff0c;下一步。 选择操作系统Linux&#xff0c;版本Ubuntu64位&#xff0c;下一步。 给虚拟机命名…

2534. 乘方 [CSP-J 2022]

代码 #include<bits/stdc.h> using namespace std; int main() {long long n,m,i,sum1;cin>>n>>m;for(i1;i<m;i){sum*n;if(sum>1000000000){cout<<-1;return 0;;}}cout<<sum;return 0; } 记得点赞关注收藏&#xff01;&#xff01;&…

根据股票列表获取资金流入情况

获取股票列表 作为演示&#xff0c;以创业板为例&#xff08;数据不多&#xff09;&#xff0c;我们通过自编的 get_stock_list 方法获取股票列表&#xff1a; import pandas from bad import BigAData from tqdm.notebook import tqdmplate cyb bad BigAData() json bad.…

180页某项目可视化智能停车场系统技术解决方案WORD

今天分享的是一份《180页某项目可视化智能停车场系统技术解决方案WORD》&#xff0c;资料详细完整的描述了关于数智化停车场的建设方案&#xff0c;参考价值很高。 传统停车场存在进出场效率低、找车位难、找车难、管理难、管理成本高等诸多问题&#xff0c;本次建设的XX项目将…

四、控制结构

文章目录 引言一、顺序控制二、分支控制&#xff08;if&#xff0c;else&#xff0c;switch&#xff09;2.1 if 单分支2.2 if 双分支2.3 if 多分支2.4 if 嵌套分支2.5 switch分支结构2.6 switch和if的比较 三、循环控制&#xff08;for&#xff0c;while&#xff0c;dowhile&am…

[Linux#47][网络] 网络协议 | TCP/IP模型 | 以太网通信

目录 1.网络协议 2.协议分层 2.1 OSI七层模型 2.2TCP/IP五层(四层)模型 2.3 以太网通信 1.网络协议 "协议"本质就是一种约定 计算机之间的传输媒介是光信号和电信号. 通过 "频率" 和 "强弱" 来表示 0 和 1 这样的 信息. 要想传递各种不同…

全志616系统启动和登录

一、系统启动 刷完机烧入镜像&#xff0c;直接用MobaXterm软件串口登陆 约定固定的波特率115200。 默认登录&#xff1a; 用户&#xff1a;orangepi 密码&#xff1a;orangepi 或用户&#xff1a;root 密码&#xff1a;orangepi 在输入密码时…

SEO之网站结构优化(十三-网站地图)

** 初创企业搭建网站的朋友看1号文章&#xff1b;想学习云计算&#xff0c;怎么入门看2号文章谢谢支持&#xff1a; ** 1、我给不会敲代码又想搭建网站的人建议 2、“新手上云”能够为你开启探索云世界的第一步 博客&#xff1a;阿幸SEO~探索搜索排名之道 网站无论大小&…