azkaban-tools 项目介绍

news2025/1/12 16:17:33

本文背景

应一个用户的好心和好奇心,在最近水深火热的百忙之中抽时间写完了一个简短的项目介绍,其实就是几个azkaban的批量操作脚本,但在大数据集群的“运维生涯”中,还是帮了自己不少忙,也算是为了它做一个简单的回顾吧

项目背景

azkaban 是一个大数据领域通用的任务管理服务,它的运行模式和其他任务管理服务类似,都是将任务下发到执行器,定期执行,它的优势主要在于可定义任务流,同个项目下不同任务可引用同个模板,大数据领域的任务正好比较具有复用性,因此在 azkaban 诞生的时代(第一个release在2014年),它还是成为了当时比较流行的开源任务调度服务

azkaban 的操作方式比较容易上手,通过界面即可完成所有的操作,包括上传项目、执行项目中定义的job、查看job日志、给任务配置调度时间等,操作并不复杂。但如果需要批量做一些操作,在界面一个个点就不太方便了

之前没有做这个项目的时候,隔三差五用户就要来找我“能不能帮忙…”(具体对话参考下面),终于有一天没忍住,本项目就此诞生…

项目主要是参考了 azkaban api 文档,通过若干脚本实现了一些常见的批量操作,项目地址: azkaban-tools

目前实现的批量操作场景如下:

批量操作① 启动任务

每年都会有个一两次的真实对话

在这里插入图片描述

批量操作② 启动任务

azkaban 默认不允许同时执行同一个任务,因此如果任务在上个周期执行一直没结束,到下个周期也不会被触发

于是偶尔也会有以下的对话

在这里插入图片描述

批量操作③ 设置调度

对离线抽数任务来说,一般都会在每天一个固定的时间调度一次,抽数完后执行上层的任务,一般就是这种场景

在这里插入图片描述

批量操作④ 设置调度

背景同上,调度周期如果要修改,比如从早上8点改成9点,需要先把原来的调度删除后再创建新的调度

如何使用

本地部署 azkaban

azkaban 提供两种部署模式: solo (单节点)和 webserver+executor集群模式,生产环境肯定是采用后者,本地测试可以通过 solo 模式快速部署

# 第三方镜像
docker run -d -p 8081:8081 --name azkaban-srv -e TZ='Asia/Shanghai' haxqer/azkaban

# 下载代码
git clone https://github.com/azkaban/azkaban
# 注意: 官方仓库久未维护,编译会有报错,也可以拉取笔者 fork 后修复的仓库
git clone https://github.com/smiecj/azkaban -b b_3_90_extend

# 编译
./gradlew build installDist

# 本地solo模式启动
cd azkaban-solo-server/build/install/azkaban-solo-server
./bin/start-solo.sh

访问刚启动的 solo 服务器: http://localhost:8081 ,默认用户名密码都是 azkaban

首次登录,首页还没有项目

上传项目

这里我们创建一个示例项目 examples, 并以 azkaban_examples 作为项目代码,压缩成zip包后上传

git clone https://github.com/joeharris76/azkaban_examples
zip -r azkaban_examples.zip azkaban_examples

上传任务压缩包后azkaban会解析出任务结构
azkaban 项目下的任务结构一般是 flow->job->template,flow 是父任务,job 为子任务。flow 可以通过串行或并行定义一组job的关系,job 可以直接定义任务行为,也可以引用template,在template中定义具体行为

以笔者实际维护的离线抽数作为例子,就是一个比较标准的azkaban任务格式:

在这里插入图片描述

所有抽数任务都在每天早上7点执行,每个抽数任务都对应一张mysql表,具体指令都是通过sqoop指令将数据从mysql导入hive表,区别只是执行sqoop的参数(即库表名)。这种任务我们就可以在 template 中编写sqoop的执行逻辑,每个表的同步任务作为一个job,都引用 template ,在job配置中定义表明即可

具体的任务定义示例:

# template/sqoop.job
command=sqoop import -Dmapreduce.job.user.classpath.first=true --connect jdbc:mysql://mysql_host:mysql_port/${mysql_db} --username mysql_user --password mysql_pwd --table ${mysql_table} -m 1 --target-dir /import/stg/${mysql_table} --as-avrodatafile

# stg/table1.job
mysql_db=element
mysql_table=orders

command=echo "import mysql table"

type=flow
flow.name=sqoop

examples 项目结构也比较简单,basic_flow 和 workflow 是最上层的 flow,basic_flow 引用的8个job直接定义了执行echo的行为;workflow引用的4个job则都属于同一个template,同样都是echo操作,只是最后打印的内容不同

