海豚调度清理:使用 API 轻松清理历史工作流实例以及日志文件

news2025/2/1 12:03:29

💡 本系列文章是 DolphinScheduler 由浅入深的教程,涵盖搭建、二开迭代、核心原理解读、运维和管理等一系列内容。适用于想对 DolphinScheduler了解或想要加深理解的读者。
祝开卷有益。
大数据学习指南

大家好,我是小陶,DolphinScheduler 运行一段时间之后,会积累大量的历史运行记录,这些记录主要包括:工作流实例记录(MySQL)、任务实例记录(MySQL)、任务日志(本地磁盘),其中 MySQL 的记录越来越多,会影响页面分页查询的速度,进而影响用户使用体验和 MySQL 服务。

所以,需要清理以上历史记录,保证页面影响速度和 MySQL 服务。

本文的内容也比较简单,先是说明 API 的逻辑、存在的bug和修复方法,最后再介绍如何使用一个 Python 脚本来调用 API 删除历史实例。

1.API 逻辑介绍

DolphinScheduler 本身提供了批量删除工作流实例的接口,**process-instances/batch-delete,**接口逻辑这里简单描述一下就是,找到工作流下面的任务实例,依次删除任务日志和 Mysql 记录。

在这里插入图片描述

2.API bug说明和修复

但是这里需要注意的是,海豚调度 3.2.0(不包含)以前的版本,这里有一个 bug,在查询工作流实例下面的任务实例的时候,只查询了 flag =1 的任务实例,所以就导致了在清理日志和记录的时候,漏掉了一部分。

ProcessServiceImpl.java 中的 removeTaskLogFile 方法,在查询任务实例集合的时候,引用了 findValidTaskListByProcessId(processInstanceId); 而 findValidTaskListByProcessId 中仅查询了 Flag.YES 也就是 flag = 1 的记录。如下图所示:

在这里插入图片描述

这里解释一下 flag = 1 是标识该任务的最新的运行记录,表示任务多次重试之后,最新的运行记录。如果任务第一次失败了,第二次重试之后成功了,那么这个任务就会有两条运行记录,flag = 0 和 falg = 1,flag =1 的则标识最新的运行记录。

所以,如果你在使用海豚调度 3.2.0(不包含)以前的版本的时候,需要自行修复一下,或者升级到 3.2.0 。

修复的方式,也比较简单,新增 findAllTaskListByProcessId 方法,把工作流实例所有的运行实例都拿出来,不要加 flag 这个过滤条件。


3.使用 Python 脚本调用API

Python脚本的逻辑比较简单,使用了三个API,按照顺序是:

1.获取项目列表
2.获取工作流列表
3.批量删除工作流实例

入参是:日期

具体的代码如下:

#!/usr/bin/python
# -*- coding: utf8 -*-
## 定时清理调度工作流记录,入参是日期

import io
import subprocess
import requests
import json
import time
import datetime
from optparse import OptionParser
from optparse import OptionGroup

logging.basicConfig(format='%(asctime)s : %(levelname)s : %(module)s : %(message)s', level=logging.INFO,
                    stream=sys.stdout)
logger = logging.getLogger(__name__)

# 配置信息: ip 端口 token自行修改
base_url = 'http://IP:端口'
token = 'xxxxxxxxxxxxx'

# get args
def get_option_parser(params):
    usage = "usage: %prog [options] json-url"
    parser = OptionParser(usage=usage)
    prodEnvOptionGroup = OptionGroup(parser, "Product Env Options",
                                     "Normal user use these options to set jvm parameters, job runtime mode etc. "
                                     "Make sure these options can be used in Product Env.")
    for k in params:
        prodEnvOptionGroup.add_option("--" + k, metavar="<" + k + ">", dest=k, action="store", default="",
                                      help="" + params[k])

    parser.add_option_group(prodEnvOptionGroup)
    return parser
  
