海豚调度监控:使用图关系网络解决核心链路告警,减轻任务运维负担!

news2024/10/24 10:23:50

💡 本系列文章是 DolphinScheduler 由浅入深的教程,涵盖搭建、二开迭代、核心原理解读、运维和管理等一系列内容。适用于想对 DolphinScheduler了解或想要加深理解的读者。
祝开卷有益。
大数据学习指南

大家好,我是小陶,之前分享了如果自动检测依赖缺失:文章在这,今天依然是有关依赖关系的分享。DolphinScheduler 在使用过程中,肯定会有任务出现失败的情况,那么问题来了:调度任务的告警是需要人为配置的,在生产环境中,面对海量的任务,如何找到重要的任务,并且在失败的时候,第一时间告警呢?

先思考一下。

先看思路

本文提供一个思路,接着往下看吧。

不卖关子了。

本质是路径查找,本文这里使用了图数据库,或者你也可以自己使用Java实现路径查找。

下面是需要实现的目标,看一组任务的关系,如下图所示,存在 A/B/C/D/E 五个任务,E 任务被配置为核心任务,当 B 任务报错时,检测到 B 和 E 之前存在路径,则需要电话告警。
截屏2024-06-20 13.39.53.png

所以在配置核心链路告警的时候,我们只需要配置叶子节点,在实际生产中,一般是应用层的任务,比如报表、标签、接口数据等任务。

清洗依赖数据

核心逻辑就是把所有工作流内部、跨工作流以及跨项目的依赖全部清洗出来,生成一张关系表。具体清洗逻辑,可以看:文章在这
image.png
最终生成了
t_ds_task_node_base_data 任务基础表,后续会用于 Nebula Graph,这个后面会讲。
t_ds_dag_task_relation_data_df 关系最终表,后续会用于 Nebula Graph,这个后面会讲。

t_ds_dag_task_relation_data_df 这个表结构如下:
截屏2024-06-20 13.53.00.png

关系导入图数据库

这里用的国产图数据库 Nebula Graph,当然你也可以自己使用 Java 实现路径查找

为什么我们一定要引入图数据库呢?有下面几方面考虑:

  • 可以减轻调度系统Mysql的压力,把负责的路径计算放在图数据库里面。
  • 探索更多调度任务数据治理和运维的可能性,比如任务权重,影响分析等。(不久的将来我也会分享这一块的实践。)

截屏2024-06-20 13.59.23.png
用到的组件是 Nebula Graph,最关键的函数是 find path 查询最短链路
① 用到的语法是:FIND SHORTEST PATH需要注意的是,注意查询步长,UPTO <N> {STEP|STEPS}:路径的最大跳数。默认值为5。
② 3.3.0 开始,子图支持了边的条件限制了,查询的时候只拿最新的一批关系。

创建图空间

CREATE SPACE s_schedule_job (partition_num = 225, replica_factor = 3, vid_type = FIXED_STRING(180)) COMMENT = "大数据平台调度系统任务的血缘关系";

创建边和点

## 任务标签
DROP tag if exists t_task;
CREATE tag if not exists t_task(  id string NULL COMMENT "project_code,dag_code,task_code,拼接,",  project_name string NULL COMMENT "project_name",  project_code string NULL COMMENT "project_code",  dag_name string NULL COMMENT "dag_name",  dag_code string NULL COMMENT "dag_code",  dag_version string NULL COMMENT "dag_version",  task_code string NULL COMMENT "task_code",  task_version string NULL COMMENT "task_version",  task_name string NULL COMMENT "task_name",  task_type string NULL COMMENT "task_type",  create_time string NULL COMMENT "时间戳") comment='调度任务节点';

## 调度任务关系
drop edge if exists e_task;
create edge if not exists e_task(  pre_project_name string NULL COMMENT "project_name",  pre_project_code string NULL COMMENT "project_code",  pre_dag_name string NULL COMMENT "dag_name",  pre_dag_code string NULL COMMENT "dag_code",  pre_dag_version string NULL COMMENT "dag_version",  pre_task_code string NULL COMMENT "task_code",  pre_task_version string NULL COMMENT "task_version",  pre_task_name string NULL COMMENT "task_name",  pre_task_type string NULL COMMENT "task_type",  post_project_name string NULL COMMENT "project_name",  post_project_code string NULL COMMENT "project_code",  post_dag_name string NULL COMMENT "dag_name",  post_dag_code string NULL COMMENT "dag_code",  post_dag_version string NULL COMMENT "dag_version",  post_task_code string NULL COMMENT "task_code",  post_task_version string NULL COMMENT "task_version",  post_task_name string NULL COMMENT "task_name",  post_task_type string NULL COMMENT "task_type",  create_time string NULL COMMENT "时间戳") comment='调度任务关系';