basic_flow

workflow

手动执行两个flow,可以看到它们的执行速度都非常快,因为到最后的子任务都只是执行 echo

在这里插入图片描述

为了方便接下来的测试,我们可以给basic_flow下的job添加sleep以增加执行时间

# 每个job增加睡眠时间,序号越大睡眠时间越长
for i in {1..8}
do
  echo "command.1=sleep ${i}" >> basic_flow/basic_step_${i}.cmd.job
done

登录配置

在正式使用 azkaban-tools 之前,我们需要先修改 env.sh 中和服务相关的配置

# search_env_name: 查询操作的环境
search_env_name=produce

# exec_env_name: 执行、修改等操作的环境
# 注: 一开始开发这个项目是为了将一个老集群部署的azkaban上的所有任务调度都迁移到新集群服务上。为防止误操作生产环境,区分了进行查询和执行操作时使用不同的环境
exec_env_name=test

## produce env 连接信息
produce_azkaban_address=localhost:8081
produce_azkaban_user=azkaban
produce_azkaban_password=azkaban

## test env 连接信息
## 这里故意将测试环境地址配置成和生产环境不同(虽然都是本地地址),是为了接下来测试批量停止任务时,不会因为脚本中判断“执行环境地址和生产环境地址完全一致”而停止执行
test_azkaban_address=127.0.0.1:8081
test_azkaban_user=azkaban
test_azkaban_password=azkaban

jq 工具

项目通过 jq 工具来解析 azkaban 接口返回的json数据

jq 可以通过 yum/apt 安装,或者直接下载可执行文件

wget https://github.com/stedolan/jq/releases/download/jq-1.6/jq-linux64
mv jq-linux64 /usr/local/bin/jq
chmod +x jq

jq 作为命令行工具,解析json还是非常好用的,常用指令可以参考后面的章节

批量启动任务

如果需要把examples的两个flow都调度起来,可以这么配置:

# 需要执行的项目名
execute_project_name="examples"
# 禁止执行的任务名
execute_block_flow_names="(^_template$)"
# 允许执行的任务名
execute_allow_flow_names=""

配置完成后直接执行 make run,即可同时拉起两个任务

在这里插入图片描述

批量停止任务

配置和启动任务一样,执行 make kill,即可把项目下所有执行中的任务停止

批量创建调度

# 操作项目名
target_project_name="examples"

# 需要设置调度的白名单,不配置默认会设置项目下的所有任务
flow_name_allowlist="(^workflow$|^basic_flow$)"

# 设置的调度周期,java cron 格式
fix_schedule_cron="0 0 18 * * ?"

注: 已经配有调度的任务不会覆盖

在这里插入图片描述

批量删除调度

配置和创建调度相同,执行 make clean_cron即可删除任务调度

部分脚本逻辑

登录

# 登录方法需要返回azkaban地址和登录成功得到的session id 两个信息
login() {
    local env_name=$1
    eval $(get_azkaban_env_info_by_envname $env_name)
    local login_ret=`curl -X POST --data "action=$command_login&username=$tmp_azkaban_user&password=$tmp_azkaban_password" http://$tmp_azkaban_address 2>/dev/null`
    local tmp_session_id=`echo "$login_ret" | jq '.["session.id"] // empty' | tr -d '"'`
    echo "tmp_azkaban_address=$tmp_azkaban_address"
    echo "tmp_session_id=$tmp_session_id"
}

# eval $(login $exec_env_name)
# session_id=$tmp_session_id
# azkaban_address=$tmp_azkaban_address

shell脚本中返回多个值的方法参考,eval: 将传入的字符串当成指令执行

获取任务下的所有 flow

get_project_flow() {
    local project_name=$1
    local session_id=$2
    local azkaban_address=$3

    local get_project_ret=`curl "http://$azkaban_address/manager?ajax=$command_get_project_flows&project=$project_name&session.id=$session_id" 2>/dev/null`

    local tmp_project_id=`echo "$get_project_ret" | jq '.projectId // empty'`
    local tmp_flow_count=`echo "$get_project_ret" | jq '.flows | length'`
    local tmp_flow_name=`echo "$get_project_ret" | sed "s/ //g" | jq -r '[.flows[].flowId] | join(",")'`
    echo "project_id=$tmp_project_id"
    echo "flow_name_join_str=$tmp_flow_name"
    echo "flow_count=$tmp_flow_count"
}

jq 基本用法

参考了这篇文档和官方文档