# 获取项目列表
def get_project_list():
    url = "{base_url}/dolphinscheduler/projects?pageSize=100&pageNo=1&searchVal=&_t=0.3741042528841678".format(base_url=base_url)
    payload={}
    headers = {
      'Connection': 'keep-alive',
      'Accept': 'application/json, text/plain, */*',
      'language': 'zh_CN',
      'sessionId': '680b2a0e-624c-4804-9e9e-58c7d4a0b44c',
      'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36',
      'Referer': "{base_url}/dolphinscheduler/ui/".format(base_url=base_url),
      'Accept-Language': 'zh-CN,zh;q=0.9,pt;q=0.8,en;q=0.7',
      'token':token
    }
    response = requests.request("GET", url, headers=headers, data=payload)
    response_data = json.loads(response.text)
    totalList = response_data['data']['totalList']
    return totalList

def get_page_detail(code,dt):
    url = "{base_url}/dolphinscheduler/projects/{code}/process-instances?searchVal=&pageSize=50&pageNo=1&host=&stateType=&startDate=2000-01-01 00:00:00&endDate={dt} 23:59:59&executorName=".format(code=code,dt=dt,base_url=base_url)
    payload={}
    headers = {
      'Connection': 'keep-alive',
      'Accept': 'application/json, text/plain, */*',
      'language': 'zh_CN',
      'sessionId': '680b2a0e-624c-4804-9e9e-58c7d4a0b44c',
      'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36',
      'Referer': "{base_url}/dolphinscheduler/ui/".format(base_url=base_url),
      'Accept-Language': 'zh-CN,zh;q=0.9,pt;q=0.8,en;q=0.7',
      'token':token
    }
    response = requests.request("GET", url, headers=headers, data=payload)
    response_data = json.loads(response.text)
    page = response_data['data']['totalList']
    page_del = 'processInstanceIds='
    if len(page) == 0:
        print('列表为空,退出程序')
        return '0'
    for p in page:
        page_del = page_del + str(p['id']) + ','
    # print(page_del)
    return page_del

def delete(project,ids):
    print('即将删除如下工作流实例:')
    print(project)
    print(ids)
    url = "{base_url}/dolphinscheduler/projects/{project}/process-instances/batch-delete".format(base_url=base_url,project = project)
    # 'processInstanceIds=89767'
    payload= ids
    headers = {
      'Connection': 'keep-alive',
      'Accept': 'application/json, text/plain, */*',
      'language': 'zh_CN',
      'sessionId': '680b2a0e-624c-4804-9e9e-58c7d4a0b44c',
      'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36',
      'Content-Type': 'application/x-www-form-urlencoded',
      'Referer': "{base_url}/dolphinscheduler/ui/".format(base_url=base_url),
      'Accept-Language': 'zh-CN,zh;q=0.9,pt;q=0.8,en;q=0.7',
      'token':token
    }
    response = requests.request("POST", url, headers=headers, data=payload)
    print('执行结果如下:')
    print(response.text)

if __name__ == '__main__':
    #获取请求参数()
    params = {"dt": "dt"};
    parser = get_option_parser(params)
    options, args = parser.parse_args(sys.argv[1:])
    logger.info('开始执行删除任务实例...' + " ".join(sys.argv))
    # 清理的日期
    dt = options.dt
    if dt == '' or len(dt) == 0:
        logger.error('调度系统-运维任务:日期为空,请输入日期')
        sys.exit(1)

    today_91 = (datetime.datetime.now()+datetime.timedelta(days=-61)).strftime("%Y-%m-%d")

    short_dt = dt.replace('-','')
    short_today_91 = today_91.replace('-','')
    if int(short_dt) > int(short_today_91):
        logger.error('调度系统-运维任务:不能删除最近90天之内的任务实例')
        sys.exit(1)
    # # 需要处理的项目
    projects = get_project_list()
    # 依次处理项目
    for project in projects:
        code = project['code']
        print('正在处理:'+ str(code))
        while True:
            page_del = get_page_detail(code,dt)
            if page_del == '0':
                break
            delete(code,page_del)
            time.sleep(1)

使用示例:dolphin_clean_process.py 是上面的脚本。

python  dolphin_clean_process.py 2024-01-01

**脚本在 GitHub 也维护了一份,欢迎 star **
https://github.com/aikuyun/dolphin_practices/blob/main/dolphin_clean_process.py

4.注意事项

1.token 获取的方式

在这里插入图片描述

2.可以删除的工作流的状态是一定要是完成状态的。否则,接口就会报错,非完成状态的工作流是不可以删除的。可以通过下面的SQL查看某个日期之前是否存在非完成状态的工作流实例。