导入数据

同步点:

{
  spark: {
    app: {
      name: Nebula_Exchange_t_task
    }
    driver: {
      cores: 2
      maxResultSize: 5G
    }
  }

  nebula: {
    address:{
      graph:["10.1.x.xx:9669","10.1.x.xx:9669","10.1.x.xx:9669","10.1.x.xx3:9669","10.1.x.xx:9669"]
      meta:["10.1.x.xx:9559","10.1.x.xx:9559","10.1.x.xx:9559"]
    }
    user: root
    pswd: "nebula密码"
    space: s_schedule_job
    connection {
      timeout: 60000
      retry: 3
    }
    execution {
      retry: 3
    }
    error: {
      max: 32
      output: /tmp/errors/t_task
    }
    rate: {
      limit: 1024
      timeout: 10000
    }
  }
  tags: [

    {
      name: t_task
      type: {
        source: mysql
        sink: client
      }
      host:"调度系统MYSQL数据库IP"
      port:3307
      database:"调度系统MYSQL数据库"
      table:"t_ds_task_node_base_data"
      user:"调度系统MYSQL用户"
      password:"调度系统MYSQL用户密码"
      sentence:"SELECT concat(project_code,'_',dag_code,'_',task_code) as id,project_name,project_code,dag_name,dag_code,dag_version,task_code,task_version,task_name,task_type,create_time FROM t_ds_task_node_base_data"
      fields: [project_name,project_code,dag_name,dag_code,dag_version,task_code,task_version,task_name,task_type,create_time]
      nebula.fields: [project_name,project_code,dag_name,dag_code,dag_version,task_code,task_version,task_name,task_type,create_time]
      vertex:{
        field:id
      }
      batch: 256
      partition: 32
    }

  ]


}

同步边:

{
  spark: {
    app: {
      name: Nebula_Exchange_e_task
    }
    driver: {
      cores: 2
      maxResultSize: 5G
    }
  }

  nebula: {
    address:{
      graph:["10.1.x.xx:9669","10.1.x.xx:9669","10.1.x.xx:9669","10.1.x.xx3:9669","10.1.x.xx:9669"]
      meta:["10.1.x.xx:9559","10.1.x.xx:9559","10.1.x.xx:9559"]
    }
    user: root
    pswd: "aD@VX2018#"
    space: s_schedule_job
    connection {
      timeout: 60000
      retry: 3
    }
    execution {
      retry: 3
    }
    error: {
      max: 32
      output: /tmp/errors/e_task
    }
    rate: {
      limit: 1024
      timeout: 10000
    }
  }
  edges: [

    {
      name: e_task
      type: {
        source: mysql
        sink: client
      }
      host:"调度系统MYSQL数据库IP"
      port:3307
      database:"调度系统MYSQL数据库"
      table:"t_ds_task_node_base_data"
      user:"调度系统MYSQL用户"
      password:"调度系统MYSQL用户密码"
      sentence:"SELECT concat(pre_project_code,'_',pre_dag_code,'_',pre_task_code) as from_id,concat(post_project_code,'_',post_dag_code,'_',post_task_code) as to_id,pre_project_name,pre_project_code,pre_dag_name,pre_dag_code,pre_dag_version,pre_task_code,pre_task_name,pre_task_type,pre_task_version,post_project_name,post_project_code,post_dag_name,post_dag_code,post_dag_version,post_task_code,post_task_name,post_task_type,post_task_version,create_time FROM t_ds_dag_task_relation_data_df"
      fields: [pre_project_name,pre_project_code,pre_dag_name,pre_dag_code,pre_dag_version,pre_task_code,pre_task_name,pre_task_type,pre_task_version,post_project_name,post_project_code,post_dag_name,post_dag_code,post_dag_version,post_task_code,post_task_name,post_task_type,post_task_version,create_time]
      nebula.fields: [pre_project_name,pre_project_code,pre_dag_name,pre_dag_code,pre_dag_version,pre_task_code,pre_task_name,pre_task_type,pre_task_version,post_project_name,post_project_code,post_dag_name,post_dag_code,post_dag_version,post_task_code,post_task_name,post_task_type,post_task_version,create_time]
      source: {
        field: from_id
      }
      target: {
        field: to_id
      }
      batch: 256
      partition: 225
    }

  ]

}