# 获取某个key-value
echo '{"hello": "world"}' | jq '.hello' # world

# 获取数组
echo '{"arr": [1,2,3]}' | jq '.arr' # [1,2,3]

# 数组所有元素用逗号组合
# 注意: 如果数组元素类型是数字,还要先通过tostring转成字符串
# -r: 直接打印,不再组装成json
echo '{"arr": [1,2,3]}' | jq -r '[.arr[] | tostring] | join(",")' # 1,2,3

# 判断
echo '{"hello": "world"}' | jq 'has("world")' # false

# 选择器
echo '{"arr": [1,2,3,4,5]}' | jq -r '[.arr[] | select(.>2) | tostring] | join(",")' # 3,4,5

# 选择器和映射
echo '{"arr": [1,2,3,4,5]}' | jq -r '.arr | map(select(.>2))' # [3,4,5]

# 排序
echo '{"arr": [5,4,3,2,1]}' | jq -r '.arr | sort' # [1,2,3,4,5]

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1669112.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot集成jxls2实现复杂(多表格)excel导出

核心依赖 需求 导出多个表格,包含图片,类似商品标签 1.配置模板 创建一个xlsx的模板文件,配置如下 该模板进行遍历了两次,因为我想要导出的数据分为两列展示,左右布局,一个循环实现不了,所以采…

重学JavaScript高阶知识点(三)—— 详解Js中的内存管理

详解Js中的内存管理 1. 简介2. 内存生命周期3. JavaScript 的内存分配4. 垃圾回收 1. 简介 很多底层语言一般都有底层的内存管理接口,比如 C语言,可以调用对应的API去创建和释放内存空间。意思是需要手动去创建和释放内存空间,很明显&#x…

TCP服务器实现将客服端发送的信息广播发送(使用内核链表管理客户端信息)

目录 1.服务器端实现思路 2.服务器端代码 3.客户端代码 4.内核链表代码 5.运行格式 一、服务器端 二、客户端 6.效果 1.服务器端实现思路 Tcp广播服务初始化 等待客户端连接 广播发送 2.服务器端代码 #include "list.h" #include <signal.h> #def…

如何解决IntelliJ IDEA中pom.xml依赖项引发的安全漏洞黄线警告问题

背景 在开发过程中&#xff0c;当我们在pom.xml文件中添加依赖项时&#xff0c;经常会发现IntelliJ IDEA报出黄色警告线条&#xff0c;提示存在潜在的安全漏洞。警告的具体展现形式如下&#xff1a; 解决方案 首先&#xff0c;打开设置菜单界面&#xff0c;接着选择编辑器选…

神经网络复习--神经网络算法模型及BP算法

文章目录 神经网络模型的构成BP神经网络 神经网络模型的构成 三种表示方式&#xff1a; 神经网络的三要素&#xff1a; 具有突触或连接&#xff0c;用权重表示神经元的连接强度具有时空整合功能的输入信号累加器激励函数用于限制神经网络的输出 感知神经网络 BP神经网络 …

[嵌入式系统-75]:RT-Thread-快速上手:正点原子探索者 STM32F407示例

目录 正点原子探索者 STM32F407 上手指南 1. 简介 2. 准备工作 3. 运行第一个示例程序 3.1 编译下载 3.2 运行 继续学习 正点原子探索者 STM32F407 上手指南 1. 简介 探索者 STM32F407 是正点原子推出的一款基于 ARM Cortex-M4 内核的开发板&#xff0c;最高主频为 16…

机器人学导论实验2-差速驱动机器人的运动学与控制BJTU

目录 机器人导论实验-差速驱动机器人的运动学与控制 1 实验目的 2 任务一&#xff1a;前馈控制 2.1 内容分析 2.2 过程分析 2.3 结果分析 3 任务二&#xff1a;闭环控制 3.2 过程分析 3.3 结果分析 4 任务三&#xff1a;闭环控制&#xff08;改进&#xff09; 4.1 内容分…

Kotlin: ‘return‘ is not allowed here

报错&#xff1a;以下函数的内部函数return语句报错 Kotlin: return is not allowed here fun testReturn(summary: (String) -> String): String {var msg summary("summary收到参数")println("test内部调用参数&#xff1a;>结果是 &#xff1a;${msg…

(四十)第 6 章 树和二叉树(树的双亲表存储)

1. 背景说明 2. 示例代码 1) errorRecord.h // 记录错误宏定义头文件#ifndef ERROR_RECORD_H #define ERROR_RECORD_H#include <stdio.h> #include <string.h> #include <stdint.h>// 从文件路径中提取文件名 #define FILE_NAME(X) strrchr(X, \\) ? strrch…