SELECT *
FROM t_ds_process_instance
where state not in (7 ,13 ,6 ,8 ,5 ,9 ,3)
and start_time < '2024-01-01'

以上就使用 API 轻松清理历史工作流实例以及日志文件的全部内容,如果有任何疑问,都可以与我交流,希望可以帮到你,下次见。


大数据学习指南 专注于大数据技术分享与交流。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1815381.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

橡胶:神奇的天然材料

你是否知道&#xff0c;橡胶其实是一个充满神秘与奇妙的世界&#xff1f; 从原始的乳胶&#xff0c;到现代工业中的合成橡胶&#xff0c;它的变化与运用令人叹为观止。橡胶&#xff0c;不仅是轮胎、胶鞋的原材料&#xff0c;更是众多工业产品不可或缺的组成部分。它的弹性、耐磨…

Win快速删除node_modules

在Windows系统上删除 node_modules 文件夹通常是一个缓慢且耗时的过程。这主要是由于几个关键因素导致的&#xff1a; 主要原因 文件数量多且嵌套深&#xff1a; node_modules 文件夹通常包含成千上万的子文件夹和文件。由于其结构复杂&#xff0c;文件和文件夹往往嵌套得非常…

商家转账到零钱申请内幕最详细解说

商家转账到零钱开通过程中&#xff0c;微信支付官方提供了多达十一种不同的转账场景&#xff0c;这些繁杂的选项经常让商家感到迷茫&#xff0c;难以选择最适合的场景。尤其是申请被拒后&#xff0c;一些商家会试图通过更换场景来碰运气。 不过根据我们上万例的开通经验来看&a…

STM32存储左右互搏 模拟U盘桥接SPI总线FATS读写FLASH W25QXX

STM32存储左右互搏 模拟U盘桥接SPI总线FATS读写FLASH W25QXX STM32的USB接口可以模拟成为U盘&#xff0c;通过FATS文件系统对连接的存储单元进行U盘方式的读写。 这里介绍STM32CUBEIDE开发平台HAL库模拟U盘桥接SPI总线FATS读写W25Q各型号FLASH的例程。 FLASH是常用的一种非易失…

自己用pip下载好模块啦,但是在pycharm里面不显示?

问题&#xff1a; 今天在cmd里面用pip命令安装第三方模块&#xff0c;最后用pip list 命令发现已经成功安装&#xff0c;但是在pycharm里面用该模块的时候&#xff0c;还是爆红&#xff0c;显示没有该库 。 解决方法&#xff1a; 第一种&#xff08;项目刚创建&#xff09;&am…

Qt5/6使用SqlServer用户连接操作SqlServer数据库

网上下载SQLServer2022express版数据库,这里没啥可说的,随你喜欢,也可以下载Develop版本。安装完后,我们可以直接连接尝试, 不过一般来说,还是下载SQLServer管理工具来连接数据更加方便。 所以直接下载ssms, 我在用的时候,一开始只能用Windows身份登录。 所以首先,我…

html的网页制作代码分享

<!-- prj_8_2.html --> <!DOCTYPE html> <html lang "EN"><head><meta charset"utf-8" /><title>页面布局设计</title><style type "text/css">*{padding: 0px;margin:0px;}#header{back…

代码签名证书有哪些不同类型?

在数字化时代&#xff0c;软件安全已成为企业和个人用户关注的焦点。代码签名证书作为一种数字证书&#xff0c;对软件进行数字签名和加密&#xff0c;确保了软件在传输过程中的安全性和可靠性。本文将详细介绍代码签名证书的类型及其选择方法。 代码签名证书的类型 代码签名…

易基因:表观基因组分析揭示转录因子结合区DNA甲基化表征其功能和进化背景 | 研究速递

大家好&#xff0c;这里是专注表观组学十余年&#xff0c;领跑多组学科研服务的易基因。 DNA甲基化是一种重要的表观遗传修饰&#xff0c;对调控基因组功能有多种作用。其水平在整个基因组中具有空间相关性&#xff0c;通常在被抑制区域较高&#xff0c;在转录因子&#xff08…

shell脚本条件测试