定时脚本: 使用 Nebula Graph 社区提供的 exchange 工具把数据从 mysql 导入 Nebula Graph。

#!/bin/bash
# 作业参数
basepath='/opt/vcredit-graph-db/s_schedule_job/exchange'
tmpdir='/tmp/nebula/s_schedule_job'
mkdir -p $tmpdir
sourcefile=${basepath}/${jobname}.conf
targetfile=${tmpdir}/${jobname}_${vardate}.conf
cat ${sourcefile} > ${targetfile}
sed -i "s/vardate/${vardate}/g" ${targetfile}
sed -i "s/varhivetable/${varhivetable}/g" ${targetfile}

# 运行环境
export JAVA_HOME=/usr/java/jdk1.8.0_181-cloudera
spark_submit="/opt/spark-2.4.8-bin-hadoop2.7/bin/spark-submit"
# 开始运行
${spark_submit} \
--principal hive@VCREDIT.COM \
--keytab /etc/security/hive.keytab \
--master "local[*]" \
--class com.vesoft.nebula.exchange.Exchange /opt/nebula/nebula-exchange_spark_2.4-3.0.0.jar  -c ${targetfile} -h

Java 服务

/**
 * 判断这个任务是否会影响核心任务
 * @param projectName
 * @param dagName
 * @param taskName
 * @return
 */
@ApiOperation(value = "dolphinTaskIsOnCall", notes = "判断这个任务是否会影响核心任务,是 1 ,否 0")
@ApiImplicitParams({
        @ApiImplicitParam(name = "projectName", value = "T-1", required = false, dataType = "String", example = "BigData"),
        @ApiImplicitParam(name = "dagName", value = "T-1", required = false, dataType = "String", example = "公共和自定义域(pub)_daily"),
        @ApiImplicitParam(name = "taskName", value = "T-1", required = false, dataType = "String", example = "dwd_pub_screen_zxd_cust_df")
})
@GetMapping("/dolphinTaskIsOnCall")
@ResponseBody
public DataResult dolphinTaskIsOnCall(
        @RequestParam(value = "projectName", required = true) String projectName,
        @RequestParam(value = "dagName", required = true) String dagName,
        @RequestParam(value = "taskName", required = true) String taskName) throws GraphDatabaseException, UnsupportedEncodingException {

    HashMap<String,Object> res = dolphinService.dolphinTaskIsOnCall(projectName, dagName, taskName);
    return DataResult.ok(res);
}

核心代码,在第 17 行:

@Override
public HashMap<String, Object> dolphinTaskIsOnCall(String projectName, String dagName, String taskName) throws GraphDatabaseException, UnsupportedEncodingException {
    HashMap<String,Object> resMap = new HashMap<>();
    // 查询该任务 codes
    HashMap<String,Object> task = dolphinTaskInstanceMapper.getTaskCode(projectName,dagName,taskName);
    if (task == null){
        resMap.put("res","任务不存在!");
        return resMap;
    }
    String fromCodes = task.get("project_code") + "_" + task.get("dag_code") + "_" + task.get("task_code");
    // 查询核心任务 codes
    List<HashMap<String,Object>> tasks = dolphinTaskInstanceMapper.getOnCallTasks();
    // 查询最短链路
    for (HashMap<String,Object> t : tasks){
        String toCodes = t.get("project_code") + "_" + t.get("dag_code") + "_" + t.get("task_code");
        // 查询Nebula
        String NgSql = "FIND SHORTEST PATH with PROP FROM \"" + fromCodes + "\" TO \"" + toCodes + "\" OVER * WHERE e_task.create_time > '" + DateUtils.dayToString(DateUtils.getSomeDay(new Date(), -1)) + "' UPTO 100 STEPS  YIELD path AS p;";
        int res = nebulaService.isOnCallTask("s_schedule_job",NgSql);
        if (res > 0){
            resMap.put("res",res);
            return resMap;
        }
    }
    resMap.put("res",0);
    return resMap;
}

返回值说明:

① 影响核心任务,需要打电话
{“data”:{“res”:1},“code”:0,“msg”:“success”}
② 不影响核心任务,不需要打电话
{“data”:{“res”:0},“code”:0,“msg”:“success”}
③ 任务不存在,忽略
{“data”:{“res”:“任务不存在!”},“code”:0,“msg”:“success”}
④ code 不等于 0 ,接口异常,忽略。

封装好接口之后,任务失败的程序调这个接口,判断失败任务是否影响核心任务,如果影响就打电话。

钉钉告警样式:
image.png
外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
电话告警,直接给对应负责人打电话。