项目管理-计算题公式-补充【复习】

1.EMV决策树 定义&#xff1a;用决策树在若干备选行动方案中选择一个最佳方案。在决策树 中&#xff0c;用不同的分支代表不同的决策或事件&#xff0c;即项目的备选路径。每个决策或事件 都有相关的成本和单个项目风险(包括威胁和机会)。决策树分支的终点表示沿特 定路径发展的…

C++ | Leetcode C++题解之第86题分隔链表

题目&#xff1a; 题解&#xff1a; class Solution { public:ListNode* partition(ListNode* head, int x) {ListNode* small new ListNode(0);ListNode* smallHead small;ListNode* large new ListNode(0);ListNode* largeHead large;while (head ! nullptr) {if (head-…

C语言实现猜数字小游戏

1.随机数生成 要想实现猜数字小游戏&#xff0c;依赖于随机数的生成 1.1 rand()函数 这个函数是用来生成随机数的&#xff0c;返回值是正整数&#xff0c;他的值的范围是0到rand_max之间的&#xff0c;rand_max的值在大多数编译器上面是32767&#xff0c;rand()函数的使用必…

高校课程评价|基于SSM+vue的高校课程评价系统的设计与实现(源码+数据库+文档)

高校课程评价系统 目录 基于SSM&#xff0b;vue的高校课程评价系统的设计与实现 一、前言 二、系统设计 三、系统功能设计 1管理员功能模块 2学生功能 3教师功能 4专家功能 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、源码获取&…

Node.js基础:从入门到实战

初识 Node.js 与内置模块 &#xff08;初识&#xff09; 1、知道什么是node.js 2、知道node.js可以做什么 3、node.js 中js的组成部分 &#xff08;内置模块&#xff09; 4、用 fs 模块读写操作文件 5、使用 path 模块处理路径 6、使用http 模块写一个基本的web服务器 初识 N…

高精度原理介绍及代码实现

目录 高精度 引入 使用场景 实现原理 高精度加法 数据存储 加法实现 总代码 高精度减法 与加法的不同点&#xff1a; 总代码 高精度乘法 总代码 高精度除法 总结 总注意点 减法注意点 高精度 引入 所谓高精度并不是很高级难懂的东西&#xff0c;只是对传统的…

改进YOLOv5,YOLOv5+CBAM注意力机制

目录 1. 目标检测模型 2. YOLOv5s 3. YOLOv5s融合注意力机制 4. 修改yolov5.yaml文件 5. ChannelAttentionModule.py 6. 修改yolo.py 1. 目标检测模型 目标检测算法现在已经在实际中广泛应用&#xff0c;其目的是找出图像中感兴趣的对象&#xff0c;并确定对象的类别和位…

一文读懂设计模式-单例模式

单例模式&#xff08;Singleton Pattern&#xff09;提供了一种创建对象的最佳方式 单例模式涉及到一个单一的类&#xff0c;该类负责创建自己的对象&#xff0c;同时确保只有单个对象被创建&#xff0c;这个类提供了一种访问其唯一的对象的方式&#xff0c;可以直接访问&…

Redis—图文详解高可用原因

本文不会讲解Redis的用途&#xff0c;关于用途会发另一片文章讲解&#xff0c;本文主要讲的是高可用的原理。 Redis高可用主要有以下三个原因&#xff1a;主从模式(上一篇讲Kafka的文章里有涉及到)&#xff0c;哨兵模式&#xff0c;Redis-Cluster(Redis集群)。 什么是主从模式…

mysql集群NDBcluster引擎在写入数据时报错 (1114, “The table ‘ads‘ is full“)

问题描述&#xff1a;mysql集群在写入数据时&#xff0c;出现上述报错 问题原因&#xff1a;表数据已满&#xff0c;一般是在集群的管理节点设置里面datamemory的值太小&#xff0c;当数据量超过该值时就会出现该问题 解决方案&#xff1a; 修改集群管理节点的config.ini里面…

【Linux 网络】网络编程套接字 -- 详解

⚪ 预备知识 1、理解源 IP 地址和目的 IP 地址 举例理解&#xff1a;&#xff08;唐僧西天取经&#xff09; 在 IP 数据包头部中 有两个 IP 地址&#xff0c; 分别叫做源 IP 地址 和目的 IP 地址。 如果我们的台式机或者笔记本没有 IP 地址就无法上网&#xff0c;而因为…