KnowStreaming系列教程第三篇——调度任务模块

news2025/1/23 3:04:11

前一篇文章KnowStreaming系列教程第二篇——项目整体架构分析_诸葛子房_的博客-CSDN博客

讲述了KS的整体项目目录,这边文章来讲述下KS在调度模块里面对于指标采集和元数据同步

一、调度模块代码主要在km-task里面

public class TaskClusterAddedListener implements ApplicationListener<ClusterPhyAddedEvent> {
    private static final ILog LOGGER = LogFactory.getLog(TaskClusterAddedListener.class);

    @Override
    public void onApplicationEvent(ClusterPhyAddedEvent event) {
        LOGGER.info("method=onApplicationEvent||clusterPhyId={}||msg=listened new cluster", event.getClusterPhyId());
        Long now = System.currentTimeMillis();

        // 交由KS自定义的线程池,异步执行任务
        FutureUtil.quickStartupFutureUtil.submitTask(() -> triggerAllTask(event.getClusterPhyId(), now));
    }

    private void triggerAllTask(Long clusterPhyId, Long startTimeUnitMs) {
        ClusterPhy tempClusterPhy = null;

        // 120秒内无加载进来,则直接返回退出
        while (System.currentTimeMillis() - startTimeUnitMs <= 120L * 1000L) {
            tempClusterPhy = LoadedClusterPhyCache.getByPhyId(clusterPhyId);
            if (tempClusterPhy != null) {
                break;
            }

            BackoffUtils.backoff(1000);
        }

        if (tempClusterPhy == null) {
            return;
        }

        // 获取到之后,再延迟5秒,保证相关的集群都被正常加载进来,这里的5秒不固定
        BackoffUtils.backoff(5000);
        final ClusterPhy clusterPhy = tempClusterPhy;

        // 集群执行集群元信息同步
        List<AbstractAsyncMetadataDispatchTask> metadataServiceList = new ArrayList<>(SpringTool.getBeansOfType(AbstractAsyncMetadataDispatchTask.class).values());
        for (AbstractAsyncMetadataDispatchTask dispatchTask: metadataServiceList) {
            try {
                dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs);
            } catch (Exception e) {
                // ignore
            }
        }

        // 再延迟5秒,保证集群元信息都已被正常同步至DB,这里的5秒不固定
        BackoffUtils.backoff(5000);

        // 集群集群指标采集
        List<AbstractAsyncMetricsDispatchTask> metricsServiceList = new ArrayList<>(SpringTool.getBeansOfType(AbstractAsyncMetricsDispatchTask.class).values());
        for (AbstractAsyncMetricsDispatchTask dispatchTask: metricsServiceList) {
            try {
                dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs);
            } catch (Exception e) {
                // ignore
            }
        }
    }
}

通过监听集群添加事件,触发元数据同步和指标采集调度任务

具体实现可参考:

spring 根据接口或者抽象类获取子类执行: https://blog.csdn.net/u012501054/article/details/103927674

二、调度任务分布式系统如何做到单节点运行,避免多台机器调度

AbstractDispatchTask 里面的execute 方法通过实现任务分配
public TaskResult execute(JobContext jobContext) {
        try {
            long triggerTimeUnitMs = System.currentTimeMillis();

            // 获取所有的任务
            List<E> allTaskList = this.listAllTasks();
            if (ValidateUtils.isEmptyList(allTaskList)) {
                LOGGER.debug("all-task is empty, finish process, taskName:{} jobContext:{}", taskName, jobContext);
                return TaskResult.SUCCESS;
            }

            // 计算当前机器需要执行的任务
            List<E> subTaskList = this.selectTask(allTaskList, jobContext.getAllWorkerCodes(), jobContext.getCurrentWorkerCode());

            if (ValidateUtils.isEmptyList(allTaskList)) {
                LOGGER.debug("sub-task is empty, finish process, taskName:{} jobContext:{}", taskName, jobContext);
                return TaskResult.SUCCESS;
            }

            // 进行任务处理
            TaskResult ret = this.processTask(subTaskList, triggerTimeUnitMs);

            //组装信息
            TaskResult taskResult = new TaskResult();
            taskResult.setCode(ret.getCode());
            taskResult.setMessage(ConvertUtil.list2String(subTaskList, ","));

            return taskResult;

        } catch (Exception e) {
            LOGGER.error("process task failed, taskName:{}", taskName, e);

            return new TaskResult(TaskResult.FAIL_CODE, e.toString());
        }
    }