条件测试 $? 返回码 判断命令或者脚本是否执行成功&#xff08;最近的一条&#xff09; 0 ture 为真就是成功 成立 非0 false 失败或者异常 test命令 可以进行条件测试 然后根据的是返回值来判断条件是否成立 test -e 测试目录和文件是否存在 exist test -d 只能测试目…

PS系统教程19

渐变与照片调色 增加色彩背景新建图层选好渐变拉选图片渐变 与图层模式结合 也可以变换颜色 看起来比较自然&#xff0c;因为是与人物结合起来 也可以选择系统里面的一些色调 可以进行多次调试

m4a怎么转换成m4r格式?

之前给大家分享过“m4a转mp3”教程文章&#xff0c;今天继续分享有关m4a的格式转换方法。想必大家对m4a格式已经了解的差不多了&#xff0c;苹果手机的专属格式。那么大家知道m4r格式吗? 从名字就可以看出来&#xff0c;它和m4a格式肯定有着密切的联系和相似点。m4r是iPhone铃…

知识付费平台功能模块详解

知识付费平台作为一种新兴的在线教育模式&#xff0c;以其用户需求导向的设计理念和便捷高效的学习方式&#xff0c;受到了广泛欢迎。这类平台汇集了职业技能、生活兴趣和人文社科等多领域的专业知识&#xff0c;并通过视频播放、在线问答、作业批改等工具和服务&#xff0c;助…

python的%time 、%%time 、%timeit、%%timeit的区别

%time 、%timeit 要在ipython下才可以使用。(所以说Jupyter Notebook当然是可以用的,pycharm里的python环境也是jupyter Notebook的) %time可以测量一行代码执行的时间 %timeit可以测量一行代码多次执行的时间 网上有说法说,%timeit是测量一行代码100000次循环内,3次最快速…

【沈阳航空航天大学主办|往届已实现EI检索】第二届智能通信与网络国际学术会议(ICN 2024)

第二届智能通信与网络国际学术会议&#xff08;ICN 2024&#xff09; The 2nd International Conference on Intelligent Communication and Networking 一、重要信息 大会官网&#xff1a;www.ic-icn.org (点击参会/投稿/了解会议详情&#xff09; 大会时间&#xff1a;2…

【扫码点餐系统】制作搭建部署

前言&#xff1a; 餐饮类做一个扫码点餐的工具可以提升用户体验、扩大市场份额、提高运营效率以及适应数字化趋势等方面。 一、企业开发小程序原因 企业开发小程序具有多方面的优势&#xff0c;可以帮助企业提升用户体验、扩大市场份额、提高运营效率以及适应数字化趋势等。…

如何导出数据库中数据表或查询结果的数据:支持大数据量(以MySQL和SQLynx为例)

MySQL的数据导出是一个操作非常频繁的任务&#xff0c;也是数据的存储和传输比较重要的一种方式&#xff0c;本文主要以SQLynx为例来介绍MySQL的数据如何导出。 目录 1 操作步骤 步骤 1&#xff1a;登录SQLynx 步骤 2&#xff1a;选择数据库和表 步骤 3&#xff1a;执行查询…

如何设置EVM/IMA密钥即如何生成签名密钥和证书

为生成和使用签名密钥来配置 IMA&#xff0c;以下是详细步骤&#xff0c;包括如何生成签名密钥和证书。 1. 生成签名密钥和证书 首先&#xff0c;我们需要一个私钥和一个自签名证书。我们将使用 OpenSSL 来生成这些密钥和证书。 生成私钥 openssl genpkey -algorithm RSA -…

RabbitMQ 见解一

1.初识MQ 1.1.同步和异步通讯 微服务间通讯有同步和异步两种方式&#xff1a; 同步通讯&#xff1a;就像打电话&#xff0c;需要实时响应。 异步通讯&#xff1a;就像发邮件&#xff0c;不需要马上回复。 两种方式各有优劣&#xff0c;打电话可以立即得到响应&#xff0c;…

各省环保税税额一览表数据

各省环保税税额一览表数据 1、指标&#xff1a;地区、税目、税率&#xff08;元/污染当量&#xff09;、依据 2、范围&#xff1a;27个省&#xff08;不含西藏、贵州、青海、陕西&#xff09; 3、来源&#xff1a;主要基于人大常委会的会议决定&#xff0c;涵盖了各类污染税…