至此,我们减少了很多任务告警的配置工作,只需要关注核心的叶子节点是什么,也就是核心的应用任务是什么,大大提高了任务告警的配置效率!!!

注意:清洗数据 和 导入图数据库,在每天的 23:30 分进行,一天初始化一次,确保凌晨的任务关系是最新的,主要是用于凌晨告警。

以上就使用图关系网络解决核心链路告警的全部内容,如果有任何疑问,都可以与我交流,希望可以帮到你,下次见。


**大数据学习指南 **专注于大数据技术分享与交流。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1842911.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python | Leetcode Python题解之第155题最小栈

题目&#xff1a; 题解&#xff1a; class MinStack:def __init__(self):self.stack []self.min_stack [math.inf]def push(self, x: int) -> None:self.stack.append(x)self.min_stack.append(min(x, self.min_stack[-1]))def pop(self) -> None:self.stack.pop()sel…

Java8 --- Gradle安装及测试使用

目录 一、Gradle 1.1、简介 1.2、安装 1.2.1、注意事项 1.2.2、安装流程 1.2.3、配置共享仓库 1.3、构建项目 1.4、常用指令 1.4.1、gradle classes 1.4.2、gradle clean 1.4.3、gradle test 1.4.4、gradle build 1.5、修改Maven下载源 1.6、Wrapper包装器 1.…

蓝牙模块在智能城市构建中的创新应用

随着科技的飞速发展&#xff0c;智能城市的概念已经逐渐从理论走向实践。物联网技术作为智能城市构建的核心驱动力&#xff0c;正在推动着城市基础设施、交通管理、环境监测等领域的深刻变革。蓝牙模块&#xff0c;作为物联网技术的重要组成部分&#xff0c;以其低功耗、低成本…

数据结构与算法-差分数组及应用

差分数组 差分数组&#xff1a; 其实差分数组是创建一个一个辅助数组&#xff0c;用来表示给定数组的变化&#xff0c;一般用来对数组进行区间修改的操作。 频繁操作数组区间的问题 假设我们要对一个数组进行区间操作。数组为 a {10,10, 20,20,50,… 100}。数组数据比较多。 对…

中新赛克两款数据安全产品成功获得“可信数安”评估测试证书

6月19日&#xff0c;2024数据智能大会在北京盛大召开。 会上&#xff0c;中国2024年上半年度“可信数安”评估测试证书正式颁发。中新赛克两款参评产品凭借过硬的技术水准和卓越的应用效果&#xff0c;成功获得专项测试证书。 2024年上半年度“可信数安”评估测试通过名单 中新…

SpringBoot集成logback初始化源码解析(部分)

一.SpringBoot配置扩展点 SpringBoot日志模块使用监听的方式进行初始化&#xff0c;在SpringBoot项目启动后&#xff0c;会通知日志监听器 在日志监听器中ApplicationStartingEvent事件用来确定到底使用哪个日志系统&#xff0c;logback log4j等 在日志监听器中ApplicationEn…

PFA氟树脂烧杯耐热无接缝带基准刻度量杯30/50/100ml

PFA量杯&#xff1a;可溶性聚四氟乙烯量杯、特氟龙量杯。主要用于痕量超痕量分析、同位素分析等实验室&#xff0c;是国内外洁净的实验室分析器皿。 量杯是上大下小的圆台形&#xff0c;底座宽台设计&#xff0c;保证稳定性&#xff0c;可在实验室中作为定量量取液体的量具&am…

HNU-计算机系统(CSAPP)实验三 BombLab

前言 BombLab来自《深入理解计算机系统》&#xff08;CSAPP&#xff09;一书的第三章“程序的机器级表示”的配套实验&#xff0c;该实验的目的是通过反汇编可执行程序bomb&#xff0c;来反推出程序执行内容&#xff0c;进而能够正确破解“密码”&#xff0c;拆除“炸弹”。 …

四川赤橙宏海商务信息咨询有限公司引领抖音电商潮流

在当今数字化浪潮下&#xff0c;电商行业蓬勃发展&#xff0c;抖音电商作为新兴力量&#xff0c;正以其独特的魅力吸引着越来越多的商家和消费者。四川赤橙宏海商务信息咨询有限公司&#xff0c;作为抖音电商服务领域的佼佼者&#xff0c;凭借其专业的团队和丰富的经验&#xf…

番外篇 | 基于YOLOv5-RCS的明火烟雾检测 | 源于RCS-YOLO