对应代码解释如下:

参考:

https://github.com/didi/KnowStreaming/blob/master/docs/dev_guide/Task%E6%A8%A1%E5%9D%97%E7%AE%80%E4%BB%8B.md

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/796458.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Docker-compose简介和部署编排 and Docker 私有仓库Harbor的简介和部署

Docker-compose简介和部署编排 and Docker 私有仓库的简介和部署 一、Docker-compose简介Ⅰ、compose概述Ⅱ、YAML 文件格式及编写注意事项Ⅲ、YAML支持的数据结构 二、compose部署安装Ⅰ、Docker Compose 环境安装Ⅱ、Docker Compose配置常用字段Ⅲ、Docker Compose 常用命令 …

【工具】js字符串扩展格式化方法format 格式化文本

有序的格式化文本&#xff0c;使用{number}做为占位符 通常使用&#xff1a;format("this is {0} for {1}", "a", "b") 》 this is a for b 形参: pattern – 文本格式 arguments – 参数 返回值: 格式化后的文本 /*** 设置字符串format函数* …

汽车分析,随时间变化的燃油效率

简述 今天我们来分析一个汽车数据。 数据集由以下列组成&#xff1a; 名称&#xff1a;每辆汽车的唯一标识符。MPG&#xff1a;燃油效率&#xff0c;以英里/加仑为单位。气缸数&#xff1a;发动机中的气缸数。排量&#xff1a;发动机排量&#xff0c;表示其大小或容量。马力&…

torchsort安装报错:ModuleNotFoundError: No module named ‘torch‘

【问题】 安装torchsort时报错ModuleNotFoundError: No module named torch。 但实际已安装torch。 【解决】 是pip版本的问题&#xff0c;回退至旧版即可。 pip --version # 查看当前pip版本 pip install pip # 查看pip所有版本 pip install pip23.0 # 回退pip至较新版本&…

Vue3 Element-plus分页效果动态数据展示

Vue3 Element-plus实现分页动态数据展示 环境&#xff1a;vue3tsviteelement plus 接着前面的axios封装请求数据&#xff0c;继续 直接贴代码 <template><div class"news"><ul><li v-for"item in state.list">{{item.title}}&l…

蓝牙协议之蓝牙车载BLE-GATT基础知识

蓝牙协议之蓝牙车载BLE-GATT基础知识 一&#xff1a;定义 GATT 的全名是 Generic Atribute Profile &#xff0c;它定义两 BLE 设备通过叫做 Service 和 Characteristic 的东西进行通信。GATT 就是使用了ATT (Atribute Protoo)协议&#xff0c;ATT 协议把 Service,Characterist…

RN输入框默认设置数字键盘

<TextInput keyboardType"numeric"/> keyboardType 决定弹出何种软键盘类型&#xff0c;譬如numeric&#xff08;纯数字键盘&#xff09;。 See screenshots of all the types here. 这些值在所有平台都可用&#xff1a; defaultnumber-paddecimal-padnume…

【电商小知识】7个步骤让你快速了解跨境电商!

近几年来&#xff0c;随着互联网的发展&#xff0c;国内外的商业贸易越来越流畅&#xff0c;直播电商的火爆也带动着一大批相关的产业链发展&#xff0c;其中跨境电商就是尤为突出的一个。尽管在国内做跨境电商的企业数量非常之多&#xff0c;但仍有许多新人争相入局&#xff0…

无涯教程-jQuery - Ajax Tutorial函数

AJAX是用于创建交互式Web应用程序的Web开发技术。如果您了解JavaScript,HTML,CSS和XML,则只需花费一个小时即可开始使用AJAX。 为什么要学习Ajax? AJAX代表 A 同步 Ja vaScript和 X ML。 AJAX是一项新技术,可借助XML,HTML,CSS和Java Script创建更好,更快,更具交互性的Web应用…

【Java】JUC并发编程-进程线程

