Kafka 深入服务端 — 时间轮

news2025/1/31 2:48:27

Kafka中存在大量的延迟操作,比如延时生产、延时拉取和延时删除等。Kafka基于时间轮概念自定义实现了一个用于延时功能的定时器,来完成这些延迟操作。

1 时间轮

Kafka没有使用基于JDK自带的Timer或DelayQueue来实现延迟功能,因为它们的插入和删除操作的时间复杂度为logn,这不能满足Kafka高性能要求。

1.1 Timer 和 DelayQueue

它们都使用了一个优先级队列(通常基于堆实现)来管理任务。

1.1.1 Timer

用于计划在特定时间后执行的任务,这些任务可以只执行一次或定期重复执行。其有以下特点:

  1. 运行在单线程中,无法满足多个任务同时执行的需求。如果前置任务耗时长,可能会阻塞后置任务。
  2. 如果任务执行过程中抛出异常,Timer会被异常中断停止。
public class TimerTest {

    public static void main(String[] args) {
        Timer timer = new Timer();

        Date startTime = new Date();

        TimerTask task1 = new TimerTask() {
            @Override
            public void run() {
                long dis = (new Date().getTime() - startTime.getTime()) / 1_000;
                System.out.println("task1执行,距离开始时间:" + dis + "s");
            }
        };

        TimerTask task2 = new TimerTask() {
            @Override
            public void run() {
                long dis = (new Date().getTime() - startTime.getTime()) / 1_000;
                System.out.println("task2执行,距离开始时间:" + dis + "s,休眠5s。");
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println("task2休眠结束");
            }
        };

        TimerTask task3 = new TimerTask() {
            @Override
            public void run() {
                long dis = (new Date().getTime() - startTime.getTime()) / 1_000;
                System.out.println("task3执行,距离开始时间:" + dis + "s");
            }
        };

        timer.schedule(task1,1000); // 1s后执行
        timer.schedule(task2,2000); // 2s后执行
        timer.schedule(task3,3000); // 3s后执行
    }
//    执行结果:
//    task1执行,距离开始时间:1s
//    task2执行,距离开始时间:2s,休眠5s。
//    task2休眠结束
//    task3执行,距离开始时间:7s
}

1.1.2 DelayQueue

是一个无界阻塞队列,用于存储实现了Delayed接口的元素,这些元素只有在它们的延迟期满时才会被取出。其有以下特点:

  1. 线程安全,可以在多线程环境中使用。
  2. 无界队列,可以存储任意数量的元素,直到系统内存耗尽。
  3. 延迟精度依赖与系统时钟。
public class DelayQueueTest {

    private static class DelayQueueTask implements Delayed {

        private final String taskName;
        private final long delayTime;

        private DelayQueueTask(String taskName, long delayTime) {
            this.taskName = taskName;
            this.delayTime = delayTime + System.currentTimeMillis();
        }

        @Override
        public long getDelay(TimeUnit unit) { // 返回剩余延迟
            long diff = delayTime - System.currentTimeMillis();
            return unit.convert(diff,TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            if (this.delayTime < ((DelayQueueTask) o).delayTime) {
                return -1;
            }
            if (this.delayTime > ((DelayQueueTask) o).delayTime) {
                return 1;
            }
            return 0;
        }

        @Override
        public String toString() {
            return "DelayQueueTask{" +
                    "taskName='" + taskName + '\'' +
                    ", delayTime=" + delayTime +
                    '}';
        }
    }