前言:Hello大家好,我是小哥谈。RCS-YOLO是一种目标检测算法,它是基于YOLOv3算法的改进版本。通过查看RCS-YOLO的整体架构可知,其中包括RCS-OSA模块。RCS-OSA模块在模型中用于堆叠RCS模块,以确保特征的复用并加强不同层之间的信息流动。本文就给大家详细介绍如何将RCS-YOLO…

新疆旅游创新产品迎活力 伊吾胡杨文旅品牌发布

2024年,随着政策的引领、经济的形态、社会需求的多样化以及新媒体流量的赋能,我国旅游业的主基调将从“快速复苏”转向“理性繁荣”,文旅产业正呈现出前所未有的活力和潜力。6月14日,“千年敦煌万年胡杨”敦煌至伊吾踩线采风行暨伊吾胡杨产品发布大会圆满落幕。会上,景区、旅行…

一名女DBA的感谢信,到底发生了什么?

昨日我们收到这样一通来电 “早上九点刚上班便收到业务投诉电话&#xff0c;系统卡顿&#xff0c;接口失败率大增&#xff0c;怀疑数据库问题。打开运维平台发现是国产库&#xff0c;生无可恋&#xff0c;第一次生产环境遇到国产库性能问题&#xff0c;没什么排查经验&#xf…

什么是Amazon Relational Database Service(Amazon RDS)及实践体验

目录 前言亚马逊云服务免费体验中心三种优惠类型 Amazon RDS什么是Amazon RDS为什么选择 Amazon RDS&#xff1f;Amazon RDS 的优势关键功能详情工作原理Amazon RDSAmazon RDS CustomAmazon RDS on Amazon Outposts 实践创建并连接到 MySQL 数据库实例一、创建 EC2 实例二、创建…

大型Web应用的模块化与组织实践:Flask Blueprints深入解析

目录 一、引言 二、Flask Blueprints概述 三、Flask Blueprints的使用 创建Blueprint对象 定义路由和视图函数 注册Blueprint 使用Blueprints组织代码 四、案例分析 创建模块目录结构 创建Blueprint对象 注册Blueprint 五、代码示例与最佳实践 1. 代码示例 …

成为AIGC人才,是职场人当下的必修课?

随着科技的飞速进步&#xff0c;人工智能和机器学习技术正逐渐渗透到我们生活的每一个角落&#xff0c;其中&#xff0c;人工智能生成内容&#xff08;AIGC&#xff09;更是以其独特的魅力和广泛的应用前景&#xff0c;成为当下科技领域的热门话题。在这样的背景下&#xff0c;…

数字乡村:绘就乡村振兴的智慧新画卷

在乡村振兴战略的宏伟蓝图下&#xff0c;“数字乡村”作为新时代农村现代化的重要抓手&#xff0c;正悄然改变着中国乡村的面貌。本文旨在深度剖析数字乡村建设的核心价值、关键技术、成功案例以及未来展望&#xff0c;为乡村振兴战略提供前瞻性的思考与启示。 数字乡村的核心价…

java.io.eofexception:ssl peer shut down incorrectly

可能是因为 1)https设置 2&#xff09;超时设置 FeignConfig.java package zwf.service;import java.io.IOException; import java.io.InputStream; import java.security.KeyStore;import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocketFactory;import org.apac…

用电子表单替代纸质表格,签到报名、出入登记更轻松

用纸质表格收集信息时&#xff0c;常常会出现数据丢失、不易统计等问题。我们可以搭建电子表单来代替线下纸质表格&#xff0c;进行信息收集、记录数据。 这些数据会保存在账号下&#xff0c;可以导出Excel或PDF进行存档&#xff1b;也可以根据企业要求自定义PDF导出格式。 并…

com.lowagie:itext:jar:2.1.7.js9 was not found

1 在 https://jaspersoft.jfrog.io/ui/native/third-party-ce-artifacts/com/lowagie/itext/2.1.7.js9/下载com/lowagie/itext/2.1.7.js9/itext-2.1.7.js9.jar的包&#xff0c; 2 在本地maven仓库com.lowagie.itext.2.1.7的目录下&#xff0c;将itext-2.1.7.js9.jar复制更名为…

摄像头劫持——保护自己免受窥探

今天为您带来当今科技界的最新趋势及探索方法。本周&#xff0c;我们将为您提供五个防止黑客在您不知情的情况下访问您的网络摄像头的建议。 网络摄像头 一、摄像头劫持 你是否曾经怀疑过&#xff0c;即使你没有主动使用网络摄像头&#xff0c;也可能有人正在通过它窥视你&am…