目录 一、什么是JUC二、进程和线程1、进程2、线程 三、线程的六种状态四、wait与sleep的区别五、并发与并行1、串行模式2、并行模式3、并发模式4、管程 六、用户线程与守护线程1、用户线程&#xff08;自定义线程&#xff09;2、守护线程&#xff08;比如垃圾回收&#xff09; …

怎么来说?学习HashSet类

与这道题相关 》 345. 反转字符串中的元音字母 目录 一、HashSet是什么&#xff1f; 二、使用步骤 总结 一、HashSet是什么&#xff1f; HashSet是基于HashMap来实现的&#xff0c;实现了Set接口&#xff0c;同时还实现了序列化和可克隆化。而集合&#xff08;Set&#xff…

为什么程序员不喜欢写注释?

现在的项目开发里&#xff0c;代码注释就像程序员的头发&#xff0c;越来越少。 尤其是国内&#xff0c;这种现象不仅是在小公司小团队中司空见惯&#xff0c;就算在大公司&#xff0c;以及大团队中的开源项目里&#xff0c;也是屡见不鲜。 上图是我在阿里的 Druid 项目源码里…

SQL-每日一题【626.换座位】

题目 表: Seat 编写SQL查询来交换每两个连续的学生的座位号。如果学生的数量是奇数&#xff0c;则最后一个学生的id不交换。 按 id 升序 返回结果表。 查询结果格式如下所示。 示例 1: 解题思路 前置知识 MySQL 的 MOD() 函数是取模运算的函数&#xff0c;它返回两个数相除…

el-popover 的content内容换行

需求&#xff1a;把el-popover的content内容进行换行 <el-popoverplacement"bottom"width"450"trigger"click"><div class"custom-content">示例&#xff1a;如果您在 2 个托盘上运输 40 箱纸&#xff08;每个托盘上20 箱…

基于SpringBoot+Vue的“智慧食堂”系统设计与实现(源码+LW+部署文档等)

博主介绍&#xff1a; 大家好&#xff0c;我是一名在Java圈混迹十余年的程序员&#xff0c;精通Java编程语言&#xff0c;同时也熟练掌握微信小程序、Python和Android等技术&#xff0c;能够为大家提供全方位的技术支持和交流。 我擅长在JavaWeb、SSH、SSM、SpringBoot等框架…

三、Web安全相关知识

请勿用于非法用途 文章目录 一、Web源码框架二、目录结构1、静态资源2、WEB-INF&#xff08;1&#xff09;classes&#xff08;2&#xff09;lib&#xff08;3&#xff09;web.xml 二、web脚本语言1、脚本种类&#xff08;1&#xff09;ASP&#xff08;2&#xff09;ASP.NET&am…

Vue的下载以及MVVM分析

&#x1f600;前言本片文章是vue系列第一篇整理了vue的基础和发展史 &#x1f3e0;个人主页&#xff1a;尘觉主页 &#x1f9d1;个人简介&#xff1a;大家好&#xff0c;我是尘觉&#xff0c;希望我的文章可以帮助到大家&#xff0c;您的满意是我的动力&#x1f609;&#x1f6…

服务器——Nginx安装及静态配置、部署

目录 Nginx 安装Nginx步骤 安装yum-utils 配置nginx.repo源 安装nginx 系统启动nginx服务器 nginx.conf配置 关闭nginx服务器 配置文件启动nginx服务 配置文件编写 启动nginx服务 关闭nginx服务 服务器——Nginx安装及静态配置、部署。 直接在云服务器中启动项目&…

【C# 数据结构】Heap 堆

【C# 数据结构】Heap 堆 先看看C#中有那些常用的结构堆的介绍完全二叉树最大堆 Heap对类进行排序实现 IComparable<T> 接口 对CompareTo的一点解释 参考资料 先看看C#中有那些常用的结构 作为 数据结构系类文章 的开篇文章&#xff0c;我们先了解一下C# 有哪些常用的数据…

Android开发之Fragment动态添加与管理

文章目录 主界面布局资源两个工具Fragment主程序 主界面布局资源 在activity_main.xml中&#xff0c;声明两个按钮备用&#xff0c;再加入一个帧布局&#xff0c;待会儿用来展示Fragment。 <?xml version"1.0" encoding"utf-8"?> <LinearLayo…