    public static void main(String[] args) throws InterruptedException {
        DelayQueue<DelayQueueTask> delayQueue = new DelayQueue<>();

        delayQueue.offer(new DelayQueueTask("task1",5000));
        delayQueue.offer(new DelayQueueTask("task2",2000));
        delayQueue.offer(new DelayQueueTask("task3",4000));

        System.out.println("开始执行delayQueue任务");
        while (!delayQueue.isEmpty()) {
            DelayQueueTask task = delayQueue.take();
            System.out.println("任务:" + task);
        }
        System.out.println("delayQueue任务 任务执行完毕");
    }
}

1.2 时间轮结构

Kafka 在任务的插入与删除采用了时间轮结构,其时间复杂度为O(1),而在时间推进上,还是依赖JDK提供的DelayQueue。

图 时间轮(TimingWheel)结构

Kafka的时间轮是一个存储定时任务的环形队列,每个元素(时间格)相当于一个桶(bucket),来存储一个定时任务列表TimerTaskList。TimerTaskList是一个环形的双向链表,链表中的每一项都是定时任务项TimerTaskEntry,其中封装了真正的定时任务TimerTask。

Kafka将TimerTaskList插入到DelayQueue(队列)中,使其成为其中的一个元素。它的过期时间为TimerTaskList的TimerTaskEntry中最快过期的时间。

1.2.1 时间格与时间跨度

时间轮由多个时间格组成(上面示意图中,每一层有10个时间格),每个时间格代表时间轮的基本时间跨度(tick),时间格数(wheelSize)是固定的,那么时间轮的总体跨度interval = tick * wheelSize。

时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间。currentTime是tick的整数倍,currentTime将整个时间轮划分为到期部分和未到期部分。当前指向的时间格也属于到期部分,此时需要处理此时间格所对应的TimerTaskList中的所有任务。

上面示意图中,tick = 1s,此时currentTime指向第2个时间格,需要处理这个时间格存储的所有任务。假设,此时插入了一个3s后的任务,则把该任务插入第5个时间格中的bucket。

1.2.2 时间轮层级

当currentTime 指向第2个时间格时,需要插入一个33s后的任务,此时时间超过了第一层的跨度(1s * 10 = 10s)。Kafka引入层级时间轮的概念,当到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层的时间轮中。

33s 的任务会被插入到第二层的第(33 / 10 = 3) 3 个时间格中。

第一层的开始时间(第0格)startMs 是当前系统时间。其余高层时间轮的起始时间都设置为创建此层时前面第一轮的currentTime。

每一层时间轮都会有指向更高一层的引用。

1.2.3 任务处理及时间轮降级

图 时钟

时间轮类似于时钟,当每一层走完一圈时,上一层就会走一格。例如当第1层的currentTime 指向第2格时,此时需要插入两个任务,分别是33s及39s后。它们都会被插入到第2层的第3格中的bucket(TimerTaskList)。

假设经过33s(第1层指向第5格,第2层指向第3格)后,此bucket还是只有这两个任务。Kafka会把它们所在的TimerTaskList从第2层的第3格中取出,将33s的任务执行并从TimerTaskList中删除。此时,39s的任务还剩6s,Kafka会把这个任务“降级”,插入到第1层第1((5+6)% 10)格中。

1.2.4 时间推进与DelayQueue

如果按照时间格一格格推进时间,这样消耗会比较大,而且可能好多时间格没有存储任务。Kafka借助DelayQueue来推进时间。

将时间格bucket的TimerTaskList封装成Delayed,其剩余时间取TimerTaskList中TimerTaskEntry最快达到的时间。然后将这些Delayed插入到DelayQueue中。DelayQueue会将这些Delayed排序,最快到达的排在队列头部。当到达时刻时,将表头的TimerTaskEntry取出,对它的TaskEntry执行任务执行或降级等操作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2284958.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

一文掌握ADB的安装及使用

文章目录 一、什么是ADB&#xff1f;二、 安装ADB2.1 下载ADB2.2 配置环境变量 三、连接Android设备四、 常用ADB命令五、ADB高级功能5.1 屏幕截图和录制5.2 模拟按键输入5.3 文件管理5.4 系统设置管理5.5 系统操作指令5.6 日志操作指令5.7 APK操作指令5.8 设备重启和恢复 六、…

Linux系统下速通stm32的clion开发环境配置

陆陆续续搞这个已经很久了。 因为自己新电脑是linux系统无法使用keil&#xff0c;一开始想使用vscode里的eide但感觉不太好用&#xff1b;后面想直接使用cudeide但又不想妥协&#xff0c;想趁着这个机会把linux上的其他单片机开发配置也搞明白&#xff1b;而且非常想搞懂cmake…

OpenCSG月度更新2025.1

1月的OpenCSG取得了一些亮眼的成绩 在2025年1月&#xff0c;OpenCSG在产品和社区方面继续取得了显著进展。产品方面&#xff0c;推出了AutoHub浏览器自动化助手&#xff0c;帮助用户提升浏览体验&#xff1b;CSGHub企业版功能全面升级&#xff0c;现已开放试用申请&#xff0c…

AWTK 骨骼动画控件发布

Spine 是一款广泛使用的 2D 骨骼动画工具&#xff0c;专为游戏开发和动态图形设计设计。它通过基于骨骼的动画系统&#xff0c;帮助开发者创建流畅、高效的角色动画。本项目是基于 Spine 实现的 AWTK 骨骼动画控件。 代码&#xff1a;https://gitee.com/zlgopen/awtk-widget-s…

go到底是什么意思:对go的猜测或断言

go这个单词&#xff0c;简单地讲&#xff0c;表示“走或去”的意思&#xff1a; go v.去&#xff1b;走 认真想想&#xff0c;go是一个非常神秘的单词&#xff0c;g-和o-这两个字母&#xff0c;为什么就会表达“去&#xff1b;走”的意思呢&#xff1f;它的字面义或本质&…

学习数据结构(2)空间复杂度+顺序表

1.空间复杂度 &#xff08;1&#xff09;概念 空间复杂度也是一个数学表达式&#xff0c;表示一个算法在运行过程中根据算法的需要额外临时开辟的空间。 空间复杂度不是指程序占用了多少bytes的空间&#xff0c;因为常规情况每个对象大小差异不会很大&#xff0c;所以空间复杂…

DeepSeek--通向通用人工智能的深度探索者

一、词源与全称 “DeepSeek"由"Deep”&#xff08;深度&#xff09;与"Seek"&#xff08;探索&#xff09;组合而成&#xff0c;中文译名为"深度求索"。其全称为"深度求索人工智能基础技术研究有限公司"&#xff0c;英文对应"De…

Unity游戏(Assault空对地打击)开发(1) 创建项目和选择插件

目录 前言 创建项目 插件导入 地形插件 前言 这是游戏开发第一篇&#xff0c;进行开发准备。 创作不易&#xff0c;欢迎支持。 我的编辑器布局是【Tall】&#xff0c;建议调整为该布局&#xff0c;如下。 创建项目 首先创建一个项目&#xff0c;过程略&#xff0c;名字请勿…

(三)Session和Cookie讲解

目录 一、前备知识点 &#xff08;1&#xff09;静态网页 &#xff08;2&#xff09;动态网页 &#xff08;3&#xff09;无状态HTTP 二、Session和Cookie 三、Session 四、Cookie &#xff08;1&#xff09;维持过程 &#xff08;2&#xff09;结构 正式开始说 Sessi…

1.Template Method 模式

模式定义 定义一个操作中的算法的骨架&#xff08;稳定&#xff09;&#xff0c;而将一些步骤延迟&#xff08;变化)到子类中。Template Method 使得子类可以不改变&#xff08;复用&#xff09;一个算法的结构即可重定义&#xff08;override 重写&#xff09;该算法的某些特…

【PyTorch】5.张量索引操作

目录 1. 简单行、列索引 2. 列表索引 3. 范围索引 4. 布尔索引 5. 多维索引 个人主页&#xff1a;Icomi 在深度学习蓬勃发展的当下&#xff0c;PyTorch 是不可或缺的工具。它作为强大的深度学习框架&#xff0c;为构建和训练神经网络提供了高效且灵活的平台。神经网络作为…

[EAI-023] FAST: Efficient Action Tokenization for Vision-Language-Action Models

Paper Card 论文标题&#xff1a;FAST: Efficient Action Tokenization for Vision-Language-Action Models 论文作者&#xff1a;Karl Pertsch, Kyle Stachowicz, Brian Ichter, Danny Driess, Suraj Nair, Quan Vuong, Oier Mees, Chelsea Finn, Sergey Levine 论文链接&…

2025年AI手机集中上市,三星Galaxy S25系列上市

2025年被认为是AI手机集中爆发的一年&#xff0c;各大厂商都会推出搭载人工智能的智能手机。三星Galaxy S25系列全球上市了。 三星Galaxy S25系列包含S25、S25和S25 Ultra三款机型&#xff0c;起售价为800美元&#xff08;约合人民币5800元&#xff09;。全系搭载骁龙8 Elite芯…

在虚拟机里运行frida-server以实现对虚拟机目标软件的监测和修改参数(一)(android Google Api 35高版本版)

frida-server下载路径 我这里选择较高版本的frida-server-16.6.6-android-x86_64 以root身份启动adb 或 直接在android studio中打开 adb root 如果使用android studio打开的话&#xff0c;最好选择google api的虚拟机&#xff0c;默认以root模式开启 跳转到下载的frida-se…

如何实现滑动删除功能

文章目录 1 概念介绍2 使用方法3 示例代码 我们在上一章回中介绍了GestureDetector Widget相关的内容,本章回中将介绍Dismissible Widget.闲话休提&#xff0c;让我们一起Talk Flutter吧。 1 概念介绍 我们在这里介绍的Dismissible是一个事件响应Widget,它和GestureDetector类…

golang通过AutoMigrate方法自动创建table详解

一.AutoMigrate介绍 1.介绍 在 Go 语言中&#xff0c;GORM支持Migration特性&#xff0c;支持根据Go Struct结构自动生成对应的表结构,使用 GORM ORM 库的 AutoMigrate 方法可以自动创建数据库表&#xff0c;确保数据库结构与定义的模型结构一致。AutoMigrate 方法非常方便&am…

JAVA:利用 Content Negotiation 实现多样式响应格式的技术指南

1、简述 Content Negotiation&#xff08;内容协商&#xff09; 是 RESTful 服务的重要特性&#xff0c;允许客户端和服务器根据请求的不同特性动态选择适合的响应格式。它是一种在 HTTP 协议中实现的机制&#xff0c;通过它&#xff0c;服务器能够根据客户端需求返回适合的内…

Effective Objective-C 2.0 读书笔记—— objc_msgSend

Effective Objective-C 2.0 读书笔记—— objc_msgSend 文章目录 Effective Objective-C 2.0 读书笔记—— objc_msgSend引入——静态绑定和动态绑定OC之中动态绑定的实现方法签名方法列表 其他方法objc_msgSend_stretobjc_msgSend_fpretobjc_msgSendSuper 尾调用优化总结参考文…

使用EVE-NG-锐捷实现OSPF

一、OSPF基础知识 Open shortest Path First(OSPF)开放式最短路径优先协议 1.OSPF的关系状态 (1)邻居关系(TWO-WAY) 只发送hello包不发送LSA包(链路状态通告包) (2)邻接关系(FULL) OSPF设备与设备之间相互建立OSPF关系&#xff0c;初始为邻居关系(TWO-WAY)状态&#xff0…

Baklib赋能下的内容中台智能化推荐系统解析与展望

内容概要 在数字化时代&#xff0c;内容中台的智能化推荐系统正逐渐成为各类企业提升用户体验与运营效率的重要工具。该系统通过集成和分析大量用户数据及内容信息&#xff0c;能够实现精准的个性化推荐&#xff0c;为用户提供最相关的内容。 以下是内容中台智能化推荐系统的…